Hello everyone, I ran into some issues while trying to figure out how to correctly use aggregators, since I want to implement a global priority queue that "schedules" processing on vertices. As a simple test to better understand aggregator useage I ended up modifying the SimpleShortestPathsVertex example and added the SumAggregator code from the SimplePageRankVertex example to it (Workercontext and compute()) (code posted below). Though this test code does not do anything useful I was surprised to see the following worker NullPointerExceptions during execution.
2012-05-23 14:44:59,267 INFO org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1 2012-05-23 14:44:59,469 INFO org.apache.hadoop.io.nativeio.NativeIO: Initialized cache for UID to User mapping with a cache timeout of 14400 seconds. 2012-05-23 14:44:59,470 INFO org.apache.hadoop.io.nativeio.NativeIO: Got UserName hadoop00 for UID 508 from the native implementation 2012-05-23 14:44:59,472 WARN org.apache.hadoop.mapred.Child: Error running child java.lang.NullPointerException at org.apache.giraph.examples.linda.LindaAggregatorTest.compute(LindaAggregatorTest.java:104) at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:593) at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:648) at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369) at org.apache.hadoop.mapred.Child$4.run(Child.java:259) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:396) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) at org.apache.hadoop.mapred.Child.main(Child.java:253) 2012-05-23 14:44:59,475 INFO org.apache.hadoop.mapred.Task: Runnning cleanup for the task So my question is. What are the pitfalls (method call order, setup, superstep count) of aggregator usage, as following the description in useAggregator did not seem to help, so I am obviously missing some detail. JAVA Code: @Override public void compute(Iterator<DoubleWritable> msgIterator) { LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum"); if (getSuperstep() == 0) { setVertexValue(new DoubleWritable(Double.MAX_VALUE)); } double minDist = isSource() ? 0d : Double.MAX_VALUE; while (msgIterator.hasNext()) { minDist = Math.min(minDist, msgIterator.next().get()); } if (LOG.isDebugEnabled()) { LOG.debug("Vertex " + getVertexId() + " got minDist = " + minDist + " vertex value = " + getVertexValue()); } if (getSuperstep() >= 0) { sumAggreg.aggregate(1L); // NPE at Line 104 } if (minDist < getVertexValue().get()) { setVertexValue(new DoubleWritable(minDist)); for (LongWritable targetVertexId : this) { FloatWritable edgeValue = getEdgeValue(targetVertexId); if (LOG.isDebugEnabled()) { LOG.debug("Vertex " + getVertexId() + " sent to " + targetVertexId + " = " + (minDist + edgeValue.get())); } sendMsg(targetVertexId, new DoubleWritable(minDist + edgeValue.get())); } } voteToHalt(); } public static class MyVertexWorkerContext extends WorkerContext { /** Final sum value for verification for local jobs */ private static long FINAL_SUM; public static long getFinalSum() { return FINAL_SUM; } @Override public void preApplication() throws InstantiationException, IllegalAccessException { registerAggregator("sum", LongSumAggregator.class); } @Override public void postApplication() { System.out.println("PreApp"); LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum"); FINAL_SUM = sumAggreg.getAggregatedValue().get(); LOG.info("aggregatedNumVertices=" + FINAL_SUM); } @Override public void preSuperstep() { System.out.println("PreSuperStep"); LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum"); this.useAggregator("sum"); sumAggreg.setAggregatedValue(new LongWritable(0L)); } @Override public void postSuperstep() {} } Rest as in SimpleShortestPathsVertex. Regards, Nils -- NEU: FreePhone 3-fach-Flat mit kostenlosem Smartphone! Jetzt informieren: http://mobile.1und1.de/?ac=OM.PW.PW003K20328T7073a