There was a similar discussion over here http://mail-archives.us.apache.org/mod_mbox/spark-user/201411.mbox/%3ccakz4c0s_cuo90q2jxudvx9wc4fwu033kx3-fjujytxxhr7p...@mail.gmail.com%3E
Thanks Best Regards On Fri, May 1, 2015 at 7:12 PM, Todd Nist <tsind...@gmail.com> wrote: > *Resending as I do not see that this made it to the mailing list, sorry if > in fact it did an is just nor reflected online yet.* > > I’m very perplexed with the following. I have a set of AVRO generated > objects that are sent to a SparkStreaming job via Kafka. The SparkStreaming > job follows the receiver-based approach. I am encountering the below > error when I attempt to de serialize the payload: > > 15/04/30 17:49:25 INFO MapOutputTrackerMasterActor: Asked to send map output > locations for shuffle 9 to sparkExecutor@192.168.1.3:6105115/04/30 17:49:25 > INFO MapOutputTrackerMaster: Size of output statuses for shuffle 9 is 140 > bytes15/04/30 17:49:25 ERROR TaskResultGetter: Exception while getting task > resultcom.esotericsoftware.kryo.KryoException: java.lang.NullPointerException > Serialization trace: > relations (com.opsdatastore.model.ObjectDetails) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) > at > com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) > at > com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729) > at > org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173) > at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79) > at > org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621) > at > org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51) > at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618) > at > org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.NullPointerException > at org.apache.avro.generic.GenericData$Array.add(GenericData.java:200) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109) > at > com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18) > at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648) > at > com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) > ... 17 more15/04/30 17:49:25 INFO TaskSchedulerImpl: Removed TaskSet 20.0, > whose tasks have all completed, from pool > > Basic code looks like this. > > Register the class with Kryo as follows: > > val sc = new SparkConf(true) > .set("spark.streaming.unpersist", "true") > .setAppName("StreamingKafkaConsumer") > .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") > > // register all related AVRO generated classes > sc.registerKryoClasses(Array( > classOf[ConfigurationProperty], > classOf[Event], > classOf[Identifier], > classOf[Metric], > classOf[ObjectDetails], > classOf[Relation], > classOf[RelationProperty] > )) > > Use the receiver based approach to consume messages from Kafka: > > val messages = KafkaUtils.createStream[Array[Byte], Array[Byte], > DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topics, storageLevel) > > Now process the received messages: > > val raw = messages.map(_._2) > val dStream = raw.map( > byte => { > // Avro Decoder > println("Byte length: " + byte.length) > val decoder = new AvroDecoder[ObjectDetails](schema = > ObjectDetails.getClassSchema) > val message = decoder.fromBytes(byte) > println(s"AvroMessage : Type : ${message.getType}, Payload : $message") > message > } > ) > > When i look in the logs of the workers, in standard out i can se the > messages being printed, in fact I’m even able to access the Type field with > out issue: > > Byte length: 315 > AvroMessage : Type : Storage, Payload : {"name": "Storage 1", "type": > "Storage", "vendor": "6274g51cbkmkqisk", "model": "lk95hqk9m10btaot", > "timestamp": 1430428565141, "identifiers": {"ID": {"name": "ID", "value": > "Storage-1"}}, "configuration": null, "metrics": {"Disk Space Usage (GB)": > {"name": "Disk Space Usage (GB)", "source": "Generated", "values": > {"1430428565356": {"timestamp": 1430428565356, "value": 42.55948347907833}}}, > "Disk Space Capacity (GB)": {"name": "Disk Space Capacity (GB)", "source": > "Generated", "values": {"1430428565356": {"timestamp": 1430428565356, > "value": 38.980024705429095}}}}, "relations": [{"type": "parent", > "object_type": "Virtual Machine", "properties": {"ID": {"name": "ID", > "value": "Virtual Machine-1"}}}], "events": [], "components": []} > > The ObjectDetails which is generated from AVRO, has a relations field > which is of type java.util.List: > > /** > * Gets the value of the 'relations' field. > */ > public java.util.List<com.opsdatastore.model.Relation> getRelations() { > return relations; > } > > So I’m at a loss at the moment as to what is causing the above NPE and > exactly how to address this. I’m even more confused as I appear to be able > to access the deserialized message within the raw.map(...) as shown above. > > Is there something special I need to do for the List? Am I missing > something obvious here? If so an example would be appreciated. > > TIA for the assistance as I am stumped at the moment. > > -Todd >