June 29, 2015

From Hadoop Streaming to RHadoop

The challenge of combining the statistical power of R and the "Big Data" capabilities of Hadoop is something that has always fascinated me. Over a year ago, I had finally broken free from from the stupidity of the WordCount ( and various other counting ) programs and tried to solve a real like retail problem with linear regression using R and Hadoop. This is documented in my blog post Forecasting Retail Sales -- Linear Regression with R and Hadoop. In this case however I had used the Hadoop streaming API to call to separate R programs.

Subsequently I had come across the Hortonworks HDP platform that dramatically simplified the process of installing and running Hadoop. This is explained in my blog post Big Data for the Non Geek, where in addition to installing Hadoop, I have also explained how to overcome the challenges of installing the RHadoop packages on top Hadoop on the Hortonworks platform.

Hortonworks has a nice example of how to run an rHadoop program on the HDP platform but I was very keen to see how to make my port my Retail Sales program from the traditional streaming mode to the rHadoop mode. This means replacing the two R program LinReg-map.R and LinReg-red.R into one R program LinReg-MR.R and run this, not from the linux command prompt but from R Studio itself.

This process is explained in this post.

First I had to check whether my original LinReg-map.R and LinReg-red.R would work on the Hortonworks HDP platform. Fortunately, they did but a small change was required in the command line -- not the two -file properties attached right at the very end


# -- used in the Hortonworks HDP .. two -file commands required
hdfs dfs -rm -r /user/ru1/retail/out900
# ---
hadoop jar /usr/hdp/2.2.0.0-2041/hadoop-mapreduce/hadoop-streaming-2.6.0.2.2.0.0-2041.jar -D mapred.job.name='RetailR' -mapper /home/ru1/retail/LinReg-map.R -reducer /home/ru1/retail/LinReg-red.R -input /user/ru1/retail/in-txt -output /user/ru1/retail/out900 -file LinReg-map.R -file LinReg-red.R 



Next thing that we had to do was to convert the five tab separated TXT files used as input into five corresponding comma separated CSV files. I am sure tab separated TXT files can also be used but for the time being it was easier to convert the data files to CSV than to explore the TXT option.

Finally, the two R program were merged into one R program and this is available in the rHadoop directory of the original Retail github repository.

Significant changes are as follows --
In the MAP part

-- the original Map program

DailySales <- read.table("stdin",col.names=c("date","sku","sale"))
for(i in 1:nrow(DailySales)){
  Key <- as.character(DailySales[i,]$sku)
  Val <- paste(as.character(DailySales[i,]$date),"$",as.character(DailySales[i,]$sale))
  cat(Key,gsub(" ","",Val),"\n")
}

-- the modified Map subroutine

mapper1 = function(null,line) {
    ckey = line[[2]]
    cval = paste(line[[1]],line[[3]],sep = "$")
    keyval(ckey,cval)
}


The Reduce part is greatly simplified

-- the original Reduce program

mapOut <- read.table("stdin",col.names=c("mapkey","mapval"))
CurrSKU <- as.character(mapOut[1,]$mapkey)
CurrVal <- ""
days <- ""
sale <- ""
FIRSTROW = TRUE
for(i in 1:nrow(mapOut)){
  SKU <- as.character(mapOut[i,]$mapkey)
  Val <- as.character(mapOut[i,]$mapval)
  DataVal <- unlist(strsplit(Val,"\\$"))
  if (identical(SKU,CurrSKU)){
    CurrVal = paste(CurrVal, Val)
    if (FIRSTROW)  {
      days <- DataVal[1]
      sale <- DataVal[2]
      FIRSTROW = FALSE
    } else {
    days = paste(days,DataVal[1])
    sale = paste(sale,DataVal[2])
  }
  }
  else {
    EstValue(CurrSKU,days,sale,9)
    CurrSKU <- SKU
    CurrVal <- Val
    days <- DataVal[1]
    sale <- DataVal[2]
    
  }
}
EstValue(CurrSKU,days,sale,9)


-- the modified Reduce subroutine

reducer1 = function(key,val.list) {
  days <- ""
  sale <- ""
  for(line in val.list) {
    DataVal <- unlist(strsplit(line, split="\\$"))
    days <- paste(days,DataVal[[1]])
    sale <- paste(sale,DataVal[[2]])
  }

  retVal <- EstValue(key,days,sale,9)
  keyval(key,retVal)
}




the "key" difference is that instead of using the cat command to emit the key-value pair,  we use the keyval function of the rmr2 package to move data from the mapper to the reducer. Also the reducer gets all the values for one key and so no sequential processing required to isolate the values associated with one key.

The actual Linear Regression is done by the EstValue function and ideally it should have had no changes at all. However there was ONE change that was required and that is shown here.

#---------------------------------------
#PastSale = Reduce("+",sale)
PastSale = 0
for (j in 2:length(sale))PastSale = PastSale + sale[j]
#PastSale= sale[2]
#----------------------------------------


The total of Past Sales, though not required for the Regression was being calculated by the Reduce function but somehow this would never work in rHadoop. Had to be replaced with a manual loop and that too starting from 2! However the answer in both cases -- streaming and rHadoop -- is the same.

To be honest, rHadoop is actually no different from the Hadoop streaming process. In fact it is one and the same but there are some small changes that one needs to make and the benefit is that one can work from the familiar confines of the RStudio as you can see from this screenshot.



Update :  A very comprehensive tutorial on RHadoop is available here. Here is a guide on how to run RHadoop on Amazon AWS EMR

No comments: