Mao, Wei created SPARK-13758:
--------------------------------

             Summary: Error message is misleading when RDD refer to null spark 
context
                 Key: SPARK-13758
                 URL: https://issues.apache.org/jira/browse/SPARK-13758
             Project: Spark
          Issue Type: Bug
          Components: Spark Core, Streaming
            Reporter: Mao, Wei


We have a recoverable Spark streaming job with checkpoint enabled, it could be 
executed correctly at first time, but throw following exception when restarted 
and recovered from checkpoint.
{noformat}
org.apache.spark.SparkException: RDD transformations and actions can only be 
invoked by the driver, not inside of other transformations; for example, 
rdd1.map(x => rdd2.values.count() * x) is invalid because the values 
transformation and count action cannot be performed inside of the rdd1.map 
transformation. For more information, see SPARK-5063.
        at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:352)
        at org.apache.spark.rdd.RDD.union(RDD.scala:565)
        at 
org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:23)
        at 
org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:19)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
...
{noformat}
According to exception, it shows I invoked transformations and actions in other 
transformations, but I did not. The real reason is that I used external RDD in 
DStream operation. External RDD data is not stored in checkpoint, so that 
during recovering, the initial value of _sc in this RDD is assigned to null and 
hit above exception.  But you can find the error message is misleading, it 
indicates nothing about the real issue

Here is the code to reproduce it.
{code:java}
object Repo {

  def createContext(ip: String, port: Int, checkpointDirectory: 
String):StreamingContext = {

    println("Creating new context")
    val sparkConf = new SparkConf().setAppName("Repo").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    ssc.checkpoint(checkpointDirectory)

    var cached = ssc.sparkContext.parallelize(Seq("apple, banana"))

    val words = ssc.socketTextStream(ip, port).flatMap(_.split(" "))
    words.foreachRDD((rdd: RDD[String]) => {
      val res = rdd.map(word => (word, word.length)).collect()
      println("words: " + res.mkString(", "))

      cached = cached.union(rdd)
      cached.checkpoint()
      println("cached words: " + cached.collect.mkString(", "))
    })
    ssc
  }

  def main(args: Array[String]) {

    val ip = "localhost"
    val port = 9999
    val dir = "/home/maowei/tmp"

    val ssc = StreamingContext.getOrCreate(dir,
      () => {
        createContext(ip, port, dir)
      })
    ssc.start()
    ssc.awaitTermination()
  }
}
{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to