I am a beginner to Spark and finding it difficult to implement a very simple reduce operation. I read that is ideal to use combineByKey for complex reduce operations.
My input: val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), ("SFO",0), ("SFO",1), ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), ("KX",9), ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) val opPart1 = input.combineByKey( (v) => (v, 1), (var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) ) val opPart2 = opPart1.map{ case (key, value) => (key, (value._1,value._2)) } opPart2.collectAsMap().map(println(_)) If the value is greater than 0, the first accumulator should be incremented by 1, else it remains the same. The second accumulator is a simple counter for each value. I am getting an incorrect output (garbage values )for the first accumulator. Please help. The equivalent reduce operation in Hadoop MapReduce is : public static class PercentageCalcReducer extends Reducer<Text,IntWritable,Text,FloatWritable> { private FloatWritable pdelay = new FloatWritable(); public void reduce(Text key, Iterable<IntWritable> values,Context context)throws IOException,InterruptedException { int acc2=0; float frac_delay, percentage_delay; int acc1=0; for(IntWritable val : values) { if(val.get() > 0) { acc1++; } acc2++; } frac_delay = (float)acc1/acc2; percentage_delay = frac_delay * 100 ; pdelay.set(percentage_delay); context.write(key,pdelay); } } Please help. Thank you for your time. -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112