If you just want the ratio of positive to all values per key (if I'm reading right) this works
val reduced= input.groupByKey().map(grp=> grp._2.filter(v=>v>0).size.toFloat/grp._2.size) reduced.foreach(println) I don't think you need reduceByKey or combineByKey as you're not doing anything where the values depend on each other -- you're just counting... On Thu, Oct 9, 2014 at 11:47 AM, HARIPRIYA AYYALASOMAYAJULA < aharipriy...@gmail.com> wrote: > > I am a beginner to Spark and finding it difficult to implement a very > simple reduce operation. I read that is ideal to use combineByKey for > complex reduce operations. > > My input: > > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), > ("SFO",0), ("SFO",1), > ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), ("KX",9), > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) > > > val opPart1 = input.combineByKey( > (v) => (v, 1), > (var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 > + 1), > (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + > acc2._2) > ) > > val opPart2 = opPart1.map{ case (key, value) => (key, > (value._1,value._2)) } > > opPart2.collectAsMap().map(println(_)) > > If the value is greater than 0, the first accumulator should be > incremented by 1, else it remains the same. The second accumulator is a > simple counter for each value. I am getting an incorrect output (garbage > values )for the first accumulator. Please help. > > The equivalent reduce operation in Hadoop MapReduce is : > > > public static class PercentageCalcReducer extends > Reducer<Text,IntWritable,Text,FloatWritable> > > { > > private FloatWritable pdelay = new FloatWritable(); > > > public void reduce(Text key, Iterable<IntWritable> values,Context > context)throws IOException,InterruptedException > > { > > int acc2=0; > > float frac_delay, percentage_delay; > > int acc1=0; > > for(IntWritable val : values) > > { > > if(val.get() > 0) > > { > > acc1++; > > } > > acc2++; > > } > > > > frac_delay = (float)acc1/acc2; > > percentage_delay = frac_delay * 100 ; > > pdelay.set(percentage_delay); > > context.write(key,pdelay); > > } > > } > > > Please help. Thank you for your time. > > -- > > Regards, > Haripriya Ayyalasomayajula > contact : 650-796-7112 >