This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 2e2f67e [FLINK-13124] Don't forward exceptions when finishing SourceStreamTask 2e2f67e is described below commit 2e2f67ed348c334402a5d0af76b0fd47cedcf5a7 Author: Aljoscha Krettek <aljos...@apache.org> AuthorDate: Thu Jul 11 15:40:06 2019 +0200 [FLINK-13124] Don't forward exceptions when finishing SourceStreamTask Before, exceptions that occurred after cancelling a source (as the KafkaConsumer did, for example) would make a job fail when attempting a "stop-with-savepoint". Now we ignore those exceptions. --- .../streaming/runtime/tasks/SourceStreamTask.java | 4 ++ .../runtime/tasks/SourceStreamTaskTest.java | 54 ++++++++++++++++++++++ 2 files changed, 58 insertions(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index 50fdca1..608d972 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -127,6 +127,10 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S @Override protected void finishTask() throws Exception { + // We tell the mailbox to finish, to prevent any exceptions that might occur during + // finishing from leading to a FAILED state. This could happen, for example, when cancelling + // sources as part of a "stop-with-savepoint". + mailboxProcessor.allActionsCompleted(); cancelTask(); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java index 9019d88..a07fdad 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java @@ -231,6 +231,27 @@ public class SourceStreamTaskTest { testHarness.getOutput()); } + /** + * If finishing a task doesn't swallow exceptions this test would fail with an exception. + */ + @Test + public void finishingIgnoresExceptions() throws Exception { + final StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<>( + SourceStreamTask::new, + BasicTypeInfo.STRING_TYPE_INFO); + + testHarness.setupOutputForSingletonOperatorChain(); + StreamConfig streamConfig = testHarness.getStreamConfig(); + streamConfig.setStreamOperator(new StreamSource<>(new ExceptionThrowingSource())); + streamConfig.setOperatorID(new OperatorID()); + + testHarness.invoke(); + ExceptionThrowingSource.isInRunLoop.get(); + testHarness.getTask().finishTask(); + + testHarness.waitForTaskCompletion(); + } + private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, ListCheckpointed<Serializable> { private static final long serialVersionUID = 1; @@ -406,5 +427,38 @@ public class SourceStreamTaskTest { return dataProcessing; } } + + /** + * A {@link SourceFunction} that throws an exception from {@link #run(SourceContext)} when it is + * cancelled via {@link #cancel()}. + */ + private static class ExceptionThrowingSource implements SourceFunction<String> { + + private volatile boolean running = true; + static CompletableFuture<Void> isInRunLoop = new CompletableFuture<>(); + + public static class TestException extends RuntimeException { + public TestException(String message) { + super(message); + } + } + + @Override + public void run(SourceContext<String> ctx) throws TestException { + while (running) { + if (!isInRunLoop.isDone()) { + isInRunLoop.complete(null); + } + ctx.collect("hello"); + } + + throw new TestException("Oh no, we're failing."); + } + + @Override + public void cancel() { + running = false; + } + } }