Thank you guys!

It was very helpful and now I understand it better.




On Fri, Oct 10, 2014 at 1:38 AM, Davies Liu <dav...@databricks.com> wrote:

> Maybe this version is easier to use:
>
> plist.mapValues((v) => (if (v >0) 1 else 0, 1)).reduceByKey((x, y) =>
> (x._1 + y._1, x._2 + y._2))
>
> It has similar behavior with combineByKey(), will by faster than
> groupByKey() version.
>
> On Thu, Oct 9, 2014 at 9:28 PM, HARIPRIYA AYYALASOMAYAJULA
> <aharipriy...@gmail.com> wrote:
> > Sean,
> >
> > Thank you. It works. But I am still confused about the function. Can you
> > kindly throw some light on it?
> > I was going through the example mentioned in
> >
> https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
> >
> > Is there any better source through which I can learn more about these
> > functions? It would be helpful if I can get a chance to look at more
> > examples.
> > Also, I assume using combineByKey helps us solve it parallel than using
> > simple functions provided by scala as mentioned by Yana. Am I correct?
> >
> > On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen <so...@cloudera.com> wrote:
> >>
> >> Oh duh, sorry. The initialization should of course be (v) => (if (v >
> >> 0) 1 else 0, 1)
> >> This gives the answer you are looking for. I don't see what Part2 is
> >> supposed to do differently.
> >>
> >> On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA
> >> <aharipriy...@gmail.com> wrote:
> >> > Hello Sean,
> >> >
> >> > Thank you, but changing from v to 1 doesn't help me either.
> >> >
> >> > I am trying to count the number of non-zero values using the first
> >> > accumulator.
> >> > val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0),
> >> > ("SFO",0),
> >> > ("SFO",9))
> >> >
> >> > val plist = sc.parallelize(newlist)
> >> >
> >> >  val part1 = plist.combineByKey(
> >> >    (v) => (1, 1),
> >> >    (acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2
> +
> >> > 1),
> >> >    (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1,
> acc1._2 +
> >> > acc2._2)
> >> >    )
> >> >
> >> >    val Part2 = part1.map{ case (key, value) => (key,
> >> > (value._1,value._2)) }
> >> >
> >> > This should give me the result
> >> > (LAX,(2,3))
> >> > (SFO,(1,3))
> >> >
> >> >
> >> >
> >> > On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen <so...@cloudera.com>
> wrote:
> >> >>
> >> >> You have a typo in your code at "var acc:", and the map from opPart1
> >> >> to opPart2 looks like a no-op, but those aren't the problem I think.
> >> >> It sounds like you intend the first element of each pair to be a
> count
> >> >> of nonzero values, but you initialize the first element of the pair
> to
> >> >> v, not 1, in v => (v,1). Try v => (1,1)
> >> >>
> >> >>
> >> >> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA
> >> >> <aharipriy...@gmail.com> wrote:
> >> >> >
> >> >> > I am a beginner to Spark and finding it difficult to implement a
> very
> >> >> > simple
> >> >> > reduce operation. I read that is ideal to use combineByKey for
> >> >> > complex
> >> >> > reduce operations.
> >> >> >
> >> >> > My input:
> >> >> >
> >> >> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7),
> >> >> > ("SFO",0),
> >> >> > ("SFO",1),
> >> >> > ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1),
> >> >> > ("KX",9),
> >> >> >
> >> >> >
> >> >> >
> ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0)))
> >> >> >
> >> >> >
> >> >> >  val opPart1 = input.combineByKey(
> >> >> >    (v) => (v, 1),
> >> >> >    (var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1,
> >> >> > acc._2 +
> >> >> > 1),
> >> >> >    (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1,
> >> >> > acc1._2 +
> >> >> > acc2._2)
> >> >> >    )
> >> >> >
> >> >> >    val opPart2 = opPart1.map{ case (key, value) => (key,
> >> >> > (value._1,value._2)) }
> >> >> >
> >> >> >  opPart2.collectAsMap().map(println(_))
> >> >> >
> >> >> > If the value is greater than 0, the first accumulator should be
> >> >> > incremented
> >> >> > by 1, else it remains the same. The second accumulator is a simple
> >> >> > counter
> >> >> > for each value.  I am getting an incorrect output (garbage values
> >> >> > )for
> >> >> > the
> >> >> > first accumulator. Please help.
> >> >> >
> >> >> >  The equivalent reduce operation in Hadoop MapReduce is :
> >> >> >
> >> >> > public static class PercentageCalcReducer extends
> >> >> > Reducer<Text,IntWritable,Text,FloatWritable>
> >> >> >
> >> >> > {
> >> >> >
> >> >> > private FloatWritable pdelay = new FloatWritable();
> >> >> >
> >> >> >
> >> >> > public void reduce(Text key, Iterable<IntWritable> values,Context
> >> >> > context)throws IOException,InterruptedException
> >> >> >
> >> >> >             {
> >> >> >
> >> >> > int acc2=0;
> >> >> >
> >> >> > float frac_delay, percentage_delay;
> >> >> >
> >> >> > int acc1=0;
> >> >> >
> >> >> > for(IntWritable val : values)
> >> >> >
> >> >> > {
> >> >> >
> >> >> > if(val.get() > 0)
> >> >> >
> >> >> > {
> >> >> >
> >> >> > acc1++;
> >> >> >
> >> >> > }
> >> >> >
> >> >> > acc2++;
> >> >> >
> >> >> > }
> >> >> >
> >> >> >
> >> >> >
> >> >> > frac_delay = (float)acc1/acc2;
> >> >> >
> >> >> > percentage_delay = frac_delay * 100 ;
> >> >> >
> >> >> > pdelay.set(percentage_delay);
> >> >> >
> >> >> > context.write(key,pdelay);
> >> >> >
> >> >> >             }
> >> >> >
> >> >> > }
> >> >> >
> >> >> >
> >> >> > Please help. Thank you for your time.
> >> >> >
> >> >> > --
> >> >> >
> >> >> > Regards,
> >> >> >
> >> >> > Haripriya Ayyalasomayajula
> >> >> > contact : 650-796-7112
> >> >
> >> >
> >> >
> >> >
> >> > --
> >> > Regards,
> >> > Haripriya Ayyalasomayajula
> >> > contact : 650-796-7112
> >
> >
> >
> >
> > --
> > Regards,
> > Haripriya Ayyalasomayajula
> > contact : 650-796-7112
>



-- 
Regards,
Haripriya Ayyalasomayajula
contact : 650-796-7112

Reply via email to