[ https://issues.apache.org/jira/browse/SPARK-3601?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon resolved SPARK-3601. --------------------------------- Resolution: Incomplete > Kryo NPE for output operations on Avro complex Objects even after registering. > ------------------------------------------------------------------------------ > > Key: SPARK-3601 > URL: https://issues.apache.org/jira/browse/SPARK-3601 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.0.0 > Environment: local, standalone cluster > Reporter: mohan gaddam > Priority: Major > Labels: bulk-closed > > Kryo serializer works well when avro objects has simple data. but when the > same avro object has complex data(like unions/arrays) kryo fails while output > operations. but mappings are good. Note that i have registered all the Avro > generated classes with kryo. Im using Java as programming language. > when used complex message throws NPE, stack trace as follows: > ================================================== > ERROR scheduler.JobScheduler: Error running job streaming job 1411043845000 > ms.0 > org.apache.spark.SparkException: Job aborted due to stage failure: Exception > while getting task result: com.esotericsoftware.kryo.KryoException: > java.lang.NullPointerException > Serialization trace: > value (xyz.Datum) > data (xyz.ResMsg) > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) > > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) > > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) > > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) > > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > In the above exception, Datum and ResMsg are project specific classes > generated by avro using the below avdl snippet: > ====================== > record KeyValueObject { > union{boolean, int, long, float, double, bytes, string} name; > union {boolean, int, long, float, double, bytes, string, > array<union{boolean, int, long, float, double, bytes, string, > KeyValueObject}>, KeyValueObject} value; > } > record Datum { > union {boolean, int, long, float, double, bytes, string, > array<union{boolean, int, long, float, double, bytes, string, > KeyValueObject}>, KeyValueObject} value; > } > record ResMsg { > string version; > string sequence; > string resourceGUID; > string GWID; > string GWTimestamp; > union {Datum, array<Datum>} data; > } > avro message samples are as follows: > ============================ > 1) > {"version": "01", "sequence": "00001", "resourceGUID": "001", "GWID": "002", > "GWTimestamp": "1409823150737", "data": {"value": "30"}} > 2) > {"version": "01", "sequence": "00001", "resource": "sensor-001", > "controller": "002", "controllerTimestamp": "1411038710358", "data": > {"value": [ {"name": "Temperature", "value": "30"}, {"name": "Speed", > "value": "60"}, {"name": "Location", "value": ["+401213.1", "-0750015.1"]}, > {"name": "Timestamp", "value": "2014-09-09T08:15:25-05:00"}]}} > both 1 and 2 adhere to the avro schema, so decoder is able to convert them > into avro objects in spark streaming api. > BTW the messages were pulled from kafka source, and decoded by using kafka > decoder. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org