There was a similar discussion over here
http://mail-archives.us.apache.org/mod_mbox/spark-user/201411.mbox/%3ccakz4c0s_cuo90q2jxudvx9wc4fwu033kx3-fjujytxxhr7p...@mail.gmail.com%3E

Thanks
Best Regards

On Fri, May 1, 2015 at 7:12 PM, Todd Nist <tsind...@gmail.com> wrote:

> *Resending as I do not see that this made it to the mailing list, sorry if
> in fact it did an is just nor reflected online yet.*
>
> I’m very perplexed with the following. I have a set of AVRO generated
> objects that are sent to a SparkStreaming job via Kafka. The SparkStreaming
> job follows the receiver-based approach. I am encountering the below
> error when I attempt to de serialize the payload:
>
> 15/04/30 17:49:25 INFO MapOutputTrackerMasterActor: Asked to send map output 
> locations for shuffle 9 to sparkExecutor@192.168.1.3:6105115/04/30 17:49:25 
> INFO MapOutputTrackerMaster: Size of output statuses for shuffle 9 is 140 
> bytes15/04/30 17:49:25 ERROR TaskResultGetter: Exception while getting task 
> resultcom.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
> Serialization trace:
> relations (com.opsdatastore.model.ObjectDetails)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
> at 
> com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
> at 
> org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:173)
> at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:79)
> at 
> org.apache.spark.scheduler.TaskSetManager.handleSuccessfulTask(TaskSetManager.scala:621)
> at 
> org.apache.spark.scheduler.TaskSchedulerImpl.handleSuccessfulTask(TaskSchedulerImpl.scala:379)
> at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:82)
> at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
> at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1618)
> at 
> org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NullPointerException
> at org.apache.avro.generic.GenericData$Array.add(GenericData.java:200)
> at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:109)
> at 
> com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:18)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
> at 
> com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
> ... 17 more15/04/30 17:49:25 INFO TaskSchedulerImpl: Removed TaskSet 20.0, 
> whose tasks have all completed, from pool
>
> Basic code looks like this.
>
> Register the class with Kryo as follows:
>
> val sc = new SparkConf(true)
>   .set("spark.streaming.unpersist", "true")
>   .setAppName("StreamingKafkaConsumer")
>   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>
> // register all related AVRO generated classes
> sc.registerKryoClasses(Array(
>     classOf[ConfigurationProperty],
>     classOf[Event],
>     classOf[Identifier],
>     classOf[Metric],
>     classOf[ObjectDetails],
>     classOf[Relation],
>     classOf[RelationProperty]
>     ))
>
> Use the receiver based approach to consume messages from Kafka:
>
>  val messages = KafkaUtils.createStream[Array[Byte], Array[Byte], 
> DefaultDecoder, DefaultDecoder](ssc, kafkaParams, topics, storageLevel)
>
> Now process the received messages:
>
> val raw = messages.map(_._2)
> val dStream = raw.map(
>   byte => {
>     // Avro Decoder
>     println("Byte length: " + byte.length)
>     val decoder = new AvroDecoder[ObjectDetails](schema = 
> ObjectDetails.getClassSchema)
>     val message = decoder.fromBytes(byte)
>     println(s"AvroMessage : Type : ${message.getType}, Payload : $message")
>     message
>   }
> )
>
> When i look in the logs of the workers, in standard out i can se the
> messages being printed, in fact I’m even able to access the Type field with
> out issue:
>
> Byte length: 315
> AvroMessage : Type : Storage, Payload : {"name": "Storage 1", "type": 
> "Storage", "vendor": "6274g51cbkmkqisk", "model": "lk95hqk9m10btaot", 
> "timestamp": 1430428565141, "identifiers": {"ID": {"name": "ID", "value": 
> "Storage-1"}}, "configuration": null, "metrics": {"Disk Space Usage (GB)": 
> {"name": "Disk Space Usage (GB)", "source": "Generated", "values": 
> {"1430428565356": {"timestamp": 1430428565356, "value": 42.55948347907833}}}, 
> "Disk Space Capacity (GB)": {"name": "Disk Space Capacity (GB)", "source": 
> "Generated", "values": {"1430428565356": {"timestamp": 1430428565356, 
> "value": 38.980024705429095}}}}, "relations": [{"type": "parent", 
> "object_type": "Virtual Machine", "properties": {"ID": {"name": "ID", 
> "value": "Virtual Machine-1"}}}], "events": [], "components": []}
>
> The ObjectDetails which is generated from AVRO, has a relations field
> which is of type java.util.List:
>
>  /**
>    * Gets the value of the 'relations' field.
>    */
>   public java.util.List<com.opsdatastore.model.Relation> getRelations() {
>     return relations;
>   }
>
> So I’m at a loss at the moment as to what is causing the above NPE and
> exactly how to address this. I’m even more confused as I appear to be able
> to access the deserialized message within the raw.map(...) as shown above.
>
> Is there something special I need to do for the List? Am I missing
> something obvious here?  If so an example would be appreciated.
>
> TIA for the assistance as I am stumped at the moment.
>
> -Todd
>

Reply via email to