Hi,

I am trying to apply binning to a large CSV dataset. Here are the steps I
am taking:

1. Emit each value of CSV as (ColIndex,(RowIndex,value))
2. Then I groupByKey (here ColumnIndex) and get all values of a particular
index to one node, as I have to work on the collection of all values
3. I apply my binning algorithm which is as follows:
    a. Sort the values
    b. Iterate through values and see if it is different than the previous
one
        if no then add it to the same bin
        if yes then check the size of that bin, if it is greater than a
particular size (say 5% of whole        dataset) then change the bin
number, else keep the same bin
    c. repeat for each column

Due to this algorithm I can't calculate it partition wise and merge for
final result. But even for groupByKey I expect it should work , maybe
slowly, but it should finish. I increased the partition to reduce the
output of each groupByKey so that it helps in successful completion of the
process. But even with that it is stuck at the same stage. The log for
executor says:

ExternalMapAppendOnly(splilling to disk) (Trying ...)

The code works for small CSV files but can't complete for big files.

val inputfile = "hdfs://hm41:9000/user/file1"
val table = sc.textFile(inputfile,1000)

val withoutHeader: RDD[String] = dropHeader(table)

val kvPairs = withoutHeader.flatMap(retAtrTuple)

//val filter_na = kvPairs.map{case (x,y) => (x,if(y == "NA") "" else y)}

val isNum = kvPairs.map{case (x,y) => (x,isNumeric(y))}.reduceByKey(_&&_)

val numeric_indexes = isNum.filter{case (x,y) => y}.sortByKey().map{case
(x,y) => x}.collect()
//val isNum_Arr = isNum.sortByKey().collect()

val kvidx = withoutHeader.zipWithIndex
//val t = kvidx.map{case (a,b) => retAtrTuple(a).map(x =>(x,b)) }


val t = kvidx.flatMap{case (a,b) => retAtrTuple(a).map(x =>(x,b)) }
val t2 = t.filter{case (a,b) => numeric_indexes contains a._1 }

//val t2 = t.filter{case (a,b) => a._1 ==0 }
val t3 = t2.map{case ((a,b),c) => (a,(c,b.toDouble))}
//val t4 = t3.sortBy(_._2._1)
val t4 = t3.groupByKey.map{case (a,b) =>
(a,classing_summary(b.toArray.sortBy(_._2)))}

def dropHeader(data: RDD[String]): RDD[String] = {
    data.mapPartitionsWithIndex((idx, lines) => {
      if (idx == 0) {
        lines.drop(1)
      }
      lines
    })
  }


  def retAtrTuple(x: String) = {
    val newX = x.split(',')
    for (h <- 0 until newX.length)
      yield (h, newX(h))
  }

def isNumeric(s: String): Boolean = {
    (allCatch opt s.toDouble).isDefined
  }

def classing_summary(arr: Array[(Long, Double)]) = {
  var idx = 0L
  var value = 0.0
  var prevValue = Double.MinValue
  var counter = 1
  var classSize = 0.0
  var size = arr.length

  val output = for(i <- 0 until arr.length) yield {
      idx = arr(i)._1;
      value = arr(i)._2;
      if(value==prevValue){
        classSize+=1.0/size;
        //println("both values same")
        //println(idx,value,classSize,counter,classSize);
        prevValue = value;
        (idx,value,counter,classSize);
      }
      else if(classSize<(0.05)){
        classSize+=1.0/size;
        //println("both values not same, adding to present bucket")
        //println(idx,value,classSize,counter,classSize);
        prevValue = value;
        (idx,value,counter,classSize);
      }
      else {
        classSize = 1.0/size;
        counter +=1;
        //println("both values not same, adding to different bucket")
        //println(idx,value,classSize,counter,classSize);
        prevValue = value;
        (idx,value,counter,classSize);
      }
  }
  output.toArray
}

Thanks in advance,

Tushar Sharma

Reply via email to