Hello Sean, Thank you, but changing from v to 1 doesn't help me either.
I am trying to count the number of non-zero values using the first accumulator. val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0), ("SFO",0), ("SFO",9)) val plist = sc.parallelize(newlist) val part1 = plist.combineByKey( (v) => (1, 1), (acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) ) val Part2 = part1.map{ case (key, value) => (key, (value._1,value._2)) } This should give me the result (LAX,(2,3)) (SFO,(1,3)) On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen <so...@cloudera.com> wrote: > You have a typo in your code at "var acc:", and the map from opPart1 > to opPart2 looks like a no-op, but those aren't the problem I think. > It sounds like you intend the first element of each pair to be a count > of nonzero values, but you initialize the first element of the pair to > v, not 1, in v => (v,1). Try v => (1,1) > > > On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA > <aharipriy...@gmail.com> wrote: > > > > I am a beginner to Spark and finding it difficult to implement a very > simple > > reduce operation. I read that is ideal to use combineByKey for complex > > reduce operations. > > > > My input: > > > > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), > ("SFO",0), > > ("SFO",1), ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), > > ("KX",9), > > > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) > > > > > > val opPart1 = input.combineByKey( > > (v) => (v, 1), > > (var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, > acc._2 + > > 1), > > (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + > > acc2._2) > > ) > > > > val opPart2 = opPart1.map{ case (key, value) => (key, > > (value._1,value._2)) } > > > > opPart2.collectAsMap().map(println(_)) > > > > If the value is greater than 0, the first accumulator should be > incremented > > by 1, else it remains the same. The second accumulator is a simple > counter > > for each value. I am getting an incorrect output (garbage values )for > the > > first accumulator. Please help. > > > > The equivalent reduce operation in Hadoop MapReduce is : > > > > public static class PercentageCalcReducer extends > > Reducer<Text,IntWritable,Text,FloatWritable> > > > > { > > > > private FloatWritable pdelay = new FloatWritable(); > > > > > > public void reduce(Text key, Iterable<IntWritable> values,Context > > context)throws IOException,InterruptedException > > > > { > > > > int acc2=0; > > > > float frac_delay, percentage_delay; > > > > int acc1=0; > > > > for(IntWritable val : values) > > > > { > > > > if(val.get() > 0) > > > > { > > > > acc1++; > > > > } > > > > acc2++; > > > > } > > > > > > > > frac_delay = (float)acc1/acc2; > > > > percentage_delay = frac_delay * 100 ; > > > > pdelay.set(percentage_delay); > > > > context.write(key,pdelay); > > > > } > > > > } > > > > > > Please help. Thank you for your time. > > > > -- > > > > Regards, > > > > Haripriya Ayyalasomayajula > > contact : 650-796-7112 > -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112