[ 
https://issues.apache.org/jira/browse/SPARK-10995?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michal Cizmazia updated SPARK-10995:
------------------------------------
    Description: 
After triggering the graceful shutdown on the following application, the 
application stops before the windowed stream reaches its slide duration. As a 
result, the data is not completely processed (i.e. saveToMyStorage is not 
called) before shutdown.

According to the documentation, graceful shutdown should ensure that the data, 
which has been received, is completely processed before shutdown.
https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code

Spark version: 1.4.1

Code snippet:

{code:java}

Function0<JavaStreamingContext> factory = () -> {
    JavaStreamingContext context = new JavaStreamingContext(sparkConf, 
Durations.minutes(1));
    context.checkpoint("/test");
    JavaDStream<String> records = 
context.receiverStream(myReliableReceiver).flatMap(...);
    records.persist(StorageLevel.MEMORY_AND_DISK());
    records.foreachRDD(rdd -> { rdd.count(); return null; });
    records
        .window(Durations.minutes(15), Durations.minutes(15))
        .foreachRDD(rdd -> saveToMyStorage(rdd));
    return context;
};

try (JavaStreamingContext context = JavaStreamingContext.getOrCreate("/test", 
factory)) {
    context.start();
    waitForShutdownSignal();
    Boolean stopSparkContext = true;
    Boolean stopGracefully = true;
    context.stop(stopSparkContext, stopGracefully);
}

{code}


  was:
After triggering the graceful shutdown on the following application, the 
application stops before the windowed stream reaches its slide duration. As a 
result, the data is not completely processed (i.e. saveToMyStorage is not 
called) before shutdown.

According to the documentation, graceful shutdown should ensure that the data, 
which has been received, is completely processed before shutdown.
https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code

Spark version: 1.4.1

Code snippet:

Function0<JavaStreamingContext> factory = () -> {
    JavaStreamingContext context = new JavaStreamingContext(sparkConf, 
Durations.minutes(1));
    context.checkpoint("/test");
    JavaDStream<String> records = 
context.receiverStream(myReliableReceiver).flatMap(...);
    records.persist(StorageLevel.MEMORY_AND_DISK());
    records.foreachRDD(rdd -> { rdd.count(); return null; });
    records
        .window(Durations.minutes(15), Durations.minutes(15))
        .foreachRDD(rdd -> saveToMyStorage(rdd));
    return context;
};

try (JavaStreamingContext context = JavaStreamingContext.getOrCreate("/test", 
factory)) {
    context.start();
    waitForShutdownSignal();
    Boolean stopSparkContext = true;
    Boolean stopGracefully = true;
    context.stop(stopSparkContext, stopGracefully);
}




> Graceful shutdown drops processing in Spark Streaming
> -----------------------------------------------------
>
>                 Key: SPARK-10995
>                 URL: https://issues.apache.org/jira/browse/SPARK-10995
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 1.4.1
>            Reporter: Michal Cizmazia
>
> After triggering the graceful shutdown on the following application, the 
> application stops before the windowed stream reaches its slide duration. As a 
> result, the data is not completely processed (i.e. saveToMyStorage is not 
> called) before shutdown.
> According to the documentation, graceful shutdown should ensure that the 
> data, which has been received, is completely processed before shutdown.
> https://spark.apache.org/docs/1.4.0/streaming-programming-guide.html#upgrading-application-code
> Spark version: 1.4.1
> Code snippet:
> {code:java}
> Function0<JavaStreamingContext> factory = () -> {
>     JavaStreamingContext context = new JavaStreamingContext(sparkConf, 
> Durations.minutes(1));
>     context.checkpoint("/test");
>     JavaDStream<String> records = 
> context.receiverStream(myReliableReceiver).flatMap(...);
>     records.persist(StorageLevel.MEMORY_AND_DISK());
>     records.foreachRDD(rdd -> { rdd.count(); return null; });
>     records
>         .window(Durations.minutes(15), Durations.minutes(15))
>         .foreachRDD(rdd -> saveToMyStorage(rdd));
>     return context;
> };
> try (JavaStreamingContext context = JavaStreamingContext.getOrCreate("/test", 
> factory)) {
>     context.start();
>     waitForShutdownSignal();
>     Boolean stopSparkContext = true;
>     Boolean stopGracefully = true;
>     context.stop(stopSparkContext, stopGracefully);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to