[ https://issues.apache.org/jira/browse/SPARK-18560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Genmao Yu updated SPARK-18560: ------------------------------ Description: My spark streaming job can run correctly on Spark 1.6.1, but it can not run properly on Spark 2.0.1, with following exception: {code} 16/11/22 19:20:15 ERROR executor.Executor: Exception in task 4.3 in stage 6.0 (TID 87) com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:243) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1150) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1150) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1943) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1943) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} Go deep into relevant implementation, I find the type of data received by {{Receiver}} is erased. And in Spark2.x, framework can choose a appropriate {{Serializer}} from {{JavaSerializer}} and {{KryoSerializer}} base on the type of data. At the {{Receiver}} side, the type of data is erased to be {{Object}}, so framework will choose {{JavaSerializer}}, with following code: {code} def canUseKryo(ct: ClassTag[_]): Boolean = { primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag } def getSerializer(ct: ClassTag[_]): Serializer = { if (canUseKryo(ct)) { kryoSerializer } else { defaultSerializer } } {code} At task side, we can get correct data type, and framework will choose {{KryoSerializer}} if possible, with following supported type: {code} private[this] val stringClassTag: ClassTag[String] = implicitly[ClassTag[String]] private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = { val primitiveClassTags = Set[ClassTag[_]]( ClassTag.Boolean, ClassTag.Byte, ClassTag.Char, ClassTag.Double, ClassTag.Float, ClassTag.Int, ClassTag.Long, ClassTag.Null, ClassTag.Short ) val arrayClassTags = primitiveClassTags.map(_.wrap) primitiveClassTags ++ arrayClassTags } {code} In my case, the type of data is Byte Array. This problem stems from SPARK-13990, a patch to have Spark automatically pick the "best" serializer when caching RDDs. was: My spark streaming job can run correctly on Spark 1.6.1, but it can not run properly on Spark 2.0.1, with following exception: {code} 16/11/22 19:20:15 ERROR executor.Executor: Exception in task 4.3 in stage 6.0 (TID 87) com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994 at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:243) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1150) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1150) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1943) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1943) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) {code} Go deep into relevant implementation, I find the type of data received by {{Receiver}} is erased. And in Spark2.x, framework can choose a appropriate {{Serializer}} from {{JavaSerializer}} and {{KryoSerializer}} base on the type of data. At the {{Receiver}} side, the type of data is erased to be {{Object}}, so framework will choose {{JavaSerializer}}, with following code: {code} def canUseKryo(ct: ClassTag[_]): Boolean = { primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag } def getSerializer(ct: ClassTag[_]): Serializer = { if (canUseKryo(ct)) { kryoSerializer } else { defaultSerializer } } {code} At task side, we can get correct data type, and framework will choose {{KryoSerializer}} if possible, with following supported type: {code} private[this] val stringClassTag: ClassTag[String] = implicitly[ClassTag[String]] private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = { val primitiveClassTags = Set[ClassTag[_]]( ClassTag.Boolean, ClassTag.Byte, ClassTag.Char, ClassTag.Double, ClassTag.Float, ClassTag.Int, ClassTag.Long, ClassTag.Null, ClassTag.Short ) val arrayClassTags = primitiveClassTags.map(_.wrap) primitiveClassTags ++ arrayClassTags } {code} In my case, the type of data is Byte Array. This problem stems from #SPARK-13990, a patch to have Spark automatically pick the "best" serializer when caching RDDs. > Receiver data can not be dataSerialized properly. > ------------------------------------------------- > > Key: SPARK-18560 > URL: https://issues.apache.org/jira/browse/SPARK-18560 > Project: Spark > Issue Type: Bug > Components: Spark Core, Structured Streaming > Affects Versions: 2.0.2 > Reporter: Genmao Yu > Priority: Critical > > My spark streaming job can run correctly on Spark 1.6.1, but it can not run > properly on Spark 2.0.1, with following exception: > {code} > 16/11/22 19:20:15 ERROR executor.Executor: Exception in task 4.3 in stage 6.0 > (TID 87) > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 13994 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:243) > at > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760) > at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1150) > at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1150) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1943) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1943) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Go deep into relevant implementation, I find the type of data received by > {{Receiver}} is erased. And in Spark2.x, framework can choose a appropriate > {{Serializer}} from {{JavaSerializer}} and {{KryoSerializer}} base on the > type of data. > At the {{Receiver}} side, the type of data is erased to be {{Object}}, so > framework will choose {{JavaSerializer}}, with following code: > {code} > def canUseKryo(ct: ClassTag[_]): Boolean = { > primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag > } > def getSerializer(ct: ClassTag[_]): Serializer = { > if (canUseKryo(ct)) { > kryoSerializer > } else { > defaultSerializer > } > } > {code} > At task side, we can get correct data type, and framework will choose > {{KryoSerializer}} if possible, with following supported type: > {code} > private[this] val stringClassTag: ClassTag[String] = > implicitly[ClassTag[String]] > private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = { > val primitiveClassTags = Set[ClassTag[_]]( > ClassTag.Boolean, > ClassTag.Byte, > ClassTag.Char, > ClassTag.Double, > ClassTag.Float, > ClassTag.Int, > ClassTag.Long, > ClassTag.Null, > ClassTag.Short > ) > val arrayClassTags = primitiveClassTags.map(_.wrap) > primitiveClassTags ++ arrayClassTags > } > {code} > In my case, the type of data is Byte Array. > This problem stems from SPARK-13990, a patch to have Spark automatically pick > the "best" serializer when caching RDDs. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org