Hello Sean,

Thank you, but changing from v to 1 doesn't help me either.

I am trying to count the number of non-zero values using the first
accumulator.
val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0), ("SFO",0),
("SFO",9))

val plist = sc.parallelize(newlist)

 val part1 = plist.combineByKey(
   (v) => (1, 1),
   (acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + 1),
   (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
acc2._2)
   )

   val Part2 = part1.map{ case (key, value) => (key, (value._1,value._2)) }

This should give me the result
(LAX,(2,3))
(SFO,(1,3))



On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen <so...@cloudera.com> wrote:

> You have a typo in your code at "var acc:", and the map from opPart1
> to opPart2 looks like a no-op, but those aren't the problem I think.
> It sounds like you intend the first element of each pair to be a count
> of nonzero values, but you initialize the first element of the pair to
> v, not 1, in v => (v,1). Try v => (1,1)
>
>
> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA
> <aharipriy...@gmail.com> wrote:
> >
> > I am a beginner to Spark and finding it difficult to implement a very
> simple
> > reduce operation. I read that is ideal to use combineByKey for complex
> > reduce operations.
> >
> > My input:
> >
> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7),
> ("SFO",0),
> > ("SFO",1), ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1),
> > ("KX",9),
> >
> ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0)))
> >
> >
> >  val opPart1 = input.combineByKey(
> >    (v) => (v, 1),
> >    (var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1,
> acc._2 +
> > 1),
> >    (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
> > acc2._2)
> >    )
> >
> >    val opPart2 = opPart1.map{ case (key, value) => (key,
> > (value._1,value._2)) }
> >
> >  opPart2.collectAsMap().map(println(_))
> >
> > If the value is greater than 0, the first accumulator should be
> incremented
> > by 1, else it remains the same. The second accumulator is a simple
> counter
> > for each value.  I am getting an incorrect output (garbage values )for
> the
> > first accumulator. Please help.
> >
> >  The equivalent reduce operation in Hadoop MapReduce is :
> >
> > public static class PercentageCalcReducer extends
> > Reducer<Text,IntWritable,Text,FloatWritable>
> >
> > {
> >
> > private FloatWritable pdelay = new FloatWritable();
> >
> >
> > public void reduce(Text key, Iterable<IntWritable> values,Context
> > context)throws IOException,InterruptedException
> >
> >             {
> >
> > int acc2=0;
> >
> > float frac_delay, percentage_delay;
> >
> > int acc1=0;
> >
> > for(IntWritable val : values)
> >
> > {
> >
> > if(val.get() > 0)
> >
> > {
> >
> > acc1++;
> >
> > }
> >
> > acc2++;
> >
> > }
> >
> >
> >
> > frac_delay = (float)acc1/acc2;
> >
> > percentage_delay = frac_delay * 100 ;
> >
> > pdelay.set(percentage_delay);
> >
> > context.write(key,pdelay);
> >
> >             }
> >
> > }
> >
> >
> > Please help. Thank you for your time.
> >
> > --
> >
> > Regards,
> >
> > Haripriya Ayyalasomayajula
> > contact : 650-796-7112
>



-- 
Regards,
Haripriya Ayyalasomayajula
contact : 650-796-7112

Reply via email to