It's the exact same reason you wrote:

(acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + 1),

right? the first function establishes an initial value for a count.
The value is either (0,1) or (1,1) depending on whether the value is 0
or not.

You're otherwise using the method just fine. You can write this
function a lot of ways; this is a bit verbose but probably efficient.

Yana's version is distributed. It's just that it uses simple Scala
functions within map(). This also works although the groupByKey() can
be a problem as it requires putting all values for a key in memory,
whereas your combineByKey does not.

On Fri, Oct 10, 2014 at 5:28 AM, HARIPRIYA AYYALASOMAYAJULA
<aharipriy...@gmail.com> wrote:
> Sean,
>
> Thank you. It works. But I am still confused about the function. Can you
> kindly throw some light on it?
> I was going through the example mentioned in
> https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html
>
> Is there any better source through which I can learn more about these
> functions? It would be helpful if I can get a chance to look at more
> examples.
> Also, I assume using combineByKey helps us solve it parallel than using
> simple functions provided by scala as mentioned by Yana. Am I correct?
>
> On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen <so...@cloudera.com> wrote:
>>
>> Oh duh, sorry. The initialization should of course be (v) => (if (v >
>> 0) 1 else 0, 1)
>> This gives the answer you are looking for. I don't see what Part2 is
>> supposed to do differently.
>>
>> On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA
>> <aharipriy...@gmail.com> wrote:
>> > Hello Sean,
>> >
>> > Thank you, but changing from v to 1 doesn't help me either.
>> >
>> > I am trying to count the number of non-zero values using the first
>> > accumulator.
>> > val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0),
>> > ("SFO",0),
>> > ("SFO",9))
>> >
>> > val plist = sc.parallelize(newlist)
>> >
>> >  val part1 = plist.combineByKey(
>> >    (v) => (1, 1),
>> >    (acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 +
>> > 1),
>> >    (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
>> > acc2._2)
>> >    )
>> >
>> >    val Part2 = part1.map{ case (key, value) => (key,
>> > (value._1,value._2)) }
>> >
>> > This should give me the result
>> > (LAX,(2,3))
>> > (SFO,(1,3))
>> >
>> >
>> >
>> > On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen <so...@cloudera.com> wrote:
>> >>
>> >> You have a typo in your code at "var acc:", and the map from opPart1
>> >> to opPart2 looks like a no-op, but those aren't the problem I think.
>> >> It sounds like you intend the first element of each pair to be a count
>> >> of nonzero values, but you initialize the first element of the pair to
>> >> v, not 1, in v => (v,1). Try v => (1,1)
>> >>
>> >>
>> >> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA
>> >> <aharipriy...@gmail.com> wrote:
>> >> >
>> >> > I am a beginner to Spark and finding it difficult to implement a very
>> >> > simple
>> >> > reduce operation. I read that is ideal to use combineByKey for
>> >> > complex
>> >> > reduce operations.
>> >> >
>> >> > My input:
>> >> >
>> >> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7),
>> >> > ("SFO",0),
>> >> > ("SFO",1),
>> >> > ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1),
>> >> > ("KX",9),
>> >> >
>> >> >
>> >> > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0)))
>> >> >
>> >> >
>> >> >  val opPart1 = input.combineByKey(
>> >> >    (v) => (v, 1),
>> >> >    (var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1,
>> >> > acc._2 +
>> >> > 1),
>> >> >    (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1,
>> >> > acc1._2 +
>> >> > acc2._2)
>> >> >    )
>> >> >
>> >> >    val opPart2 = opPart1.map{ case (key, value) => (key,
>> >> > (value._1,value._2)) }
>> >> >
>> >> >  opPart2.collectAsMap().map(println(_))
>> >> >
>> >> > If the value is greater than 0, the first accumulator should be
>> >> > incremented
>> >> > by 1, else it remains the same. The second accumulator is a simple
>> >> > counter
>> >> > for each value.  I am getting an incorrect output (garbage values
>> >> > )for
>> >> > the
>> >> > first accumulator. Please help.
>> >> >
>> >> >  The equivalent reduce operation in Hadoop MapReduce is :
>> >> >
>> >> > public static class PercentageCalcReducer extends
>> >> > Reducer<Text,IntWritable,Text,FloatWritable>
>> >> >
>> >> > {
>> >> >
>> >> > private FloatWritable pdelay = new FloatWritable();
>> >> >
>> >> >
>> >> > public void reduce(Text key, Iterable<IntWritable> values,Context
>> >> > context)throws IOException,InterruptedException
>> >> >
>> >> >             {
>> >> >
>> >> > int acc2=0;
>> >> >
>> >> > float frac_delay, percentage_delay;
>> >> >
>> >> > int acc1=0;
>> >> >
>> >> > for(IntWritable val : values)
>> >> >
>> >> > {
>> >> >
>> >> > if(val.get() > 0)
>> >> >
>> >> > {
>> >> >
>> >> > acc1++;
>> >> >
>> >> > }
>> >> >
>> >> > acc2++;
>> >> >
>> >> > }
>> >> >
>> >> >
>> >> >
>> >> > frac_delay = (float)acc1/acc2;
>> >> >
>> >> > percentage_delay = frac_delay * 100 ;
>> >> >
>> >> > pdelay.set(percentage_delay);
>> >> >
>> >> > context.write(key,pdelay);
>> >> >
>> >> >             }
>> >> >
>> >> > }
>> >> >
>> >> >
>> >> > Please help. Thank you for your time.
>> >> >
>> >> > --
>> >> >
>> >> > Regards,
>> >> >
>> >> > Haripriya Ayyalasomayajula
>> >> > contact : 650-796-7112
>> >
>> >
>> >
>> >
>> > --
>> > Regards,
>> > Haripriya Ayyalasomayajula
>> > contact : 650-796-7112
>
>
>
>
> --
> Regards,
> Haripriya Ayyalasomayajula
> contact : 650-796-7112

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to