Hi,

I am getting Nullpointer exception due to Kryo Serialization issue, while
trying to read a BiMap broadcast variable. Attached is the code snippets.
Pointers shared here didn't help - link1
<http://stackoverflow.com/questions/33156095/spark-serialization-issue-with-hashmap>,
link2
<http://stackoverflow.com/questions/23962796/kryo-readobject-cause-nullpointerexception-with-arraylist>.
Spark version used is 1.6.x, but this was working with 1.3.x version.

Any help in this regard is much appreciated.

Exception:

com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
App > Serialization trace:
App > value (com.demo.BiMapWrapper)
App > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1238)
App > at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
App > at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
App > at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
App > at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
App > at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
App > at
com.manthan.aaas.algo.associationmining.impl.Test.lambda$execute$6abf5fd0$1(Test.java:39)
App > at
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1015)
App > at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
App > at scala.collection.Iterator$class.foreach(Iterator.scala:727)
App > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
App > at
scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
App > at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
App > at
scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
App > at scala.collection.TraversableOnce$class.to
(TraversableOnce.scala:273)
App > at scala.collection.AbstractIterator.to(Iterator.scala:1157)
App > at
scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
App > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
App > at
scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
App > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
App > at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
App > at
org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
App > at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1978)
App > at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1978)
App > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
App > at org.apache.spark.scheduler.Task.run(Task.scala:89)
App > at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
App > at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
App > at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
App > at java.lang.Thread.run(Thread.java:745)
App > Caused by: com.esotericsoftware.kryo.KryoException:
java.lang.NullPointerException
App > Serialization trace:
App > value (com.demo.BiMapWrapper)
App > at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
App > at
com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
App > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
App > at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:228)
App > at
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:217)
App > at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:178)
App > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1231)
App > ... 29 more
App > Caused by: java.lang.NullPointerException
App > at com.google.common.collect.HashBiMap.seekByKey(HashBiMap.java:180)
App > at com.google.common.collect.HashBiMap.put(HashBiMap.java:230)
App > at com.google.common.collect.HashBiMap.put(HashBiMap.java:218)
App > at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:135)
App > at
com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:17)
App > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
App > at
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
App > ... 35 more
App >
App > 16/11/02 18:39:01 dispatcher-event-loop-2 INFO TaskSetManager:
Starting task 17.1 in stage 1.0 (TID 19, ip-10-0-1-237.ec2.internal,
partition 17,PROCESS_LOCAL, 2076 bytes)
App > 16/11/02 18:39:01 task-result-getter-3 INFO TaskSetManager: Lost task
17.1 in stage 1.0 (TID 19) on executor ip-10-0-1-237.ec2.internal:
java.io.IOException (com.esotericsoftware.kryo.KryoException:
java.lang.NullPointerException
App > Serialization trace:
App > value (com.demo.BiMapWrapper)) [duplicate 1]
App > 16/11/02 18:39:01 dispatcher-event-loop-4 INFO TaskSetManager:
Starting task 17.2 in stage 1.0 (TID 20, ip-10-0-1-237.ec2.internal,
partition 17,PROCESS_LOCAL, 2076 bytes)
App > 16/11/02 18:39:01 task-result-getter-0 INFO TaskSetManager: Lost task
17.2 in stage 1.0 (TID 20) on executor ip-10-0-1-237.ec2.internal:
java.io.IOException (com.esotericsoftware.kryo.KryoException:
java.lang.NullPointerException
App > Serialization trace:
App > value (com.demo.BiMapWrapper)) [duplicate 2]
App > 16/11/02 18:39:01 dispatcher-event-loop-1 INFO TaskSetManager:
Starting task 17.3 in stage 1.0 (TID 21, ip-10-0-1-237.ec2.internal,
partition 17,PROCESS_LOCAL, 2076 bytes)
App > 16/11/02 18:39:01 task-result-getter-1 INFO TaskSetManager: Lost task
17.3 in stage 1.0 (TID 21) on executor ip-10-0-1-237.ec2.internal:
java.io.IOException (com.esotericsoftware.kryo.KryoException:
java.lang.NullPointerException
App > Serialization trace:
App > value (com.demo.BiMapWrapper)) [duplicate 3]

Best,
Kalpana
public class Test {
        BiMapWrapper biMapWrapper;
        
        

        public Test() {
                BiMap<String, Long> biMap = HashBiMap.create();
                biMap.put("k1", 1L);
                biMap.put("k2", 2L);
                this.biMapWrapper = new BiMapWrapper(biMap);
        }



        public void execute() {
                
                Broadcast<BiMapWrapper> broadcast = 
CurrentContext.getCurrentContext().broadcast(biMapWrapper);
                
                List<String> temp = new ArrayList<String>();
                temp.add("k1");
                
                JavaRDD<String> tempRDD = 
CurrentContext.getCurrentContext().parallelize(temp);
                
                JavaRDD<Long> tempRDD2 = tempRDD.map(item -> {
                        
                        if(broadcast.value().getVal().containsKey(item)){
                                return 1L;
                        }
                        return 0L;
                });
                
                System.out.println("tempRDD2 :: " + tempRDD2.collect());
                
        }

}

======================


public class BiMapWrapper {

        BiMap<String, Long> value = HashBiMap.create();

        public BiMapWrapper(BiMap<String, Long> value) {
                this.value.putAll(value);
        }
        
        public BiMap<String, Long> getVal(){
                return value;
        }
}

====================

public class MyKryoRegistrator  implements KryoRegistrator, Serializable{
         @Override
          public void registerClasses(Kryo kryo) {
              // Product POJO associated to a product Row from the DataFrame    
  
                 kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
              kryo.register(BiMapWrapper.class); 
          }
         
         
}
---------------------------------------------------------------------
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org

Reply via email to