Hi Tushar,

The most scalable option is probably for you to consider doing some
approximation.  Eg., sample the first to come up with the bucket
boundaries.  Then you can assign data points to buckets without needing to
do a full groupByKey.  You could even have more passes which corrects any
errors in your approximation (eg., see how sortByKey() works, and how it
samples the underlying RDD when constructing the RangePartitioner).  Though
its more passes through the data, it will probably be much faster since you
avoid the expensive groupByKey()

Imran

On Thu, Feb 26, 2015 at 3:38 AM, Tushar Sharma <tushars...@gmail.com> wrote:

> Hi,
>
> I am trying to apply binning to a large CSV dataset. Here are the steps I
> am taking:
>
> 1. Emit each value of CSV as (ColIndex,(RowIndex,value))
> 2. Then I groupByKey (here ColumnIndex) and get all values of a particular
> index to one node, as I have to work on the collection of all values
> 3. I apply my binning algorithm which is as follows:
>     a. Sort the values
>     b. Iterate through values and see if it is different than the previous
> one
>         if no then add it to the same bin
>         if yes then check the size of that bin, if it is greater than a
> particular size (say 5% of whole        dataset) then change the bin
> number, else keep the same bin
>     c. repeat for each column
>
> Due to this algorithm I can't calculate it partition wise and merge for
> final result. But even for groupByKey I expect it should work , maybe
> slowly, but it should finish. I increased the partition to reduce the
> output of each groupByKey so that it helps in successful completion of the
> process. But even with that it is stuck at the same stage. The log for
> executor says:
>
> ExternalMapAppendOnly(splilling to disk) (Trying ...)
>
> The code works for small CSV files but can't complete for big files.
>
> val inputfile = "hdfs://hm41:9000/user/file1"
> val table = sc.textFile(inputfile,1000)
>
> val withoutHeader: RDD[String] = dropHeader(table)
>
> val kvPairs = withoutHeader.flatMap(retAtrTuple)
>
> //val filter_na = kvPairs.map{case (x,y) => (x,if(y == "NA") "" else y)}
>
> val isNum = kvPairs.map{case (x,y) => (x,isNumeric(y))}.reduceByKey(_&&_)
>
> val numeric_indexes = isNum.filter{case (x,y) => y}.sortByKey().map{case
> (x,y) => x}.collect()
> //val isNum_Arr = isNum.sortByKey().collect()
>
> val kvidx = withoutHeader.zipWithIndex
> //val t = kvidx.map{case (a,b) => retAtrTuple(a).map(x =>(x,b)) }
>
>
> val t = kvidx.flatMap{case (a,b) => retAtrTuple(a).map(x =>(x,b)) }
> val t2 = t.filter{case (a,b) => numeric_indexes contains a._1 }
>
> //val t2 = t.filter{case (a,b) => a._1 ==0 }
> val t3 = t2.map{case ((a,b),c) => (a,(c,b.toDouble))}
> //val t4 = t3.sortBy(_._2._1)
> val t4 = t3.groupByKey.map{case (a,b) =>
> (a,classing_summary(b.toArray.sortBy(_._2)))}
>
> def dropHeader(data: RDD[String]): RDD[String] = {
>     data.mapPartitionsWithIndex((idx, lines) => {
>       if (idx == 0) {
>         lines.drop(1)
>       }
>       lines
>     })
>   }
>
>
>   def retAtrTuple(x: String) = {
>     val newX = x.split(',')
>     for (h <- 0 until newX.length)
>       yield (h, newX(h))
>   }
>
> def isNumeric(s: String): Boolean = {
>     (allCatch opt s.toDouble).isDefined
>   }
>
> def classing_summary(arr: Array[(Long, Double)]) = {
>   var idx = 0L
>   var value = 0.0
>   var prevValue = Double.MinValue
>   var counter = 1
>   var classSize = 0.0
>   var size = arr.length
>
>   val output = for(i <- 0 until arr.length) yield {
>       idx = arr(i)._1;
>       value = arr(i)._2;
>       if(value==prevValue){
>         classSize+=1.0/size;
>         //println("both values same")
>         //println(idx,value,classSize,counter,classSize);
>         prevValue = value;
>         (idx,value,counter,classSize);
>       }
>       else if(classSize<(0.05)){
>         classSize+=1.0/size;
>         //println("both values not same, adding to present bucket")
>         //println(idx,value,classSize,counter,classSize);
>         prevValue = value;
>         (idx,value,counter,classSize);
>       }
>       else {
>         classSize = 1.0/size;
>         counter +=1;
>         //println("both values not same, adding to different bucket")
>         //println(idx,value,classSize,counter,classSize);
>         prevValue = value;
>         (idx,value,counter,classSize);
>       }
>   }
>   output.toArray
> }
>
> Thanks in advance,
>
> Tushar Sharma
>

Reply via email to