Putting your code in a file I find the following on line 17:
stepAcc = new StepAccumulator();
However I don't think that was where the NPE was thrown.
Another thing I don't understand was that there were two addAccumulator()
calls at the top of stack trace while in your code I don't
see addAccumulator() calling itself.
FYI
On Mon, Aug 3, 2015 at 3:22 PM, Anubhav Agarwal <[email protected]> wrote:
> The code was written in 1.4 but I am compiling it and running it with 1.3.
>
> import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
> import org.apache.spark.AccumulableParam;
> import scala.Tuple4;
> import thomsonreuters.trailblazer.operation.DriverCalc;
> import thomsonreuters.trailblazer.operation.StepAccumulator;
>
> //Tuple4<Allocation StepIndex.IF_Position, DenKey, NumKey, Value> -
> Allocation Step Add
>
> class DriverAccumulator implements
> AccumulableParam<Object2ObjectOpenHashMap<String, StepAccumulator>,
> Tuple4<String, String, String, Double>> {
> private static final Object _lockObj = new Object();
>
> public Object2ObjectOpenHashMap<String, StepAccumulator>
> addAccumulator(Object2ObjectOpenHashMap<String, StepAccumulator>
> stepAccumulatorMap, Tuple4<String, String, String, Double> value) {
> if (value == null) return stepAccumulatorMap;
> synchronized (_lockObj) {
> StepAccumulator stepAcc = stepAccumulatorMap.get(value._1());
> if (stepAcc == null) {
> stepAcc = new StepAccumulator();
> stepAccumulatorMap.put(value._1(), stepAcc);
> }
> DriverCalc dc = stepAcc.stepRows.get(value._2());
> if (dc == null) {
> dc = new DriverCalc();
> dc._denominator = value._4();
> if (value._3() != null) dc._numerator.put(value._3(),
> value._4());
> stepAcc.stepRows.put(value._2(), dc);
> } else {
> dc._denominator = dc._denominator + value._4();
> if (value._3() != null) {
> Double val = dc._numerator.get(value._3());
> dc._numerator.put(value._3(), new Double(val != null ?
> val + value._4() : value._4()));
> }
> }
> }
> return stepAccumulatorMap;
> }
>
> public Object2ObjectOpenHashMap<String, StepAccumulator>
> addInPlace(Object2ObjectOpenHashMap<String, StepAccumulator> r1,
> Object2ObjectOpenHashMap<String, StepAccumulator> r2) {
> r2.forEach((k,v) -> r1.merge(k, v, this::mergeAcc));
> return r1;
> }
>
> private StepAccumulator mergeAcc(StepAccumulator source1,
> StepAccumulator source2) {
> source2.stepRows.forEach((k,v) -> source1.stepRows.merge(k, v,
> this::denominatorMerge));
> return source1;
> }
>
> private DriverCalc denominatorMerge(DriverCalc driverCalc1, DriverCalc
> driverCalc2) {
> driverCalc1._denominator = driverCalc1._denominator +
> driverCalc2._denominator;
> driverCalc2._numerator.forEach((k,v) ->
> driverCalc1._numerator.merge(k, v, this::numeratorMerge));
> return driverCalc1;
> }
>
> private Double numeratorMerge(Double d1, Double d2) {
> return d1 + d2;
> }
>
> public Object2ObjectOpenHashMap<String, StepAccumulator>
> zero(Object2ObjectOpenHashMap<String, StepAccumulator> initialValue) {
> return null;
> }
>
> }
>
> On Mon, Aug 3, 2015 at 6:20 PM, Ted Yu <[email protected]> wrote:
>
>> Can you show related code in DriverAccumulator.java ?
>>
>> Which Spark release do you use ?
>>
>> Cheers
>>
>> On Mon, Aug 3, 2015 at 3:13 PM, Anubhav Agarwal <[email protected]>
>> wrote:
>>
>>> Hi,
>>> I am trying to modify my code to use HDFS and multiple nodes. The code
>>> works fine when I run it locally in a single machine with a single worker.
>>> I have been trying to modify it and I get the following error. Any hint
>>> would be helpful.
>>>
>>> java.lang.NullPointerException
>>> at
>>> thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:17)
>>> at
>>> thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:11)
>>> at org.apache.spark.Accumulable.add(Accumulators.scala:73)
>>> at
>>> thomsonreuters.trailblazer.main.AllocationBolt.queueDriverRow(AllocationBolt.java:112)
>>> at
>>> thomsonreuters.trailblazer.main.AllocationBolt.executeRow(AllocationBolt.java:303)
>>> at
>>> thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:49)
>>> at
>>> thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:8)
>>> at
>>> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:996)
>>> at
>>> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90)
>>> at
>>> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90)
>>> at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
>>> at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
>>> at
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>> failed in write bolt execute null
>>> failed in write bolt execute null
>>> java.lang.NullPointerException
>>>
>>>
>>
>