Hello Yana,

Thank you. Yes, it works. However, can you please suggest any examples ( or
links) about the usage of combineByKey.

On Thu, Oct 9, 2014 at 12:03 PM, Yana Kadiyska <yana.kadiy...@gmail.com>
wrote:

> If you just want the ratio of positive to all values per key (if I'm
> reading right) this works
>
> val reduced= input.groupByKey().map(grp=>
> grp._2.filter(v=>v>0).size.toFloat/grp._2.size)
> reduced.foreach(println)
>
> I don't think you need reduceByKey or combineByKey as you're not doing
> anything where the values depend on each other -- you're just counting...
>
> On Thu, Oct 9, 2014 at 11:47 AM, HARIPRIYA AYYALASOMAYAJULA <
> aharipriy...@gmail.com> wrote:
>
>>
>> I am a beginner to Spark and finding it difficult to implement a very
>> simple reduce operation. I read that is ideal to use combineByKey for
>> complex reduce operations.
>>
>> My input:
>>
>> val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7),
>> ("SFO",0), ("SFO",1),
>> ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1), ("KX",9),
>> ("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0)))
>>
>>
>>  val opPart1 = input.combineByKey(
>>    (v) => (v, 1),
>>    (var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2
>> + 1),
>>    (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
>> acc2._2)
>>    )
>>
>>    val opPart2 = opPart1.map{ case (key, value) => (key,
>> (value._1,value._2)) }
>>
>>  opPart2.collectAsMap().map(println(_))
>>
>> If the value is greater than 0, the first accumulator should be
>> incremented by 1, else it remains the same. The second accumulator is a
>> simple counter for each value.  I am getting an incorrect output (garbage
>> values )for the first accumulator. Please help.
>>
>>  The equivalent reduce operation in Hadoop MapReduce is :
>>
>>
>> public static class PercentageCalcReducer extends 
>> Reducer<Text,IntWritable,Text,FloatWritable>
>>
>> {
>>
>> private FloatWritable pdelay = new FloatWritable();
>>
>>
>> public void reduce(Text key, Iterable<IntWritable> values,Context
>> context)throws IOException,InterruptedException
>>
>>             {
>>
>> int acc2=0;
>>
>> float frac_delay, percentage_delay;
>>
>> int acc1=0;
>>
>> for(IntWritable val : values)
>>
>> {
>>
>> if(val.get() > 0)
>>
>> {
>>
>> acc1++;
>>
>> }
>>
>> acc2++;
>>
>> }
>>
>>
>>
>> frac_delay = (float)acc1/acc2;
>>
>> percentage_delay = frac_delay * 100 ;
>>
>> pdelay.set(percentage_delay);
>>
>> context.write(key,pdelay);
>>
>>             }
>>
>> }
>>
>>
>> Please help. Thank you for your time.
>>
>> --
>>
>> Regards,
>> Haripriya Ayyalasomayajula
>> contact : 650-796-7112
>>
>
>


-- 
Regards,
Haripriya Ayyalasomayajula
contact : 650-796-7112

Reply via email to