[ 
https://issues.apache.org/jira/browse/SPARK-13758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen updated SPARK-13758:
------------------------------
    Assignee: Mao, Wei

> Error message is misleading when RDD refer to null spark context
> ----------------------------------------------------------------
>
>                 Key: SPARK-13758
>                 URL: https://issues.apache.org/jira/browse/SPARK-13758
>             Project: Spark
>          Issue Type: Improvement
>          Components: Documentation, Spark Core, Streaming
>            Reporter: Mao, Wei
>            Assignee: Mao, Wei
>            Priority: Trivial
>             Fix For: 2.0.0
>
>
> We have a recoverable Spark streaming job with checkpoint enabled, it could 
> be executed correctly at first time, but throw following exception when 
> restarted and recovered from checkpoint.
> {noformat}
> org.apache.spark.SparkException: RDD transformations and actions can only be 
> invoked by the driver, not inside of other transformations; for example, 
> rdd1.map(x => rdd2.values.count() * x) is invalid because the values 
> transformation and count action cannot be performed inside of the rdd1.map 
> transformation. For more information, see SPARK-5063.
>       at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
>       at org.apache.spark.rdd.RDD.withScope(RDD.scala:352)
>       at org.apache.spark.rdd.RDD.union(RDD.scala:565)
>       at 
> org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:23)
>       at 
> org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:19)
>       at 
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
> ...
> {noformat}
> According to exception, it shows I invoked transformations and actions in 
> other transformations, but I did not. The real reason is that I used external 
> RDD in DStream operation. External RDD data is not stored in checkpoint, so 
> that during recovering, the initial value of _sc in this RDD is assigned to 
> null and hit above exception.  But you can find the error message is 
> misleading, it indicates nothing about the real issue
> Here is the code to reproduce it.
> {code:java}
> object Repo {
>   def createContext(ip: String, port: Int, checkpointDirectory: 
> String):StreamingContext = {
>     println("Creating new context")
>     val sparkConf = new SparkConf().setAppName("Repo").setMaster("local[2]")
>     val ssc = new StreamingContext(sparkConf, Seconds(2))
>     ssc.checkpoint(checkpointDirectory)
>     var cached = ssc.sparkContext.parallelize(Seq("apple, banana"))
>     val words = ssc.socketTextStream(ip, port).flatMap(_.split(" "))
>     words.foreachRDD((rdd: RDD[String]) => {
>       val res = rdd.map(word => (word, word.length)).collect()
>       println("words: " + res.mkString(", "))
>       cached = cached.union(rdd)
>       cached.checkpoint()
>       println("cached words: " + cached.collect.mkString(", "))
>     })
>     ssc
>   }
>   def main(args: Array[String]) {
>     val ip = "localhost"
>     val port = 9999
>     val dir = "/home/maowei/tmp"
>     val ssc = StreamingContext.getOrCreate(dir,
>       () => {
>         createContext(ip, port, dir)
>       })
>     ssc.start()
>     ssc.awaitTermination()
>   }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to