This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ead9139680ea82c4fdfd1e9d19baf4d4a08ec845
Author: Stefan Richter <s.rich...@data-artisans.com>
AuthorDate: Tue May 14 15:33:48 2019 +0200

    [FLINK-12483][runtime] Support (legacy) SourceFunction as special case in 
the mailbox model for stream tasks.
    
    This closes #8442.
---
 .../streaming/runtime/tasks/SourceStreamTask.java  | 66 +++++++++++++++++++++-
 1 file changed, 64 insertions(+), 2 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index fd50a1a..e604f2c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -45,6 +45,8 @@ import org.apache.flink.util.FlinkException;
 public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends 
StreamSource<OUT, SRC>>
        extends StreamTask<OUT, OP> {
 
+       private static final Runnable SOURCE_POISON_LETTER = () -> {};
+
        private volatile boolean externallyInducedCheckpoints;
 
        public SourceStreamTask(Environment env) {
@@ -101,12 +103,43 @@ public class SourceStreamTask<OUT, SRC extends 
SourceFunction<OUT>, OP extends S
        protected void performDefaultAction(ActionContext context) throws 
Exception {
                // Against the usual contract of this method, this 
implementation is not step-wise but blocking instead for
                // compatibility reasons with the current source interface 
(source functions run as a loop, not in steps).
-               headOperator.run(getCheckpointLock(), 
getStreamStatusMaintainer());
+               final LegacySourceFunctionThread sourceThread = new 
LegacySourceFunctionThread();
+               sourceThread.start();
+
+               // We run an alternative mailbox loop that does not involve 
default actions and synchronizes around actions.
+               try {
+                       runAlternativeMailboxLoop();
+               } catch (Exception mailboxEx) {
+                       // We cancel the source function if some runtime 
exception escaped the mailbox.
+                       if (!isCanceled()) {
+                               cancelTask();
+                       }
+                       throw mailboxEx;
+               }
+
+               sourceThread.join();
+               sourceThread.checkThrowSourceExecutionException();
+
                context.allActionsCompleted();
        }
 
+       private void runAlternativeMailboxLoop() throws InterruptedException {
+
+               while (true) {
+
+                       Runnable letter = mailbox.takeMail();
+                       if (letter == SOURCE_POISON_LETTER) {
+                               break;
+                       }
+
+                       synchronized (getCheckpointLock()) {
+                               letter.run();
+                       }
+               }
+       }
+
        @Override
-       protected void cancelTask() throws Exception {
+       protected void cancelTask() {
                if (headOperator != null) {
                        headOperator.cancel();
                }
@@ -133,4 +166,33 @@ public class SourceStreamTask<OUT, SRC extends 
SourceFunction<OUT>, OP extends S
                        }
                }
        }
+
+       /**
+        * Runnable that executes the the source function in the head operator.
+        */
+       private class LegacySourceFunctionThread extends Thread {
+
+               private Throwable sourceExecutionThrowable;
+
+               LegacySourceFunctionThread() {
+                       this.sourceExecutionThrowable = null;
+               }
+
+               @Override
+               public void run() {
+                       try {
+                               headOperator.run(getCheckpointLock(), 
getStreamStatusMaintainer());
+                       } catch (Throwable t) {
+                               sourceExecutionThrowable = t;
+                       } finally {
+                               mailbox.clearAndPut(SOURCE_POISON_LETTER);
+                       }
+               }
+
+               void checkThrowSourceExecutionException() throws Exception {
+                       if (sourceExecutionThrowable != null) {
+                               throw new Exception(sourceExecutionThrowable);
+                       }
+               }
+       }
 }

Reply via email to