Hi, I am trying to apply binning to a large CSV dataset. Here are the steps I am taking:
1. Emit each value of CSV as (ColIndex,(RowIndex,value)) 2. Then I groupByKey (here ColumnIndex) and get all values of a particular index to one node, as I have to work on the collection of all values 3. I apply my binning algorithm which is as follows: a. Sort the values b. Iterate through values and see if it is different than the previous one if no then add it to the same bin if yes then check the size of that bin, if it is greater than a particular size (say 5% of whole dataset) then change the bin number, else keep the same bin c. repeat for each column Due to this algorithm I can't calculate it partition wise and merge for final result. But even for groupByKey I expect it should work , maybe slowly, but it should finish. I increased the partition to reduce the output of each groupByKey so that it helps in successful completion of the process. But even with that it is stuck at the same stage. The log for executor says: ExternalMapAppendOnly(splilling to disk) (Trying ...) The code works for small CSV files but can't complete for big files. val inputfile = "hdfs://hm41:9000/user/file1" val table = sc.textFile(inputfile,1000) val withoutHeader: RDD[String] = dropHeader(table) val kvPairs = withoutHeader.flatMap(retAtrTuple) //val filter_na = kvPairs.map{case (x,y) => (x,if(y == "NA") "" else y)} val isNum = kvPairs.map{case (x,y) => (x,isNumeric(y))}.reduceByKey(_&&_) val numeric_indexes = isNum.filter{case (x,y) => y}.sortByKey().map{case (x,y) => x}.collect() //val isNum_Arr = isNum.sortByKey().collect() val kvidx = withoutHeader.zipWithIndex //val t = kvidx.map{case (a,b) => retAtrTuple(a).map(x =>(x,b)) } val t = kvidx.flatMap{case (a,b) => retAtrTuple(a).map(x =>(x,b)) } val t2 = t.filter{case (a,b) => numeric_indexes contains a._1 } //val t2 = t.filter{case (a,b) => a._1 ==0 } val t3 = t2.map{case ((a,b),c) => (a,(c,b.toDouble))} //val t4 = t3.sortBy(_._2._1) val t4 = t3.groupByKey.map{case (a,b) => (a,classing_summary(b.toArray.sortBy(_._2)))} def dropHeader(data: RDD[String]): RDD[String] = { data.mapPartitionsWithIndex((idx, lines) => { if (idx == 0) { lines.drop(1) } lines }) } def retAtrTuple(x: String) = { val newX = x.split(',') for (h <- 0 until newX.length) yield (h, newX(h)) } def isNumeric(s: String): Boolean = { (allCatch opt s.toDouble).isDefined } def classing_summary(arr: Array[(Long, Double)]) = { var idx = 0L var value = 0.0 var prevValue = Double.MinValue var counter = 1 var classSize = 0.0 var size = arr.length val output = for(i <- 0 until arr.length) yield { idx = arr(i)._1; value = arr(i)._2; if(value==prevValue){ classSize+=1.0/size; //println("both values same") //println(idx,value,classSize,counter,classSize); prevValue = value; (idx,value,counter,classSize); } else if(classSize<(0.05)){ classSize+=1.0/size; //println("both values not same, adding to present bucket") //println(idx,value,classSize,counter,classSize); prevValue = value; (idx,value,counter,classSize); } else { classSize = 1.0/size; counter +=1; //println("both values not same, adding to different bucket") //println(idx,value,classSize,counter,classSize); prevValue = value; (idx,value,counter,classSize); } } output.toArray } Thanks in advance, Tushar Sharma