The code was written in 1.4 but I am compiling it and running it with 1.3.

import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap;
import org.apache.spark.AccumulableParam;
import scala.Tuple4;
import thomsonreuters.trailblazer.operation.DriverCalc;
import thomsonreuters.trailblazer.operation.StepAccumulator;

//Tuple4<Allocation StepIndex.IF_Position, DenKey, NumKey, Value> -
Allocation Step Add

class DriverAccumulator implements
AccumulableParam<Object2ObjectOpenHashMap<String, StepAccumulator>,
Tuple4<String, String, String, Double>> {
    private static final Object _lockObj = new Object();

    public Object2ObjectOpenHashMap<String, StepAccumulator>
addAccumulator(Object2ObjectOpenHashMap<String, StepAccumulator>
stepAccumulatorMap, Tuple4<String, String, String, Double> value) {
        if (value == null) return stepAccumulatorMap;
        synchronized (_lockObj) {
            StepAccumulator stepAcc = stepAccumulatorMap.get(value._1());
            if (stepAcc == null) {
                stepAcc = new StepAccumulator();
                stepAccumulatorMap.put(value._1(), stepAcc);
            }
            DriverCalc dc = stepAcc.stepRows.get(value._2());
            if (dc == null) {
                dc = new DriverCalc();
                dc._denominator = value._4();
                if (value._3() != null) dc._numerator.put(value._3(),
value._4());
                stepAcc.stepRows.put(value._2(), dc);
            } else {
                dc._denominator = dc._denominator + value._4();
                if (value._3() != null) {
                    Double val = dc._numerator.get(value._3());
                    dc._numerator.put(value._3(), new Double(val != null ?
val + value._4() : value._4()));
                }
            }
        }
        return stepAccumulatorMap;
    }

    public Object2ObjectOpenHashMap<String, StepAccumulator>
addInPlace(Object2ObjectOpenHashMap<String, StepAccumulator> r1,
Object2ObjectOpenHashMap<String, StepAccumulator> r2) {
        r2.forEach((k,v) -> r1.merge(k, v, this::mergeAcc));
        return r1;
    }

    private StepAccumulator mergeAcc(StepAccumulator source1,
StepAccumulator source2) {
        source2.stepRows.forEach((k,v) -> source1.stepRows.merge(k, v,
this::denominatorMerge));
        return source1;
    }

    private DriverCalc denominatorMerge(DriverCalc driverCalc1, DriverCalc
driverCalc2) {
        driverCalc1._denominator = driverCalc1._denominator +
driverCalc2._denominator;
        driverCalc2._numerator.forEach((k,v) ->
driverCalc1._numerator.merge(k, v, this::numeratorMerge));
        return driverCalc1;
    }

    private Double numeratorMerge(Double d1, Double d2) {
        return d1 + d2;
    }

    public Object2ObjectOpenHashMap<String, StepAccumulator>
zero(Object2ObjectOpenHashMap<String, StepAccumulator> initialValue) {
        return null;
    }

}

On Mon, Aug 3, 2015 at 6:20 PM, Ted Yu <yuzhih...@gmail.com> wrote:

> Can you show related code in DriverAccumulator.java ?
>
> Which Spark release do you use ?
>
> Cheers
>
> On Mon, Aug 3, 2015 at 3:13 PM, Anubhav Agarwal <anubha...@gmail.com>
> wrote:
>
>> Hi,
>> I am trying to modify my code to use HDFS and multiple nodes. The code
>> works fine when I run it locally in a single machine with a single worker.
>> I have been trying to modify it and I get the following error. Any hint
>> would be helpful.
>>
>> java.lang.NullPointerException
>>      at 
>> thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:17)
>>      at 
>> thomsonreuters.trailblazer.main.DriverAccumulator.addAccumulator(DriverAccumulator.java:11)
>>      at org.apache.spark.Accumulable.add(Accumulators.scala:73)
>>      at 
>> thomsonreuters.trailblazer.main.AllocationBolt.queueDriverRow(AllocationBolt.java:112)
>>      at 
>> thomsonreuters.trailblazer.main.AllocationBolt.executeRow(AllocationBolt.java:303)
>>      at 
>> thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:49)
>>      at 
>> thomsonreuters.trailblazer.main.FileMapFunction.call(FileMapFunction.java:8)
>>      at 
>> org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(JavaPairRDD.scala:996)
>>      at 
>> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90)
>>      at 
>> org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:90)
>>      at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
>>      at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
>>      at 
>> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>>      at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
>>      at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)
>>      at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
>>      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>>      at org.apache.spark.scheduler.Task.run(Task.scala:64)
>>      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>      at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>      at java.lang.Thread.run(Thread.java:745)
>>
>> failed in write bolt execute null
>> failed in write bolt execute null
>> java.lang.NullPointerException
>>
>>
>

Reply via email to