[ https://issues.apache.org/jira/browse/SPARK-3070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14105536#comment-14105536 ]
Daniel Darabos commented on SPARK-3070: --------------------------------------- I think this is almost certainly a duplicate of https://issues.apache.org/jira/browse/SPARK-2878. Which is FIXED, thanks to Graham Dennis! Can you please check the repro against the fixed code to see if this can be closed? Thanks :). > Kryo deserialization without using the custom registrator > --------------------------------------------------------- > > Key: SPARK-3070 > URL: https://issues.apache.org/jira/browse/SPARK-3070 > Project: Spark > Issue Type: Bug > Components: Spark Core > Affects Versions: 1.0.2 > Reporter: Andras Nemeth > > If an RDD partition is cached on executor1 and used by a task on executor2 > then the partition needs to be serialized and sent over. For this particular > serialization/deserialization usecase, when using kry, it appears that the > custom registrator will not be used on the deserialization side. This of > course results in some totally misleading kry deserialization errors. > The cause for this behavior seems to be that the thread running this > deserialization has a classloader which does not have the jars specified in > the SparkConf on its classpath. So it fails to load the Registrator with a > ClassNotFoundException, but it catches the exception and happily continues > without a registrator. (A bug on its own right in my opinion.) > To reproduce, have two rdds partitioned the same way (as in with the same > partitioner) but corresponding partitions cached on different machines, then > join them. See below a somewhat convoluted way to achieve this. If you run > the below program on a spark cluster with two workers, each with one core, > you will be able to trigger the bug. Basically it runs two counts in > parallel, which ensures that the two RDDs will be computed in parallel, and > as a consequence on different executors. > {code:java} > import com.esotericsoftware.kryo.Kryo > import org.apache.spark.HashPartitioner > import org.apache.spark.SparkConf > import org.apache.spark.SparkContext > import org.apache.spark.SparkContext._ > import org.apache.spark.rdd.RDD > import org.apache.spark.serializer.KryoRegistrator > import scala.actors.Actor > case class MyClass(a: Int) > class MyKryoRegistrator extends KryoRegistrator { > override def registerClasses(kryo: Kryo) { > kryo.register(classOf[MyClass]) > } > } > class CountActor(rdd: RDD[_]) extends Actor { > def act() { > println("Start count") > println(rdd.count) > println("Stop count") > } > } > object KryBugExample { > def main(args: Array[String]) { > val sparkConf = new SparkConf() > .setMaster(args(0)) > .setAppName("KryBugExample") > .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") > .set("spark.kryo.registrator", "MyKryoRegistrator") > .setJars(Seq("target/scala-2.10/krybugexample_2.10-0.1-SNAPSHOT.jar")) > val sc = new SparkContext(sparkConf) > val partitioner = new HashPartitioner(1) > val rdd1 = sc > .parallelize((0 until 100000).map(i => (i, MyClass(i))), 1) > .partitionBy(partitioner).cache > val rdd2 = sc > .parallelize((0 until 100000).map(i => (i, MyClass(i * 2))), 1) > .partitionBy(partitioner).cache > new CountActor(rdd1).start > new CountActor(rdd2).start > println(rdd1.join(rdd2).count) > while (true) {} > } > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org