[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2016-02-01 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15126928#comment-15126928
 ] 

Shixiong Zhu commented on SPARK-6847:
-

As my PR changed internal semantics, it's only merged to master branch (2.0.0). 

For pre 2.0.0, you may need to trigger the checkpoint by yourself. E.g., for 
{{updateStateByKey().filter().updateStateByKey()}}, you can update to 
{{dstream.updateStateByKey().count(); 
dstream.updateStateByKey().filter().updateStateByKey()}} to trigger the 
checkpoint for the first "updateStateByKey".

> Stack overflow on updateStateByKey which followed by a dstream with 
> checkpoint set
> --
>
> Key: SPARK-6847
> URL: https://issues.apache.org/jira/browse/SPARK-6847
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0, 1.4.1, 1.5.2, 1.6.0
>Reporter: Jack Hu
>Assignee: Shixiong Zhu
>Priority: Critical
>  Labels: StackOverflowError, Streaming
> Fix For: 2.0.0
>
>
> The issue happens with the following sample code: uses {{updateStateByKey}} 
> followed by a {{map}} with checkpoint interval 10 seconds
> {code}
> val sparkConf = new SparkConf().setAppName("test")
> val streamingContext = new StreamingContext(sparkConf, Seconds(10))
> streamingContext.checkpoint("""checkpoint""")
> val source = streamingContext.socketTextStream("localhost", )
> val updatedResult = source.map(
> (1,_)).updateStateByKey(
> (newlist : Seq[String], oldstate : Option[String]) => 
> newlist.headOption.orElse(oldstate))
> updatedResult.map(_._2)
> .checkpoint(Seconds(10))
> .foreachRDD((rdd, t) => {
>   println("Deep: " + rdd.toDebugString.split("\n").length)
>   println(t.toString() + ": " + rdd.collect.length)
> })
> streamingContext.start()
> streamingContext.awaitTermination()
> {code}
> From the output, we can see that the dependency will be increasing time over 
> time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
> stack overflow will happen. 
> Note:
> * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
> not the {{updateStateByKey}} 
> * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
> {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2016-01-26 Thread Apache Spark (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15118335#comment-15118335
 ] 

Apache Spark commented on SPARK-6847:
-

User 'zsxwing' has created a pull request for this issue:
https://github.com/apache/spark/pull/10934

> Stack overflow on updateStateByKey which followed by a dstream with 
> checkpoint set
> --
>
> Key: SPARK-6847
> URL: https://issues.apache.org/jira/browse/SPARK-6847
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0, 1.4.1, 1.5.2, 1.6.0
>Reporter: Jack Hu
>Assignee: Shixiong Zhu
>Priority: Critical
>  Labels: StackOverflowError, Streaming
>
> The issue happens with the following sample code: uses {{updateStateByKey}} 
> followed by a {{map}} with checkpoint interval 10 seconds
> {code}
> val sparkConf = new SparkConf().setAppName("test")
> val streamingContext = new StreamingContext(sparkConf, Seconds(10))
> streamingContext.checkpoint("""checkpoint""")
> val source = streamingContext.socketTextStream("localhost", )
> val updatedResult = source.map(
> (1,_)).updateStateByKey(
> (newlist : Seq[String], oldstate : Option[String]) => 
> newlist.headOption.orElse(oldstate))
> updatedResult.map(_._2)
> .checkpoint(Seconds(10))
> .foreachRDD((rdd, t) => {
>   println("Deep: " + rdd.toDebugString.split("\n").length)
>   println(t.toString() + ": " + rdd.collect.length)
> })
> streamingContext.start()
> streamingContext.awaitTermination()
> {code}
> From the output, we can see that the dependency will be increasing time over 
> time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
> stack overflow will happen. 
> Note:
> * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
> not the {{updateStateByKey}} 
> * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
> {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2016-01-25 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15116044#comment-15116044
 ] 

Shixiong Zhu commented on SPARK-6847:
-

I found the issue. The problem is in the following line:

{code}
updatedResult.map(_._2)
  .checkpoint(Seconds(10))
{code}

Because `checkpoint` is called after `map`, only MapPartitionsRDD will do 
checkpoint and the state RDD in updateStateByKey will be skipped. When an RDD 
executes checkpointing, its dependencies (parent RDDs) will be skipped. You can 
just switch `map` and `checkpoint` like this:

{code}
updatedResult.checkpoint(Seconds(10)).map(_._2)
{code}


> Stack overflow on updateStateByKey which followed by a dstream with 
> checkpoint set
> --
>
> Key: SPARK-6847
> URL: https://issues.apache.org/jira/browse/SPARK-6847
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Jack Hu
>  Labels: StackOverflowError, Streaming
>
> The issue happens with the following sample code: uses {{updateStateByKey}} 
> followed by a {{map}} with checkpoint interval 10 seconds
> {code}
> val sparkConf = new SparkConf().setAppName("test")
> val streamingContext = new StreamingContext(sparkConf, Seconds(10))
> streamingContext.checkpoint("""checkpoint""")
> val source = streamingContext.socketTextStream("localhost", )
> val updatedResult = source.map(
> (1,_)).updateStateByKey(
> (newlist : Seq[String], oldstate : Option[String]) => 
> newlist.headOption.orElse(oldstate))
> updatedResult.map(_._2)
> .checkpoint(Seconds(10))
> .foreachRDD((rdd, t) => {
>   println("Deep: " + rdd.toDebugString.split("\n").length)
>   println(t.toString() + ": " + rdd.collect.length)
> })
> streamingContext.start()
> streamingContext.awaitTermination()
> {code}
> From the output, we can see that the dependency will be increasing time over 
> time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
> stack overflow will happen. 
> Note:
> * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
> not the {{updateStateByKey}} 
> * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
> {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2016-01-25 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15116545#comment-15116545
 ] 

Jack Hu commented on SPARK-6847:


Hi [~zsxwing]

Even user does not implicit do checkpoint after the {{upstateByKey}}, this 
issue still will happen in following cases
# {{updateStateByKey().filter().updateStateByKey()}}
# {{updateStateByKey().filter().reduceByKeyAndWindow(reduce, inreduce, ...)}}
# {{reduceByKeyAndWindow(reduce,inreduce,...).filter().udateStateByKey()}}

If do not plan to fix this issue, may be an implicit workaround/warning should 
give to user to such usage. 
It will be very hard to find the real cause if the application is complicate. 


> Stack overflow on updateStateByKey which followed by a dstream with 
> checkpoint set
> --
>
> Key: SPARK-6847
> URL: https://issues.apache.org/jira/browse/SPARK-6847
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Jack Hu
>  Labels: StackOverflowError, Streaming
>
> The issue happens with the following sample code: uses {{updateStateByKey}} 
> followed by a {{map}} with checkpoint interval 10 seconds
> {code}
> val sparkConf = new SparkConf().setAppName("test")
> val streamingContext = new StreamingContext(sparkConf, Seconds(10))
> streamingContext.checkpoint("""checkpoint""")
> val source = streamingContext.socketTextStream("localhost", )
> val updatedResult = source.map(
> (1,_)).updateStateByKey(
> (newlist : Seq[String], oldstate : Option[String]) => 
> newlist.headOption.orElse(oldstate))
> updatedResult.map(_._2)
> .checkpoint(Seconds(10))
> .foreachRDD((rdd, t) => {
>   println("Deep: " + rdd.toDebugString.split("\n").length)
>   println(t.toString() + ": " + rdd.collect.length)
> })
> streamingContext.start()
> streamingContext.awaitTermination()
> {code}
> From the output, we can see that the dependency will be increasing time over 
> time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
> stack overflow will happen. 
> Note:
> * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
> not the {{updateStateByKey}} 
> * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
> {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2016-01-24 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15114722#comment-15114722
 ] 

Jack Hu commented on SPARK-6847:


Test on latest 1.6 branch (f913f7e [SPARK-12120][PYSPARK] Improve exception 
message when failing to init), it still exists.

> Stack overflow on updateStateByKey which followed by a dstream with 
> checkpoint set
> --
>
> Key: SPARK-6847
> URL: https://issues.apache.org/jira/browse/SPARK-6847
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Jack Hu
>  Labels: StackOverflowError, Streaming
>
> The issue happens with the following sample code: uses {{updateStateByKey}} 
> followed by a {{map}} with checkpoint interval 10 seconds
> {code}
> val sparkConf = new SparkConf().setAppName("test")
> val streamingContext = new StreamingContext(sparkConf, Seconds(10))
> streamingContext.checkpoint("""checkpoint""")
> val source = streamingContext.socketTextStream("localhost", )
> val updatedResult = source.map(
> (1,_)).updateStateByKey(
> (newlist : Seq[String], oldstate : Option[String]) => 
> newlist.headOption.orElse(oldstate))
> updatedResult.map(_._2)
> .checkpoint(Seconds(10))
> .foreachRDD((rdd, t) => {
>   println("Deep: " + rdd.toDebugString.split("\n").length)
>   println(t.toString() + ": " + rdd.collect.length)
> })
> streamingContext.start()
> streamingContext.awaitTermination()
> {code}
> From the output, we can see that the dependency will be increasing time over 
> time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
> stack overflow will happen. 
> Note:
> * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
> not the {{updateStateByKey}} 
> * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
> {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2016-01-22 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15113490#comment-15113490
 ] 

Shixiong Zhu commented on SPARK-6847:
-

It has not yet been released. You need to use the master branch or 1.6 branch 
to test it.

> Stack overflow on updateStateByKey which followed by a dstream with 
> checkpoint set
> --
>
> Key: SPARK-6847
> URL: https://issues.apache.org/jira/browse/SPARK-6847
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Jack Hu
>  Labels: StackOverflowError, Streaming
>
> The issue happens with the following sample code: uses {{updateStateByKey}} 
> followed by a {{map}} with checkpoint interval 10 seconds
> {code}
> val sparkConf = new SparkConf().setAppName("test")
> val streamingContext = new StreamingContext(sparkConf, Seconds(10))
> streamingContext.checkpoint("""checkpoint""")
> val source = streamingContext.socketTextStream("localhost", )
> val updatedResult = source.map(
> (1,_)).updateStateByKey(
> (newlist : Seq[String], oldstate : Option[String]) => 
> newlist.headOption.orElse(oldstate))
> updatedResult.map(_._2)
> .checkpoint(Seconds(10))
> .foreachRDD((rdd, t) => {
>   println("Deep: " + rdd.toDebugString.split("\n").length)
>   println(t.toString() + ": " + rdd.collect.length)
> })
> streamingContext.start()
> streamingContext.awaitTermination()
> {code}
> From the output, we can see that the dependency will be increasing time over 
> time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
> stack overflow will happen. 
> Note:
> * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
> not the {{updateStateByKey}} 
> * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
> {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2016-01-21 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15111849#comment-15111849
 ] 

Jack Hu commented on SPARK-6847:


Hi [~zsxwing]

I just test a simple case with 1.6, it still exists:
{code}
batch interval = 2 seconds
source.updateStateByKey(func).map(f).checkpoint(2 seconds)
{code}

> Stack overflow on updateStateByKey which followed by a dstream with 
> checkpoint set
> --
>
> Key: SPARK-6847
> URL: https://issues.apache.org/jira/browse/SPARK-6847
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Jack Hu
>  Labels: StackOverflowError, Streaming
>
> The issue happens with the following sample code: uses {{updateStateByKey}} 
> followed by a {{map}} with checkpoint interval 10 seconds
> {code}
> val sparkConf = new SparkConf().setAppName("test")
> val streamingContext = new StreamingContext(sparkConf, Seconds(10))
> streamingContext.checkpoint("""checkpoint""")
> val source = streamingContext.socketTextStream("localhost", )
> val updatedResult = source.map(
> (1,_)).updateStateByKey(
> (newlist : Seq[String], oldstate : Option[String]) => 
> newlist.headOption.orElse(oldstate))
> updatedResult.map(_._2)
> .checkpoint(Seconds(10))
> .foreachRDD((rdd, t) => {
>   println("Deep: " + rdd.toDebugString.split("\n").length)
>   println(t.toString() + ": " + rdd.collect.length)
> })
> streamingContext.start()
> streamingContext.awaitTermination()
> {code}
> From the output, we can see that the dependency will be increasing time over 
> time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
> stack overflow will happen. 
> Note:
> * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
> not the {{updateStateByKey}} 
> * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
> {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2016-01-20 Thread Shixiong Zhu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15109955#comment-15109955
 ] 

Shixiong Zhu commented on SPARK-6847:
-

I think this one has been fixed by https://github.com/apache/spark/pull/10623

Could you try the latest codes of master or 1.6?

> Stack overflow on updateStateByKey which followed by a dstream with 
> checkpoint set
> --
>
> Key: SPARK-6847
> URL: https://issues.apache.org/jira/browse/SPARK-6847
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Jack Hu
>  Labels: StackOverflowError, Streaming
>
> The issue happens with the following sample code: uses {{updateStateByKey}} 
> followed by a {{map}} with checkpoint interval 10 seconds
> {code}
> val sparkConf = new SparkConf().setAppName("test")
> val streamingContext = new StreamingContext(sparkConf, Seconds(10))
> streamingContext.checkpoint("""checkpoint""")
> val source = streamingContext.socketTextStream("localhost", )
> val updatedResult = source.map(
> (1,_)).updateStateByKey(
> (newlist : Seq[String], oldstate : Option[String]) => 
> newlist.headOption.orElse(oldstate))
> updatedResult.map(_._2)
> .checkpoint(Seconds(10))
> .foreachRDD((rdd, t) => {
>   println("Deep: " + rdd.toDebugString.split("\n").length)
>   println(t.toString() + ": " + rdd.collect.length)
> })
> streamingContext.start()
> streamingContext.awaitTermination()
> {code}
> From the output, we can see that the dependency will be increasing time over 
> time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
> stack overflow will happen. 
> Note:
> * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
> not the {{updateStateByKey}} 
> * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
> {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2015-11-17 Thread Yunjie Qiu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15008909#comment-15008909
 ] 

Yunjie Qiu commented on SPARK-6847:
---

Hi Glyton Camilleri & Jack Hu,

  We also ran into the same StackOverflow issue in our application, where we 
wrote like

val dStream1 = 
context.union(kafkaStreams).updateStateByKey(updateFunc).checkpoint(Seconds(50))

  I've read about your comments, does it mean that I can get rid of this issue 
by simply add an extra meaningless map() step to dStream1? Or I should do 
something like

val workaroundStream = 
dStream1.map(...).checkpoint(Seconds(some_value_other_than_50)) ?

  I was confused by what certainConditionsAreMet refers to, or what kind of the 
content should be filled _.foreachPartition { ... } that I had to ask here for 
detail.

Best Regards.

> Stack overflow on updateStateByKey which followed by a dstream with 
> checkpoint set
> --
>
> Key: SPARK-6847
> URL: https://issues.apache.org/jira/browse/SPARK-6847
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Jack Hu
>  Labels: StackOverflowError, Streaming
>
> The issue happens with the following sample code: uses {{updateStateByKey}} 
> followed by a {{map}} with checkpoint interval 10 seconds
> {code}
> val sparkConf = new SparkConf().setAppName("test")
> val streamingContext = new StreamingContext(sparkConf, Seconds(10))
> streamingContext.checkpoint("""checkpoint""")
> val source = streamingContext.socketTextStream("localhost", )
> val updatedResult = source.map(
> (1,_)).updateStateByKey(
> (newlist : Seq[String], oldstate : Option[String]) => 
> newlist.headOption.orElse(oldstate))
> updatedResult.map(_._2)
> .checkpoint(Seconds(10))
> .foreachRDD((rdd, t) => {
>   println("Deep: " + rdd.toDebugString.split("\n").length)
>   println(t.toString() + ": " + rdd.collect.length)
> })
> streamingContext.start()
> streamingContext.awaitTermination()
> {code}
> From the output, we can see that the dependency will be increasing time over 
> time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
> stack overflow will happen. 
> Note:
> * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
> not the {{updateStateByKey}} 
> * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
> {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2015-11-17 Thread Yunjie Qiu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15008908#comment-15008908
 ] 

Yunjie Qiu commented on SPARK-6847:
---

Hi Glyton Camilleri & Jack Hu,

  We also ran into the same StackOverflow issue in our application, where we 
wrote like

val dStream1 = 
context.union(kafkaStreams).updateStateByKey(updateFunc).checkpoint(Seconds(50))

  I've read about your comments, does it mean that I can get rid of this issue 
by simply add an extra meaningless map() step to dStream1? Or I should do 
something like

val workaroundStream = 
dStream1.map(...).checkpoint(Seconds(some_value_other_than_50)) ?

  I was confused by what certainConditionsAreMet refers to, or what kind of the 
content should be filled _.foreachPartition { ... } that I had to ask here for 
detail.

Best Regards.

> Stack overflow on updateStateByKey which followed by a dstream with 
> checkpoint set
> --
>
> Key: SPARK-6847
> URL: https://issues.apache.org/jira/browse/SPARK-6847
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Jack Hu
>  Labels: StackOverflowError, Streaming
>
> The issue happens with the following sample code: uses {{updateStateByKey}} 
> followed by a {{map}} with checkpoint interval 10 seconds
> {code}
> val sparkConf = new SparkConf().setAppName("test")
> val streamingContext = new StreamingContext(sparkConf, Seconds(10))
> streamingContext.checkpoint("""checkpoint""")
> val source = streamingContext.socketTextStream("localhost", )
> val updatedResult = source.map(
> (1,_)).updateStateByKey(
> (newlist : Seq[String], oldstate : Option[String]) => 
> newlist.headOption.orElse(oldstate))
> updatedResult.map(_._2)
> .checkpoint(Seconds(10))
> .foreachRDD((rdd, t) => {
>   println("Deep: " + rdd.toDebugString.split("\n").length)
>   println(t.toString() + ": " + rdd.collect.length)
> })
> streamingContext.start()
> streamingContext.awaitTermination()
> {code}
> From the output, we can see that the dependency will be increasing time over 
> time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
> stack overflow will happen. 
> Note:
> * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
> not the {{updateStateByKey}} 
> * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
> {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2015-11-17 Thread Glyton Camilleri (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15008993#comment-15008993
 ] 

Glyton Camilleri commented on SPARK-6847:
-

Hi Yunjie,

Whether 50 seconds is good or not as a checkpoint interval depends largely on 
the time-window the stream is acting on; so if the stream is set to execute 
jobs every 10 seconds, then 50 seconds could be fine.

In my example code, {{certainConditionsAreMet}} was just a place-holder: the 
conditions met were application-specific in that case; so in other words, there 
were conditions under which we would perform the side-effect on the stream, 
which in our case ((1) above) was saving the contents of the stream to HDFS. So 
the fix looked something like this:

{code}
  def isTimeToSave: Boolean = ... // this function decides whether it's time to 
store the contents of the stream to HDFS

  def saveData[A](stream: DStream[A]) = if (isTimeToSave) stream.foreachRDD { 
... // write data in HDFS
  } else stream.foreachRDD { 
_.foreachPartition { _ => () } // just do nothing 
  }   
{code}

The {{else}} part is what i'm referring to above.

> Stack overflow on updateStateByKey which followed by a dstream with 
> checkpoint set
> --
>
> Key: SPARK-6847
> URL: https://issues.apache.org/jira/browse/SPARK-6847
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Jack Hu
>  Labels: StackOverflowError, Streaming
>
> The issue happens with the following sample code: uses {{updateStateByKey}} 
> followed by a {{map}} with checkpoint interval 10 seconds
> {code}
> val sparkConf = new SparkConf().setAppName("test")
> val streamingContext = new StreamingContext(sparkConf, Seconds(10))
> streamingContext.checkpoint("""checkpoint""")
> val source = streamingContext.socketTextStream("localhost", )
> val updatedResult = source.map(
> (1,_)).updateStateByKey(
> (newlist : Seq[String], oldstate : Option[String]) => 
> newlist.headOption.orElse(oldstate))
> updatedResult.map(_._2)
> .checkpoint(Seconds(10))
> .foreachRDD((rdd, t) => {
>   println("Deep: " + rdd.toDebugString.split("\n").length)
>   println(t.toString() + ": " + rdd.collect.length)
> })
> streamingContext.start()
> streamingContext.awaitTermination()
> {code}
> From the output, we can see that the dependency will be increasing time over 
> time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
> stack overflow will happen. 
> Note:
> * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
> not the {{updateStateByKey}} 
> * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
> {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2015-10-23 Thread Glyton Camilleri (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14971004#comment-14971004
 ] 

Glyton Camilleri commented on SPARK-6847:
-

Hi,
we managed to actually get rid of the overflow issues by settings checkpoints 
on more streams than we thought we needed to, in addition to implementing a 
small change following your suggestion; before the fix, the setup was similar 
to what you describe:

{code}
val dStream1 = // create kafka stream and do some preprocessing
val dStream2 = dStream1.updateStateByKey { func }.checkpoint(timeWindow * 2)
val dStream3 = dStream2.map { ... }

// (1) perform some side-effect on the state
if (certainConditionsAreMet) dStream2.foreachRDD { 
  _.foreachPartition { ... }
}

// (2) publish final results to a set of Kafka topics
dStream3.transform { ... }.foreachRDD {
  _.foreachPartition { ... }
}
{code}

There were two things we did:
a) set different checkpoints for {{dStream2}} and {{dStream3}}, whereas before 
we were only setting the checkpoint for {{dStream2}}
b) changed (1) above such then when {{!certainConditionsAreMet}}, we just 
consume the stream like you describe in your suggestion

I honestly think that b) was more likely to be influential in removing the 
StackOverflowError really, but we decided to leave the checkpoint settings in 
a) there anyway.
Apologies for the late follow-up, but we needed to make sure the issue had 
actually been resolved.

> Stack overflow on updateStateByKey which followed by a dstream with 
> checkpoint set
> --
>
> Key: SPARK-6847
> URL: https://issues.apache.org/jira/browse/SPARK-6847
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Jack Hu
>  Labels: StackOverflowError, Streaming
>
> The issue happens with the following sample code: uses {{updateStateByKey}} 
> followed by a {{map}} with checkpoint interval 10 seconds
> {code}
> val sparkConf = new SparkConf().setAppName("test")
> val streamingContext = new StreamingContext(sparkConf, Seconds(10))
> streamingContext.checkpoint("""checkpoint""")
> val source = streamingContext.socketTextStream("localhost", )
> val updatedResult = source.map(
> (1,_)).updateStateByKey(
> (newlist : Seq[String], oldstate : Option[String]) => 
> newlist.headOption.orElse(oldstate))
> updatedResult.map(_._2)
> .checkpoint(Seconds(10))
> .foreachRDD((rdd, t) => {
>   println("Deep: " + rdd.toDebugString.split("\n").length)
>   println(t.toString() + ": " + rdd.collect.length)
> })
> streamingContext.start()
> streamingContext.awaitTermination()
> {code}
> From the output, we can see that the dependency will be increasing time over 
> time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
> stack overflow will happen. 
> Note:
> * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
> not the {{updateStateByKey}} 
> * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
> {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2015-10-09 Thread Glyton Camilleri (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14950156#comment-14950156
 ] 

Glyton Camilleri commented on SPARK-6847:
-

Hi, 

I've also bumped into this very same issue but couldn't find a good value for 
{{checkpoint}}; our setup consists of a kafka-stream with 10s time-window, 
trying various values for the checkpoint interval (default, 10s, and 15s). 

It always takes a long time for the exception to appear, often in the range of 
10 hours or so, making the problem relatively painful to debug. We'll be trying 
to investigate further, but it would be great if someone could shed some more 
light on the issue.

> Stack overflow on updateStateByKey which followed by a dstream with 
> checkpoint set
> --
>
> Key: SPARK-6847
> URL: https://issues.apache.org/jira/browse/SPARK-6847
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Jack Hu
>  Labels: StackOverflowError, Streaming
>
> The issue happens with the following sample code: uses {{updateStateByKey}} 
> followed by a {{map}} with checkpoint interval 10 seconds
> {code}
> val sparkConf = new SparkConf().setAppName("test")
> val streamingContext = new StreamingContext(sparkConf, Seconds(10))
> streamingContext.checkpoint("""checkpoint""")
> val source = streamingContext.socketTextStream("localhost", )
> val updatedResult = source.map(
> (1,_)).updateStateByKey(
> (newlist : Seq[String], oldstate : Option[String]) => 
> newlist.headOption.orElse(oldstate))
> updatedResult.map(_._2)
> .checkpoint(Seconds(10))
> .foreachRDD((rdd, t) => {
>   println("Deep: " + rdd.toDebugString.split("\n").length)
>   println(t.toString() + ": " + rdd.collect.length)
> })
> streamingContext.start()
> streamingContext.awaitTermination()
> {code}
> From the output, we can see that the dependency will be increasing time over 
> time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
> stack overflow will happen. 
> Note:
> * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
> not the {{updateStateByKey}} 
> * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
> {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2015-10-09 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14951593#comment-14951593
 ] 

Jack Hu commented on SPARK-6847:


Hi [~glyton.camilleri]
You can check whether there are two dstreams in the DAG need to be checkpointed 
(updateStateByKey, reduceByKeyAndWindow), it yes, you can workaround this to 
use some output for the previous DStream which needs to checkpointed. 

{code}
val d1 = input.updateStateByKey(func)
val d2 = d1.map(...).updateStateByKey(func)
d2.foreachRDD(rdd => print(rdd.count))
/// workaround the stack over flow listed in this JIRA
d1.foreachRDD(rdd => rdd.foreach(_ => Unit))
{code}


> Stack overflow on updateStateByKey which followed by a dstream with 
> checkpoint set
> --
>
> Key: SPARK-6847
> URL: https://issues.apache.org/jira/browse/SPARK-6847
> Project: Spark
>  Issue Type: Bug
>  Components: Streaming
>Affects Versions: 1.3.0
>Reporter: Jack Hu
>  Labels: StackOverflowError, Streaming
>
> The issue happens with the following sample code: uses {{updateStateByKey}} 
> followed by a {{map}} with checkpoint interval 10 seconds
> {code}
> val sparkConf = new SparkConf().setAppName("test")
> val streamingContext = new StreamingContext(sparkConf, Seconds(10))
> streamingContext.checkpoint("""checkpoint""")
> val source = streamingContext.socketTextStream("localhost", )
> val updatedResult = source.map(
> (1,_)).updateStateByKey(
> (newlist : Seq[String], oldstate : Option[String]) => 
> newlist.headOption.orElse(oldstate))
> updatedResult.map(_._2)
> .checkpoint(Seconds(10))
> .foreachRDD((rdd, t) => {
>   println("Deep: " + rdd.toDebugString.split("\n").length)
>   println(t.toString() + ": " + rdd.collect.length)
> })
> streamingContext.start()
> streamingContext.awaitTermination()
> {code}
> From the output, we can see that the dependency will be increasing time over 
> time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
> stack overflow will happen. 
> Note:
> * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
> not the {{updateStateByKey}} 
> * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
> {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2015-04-15 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14495914#comment-14495914
 ] 

Jack Hu commented on SPARK-6847:


I did a little more investigation about this issue, that appears to be a 
problem with some operations({{updateStateByKey}}, {{reduceByKeyAndWindow}} 
with in-reduce function) which must be check-pointed and followed by a 
operation with checkpoint (either manual added like the code of this JIRA 
description or an operation which must be check-pointed) and the checkpoint 
interval of these two operation is the same (or the followed operation has a 
checkpoint interval the same with batch interval).
The following code will have this issue: assume default batch interval is 2 
seconds, the default checkpoint interval is 10 seconds
# {{source.updateStateByKey(func).map(f).checkpoint(10 seconds)}} 
# {{source.updateStateByKey(func).map(f).updateStateByKey(func2)}}
# {{source.updateStateByKey(func).map(f).checkpoint(2 seconds)}} 

These DO NOT have this issue
# {{source.updateStateByKey(func).map(f).checkpoint(4 seconds)}} 
# {{source.updateStateByKey(func).map(f).updateStateByKey(func2).checkpoint(4 
seconds)}}

A rdd graph which contains two rdds needs to be check-pointed would be 
generated from these sample codes. 

If the child(ren) rdd(s) also need to do the checkpoint at the same time the 
parent needs to do, then the parent will not do checkpoint according the 
{{rdd.doCheckpoint}}. In this case, the rdd comes from {{updateStateByKey}} 
will never be check-pointed at the issued sample code, that leads the stack 
overflow. ({{updateStateByKey}} needs checkpoint to break the dependency in 
this operation) 

If the child(ren) rdd(s) is not always check-pointed at the same time of the 
parent needs to do, there is a chance that the parent rdd (comes from 
{{updateStateByKey}}) can do some successful checkpoint to break the 
dependency, although the checkpoint may have some delay. So no stack overflow 
will happen.

So, currently, we got a workaround of this issue by setting the checkpoint 
interval to different values if we use operations that must be check-pointed in 
streaming project. Maybe this is not a easy fix here, hope we can add some 
validation at least

 Stack overflow on updateStateByKey which followed by a dstream with 
 checkpoint set
 --

 Key: SPARK-6847
 URL: https://issues.apache.org/jira/browse/SPARK-6847
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: Jack Hu
  Labels: StackOverflowError, Streaming

 The issue happens with the following sample code: uses {{updateStateByKey}} 
 followed by a {{map}} with checkpoint interval 10 seconds
 {code}
 val sparkConf = new SparkConf().setAppName(test)
 val streamingContext = new StreamingContext(sparkConf, Seconds(10))
 streamingContext.checkpoint(checkpoint)
 val source = streamingContext.socketTextStream(localhost, )
 val updatedResult = source.map(
 (1,_)).updateStateByKey(
 (newlist : Seq[String], oldstate : Option[String]) = 
 newlist.headOption.orElse(oldstate))
 updatedResult.map(_._2)
 .checkpoint(Seconds(10))
 .foreachRDD((rdd, t) = {
   println(Deep:  + rdd.toDebugString.split(\n).length)
   println(t.toString() + :  + rdd.collect.length)
 })
 streamingContext.start()
 streamingContext.awaitTermination()
 {code}
 From the output, we can see that the dependency will be increasing time over 
 time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
 stack overflow will happen. 
 Note:
 * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
 not the {{updateStateByKey}} 
 * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
 {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2015-04-14 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14493749#comment-14493749
 ] 

Sean Owen commented on SPARK-6847:
--

Yeah, doesn't quite help since it is not clear where it starts, but that top 
may be lost.

Given the observations, the problem may be putting all of the input data into 
one key, effectively making all RDDs one record, then checkpointing that 
infrequently, which means it goes to serialize a large object. Large isn't 
the problem but whatever it is seems to have a long object dependency graph, 
maybe a linked list of blocks for example. This would explain why no 
checkpointing or smaller intervals, could be the difference. How about also 
turning down the checkpoint interval?

It shouldn't occur ideally but this might be pushing the intended usage a bit 
far by having as skewed a data distribution as possible. Does this come up in 
real usage? You'd generally expect the data per key per interval to be smallish.

 Stack overflow on updateStateByKey which followed by a dstream with 
 checkpoint set
 --

 Key: SPARK-6847
 URL: https://issues.apache.org/jira/browse/SPARK-6847
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: Jack Hu
  Labels: StackOverflowError, Streaming

 The issue happens with the following sample code: uses {{updateStateByKey}} 
 followed by a {{map}} with checkpoint interval 10 seconds
 {code}
 val sparkConf = new SparkConf().setAppName(test)
 val streamingContext = new StreamingContext(sparkConf, Seconds(10))
 streamingContext.checkpoint(checkpoint)
 val source = streamingContext.socketTextStream(localhost, )
 val updatedResult = source.map(
 (1,_)).updateStateByKey(
 (newlist : Seq[String], oldstate : Option[String]) = 
 newlist.headOption.orElse(oldstate))
 updatedResult.map(_._2)
 .checkpoint(Seconds(10))
 .foreachRDD((rdd, t) = {
   println(Deep:  + rdd.toDebugString.split(\n).length)
   println(t.toString() + :  + rdd.collect.length)
 })
 streamingContext.start()
 streamingContext.awaitTermination()
 {code}
 From the output, we can see that the dependency will be increasing time over 
 time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
 stack overflow will happen. 
 Note:
 * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
 not the {{updateStateByKey}} 
 * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
 {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2015-04-14 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14493804#comment-14493804
 ] 

Jack Hu commented on SPARK-6847:


Hi, [~sowen]

The checkpoint interval can not be turn down (smaller than 10 seconds) since it 
must be bigger or equal than the batch interval. I will try to more checkpoint 
interval like 20 seconds, 30 seconds...

We have a real case that has the same problem, it only updates small set of 
values per key per interval (one event per key per interval)

One observation is that: the {{updateStateByKey}} is automatically checkpointed

 Stack overflow on updateStateByKey which followed by a dstream with 
 checkpoint set
 --

 Key: SPARK-6847
 URL: https://issues.apache.org/jira/browse/SPARK-6847
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: Jack Hu
  Labels: StackOverflowError, Streaming

 The issue happens with the following sample code: uses {{updateStateByKey}} 
 followed by a {{map}} with checkpoint interval 10 seconds
 {code}
 val sparkConf = new SparkConf().setAppName(test)
 val streamingContext = new StreamingContext(sparkConf, Seconds(10))
 streamingContext.checkpoint(checkpoint)
 val source = streamingContext.socketTextStream(localhost, )
 val updatedResult = source.map(
 (1,_)).updateStateByKey(
 (newlist : Seq[String], oldstate : Option[String]) = 
 newlist.headOption.orElse(oldstate))
 updatedResult.map(_._2)
 .checkpoint(Seconds(10))
 .foreachRDD((rdd, t) = {
   println(Deep:  + rdd.toDebugString.split(\n).length)
   println(t.toString() + :  + rdd.collect.length)
 })
 streamingContext.start()
 streamingContext.awaitTermination()
 {code}
 From the output, we can see that the dependency will be increasing time over 
 time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
 stack overflow will happen. 
 Note:
 * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
 not the {{updateStateByKey}} 
 * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
 {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2015-04-13 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14493517#comment-14493517
 ] 

Jack Hu commented on SPARK-6847:


Here is the part of the stack (Full stack at: 
https://gist.github.com/jhu-chang/38a6c052aff1d666b785)
{quote}
15/04/14 11:28:20 [Executor task launch worker-1] ERROR 
org.apache.spark.executor.Executor: Exception in task 1.0 in stage 27554.0 (TID 
3801)
java.lang.StackOverflowError
at 
java.io.ObjectStreamClass.setPrimFieldValues(ObjectStreamClass.java:1243)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1984)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:366)
at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
{quote}

 Stack overflow on updateStateByKey which followed by a dstream with 
 checkpoint set
 --

 Key: SPARK-6847
 URL: https://issues.apache.org/jira/browse/SPARK-6847
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: Jack Hu
  Labels: StackOverflowError, Streaming

 The issue happens with the following sample code: uses {{updateStateByKey}} 
 followed by a {{map}} with checkpoint 

[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2015-04-13 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14492126#comment-14492126
 ] 

Sean Owen commented on SPARK-6847:
--

Can you provide (the top part of) the stack overflow stack? so we can see where 
it's occurring. I think it's something building a very long object graph but 
that is the first step to confirm.

 Stack overflow on updateStateByKey which followed by a dstream with 
 checkpoint set
 --

 Key: SPARK-6847
 URL: https://issues.apache.org/jira/browse/SPARK-6847
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: Jack Hu
  Labels: StackOverflowError, Streaming

 The issue happens with the following sample code: uses {{updateStateByKey}} 
 followed by a {{map}} with checkpoint interval 10 seconds
 {code}
 val sparkConf = new SparkConf().setAppName(test)
 val streamingContext = new StreamingContext(sparkConf, Seconds(10))
 streamingContext.checkpoint(checkpoint)
 val source = streamingContext.socketTextStream(localhost, )
 val updatedResult = source.map(
 (1,_)).updateStateByKey(
 (newlist : Seq[String], oldstate : Option[String]) = 
 newlist.headOption.orElse(oldstate))
 updatedResult.map(_._2)
 .checkpoint(Seconds(10))
 .foreachRDD((rdd, t) = {
   println(Deep:  + rdd.toDebugString.split(\n).length)
   println(t.toString() + :  + rdd.collect.length)
 })
 streamingContext.start()
 streamingContext.awaitTermination()
 {code}
 From the output, we can see that the dependency will be increasing time over 
 time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
 stack overflow will happen. 
 Note:
 * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
 not the {{updateStateByKey}} 
 * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
 {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-6847) Stack overflow on updateStateByKey which followed by a dstream with checkpoint set

2015-04-12 Thread Jack Hu (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14491873#comment-14491873
 ] 

Jack Hu commented on SPARK-6847:


Hi, [~sowen]

I tested more cases:
# only change the {{newlist.headOption.orElse(oldstate)}} to {{Some(a)}}, the 
issue still exists
# only change the streaming batch interval to {{2 seconds}}, keep the  
{{newlist.headOption.orElse(oldstate)}} and checkpoint interval 10 seconds, the 
issue does not exist. 

So this issue may related to the checkpoint interval and batch interval. 

 Stack overflow on updateStateByKey which followed by a dstream with 
 checkpoint set
 --

 Key: SPARK-6847
 URL: https://issues.apache.org/jira/browse/SPARK-6847
 Project: Spark
  Issue Type: Bug
  Components: Streaming
Affects Versions: 1.3.0
Reporter: Jack Hu
  Labels: StackOverflowError, Streaming

 The issue happens with the following sample code: uses {{updateStateByKey}} 
 followed by a {{map}} with checkpoint interval 10 seconds
 {code}
 val sparkConf = new SparkConf().setAppName(test)
 val streamingContext = new StreamingContext(sparkConf, Seconds(10))
 streamingContext.checkpoint(checkpoint)
 val source = streamingContext.socketTextStream(localhost, )
 val updatedResult = source.map(
 (1,_)).updateStateByKey(
 (newlist : Seq[String], oldstate : Option[String]) = 
 newlist.headOption.orElse(oldstate))
 updatedResult.map(_._2)
 .checkpoint(Seconds(10))
 .foreachRDD((rdd, t) = {
   println(Deep:  + rdd.toDebugString.split(\n).length)
   println(t.toString() + :  + rdd.collect.length)
 })
 streamingContext.start()
 streamingContext.awaitTermination()
 {code}
 From the output, we can see that the dependency will be increasing time over 
 time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
 stack overflow will happen. 
 Note:
 * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
 not the {{updateStateByKey}} 
 * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
 {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org