Hi Nils,
I am not 100%, but...

Do you configure your GiraphJob propertly?

You need to tell Giraph you want to use your MyVertexWorkerContext.
You can do that using the GiraphJob.setWorkerContextClass(...) method
or the 'wc' option for the command line.

My 2 cents,
Paolo

Nils Rethmeier wrote:
> Hello everyone,
> 
> I ran into some issues while trying to figure out how to correctly use
> aggregators, since I want to implement a global priority queue that
> "schedules" processing on vertices. As a simple test to better
> understand aggregator useage I ended up modifying the 
> SimpleShortestPathsVertex example and added the SumAggregator code
> from the SimplePageRankVertex example to it (Workercontext and
> compute()) (code posted below).
> Though this test code does not do anything useful I was surprised to
> see the following worker NullPointerExceptions during execution.
> 
> 2012-05-23 14:44:59,267 INFO
> org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs'
> truncater with mapRetainSize=-1 and reduceRetainSize=-1
> 2012-05-23 14:44:59,469 INFO org.apache.hadoop.io.nativeio.NativeIO:
> Initialized cache for UID to User mapping with a cache timeout of
> 14400 seconds.
> 2012-05-23 14:44:59,470 INFO org.apache.hadoop.io.nativeio.NativeIO:
> Got UserName hadoop00 for UID 508 from the native implementation
> 2012-05-23 14:44:59,472 WARN org.apache.hadoop.mapred.Child: Error running 
> child
> java.lang.NullPointerException
>     at 
> org.apache.giraph.examples.linda.LindaAggregatorTest.compute(LindaAggregatorTest.java:104)
>     at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:593)
>     at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:648)
>     at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763)
>     at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369)
>     at org.apache.hadoop.mapred.Child$4.run(Child.java:259)
>     at java.security.AccessController.doPrivileged(Native Method)
>     at javax.security.auth.Subject.doAs(Subject.java:396)
>     at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
>     at org.apache.hadoop.mapred.Child.main(Child.java:253)
> 2012-05-23 14:44:59,475 INFO org.apache.hadoop.mapred.Task: Runnning
> cleanup for the task
> 
> 
> So my question is. What are the pitfalls (method call order, setup,
> superstep count) of aggregator usage, as following the description in
> useAggregator did
> not seem to help, so I am obviously missing some detail.
> 
> JAVA Code:
> 
>  @Override
>  public void compute(Iterator<DoubleWritable> msgIterator) {
>  LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum");
>  if (getSuperstep() == 0) {
>    setVertexValue(new DoubleWritable(Double.MAX_VALUE));
>  }
>  double minDist = isSource() ? 0d : Double.MAX_VALUE;
>  while (msgIterator.hasNext()) {
>    minDist = Math.min(minDist, msgIterator.next().get());
>  }
>  if (LOG.isDebugEnabled()) {
>    LOG.debug("Vertex " + getVertexId() + " got minDist = " + minDist +
>        " vertex value = " + getVertexValue());
>  }
>  if (getSuperstep() >= 0) {
>      sumAggreg.aggregate(1L);      // NPE at Line 104
>  }
> 
>  if (minDist < getVertexValue().get()) {
>    setVertexValue(new DoubleWritable(minDist));
>    for (LongWritable targetVertexId : this) {
>      FloatWritable edgeValue = getEdgeValue(targetVertexId);
>      if (LOG.isDebugEnabled()) {
>        LOG.debug("Vertex " + getVertexId() + " sent to " +
>            targetVertexId + " = " +
>            (minDist + edgeValue.get()));
>      }
>      sendMsg(targetVertexId,
>          new DoubleWritable(minDist + edgeValue.get()));
>    }
>  }
>  voteToHalt();
>  }
> 
>  public static class MyVertexWorkerContext extends
>    WorkerContext {
>  /** Final sum value for verification for local jobs */
>  private static long FINAL_SUM;
> 
>  public static long getFinalSum() {
>    return FINAL_SUM;
>  }
> 
>  @Override
>  public void preApplication()
>    throws InstantiationException, IllegalAccessException {
>    registerAggregator("sum", LongSumAggregator.class);
>  }
> 
>  @Override
>  public void postApplication() {
>      System.out.println("PreApp");
>    LongSumAggregator sumAggreg =
>        (LongSumAggregator) getAggregator("sum");
> 
>    FINAL_SUM = sumAggreg.getAggregatedValue().get();
>    LOG.info("aggregatedNumVertices=" + FINAL_SUM);
>  }
> 
>  @Override
>  public void preSuperstep() {
>      System.out.println("PreSuperStep");
>    LongSumAggregator sumAggreg =
>        (LongSumAggregator) getAggregator("sum");
>    this.useAggregator("sum");
>    sumAggreg.setAggregatedValue(new LongWritable(0L));
>  }
> 
>  @Override
>  public void postSuperstep() {}
>  }
> 
> Rest as in SimpleShortestPathsVertex.
> 
> Regards,
> Nils

Reply via email to