[ 
https://issues.apache.org/jira/browse/SPARK-3601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mohan gaddam updated SPARK-3601:
--------------------------------
    Description: 
Kryo serializer works well when avro objects has simple data. but when the same 
avro object has complex data(like unions/arrays) kryo fails while output 
operations. but mappings are good. Note that i have registered all the Avro 
generated classes with kryo. Im using Java as programming language.

when used complex message throws NPE, stack trace as follows:
==================================================
ERROR scheduler.JobScheduler: Error running job streaming job 1411043845000 
ms.0 
org.apache.spark.SparkException: Job aborted due to stage failure: Exception 
while getting task result: com.esotericsoftware.kryo.KryoException: 
java.lang.NullPointerException 
Serialization trace: 
value (xyz.Datum) 
data (xyz.ResMsg) 
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
 
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
 
at scala.Option.foreach(Option.scala:236) 
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
 
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
 
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 


In the above exception, Datum and ResMsg are project specific classes generated 
by avro using the below avdl snippet:
======================
record KeyValueObject { 
        union{boolean, int, long, float, double, bytes, string} name; 
        union {boolean, int, long, float, double, bytes, string, 
array<union{boolean, int, long, float, double, bytes, string, KeyValueObject}>, 
KeyValueObject} value; 
} 
record Datum { 
        union {boolean, int, long, float, double, bytes, string, 
array<union{boolean, int, long, float, double, bytes, string, KeyValueObject}>, 
KeyValueObject} value; 
} 
record ResMsg { 
                string version; 
                string sequence; 
                string resourceGUID; 
                string GWID; 
                string GWTimestamp; 
                union {Datum, array<Datum>} data; 
}

avro message samples are as follows:
============================
1)
{"version": "01", "sequence": "00001", "resourceGUID": "001", "GWID": "002", 
"GWTimestamp": "1409823150737", "data": {"value": "30"}} 
2)
{"version": "01", "sequence": "00001", "resource": "sensor-001", "controller": 
"002", "controllerTimestamp": "1411038710358", "data": {"value": [ {"name": 
"Temperature", "value": "30"}, {"name": "Speed", "value": "60"}, {"name": 
"Location", "value": ["+401213.1", "-0750015.1"]}, {"name": "Timestamp", 
"value": "2014-09-09T08:15:25-05:00"}]}}

both 1 and 2 adhere to the avro schema, so decoder is able to convert them into 
avro objects in spark streaming api.
BTW the messages were pulled from kafka source, and decoded by using kafka 
decoder.

  was:
Kryo serializer works well when avro objects has simple data. but when the same 
avro object has complex data(like unions/arrays) kryo fails while output 
operations. but mappings are good. Note that i have registered all the Avro 
generated classes with kryo. Im using Java as programming language.

when used complex message throws NPE, stack trace as follows:
==================================================
ERROR scheduler.JobScheduler: Error running job streaming job 1411043845000 
ms.0 
org.apache.spark.SparkException: Job aborted due to stage failure: Exception 
while getting task result: com.esotericsoftware.kryo.KryoException: 
java.lang.NullPointerException 
Serialization trace: 
value (xyz.Datum) 
data (xyz.ResMsg) 
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
 
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
 
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
 
at scala.Option.foreach(Option.scala:236) 
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
 
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
 
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 


In the above exception, Datum and ResMsg are project specific classes generated 
by avro using the below avdl snippet:
======================
record KeyValueObject { 
        union{boolean, int, long, float, double, bytes, string} name; 
        union {boolean, int, long, float, double, bytes, string, 
array<union{boolean, int, long, float, double, bytes, string, KeyValueObject}>, 
KeyValueObject} value; 
} 
record Datum { 
        union {boolean, int, long, float, double, bytes, string, 
array<union{boolean, int, long, float, double, bytes, string, KeyValueObject}>, 
KeyValueObject} value; 
} 
record ResMsg { 
                string version; 
                string sequence; 
                string resourceGUID; 
                string GWID; 
                string GWTimestamp; 
                union {Datum, array<Datum>} data; 
}

avro message samples are as follows:
============================
1)
{"version": "01", "sequence": "00001", "resourceGUID": "001", "GWID": "002", 
"GWTimestamp": "1409823150737", "data": {"value": "30"}} 
2)
{"version": "01", "sequence": "00001", "resource": "sensor-001", "controller": 
"002", "controllerTimestamp": "1411038710358", "data": {"value": [ {"name": 
"Temperature", "value": "30"}, {"name": "Speed", "value": "60"}, {"name": 
"Location", "value": ["+401213.1", "-0750015.1"]}, {"name": "Timestamp", 
"value": "2014-09-09T08:15:25-05:00"}]}} 

both 1 and 2 adhere to the avro schema, so decoder is able to convert them into 
avro objects in spark streaming api.
BTW the messages were pulled from kafka source, and decoded by using kafka 
decoder.


> Kryo NPE for output operations on Avro complex Objects even after registering.
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-3601
>                 URL: https://issues.apache.org/jira/browse/SPARK-3601
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.0.0
>         Environment: local, standalone cluster
>            Reporter: mohan gaddam
>
> Kryo serializer works well when avro objects has simple data. but when the 
> same avro object has complex data(like unions/arrays) kryo fails while output 
> operations. but mappings are good. Note that i have registered all the Avro 
> generated classes with kryo. Im using Java as programming language.
> when used complex message throws NPE, stack trace as follows:
> ==================================================
> ERROR scheduler.JobScheduler: Error running job streaming job 1411043845000 
> ms.0 
> org.apache.spark.SparkException: Job aborted due to stage failure: Exception 
> while getting task result: com.esotericsoftware.kryo.KryoException: 
> java.lang.NullPointerException 
> Serialization trace: 
> value (xyz.Datum) 
> data (xyz.ResMsg) 
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
>  
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
>  
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
>  
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>  
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
> at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
>  
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
>  
> at scala.Option.foreach(Option.scala:236) 
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
>  
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
>  
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
> at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
> at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
> at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>  
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  
> In the above exception, Datum and ResMsg are project specific classes 
> generated by avro using the below avdl snippet:
> ======================
> record KeyValueObject { 
>         union{boolean, int, long, float, double, bytes, string} name; 
>         union {boolean, int, long, float, double, bytes, string, 
> array<union{boolean, int, long, float, double, bytes, string, 
> KeyValueObject}>, KeyValueObject} value; 
> } 
> record Datum { 
>         union {boolean, int, long, float, double, bytes, string, 
> array<union{boolean, int, long, float, double, bytes, string, 
> KeyValueObject}>, KeyValueObject} value; 
> } 
> record ResMsg { 
>                 string version; 
>                 string sequence; 
>                 string resourceGUID; 
>                 string GWID; 
>                 string GWTimestamp; 
>                 union {Datum, array<Datum>} data; 
> }
> avro message samples are as follows:
> ============================
> 1)
> {"version": "01", "sequence": "00001", "resourceGUID": "001", "GWID": "002", 
> "GWTimestamp": "1409823150737", "data": {"value": "30"}} 
> 2)
> {"version": "01", "sequence": "00001", "resource": "sensor-001", 
> "controller": "002", "controllerTimestamp": "1411038710358", "data": 
> {"value": [ {"name": "Temperature", "value": "30"}, {"name": "Speed", 
> "value": "60"}, {"name": "Location", "value": ["+401213.1", "-0750015.1"]}, 
> {"name": "Timestamp", "value": "2014-09-09T08:15:25-05:00"}]}}
> both 1 and 2 adhere to the avro schema, so decoder is able to convert them 
> into avro objects in spark streaming api.
> BTW the messages were pulled from kafka source, and decoded by using kafka 
> decoder.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to