This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 58c5e8c3cc43ddaee0fa63b8fb209eb2a4006eb7
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Sun Apr 11 20:05:47 2021 +0200

    [FLINK-18071][coordination] (part 4) Add to Execution a future for states 
INITIALIZING/RUNNING
    
    This doubles as a listener for when the execution has reached the state 
INITIALIZING or RUNNING.
---
 .../flink/runtime/executiongraph/Execution.java    | 29 +++++++++++++++++++++-
 1 file changed, 28 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index c336d23..6ce4a41 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -149,6 +149,13 @@ public class Execution
 
     private final CompletableFuture<TaskManagerLocation> 
taskManagerLocationFuture;
 
+    /**
+     * Gets completed successfully when the task switched to {@link 
ExecutionState#INITIALIZING} or
+     * {@link ExecutionState#RUNNING}. If the task never switches to those 
state, but fails
+     * immediately, then this future never completes.
+     */
+    private final CompletableFuture<?> initializingOrRunningFuture;
+
     private volatile ExecutionState state = CREATED;
 
     private LogicalSlot assignedResource;
@@ -214,6 +221,7 @@ public class Execution
         this.terminalStateFuture = new CompletableFuture<>();
         this.releaseFuture = new CompletableFuture<>();
         this.taskManagerLocationFuture = new CompletableFuture<>();
+        this.initializingOrRunningFuture = new CompletableFuture<>();
 
         this.assignedResource = null;
     }
@@ -353,6 +361,23 @@ public class Execution
     }
 
     /**
+     * Gets a future that completes once the task execution reaches one of the 
states {@link
+     * ExecutionState#INITIALIZING} or {@link ExecutionState#RUNNING}. If this 
task never reaches
+     * these states (for example because the task is cancelled before it was 
properly deployed and
+     * restored), then this future will never complete.
+     *
+     * <p>The future is completed already in the {@link 
ExecutionState#INITIALIZING} state, because
+     * various running actions are already possible in that state (the task 
already accepts and
+     * sends events and network data for task recovery). (Note that in earlier 
versions, the
+     * INITIALIZING state was not separate but part of the RUNNING state).
+     *
+     * <p>This future is always completed from the job master's main thread.
+     */
+    public CompletableFuture<?> getInitializingOrRunningFuture() {
+        return initializingOrRunningFuture;
+    }
+
+    /**
      * Gets a future that completes once the task execution reaches a terminal 
state. The future
      * will be completed with specific state that the execution reached. This 
future is always
      * completed from the job master's main thread.
@@ -1428,7 +1453,9 @@ public class Execution
                 }
             }
 
-            if (targetState.isTerminal()) {
+            if (targetState == INITIALIZING || targetState == RUNNING) {
+                initializingOrRunningFuture.complete(null);
+            } else if (targetState.isTerminal()) {
                 // complete the terminal state future
                 terminalStateFuture.complete(targetState);
             }

Reply via email to