[ 
https://issues.apache.org/jira/browse/SPARK-2408?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14086150#comment-14086150
 ] 

Ravindra Pesala commented on SPARK-2408:
----------------------------------------

Yes, Daniel is right, it is the problem of Spark shell. I went through the code 
of spark shell and ClosureCleaner. I don't think it is an issue of 
ClosureCleaner as it keeps only the outer object. But the spark shell generates 
the class for the last line  str_rdd.map(test).count in below manner. For each 
line entering into spark shell it creates one class and imports the previous 
attributes/names to it.

class $read extends Serializable {
  class $iwC extends Serializable {
val $VAL3 = $line3.$read.INSTANCE;
import $VAL3.$iw.$iw.`sc`;
class $iwC extends Serializable {
import org.apache.spark.SparkContext._
class $iwC extends Serializable {
val $VAL5 = $line5.$read.INSTANCE;
import $VAL5.$iw.$iw.$iw.$iw.`r`;
val $VAL9 = $line9.$read.INSTANCE;
import $VAL9.$iw.$iw.$iw.$iw.`str_arr`;
val $VAL10 = $line10.$read.INSTANCE;
import $VAL10.$iw.$iw.$iw.$iw.`str_rdd`;
val $VAL11 = $line11.$read.INSTANCE;
import $VAL11.$iw.$iw.$iw.$iw.`test`;
class $iwC extends Serializable {
       val res0 =
              str_rdd.map(test).count
     
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;
}
val $iw = new $iwC;

}
object $read {
  val INSTANCE = new $read();
}

Here the outer object contains all previous imports wrapped in classes. As the 
statement import related to val r = new scala.util.Random() is present in one 
of the outer class, it try to serialize that and it fails.I think some how we 
should try to manage the imports of previous requests of generated code or we 
should handle this scenarios in ClosureCleaner. Please comment.

> RDD.map(func) dependencies issue after checkpoint & count
> ---------------------------------------------------------
>
>                 Key: SPARK-2408
>                 URL: https://issues.apache.org/jira/browse/SPARK-2408
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 0.9.1, 1.0.0
>            Reporter: Daniel Fry
>
> i am noticing strange behavior with a simple example use of rdd.checkpoint(). 
> you can paste the following code into any spark-shell (e.g. with 
> MASTER=local[*]) 
> // build an array of 100 random lowercase strings of length 10
> val r = new scala.util.Random()
> val str_arr = (1 to 100).map(a => (1 to 10).map(b => new 
> Character(((Math.abs(r.nextInt) % 26) + 97).toChar)).mkString(""))
> // make this into an rdd
> val str_rdd = sc.parallelize(str_arr)
> // checkpoint & count
> sc.setCheckpointDir("hdfs://[namenode]:54310/path/to/some/spark_checkpoint_dir")
> str_rdd.checkpoint()
> str_rdd.count
> // rdd.map some dummy function
> def test(a : String) : String = { return a }
> str_rdd.map(test).count
> this results in a surprising exception! 
> org.apache.spark.SparkException: Job aborted due to stage failure: Task not 
> serializable: java.io.NotSerializableException: scala.util.Random
>         at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
>         at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)
>         at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
>         at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
>         at 
> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
>         at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>         at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>         at 
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)



--
This message was sent by Atlassian JIRA
(v6.2#6252)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to