I am a beginner to Spark and finding it difficult to implement a very
simple reduce operation. I read that is ideal to use combineByKey for
complex reduce operations.

My input:

val input = sc.parallelize(List(("LAX",6), ("LAX",8), ("LAX",7), ("SFO",0),
("SFO",1), ("SFO",9),("PHX",65),("PHX",88),("KX",7),("KX",6),("KX",1),
("KX",9),
("HOU",56),("HOU",5),("HOU",59),("HOU",0),("MA",563),("MA",545),("MA",5),("MA",0),("MA",0)))


 val opPart1 = input.combineByKey(
   (v) => (v, 1),
   (var acc: (Int, Int), v) => ( if(v > 0) acc._1 + 1 else acc._1, acc._2 +
1),
   (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 +
acc2._2)
   )

   val opPart2 = opPart1.map{ case (key, value) => (key,
(value._1,value._2)) }

 opPart2.collectAsMap().map(println(_))

If the value is greater than 0, the first accumulator should be incremented
by 1, else it remains the same. The second accumulator is a simple counter
for each value.  I am getting an incorrect output (garbage values )for the
first accumulator. Please help.

 The equivalent reduce operation in Hadoop MapReduce is :

public static class PercentageCalcReducer extends
Reducer<Text,IntWritable,Text,FloatWritable>

{

private FloatWritable pdelay = new FloatWritable();


public void reduce(Text key, Iterable<IntWritable> values,Context
context)throws IOException,InterruptedException

            {

int acc2=0;

float frac_delay, percentage_delay;

int acc1=0;

for(IntWritable val : values)

{

if(val.get() > 0)

{

acc1++;

}

acc2++;

}



frac_delay = (float)acc1/acc2;

percentage_delay = frac_delay * 100 ;

pdelay.set(percentage_delay);

context.write(key,pdelay);

            }

}


Please help. Thank you for your time.

-- 

Regards,
Haripriya Ayyalasomayajula
contact : 650-796-7112

Reply via email to