This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ead9139680ea82c4fdfd1e9d19baf4d4a08ec845 Author: Stefan Richter <s.rich...@data-artisans.com> AuthorDate: Tue May 14 15:33:48 2019 +0200 [FLINK-12483][runtime] Support (legacy) SourceFunction as special case in the mailbox model for stream tasks. This closes #8442. --- .../streaming/runtime/tasks/SourceStreamTask.java | 66 +++++++++++++++++++++- 1 file changed, 64 insertions(+), 2 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java index fd50a1a..e604f2c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java @@ -45,6 +45,8 @@ import org.apache.flink.util.FlinkException; public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>> extends StreamTask<OUT, OP> { + private static final Runnable SOURCE_POISON_LETTER = () -> {}; + private volatile boolean externallyInducedCheckpoints; public SourceStreamTask(Environment env) { @@ -101,12 +103,43 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S protected void performDefaultAction(ActionContext context) throws Exception { // Against the usual contract of this method, this implementation is not step-wise but blocking instead for // compatibility reasons with the current source interface (source functions run as a loop, not in steps). - headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()); + final LegacySourceFunctionThread sourceThread = new LegacySourceFunctionThread(); + sourceThread.start(); + + // We run an alternative mailbox loop that does not involve default actions and synchronizes around actions. + try { + runAlternativeMailboxLoop(); + } catch (Exception mailboxEx) { + // We cancel the source function if some runtime exception escaped the mailbox. + if (!isCanceled()) { + cancelTask(); + } + throw mailboxEx; + } + + sourceThread.join(); + sourceThread.checkThrowSourceExecutionException(); + context.allActionsCompleted(); } + private void runAlternativeMailboxLoop() throws InterruptedException { + + while (true) { + + Runnable letter = mailbox.takeMail(); + if (letter == SOURCE_POISON_LETTER) { + break; + } + + synchronized (getCheckpointLock()) { + letter.run(); + } + } + } + @Override - protected void cancelTask() throws Exception { + protected void cancelTask() { if (headOperator != null) { headOperator.cancel(); } @@ -133,4 +166,33 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S } } } + + /** + * Runnable that executes the the source function in the head operator. + */ + private class LegacySourceFunctionThread extends Thread { + + private Throwable sourceExecutionThrowable; + + LegacySourceFunctionThread() { + this.sourceExecutionThrowable = null; + } + + @Override + public void run() { + try { + headOperator.run(getCheckpointLock(), getStreamStatusMaintainer()); + } catch (Throwable t) { + sourceExecutionThrowable = t; + } finally { + mailbox.clearAndPut(SOURCE_POISON_LETTER); + } + } + + void checkThrowSourceExecutionException() throws Exception { + if (sourceExecutionThrowable != null) { + throw new Exception(sourceExecutionThrowable); + } + } + } }