The code was written in 1.4 but I am compiling it and running it with 1.3.
import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
import org.apache.spark.AccumulableParam;
import scala.Tuple4;
import thomsonreuters.trailblazer.operation.DriverCalc;
import thomsonreuters.trailblazer.operation.StepAccumulator;
//Tuple4<Allocation StepIndex.IF_Position, DenKey, NumKey, Value> -
Allocation Step Add
class DriverAccumulator implements
AccumulableParam<Object2ObjectOpenHashMap<String, StepAccumulator>,
Tuple4<String, String, String, Double>> {
private static final Object _lockObj = new Object();
public Object2ObjectOpenHashMap<String, StepAccumulator>
addAccumulator(Object2ObjectOpenHashMap<String, StepAccumulator>
stepAccumulatorMap, Tuple4<String, String, String, Double> value) {
if (value == null) return stepAccumulatorMap;
synchronized (_lockObj) {
StepAccumulator stepAcc = stepAccumulatorMap.get(value._1());
if (stepAcc == null) {
stepAcc = new StepAccumulator();
stepAccumulatorMap.put(value._1(), stepAcc);
}
DriverCalc dc = stepAcc.stepRows.get(value._2());
if (dc == null) {
dc = new DriverCalc();
dc._denominator = value._4();
if (value._3() != null) dc._numerator.put(value._3(),
value._4());
stepAcc.stepRows.put(value._2(), dc);
} else {
dc._denominator = dc._denominator + value._4();
if (value._3() != null) {
Double val = dc._numerator.get(value._3());
dc._numerator.put(value._3(), new Double(val != null ?
val + value._4() : value._4()));
}
}
}
return stepAccumulatorMap;
}
public Object2ObjectOpenHashMap<String, StepAccumulator>
addInPlace(Object2ObjectOpenHashMap<String, StepAccumulator> r1,
Object2ObjectOpenHashMap<String, StepAccumulator> r2) {
r2.forEach((k,v) -> r1.merge(k, v, this::mergeAcc));
return r1;
}
private StepAccumulator mergeAcc(StepAccumulator source1,
StepAccumulator source2) {
source2.stepRows.forEach((k,v) -> source1.stepRows.merge(k, v,
this::denominatorMerge));
return source1;
}
private DriverCalc denominatorMerge(DriverCalc driverCalc1, DriverCalc
driverCalc2) {
driverCalc1._denominator = driverCalc1._denominator +
driverCalc2._denominator;
driverCalc2._numerator.forEach((k,v) ->
driverCalc1._numerator.merge(k, v, this::numeratorMerge));
return driverCalc1;
}
private Double numeratorMerge(Double d1, Double d2) {
return d1 + d2;
}
public Object2ObjectOpenHashMap<String, StepAccumulator>
zero(Object2ObjectOpenHashMap<String, StepAccumulator> initialValue) {
return null;
}
}
On Mon, Aug 3, 2015 at 6:20 PM, Ted Yu <[email protected]> wrote:
> Can you show related code in DriverAccumulator.java ?
>
> Which Spark release do you use ?
>
> Cheers
>
> On Mon, Aug 3, 2015 at 3:13 PM, Anubhav Agarwal <[email protected]>
> wrote:
>
>> Hi,
>> I am trying to modify my code to use HDFS and multiple nodes. The code
>> works fine when I run it locally in a single machine with a single worker.
>> I have been trying to modify it and I get the following error. Any hint
>> would be helpful.
>>
>> java.lang.NullPointerException
>> at
>> thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:17)
>> at
>> thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:11)
>> at org.apache.spark.Accumulable.add(Accumulators.scala:73)
>> at
>> thomsonreuters.trailblazer.main.AllocationBolt.queueDriverRow(AllocationBolt.java:112)
>> at
>> thomsonreuters.trailblazer.main.AllocationBolt.executeRow(AllocationBolt.java:303)
>> at
>> thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:49)
>> at
>> thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:8)
>> at
>> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:996)
>> at
>> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90)
>> at
>> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90)
>> at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
>> at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
>> at
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> at org.apache.spark.scheduler.Task.run(Task.scala:64)
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> failed in write bolt execute null
>> failed in write bolt execute null
>> java.lang.NullPointerException
>>
>>
>