[ https://issues.apache.org/jira/browse/SPARK-27598?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16830758#comment-16830758 ]
Stavros Kontopoulos edited comment on SPARK-27598 at 4/30/19 11:39 PM: ----------------------------------------------------------------------- Btw if I do the trick and put the mappingFunction in an object like this with Spark 2.3.3 on restart I get: {quote}def createContext(checkpointDirectory: String, inputDirectory: String, outputDirectory: String) : StreamingContext = { ... object T extends Serializable { // Update the cumulative count using mapWithState // This will give a DStream made of state (which is the cumulative count of the words) val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => Unknown macro: \{ val sum = one.getOrElse(0) + state.getOption.getOrElse(0) val output = (word, sum) state.update(sum) output } } val stateDstream = words.mapWithState( StateSpec.function(T.mappingFunc).initialState(initialRDD)) } {quote} {quote}2019-05-01 02:36:14 WARN BatchedWriteAheadLog:66 - BatchedWriteAheadLog Writer queue interrupted. org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases: (1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. (2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758. at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:90) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:529) at org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:193) at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:146) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) {quote} was (Author: skonto): Btw if I do the trick and put the mappingFunction in an object like this with Spark 2.3.3 on restart I get: {quote} def createContext(checkpointDirectory: String, inputDirectory: String, outputDirectory: String) : StreamingContext = { ... object T extends Serializable { // Update the cumulative count using mapWithState // This will give a DStream made of state (which is the cumulative count of the words) val mappingFunc = (word: String, one: Option[Int], state: State[Int]) => { val sum = one.getOrElse(0) + state.getOption.getOrElse(0) val output = (word, sum) state.update(sum) output } } {quote} } {quote}2019-05-01 02:36:14 WARN BatchedWriteAheadLog:66 - BatchedWriteAheadLog Writer queue interrupted. org.apache.spark.SparkException: This RDD lacks a SparkContext. It could happen in the following cases: (1) RDD transformations and actions are NOT invoked by the driver, but inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063. (2) When a Spark Streaming job recovers from checkpoint, this exception will be hit if a reference to an RDD not defined by the streaming job is used in DStream operations. For more information, See SPARK-13758. at org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$sc(RDD.scala:90) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.PairRDDFunctions.partitionBy(PairRDDFunctions.scala:529) at org.apache.spark.streaming.rdd.MapWithStateRDD$.createFromPairRDD(MapWithStateRDD.scala:193) at org.apache.spark.streaming.dstream.InternalMapWithStateDStream.compute(MapWithStateDStream.scala:146) at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:342) {quote} > DStreams checkpointing does not work with the Spark Shell > --------------------------------------------------------- > > Key: SPARK-27598 > URL: https://issues.apache.org/jira/browse/SPARK-27598 > Project: Spark > Issue Type: Bug > Components: DStreams > Affects Versions: 2.4.0, 2.4.1, 2.4.2, 3.0.0 > Reporter: Stavros Kontopoulos > Priority: Major > > When I restarted a stream with checkpointing enabled I got this: > {quote}19/04/29 22:45:06 WARN CheckpointReader: Error reading checkpoint from > file > [file:/tmp/checkpoint/checkpoint-1556566950000.bk|file:///tmp/checkpoint/checkpoint-1556566950000.bk] > java.io.IOException: java.lang.ClassCastException: cannot assign instance of > java.lang.invoke.SerializedLambda to field > org.apache.spark.streaming.dstream.FileInputDStream.filter of type > scala.Function1 in instance of > org.apache.spark.streaming.dstream.FileInputDStream > at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1322) > at > org.apache.spark.streaming.dstream.FileInputDStream.readObject(FileInputDStream.scala:314) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > {quote} > It seems that the closure is stored in the Serialized format and cannot be > assigned back to a scala function1 > Details of how to reproduce it here: > [https://gist.github.com/skonto/87d5b2368b0bf7786d9dd992a710e4e6] > Maybe this is spark-shell specific and is not expected to work anyway, as I > dont see this to be an issues with a normal jar. > Note that with Spark 2.3.3 the error is different and this still does not > work but with a different error. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org