GitHub user mwws opened a pull request:

    https://github.com/apache/spark/pull/11595

    [SPARK-13758][Streaming][core]enhance exception message to avoid misleading

    We have a recoverable Spark streaming job with checkpoint enabled, it could 
be executed correctly at first time, but throw following exception when 
restarted and recovered from checkpoint.
    ```
    org.apache.spark.SparkException: RDD transformations and actions can only 
be invoked by the driver, not inside of other transformations; for example, 
rdd1.map(x => rdd2.values.count() * x) is invalid because the values 
transformation and count action cannot be performed inside of the rdd1.map 
transformation. For more information, see SPARK-5063.
        at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:352)
        at org.apache.spark.rdd.RDD.union(RDD.scala:565)
        at 
org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:23)
        at 
org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:19)
        at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
    ```
    
    According to exception, it shows I invoked transformations and actions in 
other transformations, but I did not. The real reason is that I used external 
RDD in DStream operation. External RDD data is not stored in checkpoint, so 
that during recovering, the initial value of _sc in this RDD is assigned to 
null and hit above exception. But you can find the error message is misleading, 
it indicates nothing about the real issue
    Here is the code to reproduce it.
    
    ```scala
    object Repo {
    
      def createContext(ip: String, port: Int, checkpointDirectory: 
String):StreamingContext = {
    
        println("Creating new context")
        val sparkConf = new SparkConf().setAppName("Repo").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf, Seconds(2))
        ssc.checkpoint(checkpointDirectory)
    
        var cached = ssc.sparkContext.parallelize(Seq("apple, banana"))
    
        val words = ssc.socketTextStream(ip, port).flatMap(_.split(" "))
        words.foreachRDD((rdd: RDD[String]) => {
          val res = rdd.map(word => (word, word.length)).collect()
          println("words: " + res.mkString(", "))
    
          cached = cached.union(rdd)
          cached.checkpoint()
          println("cached words: " + cached.collect.mkString(", "))
        })
        ssc
      }
    
      def main(args: Array[String]) {
    
        val ip = "localhost"
        val port = 9999
        val dir = "/home/maowei/tmp"
    
        val ssc = StreamingContext.getOrCreate(dir,
          () => {
            createContext(ip, port, dir)
          })
        ssc.start()
        ssc.awaitTermination()
      }
    }
    ```

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mwws/spark SPARK-MissleadingLog

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/11595.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #11595
    
----
commit 65cc20f9d0b8e71e7a0a298466190e73dd0c853e
Author: mwws <wei....@intel.com>
Date:   2016-03-09T02:05:47Z

    enhance exception message to avoid misleading

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to