[jira] [Commented] (SPARK-10995) Graceful shutdown drops processing in Spark Streaming

2015-10-08 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14948693#comment-14948693
 ] 

Sean Owen commented on SPARK-10995:
---

TD's the expert, but I don't really get that -- if you're processing the last 
hour of data each minute, then I'd expect shutdown to process the current 
minute, not for another hour.

Here however your batch interval and window are the same. In that case do you 
need a window at all?

> Graceful shutdown drops processing in Spark Streaming
> -
>
> Key: SPARK-10995
> URL: https://issues.apache.org/jira/browse/SPARK-10995
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.4.1
>Reporter: Michal Cizmazia
>
> After triggering the graceful shutdown on the following application, the 
> application stops before the windowed stream reaches its slide duration. As a 
> result, the data is not completely processed (i.e. saveToMyStorage is not 
> called) before shutdown.
> According to the documentation, graceful shutdown should ensure that the 
> data, which has been received, is completely processed before shutdown.
> https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code
> Spark version: 1.4.1
> Code snippet:
> {code:java}
> Function0 factory = () -> {
> JavaStreamingContext context = new JavaStreamingContext(sparkConf, 
> Durations.minutes(1));
> context.checkpoint("/test");
> JavaDStream records = 
> context.receiverStream(myReliableReceiver).flatMap(...);
> records.persist(StorageLevel.MEMORY_AND_DISK());
> records.foreachRDD(rdd -> { rdd.count(); return null; });
> records
> .window(Durations.minutes(15), Durations.minutes(15))
> .foreachRDD(rdd -> saveToMyStorage(rdd));
> return context;
> };
> try (JavaStreamingContext context = JavaStreamingContext.getOrCreate("/test", 
> factory)) {
> context.start();
> waitForShutdownSignal();
> Boolean stopSparkContext = true;
> Boolean stopGracefully = true;
> context.stop(stopSparkContext, stopGracefully);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10995) Graceful shutdown drops processing in Spark Streaming

2015-10-08 Thread Michal Cizmazia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14948667#comment-14948667
 ] 

Michal Cizmazia commented on SPARK-10995:
-

[On 7 October 2015 at 21:24, Tathagata 
Das:|http://search-hadoop.com/m/q3RTtftj6Y1Mu9z1]
{quote}
Aaah, interesting, you are doing 15 minute slide duration. Yeah, internally the 
streaming scheduler waits for the last "batch" interval which has data to be 
processed, but if there is a sliding interval (i.e. 15 mins) that is higher 
than batch interval, then that might not be run. This is indeed a bug and 
should be fixed. Mind setting up a JIRA and assigning it to me. 
{quote}

> Graceful shutdown drops processing in Spark Streaming
> -
>
> Key: SPARK-10995
> URL: https://issues.apache.org/jira/browse/SPARK-10995
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.4.1
>Reporter: Michal Cizmazia
>
> After triggering the graceful shutdown on the following application, the 
> application stops before the windowed stream reaches its slide duration. As a 
> result, the data is not completely processed (i.e. saveToMyStorage is not 
> called) before shutdown.
> According to the documentation, graceful shutdown should ensure that the 
> data, which has been received, is completely processed before shutdown.
> https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code
> Spark version: 1.4.1
> Code snippet:
> {code:java}
> Function0 factory = () -> {
> JavaStreamingContext context = new JavaStreamingContext(sparkConf, 
> Durations.minutes(1));
> context.checkpoint("/test");
> JavaDStream records = 
> context.receiverStream(myReliableReceiver).flatMap(...);
> records.persist(StorageLevel.MEMORY_AND_DISK());
> records.foreachRDD(rdd -> { rdd.count(); return null; });
> records
> .window(Durations.minutes(15), Durations.minutes(15))
> .foreachRDD(rdd -> saveToMyStorage(rdd));
> return context;
> };
> try (JavaStreamingContext context = JavaStreamingContext.getOrCreate("/test", 
> factory)) {
> context.start();
> waitForShutdownSignal();
> Boolean stopSparkContext = true;
> Boolean stopGracefully = true;
> context.stop(stopSparkContext, stopGracefully);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10995) Graceful shutdown drops processing in Spark Streaming

2015-10-08 Thread Sean Owen (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14949169#comment-14949169
 ] 

Sean Owen commented on SPARK-10995:
---

Ah right what I mean is that your _slide duration_ is equal to your window 
size. Isn't this the same as using a 15 minute batch duration with no windows? 
This is probably orthogonal anyway.

I understand the point now -- it only keeps running for one more batch 
interval, not one more slide duration. Maybe a 15 minute batch interval is a 
workaround here, but would not be in a slightly different situation.

I suppose I'm asking what {{waitForShutdownSignal()}} waits for, but that may 
also be inconsequential. It's not killing threads or initiating other shutdown 
parallel, nothing else that might interfere. If you see shutdown starting 
normally then that seems OK.


> Graceful shutdown drops processing in Spark Streaming
> -
>
> Key: SPARK-10995
> URL: https://issues.apache.org/jira/browse/SPARK-10995
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.4.1
>Reporter: Michal Cizmazia
>
> After triggering the graceful shutdown on the following application, the 
> application stops before the windowed stream reaches its slide duration. As a 
> result, the data is not completely processed (i.e. saveToMyStorage is not 
> called) before shutdown.
> According to the documentation, graceful shutdown should ensure that the 
> data, which has been received, is completely processed before shutdown.
> https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code
> Spark version: 1.4.1
> Code snippet:
> {code:java}
> Function0 factory = () -> {
> JavaStreamingContext context = new JavaStreamingContext(sparkConf, 
> Durations.minutes(1));
> context.checkpoint("/test");
> JavaDStream records = 
> context.receiverStream(myReliableReceiver).flatMap(...);
> records.persist(StorageLevel.MEMORY_AND_DISK());
> records.foreachRDD(rdd -> { rdd.count(); return null; });
> records
> .window(Durations.minutes(15), Durations.minutes(15))
> .foreachRDD(rdd -> saveToMyStorage(rdd));
> return context;
> };
> try (JavaStreamingContext context = JavaStreamingContext.getOrCreate("/test", 
> factory)) {
> context.start();
> waitForShutdownSignal();
> Boolean stopSparkContext = true;
> Boolean stopGracefully = true;
> context.stop(stopSparkContext, stopGracefully);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-10995) Graceful shutdown drops processing in Spark Streaming

2015-10-08 Thread Michal Cizmazia (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-10995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14949151#comment-14949151
 ] 

Michal Cizmazia commented on SPARK-10995:
-

Sean, thanks for your questions.

The application is processing data in *1 minute* intervals. In order to store 
the data in larger chunks, the data is batched in *15 minute* intervals to my 
storage. This is achieved with the equally *15 minutes*-long {{windowDuration}} 
and {{slideDuration}}.

As for stopping, the stop call on the Streaming Context is blocking and 
finishes successfully. This is apparent from the logs as well. The code snippet 
is in the story description.


> Graceful shutdown drops processing in Spark Streaming
> -
>
> Key: SPARK-10995
> URL: https://issues.apache.org/jira/browse/SPARK-10995
> Project: Spark
>  Issue Type: Bug
>  Components: Spark Core
>Affects Versions: 1.4.1
>Reporter: Michal Cizmazia
>
> After triggering the graceful shutdown on the following application, the 
> application stops before the windowed stream reaches its slide duration. As a 
> result, the data is not completely processed (i.e. saveToMyStorage is not 
> called) before shutdown.
> According to the documentation, graceful shutdown should ensure that the 
> data, which has been received, is completely processed before shutdown.
> https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code
> Spark version: 1.4.1
> Code snippet:
> {code:java}
> Function0 factory = () -> {
> JavaStreamingContext context = new JavaStreamingContext(sparkConf, 
> Durations.minutes(1));
> context.checkpoint("/test");
> JavaDStream records = 
> context.receiverStream(myReliableReceiver).flatMap(...);
> records.persist(StorageLevel.MEMORY_AND_DISK());
> records.foreachRDD(rdd -> { rdd.count(); return null; });
> records
> .window(Durations.minutes(15), Durations.minutes(15))
> .foreachRDD(rdd -> saveToMyStorage(rdd));
> return context;
> };
> try (JavaStreamingContext context = JavaStreamingContext.getOrCreate("/test", 
> factory)) {
> context.start();
> waitForShutdownSignal();
> Boolean stopSparkContext = true;
> Boolean stopGracefully = true;
> context.stop(stopSparkContext, stopGracefully);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org