Re: Help with using combineByKey
Thank you guys! It was very helpful and now I understand it better. On Fri, Oct 10, 2014 at 1:38 AM, Davies Liu wrote: > Maybe this version is easier to use: > > plist.mapValues((v) => (if (v >0) 1 else 0, 1)).reduceByKey((x, y) => > (x._1 + y._1, x._2 + y._2)) > > It has similar behavior with combineByKey(), will by faster than > groupByKey() version. > > On Thu, Oct 9, 2014 at 9:28 PM, HARIPRIYA AYYALASOMAYAJULA > wrote: > > Sean, > > > > Thank you. It works. But I am still confused about the function. Can you > > kindly throw some light on it? > > I was going through the example mentioned in > > > https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html > > > > Is there any better source through which I can learn more about these > > functions? It would be helpful if I can get a chance to look at more > > examples. > > Also, I assume using combineByKey helps us solve it parallel than using > > simple functions provided by scala as mentioned by Yana. Am I correct? > > > > On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen wrote: > >> > >> Oh duh, sorry. The initialization should of course be (v) => (if (v > > >> 0) 1 else 0, 1) > >> This gives the answer you are looking for. I don't see what Part2 is > >> supposed to do differently. > >> > >> On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA > >> wrote: > >> > Hello Sean, > >> > > >> > Thank you, but changing from v to 1 doesn't help me either. > >> > > >> > I am trying to count the number of non-zero values using the first > >> > accumulator. > >> > val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0), > >> > ("SFO",0), > >> > ("SFO",9)) > >> > > >> > val plist = sc.parallelize(newlist) > >> > > >> > val part1 = plist.combineByKey( > >> >(v) => (1, 1), > >> >(acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 > + > >> > 1), > >> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, > acc1._2 + > >> > acc2._2) > >> >) > >> > > >> >val Part2 = part1.map{ case (key, value) => (key, > >> > (value._1,value._2)) } > >> > > >> > This should give me the result > >> > (LAX,(2,3)) > >> > (SFO,(1,3)) > >> > > >> > > >> > > >> > On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen > wrote: > >> >> > >> >> You have a typo in your code at "var acc:", and the map from opPart1 > >> >> to opPart2 looks like a no-op, but those aren't the problem I think. > >> >> It sounds like you intend the first element of each pair to be a > count > >> >> of nonzero values, but you initialize the first element of the pair > to > >> >> v, not 1, in v => (v,1). Try v => (1,1) > >> >> > >> >> > >> >> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA > >> >> wrote: > >> >> > > >> >> > I am a beginner to Spark and finding it difficult to implement a > very > >> >> > simple > >> >> > reduce operation. I read that is ideal to use combineByKey for > >> >> > complex > >> >> > reduce operations. > >> >> > > >> >> > My input: > >> >> > > >> >> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), > >> >> > ("SFO",0), > >> >> > ("SFO",1), > >> >> > ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), > >> >> > ("KX",9), > >> >> > > >> >> > > >> >> > > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) > >> >> > > >> >> > > >> >> > val opPart1 = input.combineByKey( > >> >> >(v) => (v, 1), > >> >> >(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, > >> >> > acc._2 + > >> >> > 1), > >> >> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, > >> >> > acc1._2 + > >> >> > acc2._2) > >> >> >) > >> >> > > >> >> >val opPart2 = opPart1.map{ case (key, value) => (key, > >> >> > (value._1,value._2)) } > >> >> > > >> >> > opPart2.collectAsMap().map(println(_)) > >> >> > > >> >> > If the value is greater than 0, the first accumulator should be > >> >> > incremented > >> >> > by 1, else it remains the same. The second accumulator is a simple > >> >> > counter > >> >> > for each value. I am getting an incorrect output (garbage values > >> >> > )for > >> >> > the > >> >> > first accumulator. Please help. > >> >> > > >> >> > The equivalent reduce operation in Hadoop MapReduce is : > >> >> > > >> >> > public static class PercentageCalcReducer extends > >> >> > Reducer > >> >> > > >> >> > { > >> >> > > >> >> > private FloatWritable pdelay = new FloatWritable(); > >> >> > > >> >> > > >> >> > public void reduce(Text key, Iterable values,Context > >> >> > context)throws IOException,InterruptedException > >> >> > > >> >> > { > >> >> > > >> >> > int acc2=0; > >> >> > > >> >> > float frac_delay, percentage_delay; > >> >> > > >> >> > int acc1=0; > >> >> > > >> >> > for(IntWritable val : values) > >> >> > > >> >> > { > >> >> > > >> >> > if(val.get() > 0) > >> >> > > >> >> > { > >> >> > > >> >> > acc1++; > >> >> > > >> >> > } > >> >> > > >> >> > acc2++; > >> >> > > >> >> > } > >> >> > > >> >> > > >> >> > > >> >> >
Re: Help with using combineByKey
Maybe this version is easier to use: plist.mapValues((v) => (if (v >0) 1 else 0, 1)).reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2)) It has similar behavior with combineByKey(), will by faster than groupByKey() version. On Thu, Oct 9, 2014 at 9:28 PM, HARIPRIYA AYYALASOMAYAJULA wrote: > Sean, > > Thank you. It works. But I am still confused about the function. Can you > kindly throw some light on it? > I was going through the example mentioned in > https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html > > Is there any better source through which I can learn more about these > functions? It would be helpful if I can get a chance to look at more > examples. > Also, I assume using combineByKey helps us solve it parallel than using > simple functions provided by scala as mentioned by Yana. Am I correct? > > On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen wrote: >> >> Oh duh, sorry. The initialization should of course be (v) => (if (v > >> 0) 1 else 0, 1) >> This gives the answer you are looking for. I don't see what Part2 is >> supposed to do differently. >> >> On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA >> wrote: >> > Hello Sean, >> > >> > Thank you, but changing from v to 1 doesn't help me either. >> > >> > I am trying to count the number of non-zero values using the first >> > accumulator. >> > val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0), >> > ("SFO",0), >> > ("SFO",9)) >> > >> > val plist = sc.parallelize(newlist) >> > >> > val part1 = plist.combineByKey( >> >(v) => (1, 1), >> >(acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + >> > 1), >> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + >> > acc2._2) >> >) >> > >> >val Part2 = part1.map{ case (key, value) => (key, >> > (value._1,value._2)) } >> > >> > This should give me the result >> > (LAX,(2,3)) >> > (SFO,(1,3)) >> > >> > >> > >> > On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen wrote: >> >> >> >> You have a typo in your code at "var acc:", and the map from opPart1 >> >> to opPart2 looks like a no-op, but those aren't the problem I think. >> >> It sounds like you intend the first element of each pair to be a count >> >> of nonzero values, but you initialize the first element of the pair to >> >> v, not 1, in v => (v,1). Try v => (1,1) >> >> >> >> >> >> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA >> >> wrote: >> >> > >> >> > I am a beginner to Spark and finding it difficult to implement a very >> >> > simple >> >> > reduce operation. I read that is ideal to use combineByKey for >> >> > complex >> >> > reduce operations. >> >> > >> >> > My input: >> >> > >> >> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), >> >> > ("SFO",0), >> >> > ("SFO",1), >> >> > ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), >> >> > ("KX",9), >> >> > >> >> > >> >> > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) >> >> > >> >> > >> >> > val opPart1 = input.combineByKey( >> >> >(v) => (v, 1), >> >> >(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, >> >> > acc._2 + >> >> > 1), >> >> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, >> >> > acc1._2 + >> >> > acc2._2) >> >> >) >> >> > >> >> >val opPart2 = opPart1.map{ case (key, value) => (key, >> >> > (value._1,value._2)) } >> >> > >> >> > opPart2.collectAsMap().map(println(_)) >> >> > >> >> > If the value is greater than 0, the first accumulator should be >> >> > incremented >> >> > by 1, else it remains the same. The second accumulator is a simple >> >> > counter >> >> > for each value. I am getting an incorrect output (garbage values >> >> > )for >> >> > the >> >> > first accumulator. Please help. >> >> > >> >> > The equivalent reduce operation in Hadoop MapReduce is : >> >> > >> >> > public static class PercentageCalcReducer extends >> >> > Reducer >> >> > >> >> > { >> >> > >> >> > private FloatWritable pdelay = new FloatWritable(); >> >> > >> >> > >> >> > public void reduce(Text key, Iterable values,Context >> >> > context)throws IOException,InterruptedException >> >> > >> >> > { >> >> > >> >> > int acc2=0; >> >> > >> >> > float frac_delay, percentage_delay; >> >> > >> >> > int acc1=0; >> >> > >> >> > for(IntWritable val : values) >> >> > >> >> > { >> >> > >> >> > if(val.get() > 0) >> >> > >> >> > { >> >> > >> >> > acc1++; >> >> > >> >> > } >> >> > >> >> > acc2++; >> >> > >> >> > } >> >> > >> >> > >> >> > >> >> > frac_delay = (float)acc1/acc2; >> >> > >> >> > percentage_delay = frac_delay * 100 ; >> >> > >> >> > pdelay.set(percentage_delay); >> >> > >> >> > context.write(key,pdelay); >> >> > >> >> > } >> >> > >> >> > } >> >> > >> >> > >> >> > Please help. Thank you for your time. >> >> > >> >> > -- >> >> > >> >> > Regards, >> >> > >> >> > Haripriya Ayyalasomayajula >> >> > contact : 650-796-7112 >> > >> > >> > >> > >> > -- >> > Regards, >> >
Re: Help with using combineByKey
It's the exact same reason you wrote: (acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + 1), right? the first function establishes an initial value for a count. The value is either (0,1) or (1,1) depending on whether the value is 0 or not. You're otherwise using the method just fine. You can write this function a lot of ways; this is a bit verbose but probably efficient. Yana's version is distributed. It's just that it uses simple Scala functions within map(). This also works although the groupByKey() can be a problem as it requires putting all values for a key in memory, whereas your combineByKey does not. On Fri, Oct 10, 2014 at 5:28 AM, HARIPRIYA AYYALASOMAYAJULA wrote: > Sean, > > Thank you. It works. But I am still confused about the function. Can you > kindly throw some light on it? > I was going through the example mentioned in > https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html > > Is there any better source through which I can learn more about these > functions? It would be helpful if I can get a chance to look at more > examples. > Also, I assume using combineByKey helps us solve it parallel than using > simple functions provided by scala as mentioned by Yana. Am I correct? > > On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen wrote: >> >> Oh duh, sorry. The initialization should of course be (v) => (if (v > >> 0) 1 else 0, 1) >> This gives the answer you are looking for. I don't see what Part2 is >> supposed to do differently. >> >> On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA >> wrote: >> > Hello Sean, >> > >> > Thank you, but changing from v to 1 doesn't help me either. >> > >> > I am trying to count the number of non-zero values using the first >> > accumulator. >> > val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0), >> > ("SFO",0), >> > ("SFO",9)) >> > >> > val plist = sc.parallelize(newlist) >> > >> > val part1 = plist.combineByKey( >> >(v) => (1, 1), >> >(acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + >> > 1), >> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + >> > acc2._2) >> >) >> > >> >val Part2 = part1.map{ case (key, value) => (key, >> > (value._1,value._2)) } >> > >> > This should give me the result >> > (LAX,(2,3)) >> > (SFO,(1,3)) >> > >> > >> > >> > On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen wrote: >> >> >> >> You have a typo in your code at "var acc:", and the map from opPart1 >> >> to opPart2 looks like a no-op, but those aren't the problem I think. >> >> It sounds like you intend the first element of each pair to be a count >> >> of nonzero values, but you initialize the first element of the pair to >> >> v, not 1, in v => (v,1). Try v => (1,1) >> >> >> >> >> >> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA >> >> wrote: >> >> > >> >> > I am a beginner to Spark and finding it difficult to implement a very >> >> > simple >> >> > reduce operation. I read that is ideal to use combineByKey for >> >> > complex >> >> > reduce operations. >> >> > >> >> > My input: >> >> > >> >> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), >> >> > ("SFO",0), >> >> > ("SFO",1), >> >> > ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), >> >> > ("KX",9), >> >> > >> >> > >> >> > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) >> >> > >> >> > >> >> > val opPart1 = input.combineByKey( >> >> >(v) => (v, 1), >> >> >(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, >> >> > acc._2 + >> >> > 1), >> >> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, >> >> > acc1._2 + >> >> > acc2._2) >> >> >) >> >> > >> >> >val opPart2 = opPart1.map{ case (key, value) => (key, >> >> > (value._1,value._2)) } >> >> > >> >> > opPart2.collectAsMap().map(println(_)) >> >> > >> >> > If the value is greater than 0, the first accumulator should be >> >> > incremented >> >> > by 1, else it remains the same. The second accumulator is a simple >> >> > counter >> >> > for each value. I am getting an incorrect output (garbage values >> >> > )for >> >> > the >> >> > first accumulator. Please help. >> >> > >> >> > The equivalent reduce operation in Hadoop MapReduce is : >> >> > >> >> > public static class PercentageCalcReducer extends >> >> > Reducer >> >> > >> >> > { >> >> > >> >> > private FloatWritable pdelay = new FloatWritable(); >> >> > >> >> > >> >> > public void reduce(Text key, Iterable values,Context >> >> > context)throws IOException,InterruptedException >> >> > >> >> > { >> >> > >> >> > int acc2=0; >> >> > >> >> > float frac_delay, percentage_delay; >> >> > >> >> > int acc1=0; >> >> > >> >> > for(IntWritable val : values) >> >> > >> >> > { >> >> > >> >> > if(val.get() > 0) >> >> > >> >> > { >> >> > >> >> > acc1++; >> >> > >> >> > } >> >> > >> >> > acc2++; >> >> > >> >> > } >> >> > >> >> > >> >> > >> >> > frac_delay = (float)acc1/acc2; >>
Re: Help with using combineByKey
Sean, Thank you. It works. But I am still confused about the function. Can you kindly throw some light on it? I was going through the example mentioned in https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/ch04.html Is there any better source through which I can learn more about these functions? It would be helpful if I can get a chance to look at more examples. Also, I assume using combineByKey helps us solve it parallel than using simple functions provided by scala as mentioned by Yana. Am I correct? On Thu, Oct 9, 2014 at 12:30 PM, Sean Owen wrote: > Oh duh, sorry. The initialization should of course be (v) => (if (v > > 0) 1 else 0, 1) > This gives the answer you are looking for. I don't see what Part2 is > supposed to do differently. > > On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA > wrote: > > Hello Sean, > > > > Thank you, but changing from v to 1 doesn't help me either. > > > > I am trying to count the number of non-zero values using the first > > accumulator. > > val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0), > ("SFO",0), > > ("SFO",9)) > > > > val plist = sc.parallelize(newlist) > > > > val part1 = plist.combineByKey( > >(v) => (1, 1), > >(acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + > 1), > >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + > > acc2._2) > >) > > > >val Part2 = part1.map{ case (key, value) => (key, > (value._1,value._2)) } > > > > This should give me the result > > (LAX,(2,3)) > > (SFO,(1,3)) > > > > > > > > On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen wrote: > >> > >> You have a typo in your code at "var acc:", and the map from opPart1 > >> to opPart2 looks like a no-op, but those aren't the problem I think. > >> It sounds like you intend the first element of each pair to be a count > >> of nonzero values, but you initialize the first element of the pair to > >> v, not 1, in v => (v,1). Try v => (1,1) > >> > >> > >> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA > >> wrote: > >> > > >> > I am a beginner to Spark and finding it difficult to implement a very > >> > simple > >> > reduce operation. I read that is ideal to use combineByKey for complex > >> > reduce operations. > >> > > >> > My input: > >> > > >> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), > >> > ("SFO",0), > >> > ("SFO",1), ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), > >> > ("KX",9), > >> > > >> > > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) > >> > > >> > > >> > val opPart1 = input.combineByKey( > >> >(v) => (v, 1), > >> >(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, > >> > acc._2 + > >> > 1), > >> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, > acc1._2 + > >> > acc2._2) > >> >) > >> > > >> >val opPart2 = opPart1.map{ case (key, value) => (key, > >> > (value._1,value._2)) } > >> > > >> > opPart2.collectAsMap().map(println(_)) > >> > > >> > If the value is greater than 0, the first accumulator should be > >> > incremented > >> > by 1, else it remains the same. The second accumulator is a simple > >> > counter > >> > for each value. I am getting an incorrect output (garbage values )for > >> > the > >> > first accumulator. Please help. > >> > > >> > The equivalent reduce operation in Hadoop MapReduce is : > >> > > >> > public static class PercentageCalcReducer extends > >> > Reducer > >> > > >> > { > >> > > >> > private FloatWritable pdelay = new FloatWritable(); > >> > > >> > > >> > public void reduce(Text key, Iterable values,Context > >> > context)throws IOException,InterruptedException > >> > > >> > { > >> > > >> > int acc2=0; > >> > > >> > float frac_delay, percentage_delay; > >> > > >> > int acc1=0; > >> > > >> > for(IntWritable val : values) > >> > > >> > { > >> > > >> > if(val.get() > 0) > >> > > >> > { > >> > > >> > acc1++; > >> > > >> > } > >> > > >> > acc2++; > >> > > >> > } > >> > > >> > > >> > > >> > frac_delay = (float)acc1/acc2; > >> > > >> > percentage_delay = frac_delay * 100 ; > >> > > >> > pdelay.set(percentage_delay); > >> > > >> > context.write(key,pdelay); > >> > > >> > } > >> > > >> > } > >> > > >> > > >> > Please help. Thank you for your time. > >> > > >> > -- > >> > > >> > Regards, > >> > > >> > Haripriya Ayyalasomayajula > >> > contact : 650-796-7112 > > > > > > > > > > -- > > Regards, > > Haripriya Ayyalasomayajula > > contact : 650-796-7112 > -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112
Re: Help with using combineByKey
Oh duh, sorry. The initialization should of course be (v) => (if (v > 0) 1 else 0, 1) This gives the answer you are looking for. I don't see what Part2 is supposed to do differently. On Thu, Oct 9, 2014 at 6:14 PM, HARIPRIYA AYYALASOMAYAJULA wrote: > Hello Sean, > > Thank you, but changing from v to 1 doesn't help me either. > > I am trying to count the number of non-zero values using the first > accumulator. > val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0), ("SFO",0), > ("SFO",9)) > > val plist = sc.parallelize(newlist) > > val part1 = plist.combineByKey( >(v) => (1, 1), >(acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + 1), >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + > acc2._2) >) > >val Part2 = part1.map{ case (key, value) => (key, (value._1,value._2)) } > > This should give me the result > (LAX,(2,3)) > (SFO,(1,3)) > > > > On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen wrote: >> >> You have a typo in your code at "var acc:", and the map from opPart1 >> to opPart2 looks like a no-op, but those aren't the problem I think. >> It sounds like you intend the first element of each pair to be a count >> of nonzero values, but you initialize the first element of the pair to >> v, not 1, in v => (v,1). Try v => (1,1) >> >> >> On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA >> wrote: >> > >> > I am a beginner to Spark and finding it difficult to implement a very >> > simple >> > reduce operation. I read that is ideal to use combineByKey for complex >> > reduce operations. >> > >> > My input: >> > >> > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), >> > ("SFO",0), >> > ("SFO",1), ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), >> > ("KX",9), >> > >> > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) >> > >> > >> > val opPart1 = input.combineByKey( >> >(v) => (v, 1), >> >(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, >> > acc._2 + >> > 1), >> >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + >> > acc2._2) >> >) >> > >> >val opPart2 = opPart1.map{ case (key, value) => (key, >> > (value._1,value._2)) } >> > >> > opPart2.collectAsMap().map(println(_)) >> > >> > If the value is greater than 0, the first accumulator should be >> > incremented >> > by 1, else it remains the same. The second accumulator is a simple >> > counter >> > for each value. I am getting an incorrect output (garbage values )for >> > the >> > first accumulator. Please help. >> > >> > The equivalent reduce operation in Hadoop MapReduce is : >> > >> > public static class PercentageCalcReducer extends >> > Reducer >> > >> > { >> > >> > private FloatWritable pdelay = new FloatWritable(); >> > >> > >> > public void reduce(Text key, Iterable values,Context >> > context)throws IOException,InterruptedException >> > >> > { >> > >> > int acc2=0; >> > >> > float frac_delay, percentage_delay; >> > >> > int acc1=0; >> > >> > for(IntWritable val : values) >> > >> > { >> > >> > if(val.get() > 0) >> > >> > { >> > >> > acc1++; >> > >> > } >> > >> > acc2++; >> > >> > } >> > >> > >> > >> > frac_delay = (float)acc1/acc2; >> > >> > percentage_delay = frac_delay * 100 ; >> > >> > pdelay.set(percentage_delay); >> > >> > context.write(key,pdelay); >> > >> > } >> > >> > } >> > >> > >> > Please help. Thank you for your time. >> > >> > -- >> > >> > Regards, >> > >> > Haripriya Ayyalasomayajula >> > contact : 650-796-7112 > > > > > -- > Regards, > Haripriya Ayyalasomayajula > contact : 650-796-7112 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Help with using combineByKey
Hello Yana, Thank you. Yes, it works. However, can you please suggest any examples ( or links) about the usage of combineByKey. On Thu, Oct 9, 2014 at 12:03 PM, Yana Kadiyska wrote: > If you just want the ratio of positive to all values per key (if I'm > reading right) this works > > val reduced= input.groupByKey().map(grp=> > grp._2.filter(v=>v>0).size.toFloat/grp._2.size) > reduced.foreach(println) > > I don't think you need reduceByKey or combineByKey as you're not doing > anything where the values depend on each other -- you're just counting... > > On Thu, Oct 9, 2014 at 11:47 AM, HARIPRIYA AYYALASOMAYAJULA < > aharipriy...@gmail.com> wrote: > >> >> I am a beginner to Spark and finding it difficult to implement a very >> simple reduce operation. I read that is ideal to use combineByKey for >> complex reduce operations. >> >> My input: >> >> val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), >> ("SFO",0), ("SFO",1), >> ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), ("KX",9), >> ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) >> >> >> val opPart1 = input.combineByKey( >>(v) => (v, 1), >>(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 >> + 1), >>(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + >> acc2._2) >>) >> >>val opPart2 = opPart1.map{ case (key, value) => (key, >> (value._1,value._2)) } >> >> opPart2.collectAsMap().map(println(_)) >> >> If the value is greater than 0, the first accumulator should be >> incremented by 1, else it remains the same. The second accumulator is a >> simple counter for each value. I am getting an incorrect output (garbage >> values )for the first accumulator. Please help. >> >> The equivalent reduce operation in Hadoop MapReduce is : >> >> >> public static class PercentageCalcReducer extends >> Reducer >> >> { >> >> private FloatWritable pdelay = new FloatWritable(); >> >> >> public void reduce(Text key, Iterable values,Context >> context)throws IOException,InterruptedException >> >> { >> >> int acc2=0; >> >> float frac_delay, percentage_delay; >> >> int acc1=0; >> >> for(IntWritable val : values) >> >> { >> >> if(val.get() > 0) >> >> { >> >> acc1++; >> >> } >> >> acc2++; >> >> } >> >> >> >> frac_delay = (float)acc1/acc2; >> >> percentage_delay = frac_delay * 100 ; >> >> pdelay.set(percentage_delay); >> >> context.write(key,pdelay); >> >> } >> >> } >> >> >> Please help. Thank you for your time. >> >> -- >> >> Regards, >> Haripriya Ayyalasomayajula >> contact : 650-796-7112 >> > > -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112
Re: Help with using combineByKey
Hello Sean, Thank you, but changing from v to 1 doesn't help me either. I am trying to count the number of non-zero values using the first accumulator. val newlist = List (("LAX",6), ("LAX",0), ("LAX",7), ("SFO",0), ("SFO",0), ("SFO",9)) val plist = sc.parallelize(newlist) val part1 = plist.combineByKey( (v) => (1, 1), (acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + 1), (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2) ) val Part2 = part1.map{ case (key, value) => (key, (value._1,value._2)) } This should give me the result (LAX,(2,3)) (SFO,(1,3)) On Thu, Oct 9, 2014 at 11:48 AM, Sean Owen wrote: > You have a typo in your code at "var acc:", and the map from opPart1 > to opPart2 looks like a no-op, but those aren't the problem I think. > It sounds like you intend the first element of each pair to be a count > of nonzero values, but you initialize the first element of the pair to > v, not 1, in v => (v,1). Try v => (1,1) > > > On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA > wrote: > > > > I am a beginner to Spark and finding it difficult to implement a very > simple > > reduce operation. I read that is ideal to use combineByKey for complex > > reduce operations. > > > > My input: > > > > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), > ("SFO",0), > > ("SFO",1), ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), > > ("KX",9), > > > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) > > > > > > val opPart1 = input.combineByKey( > >(v) => (v, 1), > >(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, > acc._2 + > > 1), > >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + > > acc2._2) > >) > > > >val opPart2 = opPart1.map{ case (key, value) => (key, > > (value._1,value._2)) } > > > > opPart2.collectAsMap().map(println(_)) > > > > If the value is greater than 0, the first accumulator should be > incremented > > by 1, else it remains the same. The second accumulator is a simple > counter > > for each value. I am getting an incorrect output (garbage values )for > the > > first accumulator. Please help. > > > > The equivalent reduce operation in Hadoop MapReduce is : > > > > public static class PercentageCalcReducer extends > > Reducer > > > > { > > > > private FloatWritable pdelay = new FloatWritable(); > > > > > > public void reduce(Text key, Iterable values,Context > > context)throws IOException,InterruptedException > > > > { > > > > int acc2=0; > > > > float frac_delay, percentage_delay; > > > > int acc1=0; > > > > for(IntWritable val : values) > > > > { > > > > if(val.get() > 0) > > > > { > > > > acc1++; > > > > } > > > > acc2++; > > > > } > > > > > > > > frac_delay = (float)acc1/acc2; > > > > percentage_delay = frac_delay * 100 ; > > > > pdelay.set(percentage_delay); > > > > context.write(key,pdelay); > > > > } > > > > } > > > > > > Please help. Thank you for your time. > > > > -- > > > > Regards, > > > > Haripriya Ayyalasomayajula > > contact : 650-796-7112 > -- Regards, Haripriya Ayyalasomayajula contact : 650-796-7112
Re: Help with using combineByKey
If you just want the ratio of positive to all values per key (if I'm reading right) this works val reduced= input.groupByKey().map(grp=> grp._2.filter(v=>v>0).size.toFloat/grp._2.size) reduced.foreach(println) I don't think you need reduceByKey or combineByKey as you're not doing anything where the values depend on each other -- you're just counting... On Thu, Oct 9, 2014 at 11:47 AM, HARIPRIYA AYYALASOMAYAJULA < aharipriy...@gmail.com> wrote: > > I am a beginner to Spark and finding it difficult to implement a very > simple reduce operation. I read that is ideal to use combineByKey for > complex reduce operations. > > My input: > > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), > ("SFO",0), ("SFO",1), > ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), ("KX",9), > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) > > > val opPart1 = input.combineByKey( >(v) => (v, 1), >(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 > + 1), >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + > acc2._2) >) > >val opPart2 = opPart1.map{ case (key, value) => (key, > (value._1,value._2)) } > > opPart2.collectAsMap().map(println(_)) > > If the value is greater than 0, the first accumulator should be > incremented by 1, else it remains the same. The second accumulator is a > simple counter for each value. I am getting an incorrect output (garbage > values )for the first accumulator. Please help. > > The equivalent reduce operation in Hadoop MapReduce is : > > > public static class PercentageCalcReducer extends > Reducer > > { > > private FloatWritable pdelay = new FloatWritable(); > > > public void reduce(Text key, Iterable values,Context > context)throws IOException,InterruptedException > > { > > int acc2=0; > > float frac_delay, percentage_delay; > > int acc1=0; > > for(IntWritable val : values) > > { > > if(val.get() > 0) > > { > > acc1++; > > } > > acc2++; > > } > > > > frac_delay = (float)acc1/acc2; > > percentage_delay = frac_delay * 100 ; > > pdelay.set(percentage_delay); > > context.write(key,pdelay); > > } > > } > > > Please help. Thank you for your time. > > -- > > Regards, > Haripriya Ayyalasomayajula > contact : 650-796-7112 >
Re: Help with using combineByKey
You have a typo in your code at "var acc:", and the map from opPart1 to opPart2 looks like a no-op, but those aren't the problem I think. It sounds like you intend the first element of each pair to be a count of nonzero values, but you initialize the first element of the pair to v, not 1, in v => (v,1). Try v => (1,1) On Thu, Oct 9, 2014 at 4:47 PM, HARIPRIYA AYYALASOMAYAJULA wrote: > > I am a beginner to Spark and finding it difficult to implement a very simple > reduce operation. I read that is ideal to use combineByKey for complex > reduce operations. > > My input: > > val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), ("SFO",0), > ("SFO",1), ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), > ("KX",9), > ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0))) > > > val opPart1 = input.combineByKey( >(v) => (v, 1), >(var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 + > 1), >(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + > acc2._2) >) > >val opPart2 = opPart1.map{ case (key, value) => (key, > (value._1,value._2)) } > > opPart2.collectAsMap().map(println(_)) > > If the value is greater than 0, the first accumulator should be incremented > by 1, else it remains the same. The second accumulator is a simple counter > for each value. I am getting an incorrect output (garbage values )for the > first accumulator. Please help. > > The equivalent reduce operation in Hadoop MapReduce is : > > public static class PercentageCalcReducer extends > Reducer > > { > > private FloatWritable pdelay = new FloatWritable(); > > > public void reduce(Text key, Iterable values,Context > context)throws IOException,InterruptedException > > { > > int acc2=0; > > float frac_delay, percentage_delay; > > int acc1=0; > > for(IntWritable val : values) > > { > > if(val.get() > 0) > > { > > acc1++; > > } > > acc2++; > > } > > > > frac_delay = (float)acc1/acc2; > > percentage_delay = frac_delay * 100 ; > > pdelay.set(percentage_delay); > > context.write(key,pdelay); > > } > > } > > > Please help. Thank you for your time. > > -- > > Regards, > > Haripriya Ayyalasomayajula > contact : 650-796-7112 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org