June 15, 2014

Introduction to Hadoop

My effort to learn Hadoop and Map Reduce resulted in the presentation. If you like it and find it useful do leave a comment.

June 11, 2014

Forecasting Retail Sales - Linear Regression with R and Hadoop

image from Responsemagazine.com
A retail store tracks the volume of sale for each stock-keeping-unit (SKU) that the store deals with. Given the sales for days 1 through 5, is it possible to predict the sales on days 6 through 10 ? Common sense dictates that sales will remain constant and the average sales per day for the first 5 days will be the same as the average sales per day for the next 5 days. However this may not always be the case, if there is a rising or falling trend. If there is a strongly rising trend, caused by a some strong promotional activity, then the assumption of constant sale will lead to a stock-out and loss of potential business. Similarly, if there is a strongly falling trend, then a similar assumption will lead to accumulation of dead stock and hence a loss related to excessive inventory. Instead of days, the same analysis can be done on the basis of weeks, fortnights or even month. Net-net given the sales over 5 periods of time, it is useful to be able to predict the same for the next 5 periods. How can we do this ? Without resorting to the simplistic "average daily sale" strategy ?

A simple solution is to use linear regression, a well known and widely used statistical tool. If you have the sales data for a particular SKU for the past 5 days, you can "fit" a regression line, determine the slope and the intercept of this line and use the resulting linear regression "model" to predict the expected sales for the next 5 days. Based on these predictions,  you can place orders for these SKUs so that the gap between expected and the actual is minimum. A software tool like R can be used to solve this problem very easily.

All this is well known. But when the number of SKUs is of the order of 50,000 - 70,000 then the time required  -- to build so many regression models, even with R, and then using each to estimate the sales quantity for different SKUs -- becomes enormous ! In fact, if one has to do this on a rolling-basis every day to predict the sales over the next 5 days, then it becomes impossible. Even before we have a solution to today's prediction, the next set of data is waiting and getting stale !

This is where Hadoop steps in. By splitting the regression problem for 50,000 - 70,000 SKUs across multiple computers, it is quite possible to solve the entire problem in a reasonable amount of time. This means that the person responsible for placing orders for the replenishment of inventory would know which of the SKUs would need to be ordered in a higher quantity and which to be ordered in a lower quantity. This is the Linear Regression problem that we will solve with R and Hadoop.

R is not necessary for regression. Any programming language like Java can be used but using R ( or for that matter, a similar tool like Python ) allows the ready-made function -- lm()  for linear regression -- to be used without re-inventing the wheel. In fact, R is a free and open-source statistical tool that is very widely used across the data analytics community. There are two ways to use R with Hadoop. First, we can use the streaming feature of Hadoop with R scripts or we can use the RHadoop set of packages from Revolution Analytics ( which include rhdfs, rmr2 and rhbase). The RHadoop path initially looks easier because it allows one to operate from within the familiar  R environment, but configuring RHadoop is difficult ( or at least, the author was unsuccessful despite a lot of effort). Moreover RHadoop is in reality using the same streaming feature of Hadoop to get the job done. So there is no loss if one ignores RHadoop and uses the native streaming feature of Hadoop directly.

So now we will see how to solve the Sales Forecasting Problem.

We have miniaturized the problem by assuming the Retail Store stocks and sells only 3 products, namely salt, soap and soda. On a particular day, arbitrarily designated as Day 08, the sales of these three SKUs was as follows and this was stored in a file called DailySales08.txt
8 soap 90
8 salt 90
8 soda 120
where the first column represents the day, the second the SKU name (or code) and the third column is the sales on Day 08. There are 4 other files, namely DailySales09.txt, DailySales10.txt, DailySales11.txt, DailySales12.txt.  In reality, each of these files will have very large number of records,  with one record for each SKU

Based on the data for 5 days, from day 08 to day 12, we need to estimate the data for 6th to 10th day, or for day 13 to day 17. Once we run the R program in Hadoop, the following output is generated
salt dates [ 8 - 12 ] : 400  next [ 6 - 10 ] 175 : 17  -- 575  
soap dates [ 8 - 12 ] : 600  next [ 6 - 10 ] 1025 : 239  -- 1625  
soda dates [ 8 - 12 ] : 620  next [ 6 - 10 ] 845 : 187  -- 1465 

where each row represents the picture for each SKU, where we can see in row 1
  • SKU is "salt"
  • cumulative actual sales on days 8 - 12 ( the first 5 days of the analysis ) is 400
  • cumulative expected sales from 6th to 10th day is 175
  • the estimated sale on the last, 10th day, that is day 17 is 17 ( just a coincidence !)
  • the total estimated sales over the 10 day period is 575 ( 400 actual, 175 estimated)
Why is the actual sales in the first 5 days 400 but the predicted sales in the next 5 days only 175 ? See what the regression data reveals :

The black dots represent the actual sales on the first 5 days [ day 8 - day 12 ] Based on this the model has created the regression line : sales = 170 -9*days and with this the estimated values for all the days can be calculated and shown as red dots on the graph. Because of the falling trend, the expected sale in the next 5 days is significantly lower than in the first 5 days. Or so says the regression data !

To run this program, a development environment was created an Ubuntu 14.04 laptop running R 3.0.2 and Hadoop 2.2.0 installed in a single cluster mode as described in my earlier post Demystifying Hadoop and MR with this DIY tutorial.

Section 4 of that tutorial showed how the Hadoop streaming utility was used to run a WordCount program in Python. The same strategy is used in this case, where we have replaced the python programs with two R scripts, LinReg-map.R and LinReg-red.R and a shell script runRetail.sh was used to execute the map-reduce job. The source code of all three scripts along with the 5 datafiles are available at the Git Repository prithwis/Retail.

Once the Mapper (LinReg-map.R) runs, the output looks like this, though in reality, this output will not be stored but instead "streamed" to the Reducer

salt 10$120 
salt 11$50 
salt 12$60 
salt 8$90 
salt 9$80 
soap 10$100 
soap 11$140 
soap 12$160 
soap 8$90 
soap 9$110 
soda 10$150 
soda 11$130 
soda 12$140 
soda 8$120 
soda 9$80 

here the Key is the SKU name, and the Value is a string formed by the concatenation of the date and the quantity sold, separated by the $ char.

In this case there were only 15 records ( 3 SKUs x 5 days ) but even if the number of SKUs is very high, the task of creating this sorted list of <key, value> can be distributed across multiple servers in the Hadoop cluster. This sorted list of records can now be distributed again to multiple servers for the second, reducer, program LinReg-red.R to execute. Hadoop ensures that all records pertaining to any one key ( or SKU) is sent to only one machine where the Linear Regression function is executed.

The reducer program reads through all the <Key, Value> pairs for each Key ( or SKU), splits the Val at the $ char isolate the date and the sale value for that date and create two lists one of dates and the other of the corresponding sale values. These two lists are passed, along with the key (SKU) to the user defined EstValue() function. The fourth parameter N, in our case 9, represents the number of days between the last day of the period and the first day for which data is available. In this case, first day was 8, N is 9, so the last day is the 10th day or day 17.

The EstValue() function is where the Linear Regression module lm() is finally called with the two lists for days, sales as input. For a quick recap of how Linear Regression is done in R, read this tutorial. A little bit of data manipulation is done in which, the days (8,9,10,11,12) are replaced by the more generic (1,2,3,4,5) and so the estimates are done for days (6,7,8,9,10) instead of (13,14,15,16,17). This transformation does not have any implication on the result.

There are 3 ways of testing / running this set of programs of which the first two can be done on a laptop

  • To test the R scripts without calling Hadoop, one can simply pipe the commands as follows : cat DailySales*.txt | ./LinReg-map.R | sort | ./LinReg-red.R > output.txt . This simulates the entire streaming process by sending the data from the 5 data files into the "stdin" of the mapper script that in turn streams the data to the Unix sort utility which in turn streams the sorted key-value pairs to the reducer script which in turn sends the "stdout" output into a file called output.txt This the output that you can see in the post above
  • To run the same scripts on the Hadoop Single Machine Cluster installed on a laptop, we use the following shell script runRetail.sh

#hdfs dfs -ls 
#hdfs dfs -mkdir /user/hduser/Retail-in
#hdfs dfs -copyFromLocal /home/hduser/RetailSales/DailySales*.txt /user/hduser/Retail-in
hdfs dfs -ls /user/hduser/Retail-in
hdfs dfs -rm -r /user/hduser/Retail-out
hadoop jar /usr/local/hadoop220/share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar -D mapred.job.name='RetailR' -mapper /home/hduser/RetailSales/LinReg-map.R -reducer /home/hduser/RetailSales/LinReg-red.R -input /user/hduser/Retail-in/* -output /user/hduser/Retail-out 
hdfs dfs -ls /user/hduser/Retail-out

this scripts creates the Retail-in directory in HDFS and loads the DailySales files from local directory to the HDFS filesystem. It deletes the output directory, if it exists, and then calls the Hadoop streaming program with the 4 mandatory parameters : mapper script, reducer script, input directory and output directory ( all with fully qualified names, to avoid any ambiguity ). The only additional parameter is the job name (RetailR) that helps track the job on http://localhost:8088 and http://localhost:50070

In both these cases, the output is the same. [Update] - To see how to do this directly in RHadoop, see this post.

Now that we know that the program works fine, how do we scale up ? When we have thousands of SKUs and we want to use data from, say 15 or 20 days to build the regression model, the number of records will go up dramatically. One can of course procure multiple servers and configure all of them with Ubuntu, R and Hadoop but this is a very big, complicated and error-prone task. The simple solution is to use the
  • Amazon Web Services Elastic Map Reduce ( AWS/EMR) services, where the Mapper and Reducer programs can be run without any change on the same ( or if necessary on much, much larger) data to get identical results obtained in the first two methods.
To try out AWS/EMR, you need to visit the AWS website with a credit card and sign up for a loginid. Then follow the steps given in this tutorial by Raffael Vogler to run the LinReg map and reduce scripts. Follow the steps but instead of Vogler's programs, use the ones described post. You should also ignore the Bootstrapping step as well two lines of -jobconf stream.num.map.output.key.fields=2 and
-jobconf map.output.key.field.separator=\t that were meant to be placed in the Arguments box since these are not required for the Linear Regression programs. Running the programs with the test data given in this post will take around 10 mins, 7 to provision and configure the machine and 3 mins to run the job. This will result in a charge of around US$ 2 or US$ 3 that will be billed to the credit card used to create the loginid. Should you use AWS/EMR do remember to terminate the cluster at the end of the exercise as otherwise the billing will continue.

AWS/EMR really removes the hassles of configuring Hadoop and makes running Map Reduce jobs as easy as, well almost, send a Gmail message ! Everything is GUI oriented. You choose the number of type of machines and input the location of the data files and the map and reduce scripts. So after building and testing your R scripts on a laptop, you can scale up to hundreds of servers in  a few minutes  and that too for only a few minutes ! Who could ask for anything more ?

In this post, we have defined a simple sales prediction problem that could be faced in any retail store and we have shown how it can be solved with Hadoop and R. The approach taken has been adopted from a YouTube video created and uploaded by Fady El-Rukby and even though he solves a completely different problem and uses native Java, not streaming R, we have used the same data and compared results to make sure that the Linear Regression function of R is working correctly. To learn more about R and Data Science in general, please read this post on Data Science - A DIY approach and to get business perspective join the Business Analytics Program at Praxis Business School, Calcutta

June 03, 2014

HIVE and PIG to simplify Hadoop

[Note] -- Hadoop, IMHO, is history. Rather than waste time with all this, suggest you check up my blog post on Spark with Python.

When I was doing engineering at IIT, Kharagpur, the computers that we had were not even as powerful as a low-cost non-smart phone today and other than the basic concept of programming, nothing that we learnt is of any relevance today. So when we start a teaching a course on Business Analytics, that lies at the bleeding edge of  current technology and business practices, there is simply no option but to take the Do-It-Yourself approach of first learning a subject and then teaching it to students. Fortunately, there are many kind and knowledgeable souls on this planet who have taken the pains to explain new and difficult concepts to ancients like us and thanks to Google, it is not too difficult to locate them.

Using this route, I first learnt what is Data Science and then created this compilation of tutorials and training materials that anyone can use to learn about this new subject in greater depth. The next big challenge was to Demystify Hadoop and Map Reduce as these two key concepts play a very significant role in this area of interest. Writing Map Reduce programs in java, as is the standard practice, is a non-trivial task and many people have sought to simplify matters by adopting other approaches. One is to use the Hadoop streaming API and use a program written in any executable language like Python or R. HIVE and PIG are two other products that have evolved to ease and facilitate the use of MR techniques with Hadoop systems.

HIVE simulates an SQL based query engine sitting on top of the data stored in HDFS file system on Hadoop. Anyone familiar with SQL will immediately feel at home with the DDL, DML (load, insert) and Select commands.

PIG (and its humourously named command prompt, GRUNT > ) is a scripting language that allows one to run queries on data stored on HDFS without writing complex MR programs in Java.

In this post we will

  1. Install HIVE and use SQL commands to load and retrieve data from an HDFS file system.
  2. Install PIG and use it to retrieve the same data 
  3. Do the same task with the usual Java program ( already shown in an earlier blog post.)
We assume that you have followed instructions in the earlier blog post and you single machine cluster of Hadoop installed on a Ubuntu ( preferably 14.04) machine.

Varad Meru of Orzota has created a set of four excellent tutorials that we will use to get a grip on PIG and HIVE.

The first one talks about installing Hadoop 1.0.3, but we will ignore that because we have already learnt to install Hadoop 2.2.0.

The data that is used in the three other tutorials is called the Book Crossing Dataset that you can download as a zip file and then extract ONLY the file called BX-Books.csv for the purpose of the next three tutorials.

From this file we will answer the question of how many books are published in each calendar year. Not really rocket-science but enough to meet the requirements of requirements of how HIVE and PIG work.

The second tutorial Hive for Beginners gives clear, step by step instructions to carry out the task. Almost every instruction works perfectly. The following listing show the shell script used for all three tutorials (HIVE, PIG, Java).


# --- hive and common data cleaning and loading

#hdfs dfs -mkdir /user/hive
#hdfs dfs -mkdir /user/hive/warehouse
#hdfs dfs -chmod g+w /tmp
#hdfs dfs -chmod g+w /user/hive/warehouse
#hdfs dfs -mkdir /user/hduser/BXData-in
#sed 's/&amp;/&/g' BX-Books.csv | sed -e '1d' |sed 's/;/$$$/g' | sed 's/"$$$"/";"/g' > BX-BooksCorrected.txt
#hdfs dfs -copyFromLocal /home/hduser/BXData/BX-BooksCorrected.txt /user/hduser/BXData-in

#hive -f goBX2.sql > goBX2.output

# ---- pig
#pig goBX3.pig

# --- java

#rm -rf LocalClasses
#mkdir LocalClasses
# ....
#javac -classpath $HADOOP_INSTALL/share/hadoop/common/hadoop-common-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/common/lib/commons-cli-1.2.jar -d LocalClasses BookXReducer.java
# .....
#javac -classpath $HADOOP_INSTALL/share/hadoop/common/hadoop-common-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/common/lib/commons-cli-1.2.jar -d LocalClasses BookXMapper.java
# .....
#javac -classpath $HADOOP_INSTALL/share/hadoop/common/hadoop-common-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_INSTALL/share/hadoop/common/lib/commons-cli-1.2.jar:LocalClasses -d LocalClasses BookXDriver.java && jar -cvf BookXDriver.jar -C LocalClasses/ .

#hadoop jar BookXDriver.jar BookXDriver /user/hduser/BXData-in /user/hduser/BXData-MR-out


instead of typing long HIVE commands by hand, we have created a file call goBX2.sql to store the various HIVE commands and by selectively un-commenting lines, we execute the different commands.


--use default;
--show databases;
--show tables;
--LOAD DATA INPATH '/user/hduser/BXData-in/BX-BooksCorrected.txt' OVERWRITE INTO TABLE BXDataSet;
select yearofpublication, count(booktitle) from bxdataset group by yearofpublication;


The only deviation from the instructions is
  1. One error in the CREATE TABLE command. Since ";" is the EOL for HIVE files, the first CREATE TABLE statement failed because it contained a ";" symbol. This problem was solved by changing it to "\;" before the execution could proceed.
Also note that output is stored in file goBX2.output.

After using HIVE, the same task is performed using PIG by following instructions given in the tutorial PIG for Beginners.

There were two deviations from the instructions
  1. PIG was throwing a fearful error ERROR org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl - Error whiletrying to run jobs.java.lang.IncompatibleClassChangeError: Found interface org.apache.hadoop.mapreduce.JobContext, but class was expected. that was causing a major abort. This was tracked down to this StackOverflow thread and the following command, issued from $PIG_HOME directory solved the problem : ant clean jar-all -Dhadoopversion=23 .. However please note that the command takes nearly 25 minutes to execute as it virtually rebuilds many Hadoop, PIG and related jars
  2. the PIG_CLASSPATH is set to the conf directory which in the case of Hadoop 2.2.0 is set to $HADOOP_INSTALL/etc/hadoop
  3. Also do note that after HIVE has loaded data into a table, it removes the data from the HDFS filesystem. So before PIG can start, the data has to be reloaded from the local file system to HDFS once again ! Simply uncomment the line in the shell script and run it once again
the PIG commands were stored in a file goBX3.pig and executed from the shell script goBX1.sh 

BookXRecords = LOAD '/user/hduser/BXData-in/BX-BooksCorrected.txt' USING PigStorage(';') AS (ISBN:chararray, BookTitle:chararray, BookAuthor:chararray, YearOfPublication:chararray, Publisher:chararray, ImageURLS:chararray, ImageURLM:chararray, ImageURLL:chararray);
GroupByYear = GROUP BookXRecords BY YearOfPublication;
CountByYear = FOREACH GroupByYear GENERATE CONCAT((chararray)$0,CONCAT(':',(chararray)COUNT($1)));
STORE CountByYear INTO '/user/hduser/BXData-out-pig/BXDataQueryResult' USING PigStorage('t');


In this case, the output is stored in the HDFS file system that can be accessed thorough the browser at localhost:50075 and downloaded.

Finally, after using HIVE and then PIG to generate the data, one can use the standard Java route as explained in this fourth and final tutorial. There is really no need to configure Eclipse with Hadoop Plug-in ( the version for Hadoop 2.2.0 is not yet ready or stable, as of now ). You can simply download the three java files : BookXDriver, BookXMapper, and BookXReducer and then use the javac command from the ubuntu prompt as given in the shell script above. Once again the output will be stored in the HDFS directory /user/hudser/BXData-MR-out ( as show in the diagram above ) and can be downloaded for comparison with the two other results.

Ok, here is the final screenshot of the applications console available in the browser at localhost:8088 that shows all the three jobs to have executed successfully.

If you find any errors in this post, please leave a comment. If you find it useful, do share it with your friends ... and also check out the Business Analytics program at Praxis Business School.

June 01, 2014

Consultant for Criminals or Criminal Consultants

In an earlier post on Extreme Konsulting, I had explored the characteristics and challenges of delivering management consulting services to reluctant and criminally corrupt government offices, but Sameer Kamat in his book Business Doctors, goes far, far deeper. In this new novel set in US West Coast, Sameer explores what it would, or could, mean to offer professional management consulting services to actual, law-breaking, criminals !

Best MBA Books | Business Doctors Management Consulting Gone WildAs one who has been in the consulting profession for a long time, Sameer knows too well that the business is not about rocket-science, earth-shattering ideas, but the ability to locate an action-template that has worked in the past and apply it, with some creative tweaks, to address the situation at hand.

This simple point-of-view has been expanded on and converted into a novel that explores the mutually awkward relationship that develops between an Ivy League educated, laptop-carrying management consultant and a mafia mob leader and his henchmen who have never seen anything like a Powerpoint Presentation !

But behind the initial sequence of hilarious situations, there is a deeper and then darker truth that Sameer's novel has pointed to. First, a criminal organisation is really no different from any run-of-the-mill corporate in terms of underlying business processes, like recruitment, money management, public relations and so on. It is only that the rules they break are from a different section of the statutes and some of them have more undesirable outcomes !

Which brings us to the second and deeper issue. Where is the dividing line between the consultant and the criminal ? Where does consulting to criminals end and the consultant becomes a criminal himself ? There are many instances, primarily in the US, where famous management consultants are now sitting in jail. Is it that the trajectory described in Business Doctors the one that has taken them there ? Read the novel and you may find an answer.

Those who are in the management consulting profession will love the way Sameer has mapped the well known tools of their trade to the world criminals. There are a few small plotholes but given the challenge of trying to fit a square peg into the round hole, we should be more than happy to give the author the licence to stretch facts to make ends meet. Net-net, quite a page turner that will keep you amused and intrigued till the very end.