[ 
https://issues.apache.org/jira/browse/SPARK-3601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hyukjin Kwon resolved SPARK-3601.
---------------------------------
    Resolution: Incomplete

> Kryo NPE for output operations on Avro complex Objects even after registering.
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-3601
>                 URL: https://issues.apache.org/jira/browse/SPARK-3601
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.0.0
>         Environment: local, standalone cluster
>            Reporter: mohan gaddam
>            Priority: Major
>              Labels: bulk-closed
>
> Kryo serializer works well when avro objects has simple data. but when the 
> same avro object has complex data(like unions/arrays) kryo fails while output 
> operations. but mappings are good. Note that i have registered all the Avro 
> generated classes with kryo. Im using Java as programming language.
> when used complex message throws NPE, stack trace as follows:
> ==================================================
> ERROR scheduler.JobScheduler: Error running job streaming job 1411043845000 
> ms.0 
> org.apache.spark.SparkException: Job aborted due to stage failure: Exception 
> while getting task result: com.esotericsoftware.kryo.KryoException: 
> java.lang.NullPointerException 
> Serialization trace: 
> value (xyz.Datum) 
> data (xyz.ResMsg) 
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
>  
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
>  
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
>  
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
> at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
>  
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
>  
> at scala.Option.foreach(Option.scala:236) 
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
>  
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
>  
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
> at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
> at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>  
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  
> In the above exception, Datum and ResMsg are project specific classes 
> generated by avro using the below avdl snippet:
> ======================
> record KeyValueObject { 
>         union{boolean, int, long, float, double, bytes, string} name; 
>         union {boolean, int, long, float, double, bytes, string, 
> array<union{boolean, int, long, float, double, bytes, string, 
> KeyValueObject}>, KeyValueObject} value; 
> } 
> record Datum { 
>         union {boolean, int, long, float, double, bytes, string, 
> array<union{boolean, int, long, float, double, bytes, string, 
> KeyValueObject}>, KeyValueObject} value; 
> } 
> record ResMsg { 
>                 string version; 
>                 string sequence; 
>                 string resourceGUID; 
>                 string GWID; 
>                 string GWTimestamp; 
>                 union {Datum, array<Datum>} data; 
> }
> avro message samples are as follows:
> ============================
> 1)
> {"version": "01", "sequence": "00001", "resourceGUID": "001", "GWID": "002", 
> "GWTimestamp": "1409823150737", "data": {"value": "30"}} 
> 2)
> {"version": "01", "sequence": "00001", "resource": "sensor-001", 
> "controller": "002", "controllerTimestamp": "1411038710358", "data": 
> {"value": [ {"name": "Temperature", "value": "30"}, {"name": "Speed", 
> "value": "60"}, {"name": "Location", "value": ["+401213.1", "-0750015.1"]}, 
> {"name": "Timestamp", "value": "2014-09-09T08:15:25-05:00"}]}}
> both 1 and 2 adhere to the avro schema, so decoder is able to convert them 
> into avro objects in spark streaming api.
> BTW the messages were pulled from kafka source, and decoded by using kafka 
> decoder.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to