[ 
https://issues.apache.org/jira/browse/SPARK-3070?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14105536#comment-14105536
 ] 

Daniel Darabos commented on SPARK-3070:
---------------------------------------

I think this is almost certainly a duplicate of 
https://issues.apache.org/jira/browse/SPARK-2878. Which is FIXED, thanks to 
Graham Dennis! Can you please check the repro against the fixed code to see if 
this can be closed? Thanks :).

> Kryo deserialization without using the custom registrator
> ---------------------------------------------------------
>
>                 Key: SPARK-3070
>                 URL: https://issues.apache.org/jira/browse/SPARK-3070
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.0.2
>            Reporter: Andras Nemeth
>
> If an RDD partition is cached on executor1 and used by a task on executor2 
> then the partition needs to be serialized and sent over. For this particular 
> serialization/deserialization usecase, when using kry, it appears that the 
> custom registrator will not be used on the deserialization side. This of 
> course results in some totally misleading kry deserialization errors.
> The cause for this behavior seems to be that the thread running this 
> deserialization has a classloader which does not have the jars specified in 
> the SparkConf on its classpath. So it fails to load the Registrator with a 
> ClassNotFoundException, but it catches the exception and happily continues 
> without a registrator. (A bug on its own right in my opinion.)
> To reproduce, have two rdds partitioned the same way (as in with the same 
> partitioner) but corresponding partitions cached on different machines, then 
> join them. See below a somewhat convoluted way to achieve this. If you run 
> the below program on a spark cluster with two workers, each with one core, 
> you will be able to trigger the bug. Basically it runs two counts in 
> parallel, which ensures that the two RDDs will be computed in parallel, and 
> as a consequence on different executors.
> {code:java}
> import com.esotericsoftware.kryo.Kryo
> import org.apache.spark.HashPartitioner
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.SparkContext._
> import org.apache.spark.rdd.RDD
> import org.apache.spark.serializer.KryoRegistrator
> import scala.actors.Actor
> case class MyClass(a: Int)
> class MyKryoRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
>     kryo.register(classOf[MyClass])
>   }
> }
> class CountActor(rdd: RDD[_]) extends Actor {
>   def act() {
>     println("Start count")
>     println(rdd.count)
>     println("Stop count")
>   }
> }
> object KryBugExample  {
>   def main(args: Array[String]) {
>     val sparkConf = new SparkConf()
>       .setMaster(args(0))
>       .setAppName("KryBugExample")
>       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>       .set("spark.kryo.registrator", "MyKryoRegistrator")
>       .setJars(Seq("target/scala-2.10/krybugexample_2.10-0.1-SNAPSHOT.jar"))
>     val sc = new SparkContext(sparkConf)
>     val partitioner = new HashPartitioner(1)
>     val rdd1 = sc
>       .parallelize((0 until 100000).map(i => (i, MyClass(i))), 1)
>       .partitionBy(partitioner).cache
>     val rdd2 = sc
>       .parallelize((0 until 100000).map(i => (i, MyClass(i * 2))), 1)
>       .partitionBy(partitioner).cache
>     new CountActor(rdd1).start
>     new CountActor(rdd2).start
>     println(rdd1.join(rdd2).count)
>     while (true) {}
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to