[ 
https://issues.apache.org/jira/browse/SPARK-6847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14971004#comment-14971004
 ] 

Glyton Camilleri commented on SPARK-6847:
-----------------------------------------

Hi,
we managed to actually get rid of the overflow issues by settings checkpoints 
on more streams than we thought we needed to, in addition to implementing a 
small change following your suggestion; before the fix, the setup was similar 
to what you describe:

{code}
val dStream1 = // create kafka stream and do some preprocessing
val dStream2 = dStream1.updateStateByKey { func }.checkpoint(timeWindow * 2)
val dStream3 = dStream2.map { ... }

// (1) perform some side-effect on the state
if (certainConditionsAreMet) dStream2.foreachRDD { 
  _.foreachPartition { ... }
}

// (2) publish final results to a set of Kafka topics
dStream3.transform { ... }.foreachRDD {
  _.foreachPartition { ... }
}
{code}

There were two things we did:
a) set different checkpoints for {{dStream2}} and {{dStream3}}, whereas before 
we were only setting the checkpoint for {{dStream2}}
b) changed (1) above such then when {{!certainConditionsAreMet}}, we just 
consume the stream like you describe in your suggestion

I honestly think that b) was more likely to be influential in removing the 
StackOverflowError really, but we decided to leave the checkpoint settings in 
a) there anyway.
Apologies for the late follow-up, but we needed to make sure the issue had 
actually been resolved.

> Stack overflow on updateStateByKey which followed by a dstream with 
> checkpoint set
> ----------------------------------------------------------------------------------
>
>                 Key: SPARK-6847
>                 URL: https://issues.apache.org/jira/browse/SPARK-6847
>             Project: Spark
>          Issue Type: Bug
>          Components: Streaming
>    Affects Versions: 1.3.0
>            Reporter: Jack Hu
>              Labels: StackOverflowError, Streaming
>
> The issue happens with the following sample code: uses {{updateStateByKey}} 
> followed by a {{map}} with checkpoint interval 10 seconds
> {code}
>     val sparkConf = new SparkConf().setAppName("test")
>     val streamingContext = new StreamingContext(sparkConf, Seconds(10))
>     streamingContext.checkpoint("""checkpoint""")
>     val source = streamingContext.socketTextStream("localhost", 9999)
>     val updatedResult = source.map(
>         (1,_)).updateStateByKey(
>             (newlist : Seq[String], oldstate : Option[String]) =>     
> newlist.headOption.orElse(oldstate))
>     updatedResult.map(_._2)
>     .checkpoint(Seconds(10))
>     .foreachRDD((rdd, t) => {
>       println("Deep: " + rdd.toDebugString.split("\n").length)
>       println(t.toString() + ": " + rdd.collect.length)
>     })
>     streamingContext.start()
>     streamingContext.awaitTermination()
> {code}
> From the output, we can see that the dependency will be increasing time over 
> time, the {{updateStateByKey}} never get check-pointed,  and finally, the 
> stack overflow will happen. 
> Note:
> * The rdd in {{updatedResult.map(_._2)}} get check-pointed in this case, but 
> not the {{updateStateByKey}} 
> * If remove the {{checkpoint(Seconds(10))}} from the map result ( 
> {{updatedResult.map(_._2)}} ), the stack overflow will not happen



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to