Thanks! Done. https://issues.apache.org/jira/browse/SPARK-10995
On 7 October 2015 at 21:24, Tathagata Das <t...@databricks.com> wrote: > Aaah, interesting, you are doing 15 minute slide duration. Yeah, > internally the streaming scheduler waits for the last "batch" interval > which has data to be processed, but if there is a sliding interval (i.e. 15 > mins) that is higher than batch interval, then that might not be run. This > is indeed a bug and should be fixed. Mind setting up a JIRA and assigning > it to me. > > On Wed, Oct 7, 2015 at 8:33 AM, Michal Čizmazia <mici...@gmail.com> wrote: > >> After triggering the graceful shutdown on the following application, the >> application stops before the windowed stream reaches its slide duration. As >> a result, the data is not completely processed (i.e. saveToMyStorage is not >> called) before shutdown. >> >> According to the documentation, graceful shutdown should ensure that the >> data, which has been received, is completely processed before shutdown. >> >> https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code >> >> Spark version: 1.4.1 >> >> Code snippet: >> >> Function0<JavaStreamingContext> factory = () -> { >> JavaStreamingContext context = new JavaStreamingContext(sparkConf, >> Durations.minutes(1)); >> context.checkpoint("/test"); >> JavaDStream<String> records = >> context.receiverStream(myReliableReceiver).flatMap(...); >> records.persist(StorageLevel.MEMORY_AND_DISK()); >> records.foreachRDD(rdd -> { rdd.count(); return null; }); >> records >> .window(Durations.minutes(15), Durations.minutes(15)) >> .foreachRDD(rdd -> saveToMyStorage(rdd)); >> return context; >> }; >> >> try (JavaStreamingContext context = >> JavaStreamingContext.getOrCreate("/test", factory)) { >> context.start(); >> waitForShutdownSignal(); >> Boolean stopSparkContext = true; >> Boolean stopGracefully = true; >> context.stop(stopSparkContext, stopGracefully); >> } >> >> >