Hello everyone,

I ran into some issues while trying to figure out how to correctly use
aggregators, since I want to implement a global priority queue that
"schedules" processing on vertices. As a simple test to better
understand aggregator useage I ended up modifying the SimpleShortestPathsVertex 
example and added the SumAggregator code
from the SimplePageRankVertex example to it (Workercontext and
compute()) (code posted below).
Though this test code does not do anything useful I was surprised to
see the following worker NullPointerExceptions during execution.

2012-05-23 14:44:59,267 INFO
org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs'
truncater with mapRetainSize=-1 and reduceRetainSize=-1
2012-05-23 14:44:59,469 INFO org.apache.hadoop.io.nativeio.NativeIO:
Initialized cache for UID to User mapping with a cache timeout of
14400 seconds.
2012-05-23 14:44:59,470 INFO org.apache.hadoop.io.nativeio.NativeIO:
Got UserName hadoop00 for UID 508 from the native implementation
2012-05-23 14:44:59,472 WARN org.apache.hadoop.mapred.Child: Error running child
java.lang.NullPointerException
    at 
org.apache.giraph.examples.linda.LindaAggregatorTest.compute(LindaAggregatorTest.java:104)
    at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:593)
    at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:648)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
    at org.apache.hadoop.mapred.Child.main(Child.java:253)
2012-05-23 14:44:59,475 INFO org.apache.hadoop.mapred.Task: Runnning
cleanup for the task


So my question is. What are the pitfalls (method call order, setup,
superstep count) of aggregator usage, as following the description in
useAggregator did
not seem to help, so I am obviously missing some detail.

JAVA Code:

 @Override
 public void compute(Iterator<DoubleWritable> msgIterator) {
 LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
 if (getSuperstep() == 0) {
   setVertexValue(new DoubleWritable(Double.MAX_VALUE));
 }
 double minDist = isSource() ? 0d : Double.MAX_VALUE;
 while (msgIterator.hasNext()) {
   minDist = Math.min(minDist, msgIterator.next().get());
 }
 if (LOG.isDebugEnabled()) {
   LOG.debug("Vertex " + getVertexId() + " got minDist = " + minDist +
       " vertex value = " + getVertexValue());
 }
 if (getSuperstep() >= 0) {
     sumAggreg.aggregate(1L);      // NPE at Line 104
 }

 if (minDist < getVertexValue().get()) {
   setVertexValue(new DoubleWritable(minDist));
   for (LongWritable targetVertexId : this) {
     FloatWritable edgeValue = getEdgeValue(targetVertexId);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Vertex " + getVertexId() + " sent to " +
           targetVertexId + " = " +
           (minDist + edgeValue.get()));
     }
     sendMsg(targetVertexId,
         new DoubleWritable(minDist + edgeValue.get()));
   }
 }
 voteToHalt();
 }

 public static class MyVertexWorkerContext extends
   WorkerContext {
 /** Final sum value for verification for local jobs */
 private static long FINAL_SUM;

 public static long getFinalSum() {
   return FINAL_SUM;
 }

 @Override
 public void preApplication()
   throws InstantiationException, IllegalAccessException {
   registerAggregator("sum", LongSumAggregator.class);
 }

 @Override
 public void postApplication() {
     System.out.println("PreApp");
   LongSumAggregator sumAggreg =
       (LongSumAggregator) getAggregator("sum");

   FINAL_SUM = sumAggreg.getAggregatedValue().get();
   LOG.info("aggregatedNumVertices=" + FINAL_SUM);
 }

 @Override
 public void preSuperstep() {
     System.out.println("PreSuperStep");
   LongSumAggregator sumAggreg =
       (LongSumAggregator) getAggregator("sum");
   this.useAggregator("sum");
   sumAggreg.setAggregatedValue(new LongWritable(0L));
 }

 @Override
 public void postSuperstep() {}
 }

Rest as in SimpleShortestPathsVertex.

Regards,
Nils
-- 
NEU: FreePhone 3-fach-Flat mit kostenlosem Smartphone!                          
        
Jetzt informieren: http://mobile.1und1.de/?ac=OM.PW.PW003K20328T7073a

Reply via email to