Putting your code in a file I find the following on line 17:
                stepAcc = new StepAccumulator();
However I don't think that was where the NPE was thrown.

Another thing I don't understand was that there were two addAccumulator()
calls at the top of stack trace while in your code I don't
see addAccumulator() calling itself.

FYI

On Mon, Aug 3, 2015 at 3:22 PM, Anubhav Agarwal <anubha...@gmail.com> wrote:

> The code was written in 1.4 but I am compiling it and running it with 1.3.
>
> import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
> import org.apache.spark.AccumulableParam;
> import scala.Tuple4;
> import thomsonreuters.trailblazer.operation.DriverCalc;
> import thomsonreuters.trailblazer.operation.StepAccumulator;
>
> //Tuple4<Allocation StepIndex.IF_Position, DenKey, NumKey, Value> -
> Allocation Step Add
>
> class DriverAccumulator implements
> AccumulableParam<Object2ObjectOpenHashMap<String, StepAccumulator>,
> Tuple4<String, String, String, Double>> {
>     private static final Object _lockObj = new Object();
>
>     public Object2ObjectOpenHashMap<String, StepAccumulator>
> addAccumulator(Object2ObjectOpenHashMap<String, StepAccumulator>
> stepAccumulatorMap, Tuple4<String, String, String, Double> value) {
>         if (value == null) return stepAccumulatorMap;
>         synchronized (_lockObj) {
>             StepAccumulator stepAcc = stepAccumulatorMap.get(value._1());
>             if (stepAcc == null) {
>                 stepAcc = new StepAccumulator();
>                 stepAccumulatorMap.put(value._1(), stepAcc);
>             }
>             DriverCalc dc = stepAcc.stepRows.get(value._2());
>             if (dc == null) {
>                 dc = new DriverCalc();
>                 dc._denominator = value._4();
>                 if (value._3() != null) dc._numerator.put(value._3(),
> value._4());
>                 stepAcc.stepRows.put(value._2(), dc);
>             } else {
>                 dc._denominator = dc._denominator + value._4();
>                 if (value._3() != null) {
>                     Double val = dc._numerator.get(value._3());
>                     dc._numerator.put(value._3(), new Double(val != null ?
> val + value._4() : value._4()));
>                 }
>             }
>         }
>         return stepAccumulatorMap;
>     }
>
>     public Object2ObjectOpenHashMap<String, StepAccumulator>
> addInPlace(Object2ObjectOpenHashMap<String, StepAccumulator> r1,
> Object2ObjectOpenHashMap<String, StepAccumulator> r2) {
>         r2.forEach((k,v) -> r1.merge(k, v, this::mergeAcc));
>         return r1;
>     }
>
>     private StepAccumulator mergeAcc(StepAccumulator source1,
> StepAccumulator source2) {
>         source2.stepRows.forEach((k,v) -> source1.stepRows.merge(k, v,
> this::denominatorMerge));
>         return source1;
>     }
>
>     private DriverCalc denominatorMerge(DriverCalc driverCalc1, DriverCalc
> driverCalc2) {
>         driverCalc1._denominator = driverCalc1._denominator +
> driverCalc2._denominator;
>         driverCalc2._numerator.forEach((k,v) ->
> driverCalc1._numerator.merge(k, v, this::numeratorMerge));
>         return driverCalc1;
>     }
>
>     private Double numeratorMerge(Double d1, Double d2) {
>         return d1 + d2;
>     }
>
>     public Object2ObjectOpenHashMap<String, StepAccumulator>
> zero(Object2ObjectOpenHashMap<String, StepAccumulator> initialValue) {
>         return null;
>     }
>
> }
>
> On Mon, Aug 3, 2015 at 6:20 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Can you show related code in DriverAccumulator.java ?
>>
>> Which Spark release do you use ?
>>
>> Cheers
>>
>> On Mon, Aug 3, 2015 at 3:13 PM, Anubhav Agarwal <anubha...@gmail.com>
>> wrote:
>>
>>> Hi,
>>> I am trying to modify my code to use HDFS and multiple nodes. The code
>>> works fine when I run it locally in a single machine with a single worker.
>>> I have been trying to modify it and I get the following error. Any hint
>>> would be helpful.
>>>
>>> java.lang.NullPointerException
>>>     at 
>>> thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:17)
>>>     at 
>>> thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:11)
>>>     at org.apache.spark.Accumulable.add(Accumulators.scala:73)
>>>     at 
>>> thomsonreuters.trailblazer.main.AllocationBolt.queueDriverRow(AllocationBolt.java:112)
>>>     at 
>>> thomsonreuters.trailblazer.main.AllocationBolt.executeRow(AllocationBolt.java:303)
>>>     at 
>>> thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:49)
>>>     at 
>>> thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:8)
>>>     at 
>>> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:996)
>>>     at 
>>> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90)
>>>     at 
>>> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90)
>>>     at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
>>>     at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
>>>     at 
>>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>>     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>>     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>>>     at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>>     at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>     at java.lang.Thread.run(Thread.java:745)
>>>
>>> failed in write bolt execute null
>>> failed in write bolt execute null
>>> java.lang.NullPointerException
>>>
>>>
>>
>

Reply via email to