[ https://issues.apache.org/jira/browse/SPARK-18560?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Shixiong Zhu reopened SPARK-18560: ---------------------------------- > Receiver data can not be dataSerialized properly. > ------------------------------------------------- > > Key: SPARK-18560 > URL: https://issues.apache.org/jira/browse/SPARK-18560 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.0.2 > Reporter: Genmao Yu > Priority: Critical > Fix For: 2.0.3, 2.1.0 > > > My spark streaming job can run correctly on Spark 1.6.1, but it can not run > properly on Spark 2.0.1, with following exception: > {code} > 16/11/22 19:20:15 ERROR executor.Executor: Exception in task 4.3 in stage 6.0 > (TID 87) > com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: > 13994 > at > com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:137) > at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:670) > at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:781) > at > org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:243) > at > org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:169) > at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) > at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1760) > at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1150) > at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1150) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1943) > at > org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1943) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) > at org.apache.spark.scheduler.Task.run(Task.scala:108) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > Go deep into relevant implementation, I find the type of data received by > {{Receiver}} is erased. And in Spark2.x, framework can choose a appropriate > {{Serializer}} from {{JavaSerializer}} and {{KryoSerializer}} base on the > type of data. > At the {{Receiver}} side, the type of data is erased to be {{Object}}, so > framework will choose {{JavaSerializer}}, with following code: > {code} > def canUseKryo(ct: ClassTag[_]): Boolean = { > primitiveAndPrimitiveArrayClassTags.contains(ct) || ct == stringClassTag > } > def getSerializer(ct: ClassTag[_]): Serializer = { > if (canUseKryo(ct)) { > kryoSerializer > } else { > defaultSerializer > } > } > {code} > At task side, we can get correct data type, and framework will choose > {{KryoSerializer}} if possible, with following supported type: > {code} > private[this] val stringClassTag: ClassTag[String] = > implicitly[ClassTag[String]] > private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = { > val primitiveClassTags = Set[ClassTag[_]]( > ClassTag.Boolean, > ClassTag.Byte, > ClassTag.Char, > ClassTag.Double, > ClassTag.Float, > ClassTag.Int, > ClassTag.Long, > ClassTag.Null, > ClassTag.Short > ) > val arrayClassTags = primitiveClassTags.map(_.wrap) > primitiveClassTags ++ arrayClassTags > } > {code} > In my case, the type of data is Byte Array. > This problem stems from SPARK-13990, a patch to have Spark automatically pick > the "best" serializer when caching RDDs. -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org