Hi,
I'm processing the data from Hbase and, storing output in a
SequenceFile. BTW, results are directly stored by Mapper. (not through
reduce task) I couldn't find why reduce task didn't run.
Could anyone advice to me?
This is my code.
----
Job job = new Job(config, "Infinity Norm MR job : " + this.getPath());
Scan scan = new Scan();
scan.addFamily(Constants.COLUMNFAMILY);
org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil.initTableMapperJob(
this.getPath(), scan,
MatrixNormMapReduce.MatrixInfinityNormMapper.class, IntWritable.class,
DoubleWritable.class, job);
job.setReducerClass(MatrixNormMapReduce.MatrixInfinityNormReducer.class);
job.setNumReduceTasks(1);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(DoubleWritable.class);
SequenceFileOutputFormat.setOutputPath(job, outDir);
job.waitForCompletion(true);
----
public static class MatrixInfinityNormMapper extends
TableMapper<IntWritable, DoubleWritable> {
@Override
public void map(ImmutableBytesWritable key, Result value, Context context)
throws IOException, InterruptedException {
double rowSum = 0;
NavigableMap<byte[], byte[]> v = value
.getFamilyMap(Constants.COLUMNFAMILY);
for (Map.Entry<byte[], byte[]> e : v.entrySet()) {
rowSum += Math.abs(BytesUtil.bytesToDouble(e.getValue()));
}
context.write(MatrixNormMapReduce.nKey, new DoubleWritable(rowSum));
}
}
----
public static class MatrixInfinityNormReducer extends
TableReducer<IntWritable, DoubleWritable, Writable> {
static final Logger LOG = Logger.getLogger(MatrixInfinityNormReducer.class);
private double max = 0;
public void reduce(IntWritable key, Iterator<DoubleWritable> values,
OutputCollector<IntWritable, DoubleWritable> output, Reporter reporter)
throws IOException {
while (values.hasNext()) {
LOG.info(">>>>>>> " + nKey + ", " + max);
max = Math.max(values.next().get(), max);
}
// Note: Tricky here. As we known, we collect each row's sum with key(-1).
// the reduce will just iterate through one key (-1)
// so we collect the max sum-value here
output.collect(MatrixNormMapReduce.nKey, new DoubleWritable(max));
}
--
Best Regards, Edward J. Yoon @ NHN, corp.
[email protected]
http://blog.udanax.org