I have found the solution. Strangely, this aspect works [1], but these
pointcuts [2] don't. If anyone could try to explain I would be appreciated.
[1] Aspect that works
pointcut printWrite(): call(* *.write(..));
before(): printWrite() {
System.out.println("Busted2");
}
[2] Aspects that doesn't work
pointcut printWrite(): call(* write(..));
pointcut printWrite(): call(* Context.write(..));
On 09/25/2015 03:12 PM, xeonmailinglist wrote:
In my example, I have the following snippet [1]. I want to execute my
aspect before calling |context.write(this.word, one)| from the
example. Therefore, I have created this aspect [2] based on your solution.
The |context| variable is from this type |public abstract class
TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends
TaskAttemptContext implements Progressable|, and the superclass is
from the type |public class TaskAttemptContext extends JobContext
implements Progressable|.
After I compile my code, I can see that the call invocation was not
placed in the |class| file [3].
Why the |call| designator is not working?
[1] Snippet of the wordcount example.
while(itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
[2] My aspect
|before(): call(* *Context.write(..)) { System.out.println("Aspects
Write");
System.out.println(thisJoinPoint.getSourceLocation().getFileName()); } |
[3] Compiled version of the job. This is the |class| file.
|public void map(LongWritable key, Text value, Context context) throws
IOException, InterruptedException {
MapReduceCalls.aspectOf().ajc$before$org_apache_hadoop_mapred_aspects_MapReduceCalls$1$9e048e72();
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) { this.word.set(itr.nextToken());
context.write(this.word, one); } } |
Cheers,
On 09/23/2015 04:33 PM, Andy Clement wrote:
With pure AspectJ, if you want to intercept when map/reduce/cleanup
are running, you can use something like this:
=== Foo.aj ===
aspect Foo {
before(): execution(* map(..)) {
System.out.println(thisJoinPoint);
}
before(): execution(* reduce(..)) {
System.out.println(thisJoinPoint);
}
before(): execution(* cleanup(..)) {
System.out.println(thisJoinPoint);
}
}
===
ajc -1.8 Foo.aj MyWordCount.java -showWeaveInfo
This will compile and weave MyWordCount - when you run it you will
see the advice output.
Alternatively if you want to advise the call side of these things,
you will need to weave the hadoop jar (whatever that is):
=== Foo.aj ===
aspect Foo {
before(): call(* map(..)) {
System.out.println(thisJoinPoint);
}
before(): call(* reduce(..)) {
System.out.println(thisJoinPoint);
}
before(): call(* cleanup(..)) {
System.out.println(thisJoinPoint);
}
}
===
ajc -1.8 Foo.aj -inpath hadoop.jar -outjar woven_hadoop.jar
-showWeaveInfo
Now when you run your MyWordCount program you need to be using the
woven_hadoop jar instead of the regular hadoop jar.
cheers,
Andy
On Sep 23, 2015, at 7:34 AM, xeonmailinglist
<[email protected] <mailto:[email protected]>> wrote:
Hi,
I am trying to pointcut calls of |map| and |reduce| from Hadoop
MapReduce.
When you create a program to work with the application MapReduce,
you need to define |map| and |reduce| functions. The functions are
called by the framework MapReduce. Here is an example of a MapReduce
program programmed by the user [1].
I am using Spring AOP to intercept the call to the |map| , |reduce|,
and |cleanup| functions, but it is not working. Maybe I must use
AspectJ. The problem is that I don’t know how to do it with AspectJ.
Any help to try make this work?
[1] Wordcount program
|package org.apache.hadoop.mapred.examples; import
org.apache.hadoop.conf.Configuration; import
org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable; import
org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.*; import
org.apache.hadoop.mapred.lib.HashPartitioner; import
org.apache.hadoop.mapreduce.Mapper; import
org.apache.hadoop.mapreduce.ReduceContext; import
org.apache.hadoop.mapreduce.Reducer; import
org.apache.hadoop.util.GenericOptionsParser; import
java.io.IOException; import java.util.Iterator; import
java.util.StringTokenizer; /** * My example of a common wordcount.
Compare with the official WordCount.class to understand the
differences between both classes. */ public class MyWordCount {
public static class MyMap extends Mapper { private final static
IntWritable one = new IntWritable(1); private Text word = new
Text(); private MedusaDigests parser = new MedusaDigests(); public
void map(LongWritable key, Text value, OutputCollector<Text,
IntWritable> output, Reporter reporter) throws IOException {
StringTokenizer itr = new StringTokenizer(value.toString()); while
(itr.hasMoreTokens()) { word.set(itr.nextToken());
output.collect(word, one); } } public void run(Context context)
throws IOException, InterruptedException { setup(context); try {
while (context.nextKeyValue()) { System.out.println("Key: " +
context.getCurrentKey() + " Value: " + context.getCurrentValue());
map(context.getCurrentKey(), context.getCurrentValue(), context); }
} finally { cleanup(context); } } public void cleanup(Mapper.Context
context) {} } public static class MyReducer extends Reducer {
private IntWritable result = new IntWritable(); MedusaDigests parser
= new MedusaDigests(); public void reduce(Text key,
Iterable<IntWritable> values, Context context ) throws IOException,
InterruptedException { int sum = 0; for (IntWritable val : values) {
System.out.println(" - key ( " + key.getClass().toString() + "): " +
key.toString() + " value ( " + val.getClass().toString() + " ): " +
val.toString()); sum += val.get(); } result.set(sum);
context.write(key, result); } public void run(Context context)
throws IOException, InterruptedException { setup(context); try {
while (context.nextKey()) { System.out.println("Key: " +
context.getCurrentKey()); reduce(context.getCurrentKey(),
context.getValues(), context); // If a back up store is used, reset
it Iterator<IntWritable> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<IntWritable>)iter).resetBackupStore();
} } } finally { cleanup(context); } } public void
cleanup(Reducer.Context context) {} } public static void
main(String[] args) throws Exception { GenericOptionsParser parser =
new GenericOptionsParser(new Configuration(), args); String[]
otherArgs = parser.getRemainingArgs(); if (otherArgs.length < 2) {
System.err.println("Usage: wordcount [<in>...] <out>");
System.exit(2); } // first map tasks JobConf conf = new
JobConf(MyWordCount.class); conf.setJobName("wordcount");
conf.setClass("mapreduce.job.map.identity.class",
MyFullyIndentityMapper.class, Mapper.class);
conf.setJarByClass(MyWordCount.class);
conf.setPartitionerClass(HashPartitioner.class);
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);
conf.setNumReduceTasks(1); Path[] inputPaths = new
Path[otherArgs.length-1]; for (int i = 0; i < otherArgs.length - 1;
++i) { inputPaths[i] = new Path(otherArgs[i]); } Path outputPath =
new Path(otherArgs[otherArgs.length - 1]);
FileInputFormat.setInputPaths(conf, inputPaths);
FileOutputFormat.setOutputPath(conf, outputPath); // launch the job
directly Medusa.runJob(conf); System.exit(0); } } |
Thanks,
_______________________________________________
aspectj-users mailing list
[email protected] <mailto:[email protected]>
To change your delivery options, retrieve your password, or
unsubscribe from this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users
_______________________________________________
aspectj-users mailing list
[email protected]
To change your delivery options, retrieve your password, or unsubscribe from
this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users
_______________________________________________
aspectj-users mailing list
[email protected]
To change your delivery options, retrieve your password, or unsubscribe from
this list, visit
https://dev.eclipse.org/mailman/listinfo/aspectj-users