[ https://issues.apache.org/jira/browse/SPARK-13758?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sean Owen updated SPARK-13758: ------------------------------ Assignee: Mao, Wei > Error message is misleading when RDD refer to null spark context > ---------------------------------------------------------------- > > Key: SPARK-13758 > URL: https://issues.apache.org/jira/browse/SPARK-13758 > Project: Spark > Issue Type: Improvement > Components: Documentation, Spark Core, Streaming > Reporter: Mao, Wei > Assignee: Mao, Wei > Priority: Trivial > Fix For: 2.0.0 > > > We have a recoverable Spark streaming job with checkpoint enabled, it could > be executed correctly at first time, but throw following exception when > restarted and recovered from checkpoint. > {noformat} > org.apache.spark.SparkException: RDD transformations and actions can only be > invoked by the driver, not inside of other transformations; for example, > rdd1.map(x => rdd2.values.count() * x) is invalid because the values > transformation and count action cannot be performed inside of the rdd1.map > transformation. For more information, see SPARK-5063. > at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:87) > at org.apache.spark.rdd.RDD.withScope(RDD.scala:352) > at org.apache.spark.rdd.RDD.union(RDD.scala:565) > at > org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:23) > at > org.apache.spark.streaming.Repo$$anonfun$createContext$1.apply(Repo.scala:19) > at > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627) > ... > {noformat} > According to exception, it shows I invoked transformations and actions in > other transformations, but I did not. The real reason is that I used external > RDD in DStream operation. External RDD data is not stored in checkpoint, so > that during recovering, the initial value of _sc in this RDD is assigned to > null and hit above exception. But you can find the error message is > misleading, it indicates nothing about the real issue > Here is the code to reproduce it. > {code:java} > object Repo { > def createContext(ip: String, port: Int, checkpointDirectory: > String):StreamingContext = { > println("Creating new context") > val sparkConf = new SparkConf().setAppName("Repo").setMaster("local[2]") > val ssc = new StreamingContext(sparkConf, Seconds(2)) > ssc.checkpoint(checkpointDirectory) > var cached = ssc.sparkContext.parallelize(Seq("apple, banana")) > val words = ssc.socketTextStream(ip, port).flatMap(_.split(" ")) > words.foreachRDD((rdd: RDD[String]) => { > val res = rdd.map(word => (word, word.length)).collect() > println("words: " + res.mkString(", ")) > cached = cached.union(rdd) > cached.checkpoint() > println("cached words: " + cached.collect.mkString(", ")) > }) > ssc > } > def main(args: Array[String]) { > val ip = "localhost" > val port = 9999 > val dir = "/home/maowei/tmp" > val ssc = StreamingContext.getOrCreate(dir, > () => { > createContext(ip, port, dir) > }) > ssc.start() > ssc.awaitTermination() > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org