This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 58c5e8c3cc43ddaee0fa63b8fb209eb2a4006eb7 Author: Stephan Ewen <se...@apache.org> AuthorDate: Sun Apr 11 20:05:47 2021 +0200 [FLINK-18071][coordination] (part 4) Add to Execution a future for states INITIALIZING/RUNNING This doubles as a listener for when the execution has reached the state INITIALIZING or RUNNING. --- .../flink/runtime/executiongraph/Execution.java | 29 +++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index c336d23..6ce4a41 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -149,6 +149,13 @@ public class Execution private final CompletableFuture<TaskManagerLocation> taskManagerLocationFuture; + /** + * Gets completed successfully when the task switched to {@link ExecutionState#INITIALIZING} or + * {@link ExecutionState#RUNNING}. If the task never switches to those state, but fails + * immediately, then this future never completes. + */ + private final CompletableFuture<?> initializingOrRunningFuture; + private volatile ExecutionState state = CREATED; private LogicalSlot assignedResource; @@ -214,6 +221,7 @@ public class Execution this.terminalStateFuture = new CompletableFuture<>(); this.releaseFuture = new CompletableFuture<>(); this.taskManagerLocationFuture = new CompletableFuture<>(); + this.initializingOrRunningFuture = new CompletableFuture<>(); this.assignedResource = null; } @@ -353,6 +361,23 @@ public class Execution } /** + * Gets a future that completes once the task execution reaches one of the states {@link + * ExecutionState#INITIALIZING} or {@link ExecutionState#RUNNING}. If this task never reaches + * these states (for example because the task is cancelled before it was properly deployed and + * restored), then this future will never complete. + * + * <p>The future is completed already in the {@link ExecutionState#INITIALIZING} state, because + * various running actions are already possible in that state (the task already accepts and + * sends events and network data for task recovery). (Note that in earlier versions, the + * INITIALIZING state was not separate but part of the RUNNING state). + * + * <p>This future is always completed from the job master's main thread. + */ + public CompletableFuture<?> getInitializingOrRunningFuture() { + return initializingOrRunningFuture; + } + + /** * Gets a future that completes once the task execution reaches a terminal state. The future * will be completed with specific state that the execution reached. This future is always * completed from the job master's main thread. @@ -1428,7 +1453,9 @@ public class Execution } } - if (targetState.isTerminal()) { + if (targetState == INITIALIZING || targetState == RUNNING) { + initializingOrRunningFuture.complete(null); + } else if (targetState.isTerminal()) { // complete the terminal state future terminalStateFuture.complete(targetState); }