[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14495914#comment-14495914
 ] 

Jack Hu commented on SPARK-6847:
--------------------------------

I did a little more investigation about this issue, that appears to be a 
problem with some operations({{updateStateByKey}}, {{reduceByKeyAndWindow}} 
with in-reduce function) which must be check-pointed and followed by a 
operation with checkpoint (either manual added like the code of this JIRA 
description or an operation which must be check-pointed) and the checkpoint 
interval of these two operation is the same (or the followed operation has a 
checkpoint interval the same with batch interval).
The following code will have this issue: assume default batch interval is 2 
seconds, the default checkpoint interval is 10 seconds
# {{source.updateStateByKey(func).map(f).checkpoint(10 seconds)}} 
# {{source.updateStateByKey(func).map(f).updateStateByKey(func2)}}
# {{source.updateStateByKey(func).map(f).checkpoint(2 seconds)}} 

These DO NOT have this issue
# {{source.updateStateByKey(func).map(f).checkpoint(4 seconds)}} 
# {{source.updateStateByKey(func).map(f).updateStateByKey(func2).checkpoint(4 
seconds)}}

A rdd graph which contains two rdds needs to be check-pointed would be 
generated from these sample codes. 

If the child(ren) rdd(s) also need to do the checkpoint at the same time the 
parent needs to do, then the parent will not do checkpoint according the 
{{rdd.doCheckpoint}}. In this case, the rdd comes from {{updateStateByKey}} 
will never be check-pointed at the issued sample code, that leads the stack 
overflow. ({{updateStateByKey}} needs checkpoint to break the dependency in 
this operation) 

If the child(ren) rdd(s) is not always check-pointed at the same time of the 
parent needs to do, there is a chance that the parent rdd (comes from 
{{updateStateByKey}}) can do some successful checkpoint to break the 
dependency, although the checkpoint may have some delay. So no stack overflow 
will happen.

So, currently, we got a workaround of this issue by setting the checkpoint 
interval to different values if we use operations that must be check-pointed in 
streaming project. Maybe this is not a easy fix here, hope we can add some 
validation at least

> Stack overflow on updateStateByKey which followed by a dstream with 
> checkpoint set
> ----------------------------------------------------------------------------------
>
>                 Key: SPARK-6847
>                 URL: https://issues.apache.org/jira/browse/SPARK-6847
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.3.0
>            Reporter: Jack Hu
>              Labels: StackOverflowError, Streaming
>
> The issue happens with the following sample code: uses {{updateStateByKey}} 
> followed by a {{map}} with checkpoint interval 10 seconds
> {code}
>     val sparkConf = new SparkConf().setAppName("test")
>     val streamingContext = new StreamingContext(sparkConf, Seconds(10))
>     streamingContext.checkpoint("""checkpoint""")
>     val source = streamingContext.socketTextStream("localhost", 9999)
>     val updatedResult = source.map(
>         (1,_)).updateStateByKey(
>             (newlist : Seq[String], oldstate : Option[String]) =>     
> newlist.headOption.orElse(oldstate))
>     updatedResult.map(_._2)
>     .checkpoint(Seconds(10))
>     .foreachRDD((rdd, t) => {
>       println("Deep: " + rdd.toDebugString.split("\n").length)
>       println(t.toString() + ": " + rdd.collect.length)
>     })
>     streamingContext.start()
>     streamingContext.awaitTermination()
> {code}
> From the output, we can see that the dependency will be increasing time over 
> time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
> stack overflow will happen. 
> Note:
> * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
> not the {{updateStateByKey}} 
> * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
> {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to