With pure AspectJ, if you want to intercept when map/reduce/cleanup are
running, you can use something like this:
=== Foo.aj ===
aspect Foo {
before(): execution(* map(..)) {
System.out.println(thisJoinPoint);
}
before(): execution(* reduce(..)) {
System.out.println(thisJoinPoint);
}
before(): execution(* cleanup(..)) {
System.out.println(thisJoinPoint);
}
}
===
ajc -1.8 Foo.aj MyWordCount.java -showWeaveInfo
This will compile and weave MyWordCount - when you run it you will see the
advice output.
Alternatively if you want to advise the call side of these things, you will
need to weave the hadoop jar (whatever that is):
=== Foo.aj ===
aspect Foo {
before(): call(* map(..)) {
System.out.println(thisJoinPoint);
}
before(): call(* reduce(..)) {
System.out.println(thisJoinPoint);
}
before(): call(* cleanup(..)) {
System.out.println(thisJoinPoint);
}
}
===
ajc -1.8 Foo.aj -inpath hadoop.jar -outjar woven_hadoop.jar -showWeaveInfo
Now when you run your MyWordCount program you need to be using the woven_hadoop
jar instead of the regular hadoop jar.
cheers,
Andy
> On Sep 23, 2015, at 7:34 AM, xeonmailinglist <[email protected]>
> wrote:
>
> Hi,
>
> I am trying to pointcut calls of map and reduce from Hadoop MapReduce.
>
> When you create a program to work with the application MapReduce, you need to
> define map and reduce functions. The functions are called by the framework
> MapReduce. Here is an example of a MapReduce program programmed by the user
> [1].
>
> I am using Spring AOP to intercept the call to the map , reduce, and cleanup
> functions, but it is not working. Maybe I must use AspectJ. The problem is
> that I don’t know how to do it with AspectJ. Any help to try make this work?
>
> [1] Wordcount program
>
>
> package org.apache.hadoop.mapred.examples;
>
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.fs.Path;
> import org.apache.hadoop.io.IntWritable;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapred.*;
> import org.apache.hadoop.mapred.lib.HashPartitioner;
> import org.apache.hadoop.mapreduce.Mapper;
> import org.apache.hadoop.mapreduce.ReduceContext;
> import org.apache.hadoop.mapreduce.Reducer;
> import org.apache.hadoop.util.GenericOptionsParser;
>
> import java.io.IOException;
> import java.util.Iterator;
> import java.util.StringTokenizer;
>
> /**
> * My example of a common wordcount. Compare with the official
> WordCount.class to understand the differences between both classes.
> */
> public class MyWordCount {
> public static class MyMap extends Mapper {
> private final static IntWritable one = new IntWritable(1);
> private Text word = new Text();
> private MedusaDigests parser = new MedusaDigests();
>
> public void map(LongWritable key, Text value, OutputCollector<Text,
> IntWritable> output, Reporter reporter) throws IOException {
> StringTokenizer itr = new StringTokenizer(value.toString());
> while (itr.hasMoreTokens()) {
> word.set(itr.nextToken());
> output.collect(word, one);
> }
> }
>
> public void run(Context context) throws IOException,
> InterruptedException {
> setup(context);
> try {
> while (context.nextKeyValue()) {
> System.out.println("Key: " + context.getCurrentKey() + "
> Value: " + context.getCurrentValue());
> map(context.getCurrentKey(), context.getCurrentValue(),
> context);
> }
> } finally {
> cleanup(context);
> }
> }
>
> public void cleanup(Mapper.Context context) {}
> }
>
> public static class MyReducer extends Reducer {
> private IntWritable result = new IntWritable();
> MedusaDigests parser = new MedusaDigests();
>
> public void reduce(Text key, Iterable<IntWritable> values, Context
> context
> ) throws IOException, InterruptedException {
> int sum = 0;
> for (IntWritable val : values) {
> System.out.println(" - key ( " + key.getClass().toString() +
> "): " + key.toString()
> + " value ( " + val.getClass().toString() + " ): " +
> val.toString());
> sum += val.get();
> }
> result.set(sum);
> context.write(key, result);
> }
>
> public void run(Context context) throws IOException,
> InterruptedException {
> setup(context);
> try {
> while (context.nextKey()) {
> System.out.println("Key: " + context.getCurrentKey());
> reduce(context.getCurrentKey(), context.getValues(),
> context);
> // If a back up store is used, reset it
> Iterator<IntWritable> iter =
> context.getValues().iterator();
> if(iter instanceof ReduceContext.ValueIterator) {
>
> ((ReduceContext.ValueIterator<IntWritable>)iter).resetBackupStore();
> }
> }
> } finally {
> cleanup(context);
> }
> }
>
> public void cleanup(Reducer.Context context) {}
> }
>
> public static void main(String[] args) throws Exception {
> GenericOptionsParser parser = new GenericOptionsParser(new
> Configuration(), args);
>
> String[] otherArgs = parser.getRemainingArgs();
> if (otherArgs.length < 2) {
> System.err.println("Usage: wordcount [<in>...] <out>");
> System.exit(2);
> }
>
> // first map tasks
> JobConf conf = new JobConf(MyWordCount.class);
> conf.setJobName("wordcount");
> conf.setClass("mapreduce.job.map.identity.class",
> MyFullyIndentityMapper.class, Mapper.class);
>
> conf.setJarByClass(MyWordCount.class);
> conf.setPartitionerClass(HashPartitioner.class);
> conf.setOutputKeyClass(Text.class);
> conf.setOutputValueClass(IntWritable.class);
> conf.setNumReduceTasks(1);
>
> Path[] inputPaths = new Path[otherArgs.length-1];
> for (int i = 0; i < otherArgs.length - 1; ++i) { inputPaths[i] = new
> Path(otherArgs[i]); }
> Path outputPath = new Path(otherArgs[otherArgs.length - 1]);
> FileInputFormat.setInputPaths(conf, inputPaths);
> FileOutputFormat.setOutputPath(conf, outputPath);
>
> // launch the job directly
> Medusa.runJob(conf);
> System.exit(0);
> }
> }
> Thanks,
>
> _______________________________________________
> aspectj-users mailing list
> [email protected]
> To change your delivery options, retrieve your password, or unsubscribe from
> this list, visit
> https://dev.eclipse.org/mailman/listinfo/aspectj-users
_______________________________________________
aspectj-users mailing list
[email protected]
To change your delivery options, retrieve your password, or unsubscribe from
this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users