[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008993#comment-15008993 ]
Glyton Camilleri commented on SPARK-6847: ----------------------------------------- Hi Yunjie, Whether 50 seconds is good or not as a checkpoint interval depends largely on the time-window the stream is acting on; so if the stream is set to execute jobs every 10 seconds, then 50 seconds could be fine. In my example code, {{certainConditionsAreMet}} was just a place-holder: the conditions met were application-specific in that case; so in other words, there were conditions under which we would perform the side-effect on the stream, which in our case ((1) above) was saving the contents of the stream to HDFS. So the fix looked something like this: {code} def isTimeToSave: Boolean = ... // this function decides whether it's time to store the contents of the stream to HDFS def saveData[A](stream: DStream[A]) = if (isTimeToSave) stream.foreachRDD { ... // write data in HDFS } else stream.foreachRDD { _.foreachPartition { _ => () } // just do nothing } {code} The {{else}} part is what i'm referring to above. > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > ---------------------------------------------------------------------------------- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming > Affects Versions: 1.3.0 > Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", 9999) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org