Hi,
I am trying to pointcut calls of |map| and |reduce| from Hadoop MapReduce.
When you create a program to work with the application MapReduce, you
need to define |map| and |reduce| functions. The functions are called by
the framework MapReduce. Here is an example of a MapReduce program
programmed by the user [1].
I am using Spring AOP to intercept the call to the |map| , |reduce|, and
|cleanup| functions, but it is not working. Maybe I must use AspectJ.
The problem is that I don’t know how to do it with AspectJ. Any help to
try make this work?
[1] Wordcount program
|package org.apache.hadoop.mapred.examples; import
org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable; import
org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.*; import
org.apache.hadoop.mapred.lib.HashPartitioner; import
org.apache.hadoop.mapreduce.Mapper; import
org.apache.hadoop.mapreduce.ReduceContext; import
org.apache.hadoop.mapreduce.Reducer; import
org.apache.hadoop.util.GenericOptionsParser; import java.io.IOException;
import java.util.Iterator; import java.util.StringTokenizer; /** * My
example of a common wordcount. Compare with the official WordCount.class
to understand the differences between both classes. */ public class
MyWordCount { public static class MyMap extends Mapper { private final
static IntWritable one = new IntWritable(1); private Text word = new
Text(); private MedusaDigests parser = new MedusaDigests(); public void
map(LongWritable key, Text value, OutputCollector<Text, IntWritable>
output, Reporter reporter) throws IOException { StringTokenizer itr =
new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) {
word.set(itr.nextToken()); output.collect(word, one); } } public void
run(Context context) throws IOException, InterruptedException {
setup(context); try { while (context.nextKeyValue()) {
System.out.println("Key: " + context.getCurrentKey() + " Value: " +
context.getCurrentValue()); map(context.getCurrentKey(),
context.getCurrentValue(), context); } } finally { cleanup(context); } }
public void cleanup(Mapper.Context context) {} } public static class
MyReducer extends Reducer { private IntWritable result = new
IntWritable(); MedusaDigests parser = new MedusaDigests(); public void
reduce(Text key, Iterable<IntWritable> values, Context context ) throws
IOException, InterruptedException { int sum = 0; for (IntWritable val :
values) { System.out.println(" - key ( " + key.getClass().toString() +
"): " + key.toString() + " value ( " + val.getClass().toString() + " ):
" + val.toString()); sum += val.get(); } result.set(sum);
context.write(key, result); } public void run(Context context) throws
IOException, InterruptedException { setup(context); try { while
(context.nextKey()) { System.out.println("Key: " +
context.getCurrentKey()); reduce(context.getCurrentKey(),
context.getValues(), context); // If a back up store is used, reset it
Iterator<IntWritable> iter = context.getValues().iterator(); if(iter
instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<IntWritable>)iter).resetBackupStore(); } }
} finally { cleanup(context); } } public void cleanup(Reducer.Context
context) {} } public static void main(String[] args) throws Exception {
GenericOptionsParser parser = new GenericOptionsParser(new
Configuration(), args); String[] otherArgs = parser.getRemainingArgs();
if (otherArgs.length < 2) { System.err.println("Usage: wordcount
[<in>...] <out>"); System.exit(2); } // first map tasks JobConf conf =
new JobConf(MyWordCount.class); conf.setJobName("wordcount");
conf.setClass("mapreduce.job.map.identity.class",
MyFullyIndentityMapper.class, Mapper.class);
conf.setJarByClass(MyWordCount.class);
conf.setPartitionerClass(HashPartitioner.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class); conf.setNumReduceTasks(1);
Path[] inputPaths = new Path[otherArgs.length-1]; for (int i = 0; i <
otherArgs.length - 1; ++i) { inputPaths[i] = new Path(otherArgs[i]); }
Path outputPath = new Path(otherArgs[otherArgs.length - 1]);
FileInputFormat.setInputPaths(conf, inputPaths);
FileOutputFormat.setOutputPath(conf, outputPath); // launch the job
directly Medusa.runJob(conf); System.exit(0); } } |
Thanks,
_______________________________________________
aspectj-users mailing list
aspectj-users@eclipse.org
To change your delivery options, retrieve your password, or unsubscribe from
this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users