The code was written in 1.4 but I am compiling it and running it with 1.3. import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap; import org.apache.spark.AccumulableParam; import scala.Tuple4; import thomsonreuters.trailblazer.operation.DriverCalc; import thomsonreuters.trailblazer.operation.StepAccumulator;
//Tuple4<Allocation StepIndex.IF_Position, DenKey, NumKey, Value> - Allocation Step Add class DriverAccumulator implements AccumulableParam<Object2ObjectOpenHashMap<String, StepAccumulator>, Tuple4<String, String, String, Double>> { private static final Object _lockObj = new Object(); public Object2ObjectOpenHashMap<String, StepAccumulator> addAccumulator(Object2ObjectOpenHashMap<String, StepAccumulator> stepAccumulatorMap, Tuple4<String, String, String, Double> value) { if (value == null) return stepAccumulatorMap; synchronized (_lockObj) { StepAccumulator stepAcc = stepAccumulatorMap.get(value._1()); if (stepAcc == null) { stepAcc = new StepAccumulator(); stepAccumulatorMap.put(value._1(), stepAcc); } DriverCalc dc = stepAcc.stepRows.get(value._2()); if (dc == null) { dc = new DriverCalc(); dc._denominator = value._4(); if (value._3() != null) dc._numerator.put(value._3(), value._4()); stepAcc.stepRows.put(value._2(), dc); } else { dc._denominator = dc._denominator + value._4(); if (value._3() != null) { Double val = dc._numerator.get(value._3()); dc._numerator.put(value._3(), new Double(val != null ? val + value._4() : value._4())); } } } return stepAccumulatorMap; } public Object2ObjectOpenHashMap<String, StepAccumulator> addInPlace(Object2ObjectOpenHashMap<String, StepAccumulator> r1, Object2ObjectOpenHashMap<String, StepAccumulator> r2) { r2.forEach((k,v) -> r1.merge(k, v, this::mergeAcc)); return r1; } private StepAccumulator mergeAcc(StepAccumulator source1, StepAccumulator source2) { source2.stepRows.forEach((k,v) -> source1.stepRows.merge(k, v, this::denominatorMerge)); return source1; } private DriverCalc denominatorMerge(DriverCalc driverCalc1, DriverCalc driverCalc2) { driverCalc1._denominator = driverCalc1._denominator + driverCalc2._denominator; driverCalc2._numerator.forEach((k,v) -> driverCalc1._numerator.merge(k, v, this::numeratorMerge)); return driverCalc1; } private Double numeratorMerge(Double d1, Double d2) { return d1 + d2; } public Object2ObjectOpenHashMap<String, StepAccumulator> zero(Object2ObjectOpenHashMap<String, StepAccumulator> initialValue) { return null; } } On Mon, Aug 3, 2015 at 6:20 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Can you show related code in DriverAccumulator.java ? > > Which Spark release do you use ? > > Cheers > > On Mon, Aug 3, 2015 at 3:13 PM, Anubhav Agarwal <anubha...@gmail.com> > wrote: > >> Hi, >> I am trying to modify my code to use HDFS and multiple nodes. The code >> works fine when I run it locally in a single machine with a single worker. >> I have been trying to modify it and I get the following error. Any hint >> would be helpful. >> >> java.lang.NullPointerException >> at >> thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:17) >> at >> thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:11) >> at org.apache.spark.Accumulable.add(Accumulators.scala:73) >> at >> thomsonreuters.trailblazer.main.AllocationBolt.queueDriverRow(AllocationBolt.java:112) >> at >> thomsonreuters.trailblazer.main.AllocationBolt.executeRow(AllocationBolt.java:303) >> at >> thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:49) >> at >> thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:8) >> at >> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:996) >> at >> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90) >> at >> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90) >> at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) >> at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647) >> at >> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) >> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) >> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) >> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) >> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) >> at org.apache.spark.scheduler.Task.run(Task.scala:64) >> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> at java.lang.Thread.run(Thread.java:745) >> >> failed in write bolt execute null >> failed in write bolt execute null >> java.lang.NullPointerException >> >> >