[jira] [Commented] (SPARK-10995) Graceful shutdown drops processing in Spark Streaming
[ https://issues.apache.org/jira/browse/SPARK-10995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14948693#comment-14948693 ] Sean Owen commented on SPARK-10995: --- TD's the expert, but I don't really get that -- if you're processing the last hour of data each minute, then I'd expect shutdown to process the current minute, not for another hour. Here however your batch interval and window are the same. In that case do you need a window at all? > Graceful shutdown drops processing in Spark Streaming > - > > Key: SPARK-10995 > URL: https://issues.apache.org/jira/browse/SPARK-10995 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.4.1 >Reporter: Michal Cizmazia > > After triggering the graceful shutdown on the following application, the > application stops before the windowed stream reaches its slide duration. As a > result, the data is not completely processed (i.e. saveToMyStorage is not > called) before shutdown. > According to the documentation, graceful shutdown should ensure that the > data, which has been received, is completely processed before shutdown. > https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code > Spark version: 1.4.1 > Code snippet: > {code:java} > Function0 factory = () -> { > JavaStreamingContext context = new JavaStreamingContext(sparkConf, > Durations.minutes(1)); > context.checkpoint("/test"); > JavaDStream records = > context.receiverStream(myReliableReceiver).flatMap(...); > records.persist(StorageLevel.MEMORY_AND_DISK()); > records.foreachRDD(rdd -> { rdd.count(); return null; }); > records > .window(Durations.minutes(15), Durations.minutes(15)) > .foreachRDD(rdd -> saveToMyStorage(rdd)); > return context; > }; > try (JavaStreamingContext context = JavaStreamingContext.getOrCreate("/test", > factory)) { > context.start(); > waitForShutdownSignal(); > Boolean stopSparkContext = true; > Boolean stopGracefully = true; > context.stop(stopSparkContext, stopGracefully); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10995) Graceful shutdown drops processing in Spark Streaming
[ https://issues.apache.org/jira/browse/SPARK-10995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14948667#comment-14948667 ] Michal Cizmazia commented on SPARK-10995: - [On 7 October 2015 at 21:24, Tathagata Das:|http://search-hadoop.com/m/q3RTtftj6Y1Mu9z1] {quote} Aaah, interesting, you are doing 15 minute slide duration. Yeah, internally the streaming scheduler waits for the last "batch" interval which has data to be processed, but if there is a sliding interval (i.e. 15 mins) that is higher than batch interval, then that might not be run. This is indeed a bug and should be fixed. Mind setting up a JIRA and assigning it to me. {quote} > Graceful shutdown drops processing in Spark Streaming > - > > Key: SPARK-10995 > URL: https://issues.apache.org/jira/browse/SPARK-10995 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.4.1 >Reporter: Michal Cizmazia > > After triggering the graceful shutdown on the following application, the > application stops before the windowed stream reaches its slide duration. As a > result, the data is not completely processed (i.e. saveToMyStorage is not > called) before shutdown. > According to the documentation, graceful shutdown should ensure that the > data, which has been received, is completely processed before shutdown. > https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code > Spark version: 1.4.1 > Code snippet: > {code:java} > Function0 factory = () -> { > JavaStreamingContext context = new JavaStreamingContext(sparkConf, > Durations.minutes(1)); > context.checkpoint("/test"); > JavaDStream records = > context.receiverStream(myReliableReceiver).flatMap(...); > records.persist(StorageLevel.MEMORY_AND_DISK()); > records.foreachRDD(rdd -> { rdd.count(); return null; }); > records > .window(Durations.minutes(15), Durations.minutes(15)) > .foreachRDD(rdd -> saveToMyStorage(rdd)); > return context; > }; > try (JavaStreamingContext context = JavaStreamingContext.getOrCreate("/test", > factory)) { > context.start(); > waitForShutdownSignal(); > Boolean stopSparkContext = true; > Boolean stopGracefully = true; > context.stop(stopSparkContext, stopGracefully); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10995) Graceful shutdown drops processing in Spark Streaming
[ https://issues.apache.org/jira/browse/SPARK-10995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14949169#comment-14949169 ] Sean Owen commented on SPARK-10995: --- Ah right what I mean is that your _slide duration_ is equal to your window size. Isn't this the same as using a 15 minute batch duration with no windows? This is probably orthogonal anyway. I understand the point now -- it only keeps running for one more batch interval, not one more slide duration. Maybe a 15 minute batch interval is a workaround here, but would not be in a slightly different situation. I suppose I'm asking what {{waitForShutdownSignal()}} waits for, but that may also be inconsequential. It's not killing threads or initiating other shutdown parallel, nothing else that might interfere. If you see shutdown starting normally then that seems OK. > Graceful shutdown drops processing in Spark Streaming > - > > Key: SPARK-10995 > URL: https://issues.apache.org/jira/browse/SPARK-10995 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.4.1 >Reporter: Michal Cizmazia > > After triggering the graceful shutdown on the following application, the > application stops before the windowed stream reaches its slide duration. As a > result, the data is not completely processed (i.e. saveToMyStorage is not > called) before shutdown. > According to the documentation, graceful shutdown should ensure that the > data, which has been received, is completely processed before shutdown. > https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code > Spark version: 1.4.1 > Code snippet: > {code:java} > Function0 factory = () -> { > JavaStreamingContext context = new JavaStreamingContext(sparkConf, > Durations.minutes(1)); > context.checkpoint("/test"); > JavaDStream records = > context.receiverStream(myReliableReceiver).flatMap(...); > records.persist(StorageLevel.MEMORY_AND_DISK()); > records.foreachRDD(rdd -> { rdd.count(); return null; }); > records > .window(Durations.minutes(15), Durations.minutes(15)) > .foreachRDD(rdd -> saveToMyStorage(rdd)); > return context; > }; > try (JavaStreamingContext context = JavaStreamingContext.getOrCreate("/test", > factory)) { > context.start(); > waitForShutdownSignal(); > Boolean stopSparkContext = true; > Boolean stopGracefully = true; > context.stop(stopSparkContext, stopGracefully); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-10995) Graceful shutdown drops processing in Spark Streaming
[ https://issues.apache.org/jira/browse/SPARK-10995?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=14949151#comment-14949151 ] Michal Cizmazia commented on SPARK-10995: - Sean, thanks for your questions. The application is processing data in *1 minute* intervals. In order to store the data in larger chunks, the data is batched in *15 minute* intervals to my storage. This is achieved with the equally *15 minutes*-long {{windowDuration}} and {{slideDuration}}. As for stopping, the stop call on the Streaming Context is blocking and finishes successfully. This is apparent from the logs as well. The code snippet is in the story description. > Graceful shutdown drops processing in Spark Streaming > - > > Key: SPARK-10995 > URL: https://issues.apache.org/jira/browse/SPARK-10995 > Project: Spark > Issue Type: Bug > Components: Spark Core >Affects Versions: 1.4.1 >Reporter: Michal Cizmazia > > After triggering the graceful shutdown on the following application, the > application stops before the windowed stream reaches its slide duration. As a > result, the data is not completely processed (i.e. saveToMyStorage is not > called) before shutdown. > According to the documentation, graceful shutdown should ensure that the > data, which has been received, is completely processed before shutdown. > https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code > Spark version: 1.4.1 > Code snippet: > {code:java} > Function0 factory = () -> { > JavaStreamingContext context = new JavaStreamingContext(sparkConf, > Durations.minutes(1)); > context.checkpoint("/test"); > JavaDStream records = > context.receiverStream(myReliableReceiver).flatMap(...); > records.persist(StorageLevel.MEMORY_AND_DISK()); > records.foreachRDD(rdd -> { rdd.count(); return null; }); > records > .window(Durations.minutes(15), Durations.minutes(15)) > .foreachRDD(rdd -> saveToMyStorage(rdd)); > return context; > }; > try (JavaStreamingContext context = JavaStreamingContext.getOrCreate("/test", > factory)) { > context.start(); > waitForShutdownSignal(); > Boolean stopSparkContext = true; > Boolean stopGracefully = true; > context.stop(stopSparkContext, stopGracefully); > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org