This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e2f67e  [FLINK-13124] Don't forward exceptions when finishing 
SourceStreamTask
2e2f67e is described below

commit 2e2f67ed348c334402a5d0af76b0fd47cedcf5a7
Author: Aljoscha Krettek <aljos...@apache.org>
AuthorDate: Thu Jul 11 15:40:06 2019 +0200

    [FLINK-13124] Don't forward exceptions when finishing SourceStreamTask
    
    Before, exceptions that occurred after cancelling a source (as the
    KafkaConsumer did, for example) would make a job fail when attempting a
    "stop-with-savepoint". Now we ignore those exceptions.
---
 .../streaming/runtime/tasks/SourceStreamTask.java  |  4 ++
 .../runtime/tasks/SourceStreamTaskTest.java        | 54 ++++++++++++++++++++++
 2 files changed, 58 insertions(+)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 50fdca1..608d972 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -127,6 +127,10 @@ public class SourceStreamTask<OUT, SRC extends 
SourceFunction<OUT>, OP extends S
 
        @Override
        protected void finishTask() throws Exception {
+               // We tell the mailbox to finish, to prevent any exceptions 
that might occur during
+               // finishing from leading to a FAILED state. This could happen, 
for example, when cancelling
+               // sources as part of a "stop-with-savepoint".
+               mailboxProcessor.allActionsCompleted();
                cancelTask();
        }
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 9019d88..a07fdad 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -231,6 +231,27 @@ public class SourceStreamTaskTest {
                        testHarness.getOutput());
        }
 
+       /**
+        * If finishing a task doesn't swallow exceptions this test would fail 
with an exception.
+        */
+       @Test
+       public void finishingIgnoresExceptions() throws Exception {
+               final StreamTaskTestHarness<String> testHarness = new 
StreamTaskTestHarness<>(
+                               SourceStreamTask::new,
+                               BasicTypeInfo.STRING_TYPE_INFO);
+
+               testHarness.setupOutputForSingletonOperatorChain();
+               StreamConfig streamConfig = testHarness.getStreamConfig();
+               streamConfig.setStreamOperator(new StreamSource<>(new 
ExceptionThrowingSource()));
+               streamConfig.setOperatorID(new OperatorID());
+
+               testHarness.invoke();
+               ExceptionThrowingSource.isInRunLoop.get();
+               testHarness.getTask().finishTask();
+
+               testHarness.waitForTaskCompletion();
+       }
+
        private static class MockSource implements SourceFunction<Tuple2<Long, 
Integer>>, ListCheckpointed<Serializable> {
                private static final long serialVersionUID = 1;
 
@@ -406,5 +427,38 @@ public class SourceStreamTaskTest {
                        return dataProcessing;
                }
        }
+
+       /**
+        * A {@link SourceFunction} that throws an exception from {@link 
#run(SourceContext)} when it is
+        * cancelled via {@link #cancel()}.
+        */
+       private static class ExceptionThrowingSource implements 
SourceFunction<String> {
+
+               private volatile boolean running = true;
+               static CompletableFuture<Void> isInRunLoop = new 
CompletableFuture<>();
+
+               public static class TestException extends RuntimeException {
+                       public TestException(String message) {
+                               super(message);
+                       }
+               }
+
+               @Override
+               public void run(SourceContext<String> ctx) throws TestException 
{
+                       while (running) {
+                               if (!isInRunLoop.isDone()) {
+                                       isInRunLoop.complete(null);
+                               }
+                               ctx.collect("hello");
+                       }
+
+                       throw new TestException("Oh no, we're failing.");
+               }
+
+               @Override
+               public void cancel() {
+                       running = false;
+               }
+       }
 }
 

Reply via email to