Hi,

I am trying to pointcut calls of |map| and |reduce| from Hadoop MapReduce.

When you create a program to work with the application MapReduce, you need to define |map| and |reduce| functions. The functions are called by the framework MapReduce. Here is an example of a MapReduce program programmed by the user [1].

I am using Spring AOP to intercept the call to the |map| , |reduce|, and |cleanup| functions, but it is not working. Maybe I must use AspectJ. The problem is that I don’t know how to do it with AspectJ. Any help to try make this work?

[1] Wordcount program

|package org.apache.hadoop.mapred.examples; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; import org.apache.hadoop.mapred.lib.HashPartitioner; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.ReduceContext; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.util.GenericOptionsParser; import java.io.IOException; import java.util.Iterator; import java.util.StringTokenizer; /** * My example of a common wordcount. Compare with the official WordCount.class to understand the differences between both classes. */ public class MyWordCount { public static class MyMap extends Mapper { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); private MedusaDigests parser = new MedusaDigests(); public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); output.collect(word, one); } } public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKeyValue()) { System.out.println("Key: " + context.getCurrentKey() + " Value: " + context.getCurrentValue()); map(context.getCurrentKey(), context.getCurrentValue(), context); } } finally { cleanup(context); } } public void cleanup(Mapper.Context context) {} } public static class MyReducer extends Reducer { private IntWritable result = new IntWritable(); MedusaDigests parser = new MedusaDigests(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { System.out.println(" - key ( " + key.getClass().toString() + "): " + key.toString() + " value ( " + val.getClass().toString() + " ): " + val.toString()); sum += val.get(); } result.set(sum); context.write(key, result); } public void run(Context context) throws IOException, InterruptedException { setup(context); try { while (context.nextKey()) { System.out.println("Key: " + context.getCurrentKey()); reduce(context.getCurrentKey(), context.getValues(), context); // If a back up store is used, reset it Iterator<IntWritable> iter = context.getValues().iterator(); if(iter instanceof ReduceContext.ValueIterator) { ((ReduceContext.ValueIterator<IntWritable>)iter).resetBackupStore(); } } } finally { cleanup(context); } } public void cleanup(Reducer.Context context) {} } public static void main(String[] args) throws Exception { GenericOptionsParser parser = new GenericOptionsParser(new Configuration(), args); String[] otherArgs = parser.getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount [<in>...] <out>"); System.exit(2); } // first map tasks JobConf conf = new JobConf(MyWordCount.class); conf.setJobName("wordcount"); conf.setClass("mapreduce.job.map.identity.class", MyFullyIndentityMapper.class, Mapper.class); conf.setJarByClass(MyWordCount.class); conf.setPartitionerClass(HashPartitioner.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setNumReduceTasks(1); Path[] inputPaths = new Path[otherArgs.length-1]; for (int i = 0; i < otherArgs.length - 1; ++i) { inputPaths[i] = new Path(otherArgs[i]); } Path outputPath = new Path(otherArgs[otherArgs.length - 1]); FileInputFormat.setInputPaths(conf, inputPaths); FileOutputFormat.setOutputPath(conf, outputPath); // launch the job directly Medusa.runJob(conf); System.exit(0); } } |

Thanks,

​
_______________________________________________
aspectj-users mailing list
aspectj-users@eclipse.org
To change your delivery options, retrieve your password, or unsubscribe from 
this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users

Reply via email to