Putting your code in a file I find the following on line 17: stepAcc = new StepAccumulator(); However I don't think that was where the NPE was thrown.
Another thing I don't understand was that there were two addAccumulator() calls at the top of stack trace while in your code I don't see addAccumulator() calling itself. FYI On Mon, Aug 3, 2015 at 3:22 PM, Anubhav Agarwal <anubha...@gmail.com> wrote: > The code was written in 1.4 but I am compiling it and running it with 1.3. > > import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap; > import org.apache.spark.AccumulableParam; > import scala.Tuple4; > import thomsonreuters.trailblazer.operation.DriverCalc; > import thomsonreuters.trailblazer.operation.StepAccumulator; > > //Tuple4<Allocation StepIndex.IF_Position, DenKey, NumKey, Value> - > Allocation Step Add > > class DriverAccumulator implements > AccumulableParam<Object2ObjectOpenHashMap<String, StepAccumulator>, > Tuple4<String, String, String, Double>> { > private static final Object _lockObj = new Object(); > > public Object2ObjectOpenHashMap<String, StepAccumulator> > addAccumulator(Object2ObjectOpenHashMap<String, StepAccumulator> > stepAccumulatorMap, Tuple4<String, String, String, Double> value) { > if (value == null) return stepAccumulatorMap; > synchronized (_lockObj) { > StepAccumulator stepAcc = stepAccumulatorMap.get(value._1()); > if (stepAcc == null) { > stepAcc = new StepAccumulator(); > stepAccumulatorMap.put(value._1(), stepAcc); > } > DriverCalc dc = stepAcc.stepRows.get(value._2()); > if (dc == null) { > dc = new DriverCalc(); > dc._denominator = value._4(); > if (value._3() != null) dc._numerator.put(value._3(), > value._4()); > stepAcc.stepRows.put(value._2(), dc); > } else { > dc._denominator = dc._denominator + value._4(); > if (value._3() != null) { > Double val = dc._numerator.get(value._3()); > dc._numerator.put(value._3(), new Double(val != null ? > val + value._4() : value._4())); > } > } > } > return stepAccumulatorMap; > } > > public Object2ObjectOpenHashMap<String, StepAccumulator> > addInPlace(Object2ObjectOpenHashMap<String, StepAccumulator> r1, > Object2ObjectOpenHashMap<String, StepAccumulator> r2) { > r2.forEach((k,v) -> r1.merge(k, v, this::mergeAcc)); > return r1; > } > > private StepAccumulator mergeAcc(StepAccumulator source1, > StepAccumulator source2) { > source2.stepRows.forEach((k,v) -> source1.stepRows.merge(k, v, > this::denominatorMerge)); > return source1; > } > > private DriverCalc denominatorMerge(DriverCalc driverCalc1, DriverCalc > driverCalc2) { > driverCalc1._denominator = driverCalc1._denominator + > driverCalc2._denominator; > driverCalc2._numerator.forEach((k,v) -> > driverCalc1._numerator.merge(k, v, this::numeratorMerge)); > return driverCalc1; > } > > private Double numeratorMerge(Double d1, Double d2) { > return d1 + d2; > } > > public Object2ObjectOpenHashMap<String, StepAccumulator> > zero(Object2ObjectOpenHashMap<String, StepAccumulator> initialValue) { > return null; > } > > } > > On Mon, Aug 3, 2015 at 6:20 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> Can you show related code in DriverAccumulator.java ? >> >> Which Spark release do you use ? >> >> Cheers >> >> On Mon, Aug 3, 2015 at 3:13 PM, Anubhav Agarwal <anubha...@gmail.com> >> wrote: >> >>> Hi, >>> I am trying to modify my code to use HDFS and multiple nodes. The code >>> works fine when I run it locally in a single machine with a single worker. >>> I have been trying to modify it and I get the following error. Any hint >>> would be helpful. >>> >>> java.lang.NullPointerException >>> at >>> thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:17) >>> at >>> thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:11) >>> at org.apache.spark.Accumulable.add(Accumulators.scala:73) >>> at >>> thomsonreuters.trailblazer.main.AllocationBolt.queueDriverRow(AllocationBolt.java:112) >>> at >>> thomsonreuters.trailblazer.main.AllocationBolt.executeRow(AllocationBolt.java:303) >>> at >>> thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:49) >>> at >>> thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:8) >>> at >>> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:996) >>> at >>> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90) >>> at >>> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90) >>> at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) >>> at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) >>> at >>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) >>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) >>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >>> at org.apache.spark.scheduler.Task.run(Task.scala:64) >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> failed in write bolt execute null >>> failed in write bolt execute null >>> java.lang.NullPointerException >>> >>> >> >