[ 
https://issues.apache.org/jira/browse/SPARK-10995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14949169#comment-14949169
 ] 

Sean Owen commented on SPARK-10995:
-----------------------------------

Ah right what I mean is that your _slide duration_ is equal to your window 
size. Isn't this the same as using a 15 minute batch duration with no windows? 
This is probably orthogonal anyway.

I understand the point now -- it only keeps running for one more batch 
interval, not one more slide duration. Maybe a 15 minute batch interval is a 
workaround here, but would not be in a slightly different situation.

I suppose I'm asking what {{waitForShutdownSignal()}} waits for, but that may 
also be inconsequential. It's not killing threads or initiating other shutdown 
parallel, nothing else that might interfere. If you see shutdown starting 
normally then that seems OK.


> Graceful shutdown drops processing in Spark Streaming
> -----------------------------------------------------
>
>                 Key: SPARK-10995
>                 URL: https://issues.apache.org/jira/browse/SPARK-10995
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.4.1
>            Reporter: Michal Cizmazia
>
> After triggering the graceful shutdown on the following application, the 
> application stops before the windowed stream reaches its slide duration. As a 
> result, the data is not completely processed (i.e. saveToMyStorage is not 
> called) before shutdown.
> According to the documentation, graceful shutdown should ensure that the 
> data, which has been received, is completely processed before shutdown.
> https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code
> Spark version: 1.4.1
> Code snippet:
> {code:java}
> Function0<JavaStreamingContext> factory = () -> {
>     JavaStreamingContext context = new JavaStreamingContext(sparkConf, 
> Durations.minutes(1));
>     context.checkpoint("/test");
>     JavaDStream<String> records = 
> context.receiverStream(myReliableReceiver).flatMap(...);
>     records.persist(StorageLevel.MEMORY_AND_DISK());
>     records.foreachRDD(rdd -> { rdd.count(); return null; });
>     records
>         .window(Durations.minutes(15), Durations.minutes(15))
>         .foreachRDD(rdd -> saveToMyStorage(rdd));
>     return context;
> };
> try (JavaStreamingContext context = JavaStreamingContext.getOrCreate("/test", 
> factory)) {
>     context.start();
>     waitForShutdownSignal();
>     Boolean stopSparkContext = true;
>     Boolean stopGracefully = true;
>     context.stop(stopSparkContext, stopGracefully);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to