In my example, I have the following snippet [1]. I want to execute my
aspect before calling |context.write(this.word, one)| from the example.
Therefore, I have created this aspect [2] based on your solution.
The |context| variable is from this type |public abstract class
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends
TaskAttemptContext implements Progressable|, and the superclass is from
the type |public class TaskAttemptContext extends JobContext implements
Progressable|.
After I compile my code, I can see that the call invocation was not
placed in the |class| file [3].
Why the |call| designator is not working?
[1] Snippet of the wordcount example.
while(itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
[2] My aspect
|before(): call(* *Context.write(..)) { System.out.println("Aspects
Write");
System.out.println(thisJoinPoint.getSourceLocation().getFileName()); } |
[3] Compiled version of the job. This is the |class| file.
|public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
MapReduceCalls.aspectOf().ajc$before$org_apache_hadoop_mapred_aspects_MapReduceCalls$1$9e048e72();
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) { this.word.set(itr.nextToken());
context.write(this.word, one); } } |
Cheers,
On 09/23/2015 04:33 PM, Andy Clement wrote:
With pure AspectJ, if you want to intercept when map/reduce/cleanup
are running, you can use something like this:
=== Foo.aj ===
aspect Foo {
before(): execution(* map(..)) {
System.out.println(thisJoinPoint);
}
before(): execution(* reduce(..)) {
System.out.println(thisJoinPoint);
}
before(): execution(* cleanup(..)) {
System.out.println(thisJoinPoint);
}
}
===
ajc -1.8 Foo.aj MyWordCount.java -showWeaveInfo
This will compile and weave MyWordCount - when you run it you will see
the advice output.
Alternatively if you want to advise the call side of these things, you
will need to weave the hadoop jar (whatever that is):
=== Foo.aj ===
aspect Foo {
before(): call(* map(..)) {
System.out.println(thisJoinPoint);
}
before(): call(* reduce(..)) {
System.out.println(thisJoinPoint);
}
before(): call(* cleanup(..)) {
System.out.println(thisJoinPoint);
}
}
===
ajc -1.8 Foo.aj -inpath hadoop.jar -outjar woven_hadoop.jar -showWeaveInfo
Now when you run your MyWordCount program you need to be using the
woven_hadoop jar instead of the regular hadoop jar.
cheers,
Andy
On Sep 23, 2015, at 7:34 AM, xeonmailinglist
<[email protected] <mailto:[email protected]>> wrote:
Hi,
I am trying to pointcut calls of |map| and |reduce| from Hadoop
MapReduce.
When you create a program to work with the application MapReduce, you
need to define |map| and |reduce| functions. The functions are called
by the framework MapReduce. Here is an example of a MapReduce program
programmed by the user [1].
I am using Spring AOP to intercept the call to the |map| , |reduce|,
and |cleanup| functions, but it is not working. Maybe I must use
AspectJ. The problem is that I don’t know how to do it with AspectJ.
Any help to try make this work?
[1] Wordcount program
|package org.apache.hadoop.mapred.examples; import
org.apache.hadoop.conf.Configuration; import
org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable; import
org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; import
org.apache.hadoop.mapred.lib.HashPartitioner; import
org.apache.hadoop.mapreduce.Mapper; import
org.apache.hadoop.mapreduce.ReduceContext; import
org.apache.hadoop.mapreduce.Reducer; import
org.apache.hadoop.util.GenericOptionsParser; import
java.io.IOException; import java.util.Iterator; import
java.util.StringTokenizer; /** * My example of a common wordcount.
Compare with the official WordCount.class to understand the
differences between both classes. */ public class MyWordCount {
public static class MyMap extends Mapper { private final static
IntWritable one = new IntWritable(1); private Text word = new Text();
private MedusaDigests parser = new MedusaDigests(); public void
map(LongWritable key, Text value, OutputCollector<Text, IntWritable>
output, Reporter reporter) throws IOException { StringTokenizer itr =
new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) {
word.set(itr.nextToken()); output.collect(word, one); } } public void
run(Context context) throws IOException, InterruptedException {
setup(context); try { while (context.nextKeyValue()) {
System.out.println("Key: " + context.getCurrentKey() + " Value: " +
context.getCurrentValue()); map(context.getCurrentKey(),
context.getCurrentValue(), context); } } finally { cleanup(context);
} } public void cleanup(Mapper.Context context) {} } public static
class MyReducer extends Reducer { private IntWritable result = new
IntWritable(); MedusaDigests parser = new MedusaDigests(); public
void reduce(Text key, Iterable<IntWritable> values, Context context )
throws IOException, InterruptedException { int sum = 0; for
(IntWritable val : values) { System.out.println(" - key ( " +
key.getClass().toString() + "): " + key.toString() + " value ( " +
val.getClass().toString() + " ): " + val.toString()); sum +=
val.get(); } result.set(sum); context.write(key, result); } public
void run(Context context) throws IOException, InterruptedException {
setup(context); try { while (context.nextKey()) {
System.out.println("Key: " + context.getCurrentKey());
reduce(context.getCurrentKey(), context.getValues(), context); // If
a back up store is used, reset it Iterator<IntWritable> iter =
context.getValues().iterator(); if(iter instanceof
ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<IntWritable>)iter).resetBackupStore();
} } } finally { cleanup(context); } } public void
cleanup(Reducer.Context context) {} } public static void
main(String[] args) throws Exception { GenericOptionsParser parser =
new GenericOptionsParser(new Configuration(), args); String[]
otherArgs = parser.getRemainingArgs(); if (otherArgs.length < 2) {
System.err.println("Usage: wordcount [<in>...] <out>");
System.exit(2); } // first map tasks JobConf conf = new
JobConf(MyWordCount.class); conf.setJobName("wordcount");
conf.setClass("mapreduce.job.map.identity.class",
MyFullyIndentityMapper.class, Mapper.class);
conf.setJarByClass(MyWordCount.class);
conf.setPartitionerClass(HashPartitioner.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setNumReduceTasks(1); Path[] inputPaths = new
Path[otherArgs.length-1]; for (int i = 0; i < otherArgs.length - 1;
++i) { inputPaths[i] = new Path(otherArgs[i]); } Path outputPath =
new Path(otherArgs[otherArgs.length - 1]);
FileInputFormat.setInputPaths(conf, inputPaths);
FileOutputFormat.setOutputPath(conf, outputPath); // launch the job
directly Medusa.runJob(conf); System.exit(0); } } |
Thanks,
_______________________________________________
aspectj-users mailing list
[email protected] <mailto:[email protected]>
To change your delivery options, retrieve your password, or
unsubscribe from this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users
_______________________________________________
aspectj-users mailing list
[email protected]
To change your delivery options, retrieve your password, or unsubscribe from
this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users
_______________________________________________
aspectj-users mailing list
[email protected]
To change your delivery options, retrieve your password, or unsubscribe from
this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users