Thank you guys! It was very helpful and now I understand it better.
On Fri, Oct 10, 2014 at 1:38 AM, Davies Liu <dav...@databricks.com> wrote: > Maybe this version is easier to use: > > plist.mapValues((v) => (if (v >0) 1 else 0, 1)).reduceByKey((x, y) => > (x._1 + y._1, x._2 + y._2)) > > It has similar behavior with combineByKey(), will by faster than > groupByKey() version. > > On Thu, Oct 9, 2014 at 9:28 PM, HARIPRIYA AYYALASOMAYAJULA > <aharipriy...@gmail.com> wrote: > > Sean, > > > > Thank you. It works. But I am still confused about the function. Can you > > kindly throw some light on it? > > I was going through the example mentioned in > > > https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html > > > > Is there any better source through which I can learn more about these > > functions? It would be helpful if I can get a chance to look at more > > examples. > > Also, I assume using combineByKey helps us solve it parallel than using > > simple functions provided by scala as mentioned by Yana. Am I correct? > > > > On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen <so...@cloudera.com> wrote: > >> > >> Oh duh, sorry. The initialization should of course be (v) => (if (v > > >> 0) 1 else 0, 1) > >> This gives the answer you are looking for. I don't see what Part2 is > >> supposed to do differently. > >> > >> On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA > >> <aharipriy...@gmail.com> wrote: > >> > Hello Sean, > >> > > >> > Thank you, but changing from v to 1 doesn't help me either. > >> > > >> > I am trying to count the number of non-zero values using the first > >> > accumulator. > >> > val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0), > >> > ("SFO",0), > >> > ("SFO",9)) > >> > > >> > val plist = sc.parallelize(newlist) > >> > > >> > val part1 = plist.combineByKey( > >> > (v) => (1, 1), > >> > (acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 > + > >> > 1), > >> > (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, > acc1._2 + > >> > acc2._2) > >> > ) > >> > > >> > val Part2 = part1.map{ case (key, value) => (key, > >> > (value._1,value._2)) } > >> > > >> > This should give me the result > >> > (LAX,(2,3)) > >> > (SFO,(1,3)) > >> > > >> > > >> > > >> > On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen <so...@cloudera.com> > wrote: > >> >> > >> >> You have a typo in your code at "var acc:", and the map from opPart1 > >> >> to opPart2 looks like a no-op, but those aren't the problem I think. > >> >> It sounds like you intend the first element of each pair to be a > count > >> >> of nonzero values, but you initialize the first element of the pair > to > >> >> v, not 1, in v => (v,1). Try v => (1,1) > >> >> > >> >> > >> >> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA > >> >> <aharipriy...@gmail.com> wrote: > >> >> > > >> >> > I am a beginner to Spark and finding it difficult to implement a > very > >> >> > simple > >> >> > reduce operation. I read that is ideal to use combineByKey for > >> >> > complex > >> >> > reduce operations. > >> >> > > >> >> > My input: > >> >> > > >> >> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), > >> >> > ("SFO",0), > >> >> > ("SFO",1), > >> >> > ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), > >> >> > ("KX",9), > >> >> > > >> >> > > >> >> > > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) > >> >> > > >> >> > > >> >> > val opPart1 = input.combineByKey( > >> >> > (v) => (v, 1), > >> >> > (var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, > >> >> > acc._2 + > >> >> > 1), > >> >> > (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, > >> >> > acc1._2 + > >> >> > acc2._2) > >> >> > ) > >> >> > > >> >> > val opPart2 = opPart1.map{ case (key, value) => (key, > >> >> > (value._1,value._2)) } > >> >> > > >> >> > opPart2.collectAsMap().map(println(_)) > >> >> > > >> >> > If the value is greater than 0, the first accumulator should be > >> >> > incremented > >> >> > by 1, else it remains the same. The second accumulator is a simple > >> >> > counter > >> >> > for each value. I am getting an incorrect output (garbage values > >> >> > )for > >> >> > the > >> >> > first accumulator. Please help. > >> >> > > >> >> > The equivalent reduce operation in Hadoop MapReduce is : > >> >> > > >> >> > public static class PercentageCalcReducer extends > >> >> > Reducer<Text,IntWritable,Text,FloatWritable> > >> >> > > >> >> > { > >> >> > > >> >> > private FloatWritable pdelay = new FloatWritable(); > >> >> > > >> >> > > >> >> > public void reduce(Text key, Iterable<IntWritable> values,Context > >> >> > context)throws IOException,InterruptedException > >> >> > > >> >> > { > >> >> > > >> >> > int acc2=0; > >> >> > > >> >> > float frac_delay, percentage_delay; > >> >> > > >> >> > int acc1=0; > >> >> > > >> >> > for(IntWritable val : values) > >> >> > > >> >> > { > >> >> > > >> >> > if(val.get() > 0) > >> >> > > >> >> > { > >> >> > > >> >> > acc1++; > >> >> > > >> >> > } > >> >> > > >> >> > acc2++; > >> >> > > >> >> > } > >> >> > > >> >> > > >> >> > > >> >> > frac_delay = (float)acc1/acc2; > >> >> > > >> >> > percentage_delay = frac_delay * 100 ; > >> >> > > >> >> > pdelay.set(percentage_delay); > >> >> > > >> >> > context.write(key,pdelay); > >> >> > > >> >> > } > >> >> > > >> >> > } > >> >> > > >> >> > > >> >> > Please help. Thank you for your time. > >> >> > > >> >> > -- > >> >> > > >> >> > Regards, > >> >> > > >> >> > Haripriya Ayyalasomayajula > >> >> > contact : 650-796-7112 > >> > > >> > > >> > > >> > > >> > -- > >> > Regards, > >> > Haripriya Ayyalasomayajula > >> > contact : 650-796-7112 > > > > > > > > > > -- > > Regards, > > Haripriya Ayyalasomayajula > > contact : 650-796-7112 > -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112