[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15126928#comment-15126928 ] Shixiong Zhu commented on SPARK-6847: - As my PR changed internal semantics, it's only merged to master branch (2.0.0). For pre 2.0.0, you may need to trigger the checkpoint by yourself. E.g., for {{updateStateByKey().filter().updateStateByKey()}}, you can update to {{dstream.updateStateByKey().count(); dstream.updateStateByKey().filter().updateStateByKey()}} to trigger the checkpoint for the first "updateStateByKey". > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0, 1.4.1, 1.5.2, 1.6.0 >Reporter: Jack Hu >Assignee: Shixiong Zhu >Priority: Critical > Labels: StackOverflowError, Streaming > Fix For: 2.0.0 > > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15118335#comment-15118335 ] Apache Spark commented on SPARK-6847: - User 'zsxwing' has created a pull request for this issue: https://github.com/apache/spark/pull/10934 > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0, 1.4.1, 1.5.2, 1.6.0 >Reporter: Jack Hu >Assignee: Shixiong Zhu >Priority: Critical > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15116545#comment-15116545 ] Jack Hu commented on SPARK-6847: Hi [~zsxwing] Even user does not implicit do checkpoint after the {{upstateByKey}}, this issue still will happen in following cases # {{updateStateByKey().filter().updateStateByKey()}} # {{updateStateByKey().filter().reduceByKeyAndWindow(reduce, inreduce, ...)}} # {{reduceByKeyAndWindow(reduce,inreduce,...).filter().udateStateByKey()}} If do not plan to fix this issue, may be an implicit workaround/warning should give to user to such usage. It will be very hard to find the real cause if the application is complicate. > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15116044#comment-15116044 ] Shixiong Zhu commented on SPARK-6847: - I found the issue. The problem is in the following line: {code} updatedResult.map(_._2) .checkpoint(Seconds(10)) {code} Because `checkpoint` is called after `map`, only MapPartitionsRDD will do checkpoint and the state RDD in updateStateByKey will be skipped. When an RDD executes checkpointing, its dependencies (parent RDDs) will be skipped. You can just switch `map` and `checkpoint` like this: {code} updatedResult.checkpoint(Seconds(10)).map(_._2) {code} > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15114722#comment-15114722 ] Jack Hu commented on SPARK-6847: Test on latest 1.6 branch (f913f7e [SPARK-12120][PYSPARK] Improve exception message when failing to init), it still exists. > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15113490#comment-15113490 ] Shixiong Zhu commented on SPARK-6847: - It has not yet been released. You need to use the master branch or 1.6 branch to test it. > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15111849#comment-15111849 ] Jack Hu commented on SPARK-6847: Hi [~zsxwing] I just test a simple case with 1.6, it still exists: {code} batch interval = 2 seconds source.updateStateByKey(func).map(f).checkpoint(2 seconds) {code} > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15109955#comment-15109955 ] Shixiong Zhu commented on SPARK-6847: - I think this one has been fixed by https://github.com/apache/spark/pull/10623 Could you try the latest codes of master or 1.6? > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008993#comment-15008993 ] Glyton Camilleri commented on SPARK-6847: - Hi Yunjie, Whether 50 seconds is good or not as a checkpoint interval depends largely on the time-window the stream is acting on; so if the stream is set to execute jobs every 10 seconds, then 50 seconds could be fine. In my example code, {{certainConditionsAreMet}} was just a place-holder: the conditions met were application-specific in that case; so in other words, there were conditions under which we would perform the side-effect on the stream, which in our case ((1) above) was saving the contents of the stream to HDFS. So the fix looked something like this: {code} def isTimeToSave: Boolean = ... // this function decides whether it's time to store the contents of the stream to HDFS def saveData[A](stream: DStream[A]) = if (isTimeToSave) stream.foreachRDD { ... // write data in HDFS } else stream.foreachRDD { _.foreachPartition { _ => () } // just do nothing } {code} The {{else}} part is what i'm referring to above. > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008908#comment-15008908 ] Yunjie Qiu commented on SPARK-6847: --- Hi Glyton Camilleri & Jack Hu, We also ran into the same StackOverflow issue in our application, where we wrote like val dStream1 = context.union(kafkaStreams).updateStateByKey(updateFunc).checkpoint(Seconds(50)) I've read about your comments, does it mean that I can get rid of this issue by simply add an extra meaningless map() step to dStream1? Or I should do something like val workaroundStream = dStream1.map(...).checkpoint(Seconds(some_value_other_than_50)) ? I was confused by what certainConditionsAreMet refers to, or what kind of the content should be filled _.foreachPartition { ... } that I had to ask here for detail. Best Regards. > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008909#comment-15008909 ] Yunjie Qiu commented on SPARK-6847: --- Hi Glyton Camilleri & Jack Hu, We also ran into the same StackOverflow issue in our application, where we wrote like val dStream1 = context.union(kafkaStreams).updateStateByKey(updateFunc).checkpoint(Seconds(50)) I've read about your comments, does it mean that I can get rid of this issue by simply add an extra meaningless map() step to dStream1? Or I should do something like val workaroundStream = dStream1.map(...).checkpoint(Seconds(some_value_other_than_50)) ? I was confused by what certainConditionsAreMet refers to, or what kind of the content should be filled _.foreachPartition { ... } that I had to ask here for detail. Best Regards. > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14971004#comment-14971004 ] Glyton Camilleri commented on SPARK-6847: - Hi, we managed to actually get rid of the overflow issues by settings checkpoints on more streams than we thought we needed to, in addition to implementing a small change following your suggestion; before the fix, the setup was similar to what you describe: {code} val dStream1 = // create kafka stream and do some preprocessing val dStream2 = dStream1.updateStateByKey { func }.checkpoint(timeWindow * 2) val dStream3 = dStream2.map { ... } // (1) perform some side-effect on the state if (certainConditionsAreMet) dStream2.foreachRDD { _.foreachPartition { ... } } // (2) publish final results to a set of Kafka topics dStream3.transform { ... }.foreachRDD { _.foreachPartition { ... } } {code} There were two things we did: a) set different checkpoints for {{dStream2}} and {{dStream3}}, whereas before we were only setting the checkpoint for {{dStream2}} b) changed (1) above such then when {{!certainConditionsAreMet}}, we just consume the stream like you describe in your suggestion I honestly think that b) was more likely to be influential in removing the StackOverflowError really, but we decided to leave the checkpoint settings in a) there anyway. Apologies for the late follow-up, but we needed to make sure the issue had actually been resolved. > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14951593#comment-14951593 ] Jack Hu commented on SPARK-6847: Hi [~glyton.camilleri] You can check whether there are two dstreams in the DAG need to be checkpointed (updateStateByKey, reduceByKeyAndWindow), it yes, you can workaround this to use some output for the previous DStream which needs to checkpointed. {code} val d1 = input.updateStateByKey(func) val d2 = d1.map(...).updateStateByKey(func) d2.foreachRDD(rdd => print(rdd.count)) /// workaround the stack over flow listed in this JIRA d1.foreachRDD(rdd => rdd.foreach(_ => Unit)) {code} > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14950156#comment-14950156 ] Glyton Camilleri commented on SPARK-6847: - Hi, I've also bumped into this very same issue but couldn't find a good value for {{checkpoint}}; our setup consists of a kafka-stream with 10s time-window, trying various values for the checkpoint interval (default, 10s, and 15s). It always takes a long time for the exception to appear, often in the range of 10 hours or so, making the problem relatively painful to debug. We'll be trying to investigate further, but it would be great if someone could shed some more light on the issue. > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14495914#comment-14495914 ] Jack Hu commented on SPARK-6847: I did a little more investigation about this issue, that appears to be a problem with some operations({{updateStateByKey}}, {{reduceByKeyAndWindow}} with in-reduce function) which must be check-pointed and followed by a operation with checkpoint (either manual added like the code of this JIRA description or an operation which must be check-pointed) and the checkpoint interval of these two operation is the same (or the followed operation has a checkpoint interval the same with batch interval). The following code will have this issue: assume default batch interval is 2 seconds, the default checkpoint interval is 10 seconds # {{source.updateStateByKey(func).map(f).checkpoint(10 seconds)}} # {{source.updateStateByKey(func).map(f).updateStateByKey(func2)}} # {{source.updateStateByKey(func).map(f).checkpoint(2 seconds)}} These DO NOT have this issue # {{source.updateStateByKey(func).map(f).checkpoint(4 seconds)}} # {{source.updateStateByKey(func).map(f).updateStateByKey(func2).checkpoint(4 seconds)}} A rdd graph which contains two rdds needs to be check-pointed would be generated from these sample codes. If the child(ren) rdd(s) also need to do the checkpoint at the same time the parent needs to do, then the parent will not do checkpoint according the {{rdd.doCheckpoint}}. In this case, the rdd comes from {{updateStateByKey}} will never be check-pointed at the issued sample code, that leads the stack overflow. ({{updateStateByKey}} needs checkpoint to break the dependency in this operation) If the child(ren) rdd(s) is not always check-pointed at the same time of the parent needs to do, there is a chance that the parent rdd (comes from {{updateStateByKey}}) can do some successful checkpoint to break the dependency, although the checkpoint may have some delay. So no stack overflow will happen. So, currently, we got a workaround of this issue by setting the checkpoint interval to different values if we use operations that must be check-pointed in streaming project. Maybe this is not a easy fix here, hope we can add some validation at least > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14493804#comment-14493804 ] Jack Hu commented on SPARK-6847: Hi, [~sowen] The checkpoint interval can not be turn down (smaller than 10 seconds) since it must be bigger or equal than the batch interval. I will try to more checkpoint interval like 20 seconds, 30 seconds... We have a real case that has the same problem, it only updates small set of values per key per interval (one event per key per interval) One observation is that: the {{updateStateByKey}} is automatically checkpointed > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14493749#comment-14493749 ] Sean Owen commented on SPARK-6847: -- Yeah, doesn't quite help since it is not clear where it starts, but that top may be lost. Given the observations, the problem may be putting all of the input data into one key, effectively making all RDDs one record, then checkpointing that infrequently, which means it goes to serialize a large object. "Large" isn't the problem but whatever it is seems to have a long object dependency graph, maybe a linked list of blocks for example. This would explain why no checkpointing or smaller intervals, could be the difference. How about also turning down the checkpoint interval? It shouldn't occur ideally but this might be pushing the intended usage a bit far by having as skewed a data distribution as possible. Does this come up in real usage? You'd generally expect the data per key per interval to be smallish. > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14493517#comment-14493517 ] Jack Hu commented on SPARK-6847: Here is the part of the stack (Full stack at: https://gist.github.com/jhu-chang/38a6c052aff1d666b785) {quote} 15/04/14 11:28:20 [Executor task launch worker-1] ERROR org.apache.spark.executor.Executor: Exception in task 1.0 in stage 27554.0 (TID 3801) java.lang.StackOverflowError at java.io.ObjectStreamClass.setPrimFieldValues(ObjectStreamClass.java:1243) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1984) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at scala.collection.immutable.$colon$colon.readObject(List.scala:366) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at scala.collection.immutable.$colon$colon.readObject(List.scala:362) at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) {quote} > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} wit
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14492126#comment-14492126 ] Sean Owen commented on SPARK-6847: -- Can you provide (the top part of) the stack overflow stack? so we can see where it's occurring. I think it's something building a very long object graph but that is the first step to confirm. > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set
[ https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14491873#comment-14491873 ] Jack Hu commented on SPARK-6847: Hi, [~sowen] I tested more cases: # only change the {{newlist.headOption.orElse(oldstate)}} to {{Some("a")}}, the issue still exists # only change the streaming batch interval to {{2 seconds}}, keep the {{newlist.headOption.orElse(oldstate)}} and checkpoint interval 10 seconds, the issue does not exist. So this issue may related to the checkpoint interval and batch interval. > Stack overflow on updateStateByKey which followed by a dstream with > checkpoint set > -- > > Key: SPARK-6847 > URL: https://issues.apache.org/jira/browse/SPARK-6847 > Project: Spark > Issue Type: Bug > Components: Streaming >Affects Versions: 1.3.0 >Reporter: Jack Hu > Labels: StackOverflowError, Streaming > > The issue happens with the following sample code: uses {{updateStateByKey}} > followed by a {{map}} with checkpoint interval 10 seconds > {code} > val sparkConf = new SparkConf().setAppName("test") > val streamingContext = new StreamingContext(sparkConf, Seconds(10)) > streamingContext.checkpoint("""checkpoint""") > val source = streamingContext.socketTextStream("localhost", ) > val updatedResult = source.map( > (1,_)).updateStateByKey( > (newlist : Seq[String], oldstate : Option[String]) => > newlist.headOption.orElse(oldstate)) > updatedResult.map(_._2) > .checkpoint(Seconds(10)) > .foreachRDD((rdd, t) => { > println("Deep: " + rdd.toDebugString.split("\n").length) > println(t.toString() + ": " + rdd.collect.length) > }) > streamingContext.start() > streamingContext.awaitTermination() > {code} > From the output, we can see that the dependency will be increasing time over > time, the {{updateStateByKey}} never get check-pointed, and finally, the > stack overflow will happen. > Note: > * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but > not the {{updateStateByKey}} > * If remove the {{checkpoint(Seconds(10))}} from the map result ( > {{updatedResult.map(_._2)}} ), the stack overflow will not happen -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org