Thanks Paolo, adding the line to my job setup fixed the issue! Without this configuration the aggregator methods such as aggregate(), getAggregatedValue() aso. kept throwing a NullPointerExceptions when called.
Nils -------- Original-Nachricht -------- > Datum: Mon, 28 May 2012 17:33:14 +0100 > Von: Paolo Castagna <castagna.li...@googlemail.com> > An: user@giraph.apache.org > Betreff: Re: correct usage of aggregators, pittfalls? > Hi Nils, > I am not 100%, but... > > Do you configure your GiraphJob propertly? > > You need to tell Giraph you want to use your MyVertexWorkerContext. > You can do that using the GiraphJob.setWorkerContextClass(...) method > or the 'wc' option for the command line. > > My 2 cents, > Paolo > > Nils Rethmeier wrote: > > Hello everyone, > > > > I ran into some issues while trying to figure out how to correctly use > > aggregators, since I want to implement a global priority queue that > > "schedules" processing on vertices. As a simple test to better > > understand aggregator useage I ended up modifying the > SimpleShortestPathsVertex example and added the SumAggregator code > > from the SimplePageRankVertex example to it (Workercontext and > > compute()) (code posted below). > > Though this test code does not do anything useful I was surprised to > > see the following worker NullPointerExceptions during execution. > > > > 2012-05-23 14:44:59,267 INFO > > org.apache.hadoop.mapred.TaskLogsTruncater: Initializing logs' > > truncater with mapRetainSize=-1 and reduceRetainSize=-1 > > 2012-05-23 14:44:59,469 INFO org.apache.hadoop.io.nativeio.NativeIO: > > Initialized cache for UID to User mapping with a cache timeout of > > 14400 seconds. > > 2012-05-23 14:44:59,470 INFO org.apache.hadoop.io.nativeio.NativeIO: > > Got UserName hadoop00 for UID 508 from the native implementation > > 2012-05-23 14:44:59,472 WARN org.apache.hadoop.mapred.Child: Error > running child > > java.lang.NullPointerException > > at > org.apache.giraph.examples.linda.LindaAggregatorTest.compute(LindaAggregatorTest.java:104) > > at org.apache.giraph.graph.GraphMapper.map(GraphMapper.java:593) > > at org.apache.giraph.graph.GraphMapper.run(GraphMapper.java:648) > > at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:763) > > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:369) > > at org.apache.hadoop.mapred.Child$4.run(Child.java:259) > > at java.security.AccessController.doPrivileged(Native Method) > > at javax.security.auth.Subject.doAs(Subject.java:396) > > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059) > > at org.apache.hadoop.mapred.Child.main(Child.java:253) > > 2012-05-23 14:44:59,475 INFO org.apache.hadoop.mapred.Task: Runnning > > cleanup for the task > > > > > > So my question is. What are the pitfalls (method call order, setup, > > superstep count) of aggregator usage, as following the description in > > useAggregator did > > not seem to help, so I am obviously missing some detail. > > > > JAVA Code: > > > > @Override > > public void compute(Iterator<DoubleWritable> msgIterator) { > > LongSumAggregator sumAggreg = (LongSumAggregator) getAggregator("sum"); > > if (getSuperstep() == 0) { > > setVertexValue(new DoubleWritable(Double.MAX_VALUE)); > > } > > double minDist = isSource() ? 0d : Double.MAX_VALUE; > > while (msgIterator.hasNext()) { > > minDist = Math.min(minDist, msgIterator.next().get()); > > } > > if (LOG.isDebugEnabled()) { > > LOG.debug("Vertex " + getVertexId() + " got minDist = " + minDist + > > " vertex value = " + getVertexValue()); > > } > > if (getSuperstep() >= 0) { > > sumAggreg.aggregate(1L); // NPE at Line 104 > > } > > > > if (minDist < getVertexValue().get()) { > > setVertexValue(new DoubleWritable(minDist)); > > for (LongWritable targetVertexId : this) { > > FloatWritable edgeValue = getEdgeValue(targetVertexId); > > if (LOG.isDebugEnabled()) { > > LOG.debug("Vertex " + getVertexId() + " sent to " + > > targetVertexId + " = " + > > (minDist + edgeValue.get())); > > } > > sendMsg(targetVertexId, > > new DoubleWritable(minDist + edgeValue.get())); > > } > > } > > voteToHalt(); > > } > > > > public static class MyVertexWorkerContext extends > > WorkerContext { > > /** Final sum value for verification for local jobs */ > > private static long FINAL_SUM; > > > > public static long getFinalSum() { > > return FINAL_SUM; > > } > > > > @Override > > public void preApplication() > > throws InstantiationException, IllegalAccessException { > > registerAggregator("sum", LongSumAggregator.class); > > } > > > > @Override > > public void postApplication() { > > System.out.println("PreApp"); > > LongSumAggregator sumAggreg = > > (LongSumAggregator) getAggregator("sum"); > > > > FINAL_SUM = sumAggreg.getAggregatedValue().get(); > > LOG.info("aggregatedNumVertices=" + FINAL_SUM); > > } > > > > @Override > > public void preSuperstep() { > > System.out.println("PreSuperStep"); > > LongSumAggregator sumAggreg = > > (LongSumAggregator) getAggregator("sum"); > > this.useAggregator("sum"); > > sumAggreg.setAggregatedValue(new LongWritable(0L)); > > } > > > > @Override > > public void postSuperstep() {} > > } > > > > Rest as in SimpleShortestPathsVertex. > > > > Regards, > > Nils > -- NEU: FreePhone 3-fach-Flat mit kostenlosem Smartphone! Jetzt informieren: http://mobile.1und1.de/?ac=OM.PW.PW003K20328T7073a