Hello Yana, Thank you. Yes, it works. However, can you please suggest any examples ( or links) about the usage of combineByKey.
On Thu, Oct 9, 2014 at 12:03 PM, Yana Kadiyska <yana.kadiy...@gmail.com> wrote: > If you just want the ratio of positive to all values per key (if I'm > reading right) this works > > val reduced= input.groupByKey().map(grp=> > grp._2.filter(v=>v>0).size.toFloat/grp._2.size) > reduced.foreach(println) > > I don't think you need reduceByKey or combineByKey as you're not doing > anything where the values depend on each other -- you're just counting... > > On Thu, Oct 9, 2014 at 11:47 AM, HARIPRIYA AYYALASOMAYAJULA < > aharipriy...@gmail.com> wrote: > >> >> I am a beginner to Spark and finding it difficult to implement a very >> simple reduce operation. I read that is ideal to use combineByKey for >> complex reduce operations. >> >> My input: >> >> val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), >> ("SFO",0), ("SFO",1), >> ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), ("KX",9), >> ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) >> >> >> val opPart1 = input.combineByKey( >> (v) => (v, 1), >> (var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 >> + 1), >> (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + >> acc2._2) >> ) >> >> val opPart2 = opPart1.map{ case (key, value) => (key, >> (value._1,value._2)) } >> >> opPart2.collectAsMap().map(println(_)) >> >> If the value is greater than 0, the first accumulator should be >> incremented by 1, else it remains the same. The second accumulator is a >> simple counter for each value. I am getting an incorrect output (garbage >> values )for the first accumulator. Please help. >> >> The equivalent reduce operation in Hadoop MapReduce is : >> >> >> public static class PercentageCalcReducer extends >> Reducer<Text,IntWritable,Text,FloatWritable> >> >> { >> >> private FloatWritable pdelay = new FloatWritable(); >> >> >> public void reduce(Text key, Iterable<IntWritable> values,Context >> context)throws IOException,InterruptedException >> >> { >> >> int acc2=0; >> >> float frac_delay, percentage_delay; >> >> int acc1=0; >> >> for(IntWritable val : values) >> >> { >> >> if(val.get() > 0) >> >> { >> >> acc1++; >> >> } >> >> acc2++; >> >> } >> >> >> >> frac_delay = (float)acc1/acc2; >> >> percentage_delay = frac_delay * 100 ; >> >> pdelay.set(percentage_delay); >> >> context.write(key,pdelay); >> >> } >> >> } >> >> >> Please help. Thank you for your time. >> >> -- >> >> Regards, >> Haripriya Ayyalasomayajula >> contact : 650-796-7112 >> > > -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112