This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
     new 87ed9ccc210 [FLINK-35159] Transition ExecutionGraph to RUNNING after 
slot assignment
87ed9ccc210 is described below

commit 87ed9ccc2103457ba91f6ca45adfd2bfcc75c9ac
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Thu Apr 18 19:10:42 2024 +0200

    [FLINK-35159] Transition ExecutionGraph to RUNNING after slot assignment
---
 .../flink/runtime/scheduler/adaptive/AdaptiveScheduler.java       | 1 -
 .../flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java  | 5 ++++-
 .../runtime/scheduler/adaptive/CreatingExecutionGraphTest.java    | 8 ++++++--
 3 files changed, 10 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 5f6438ce181..238c594fd55 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -1187,7 +1187,6 @@ public class AdaptiveScheduler
                 executionGraphWithVertexParallelism.getExecutionGraph();
 
         executionGraph.start(componentMainThreadExecutor);
-        executionGraph.transitionToRunning();
 
         executionGraph.setInternalTaskFailuresListener(
                 new UpdateSchedulerNgOnInternalFailuresListener(this));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
index da90ef1468d..e9b1317e46e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
@@ -123,7 +123,6 @@ public class CreatingExecutionGraph extends 
StateWithoutExecutionGraph {
                         
operatorCoordinatorHandlerFactory.create(executionGraph, context);
                 operatorCoordinatorHandler.initializeOperatorCoordinators(
                         context.getMainThreadExecutor());
-                operatorCoordinatorHandler.startAllOperatorCoordinators();
                 final String updatedPlan =
                         JsonPlanGenerator.generatePlan(
                                 executionGraph.getJobID(),
@@ -137,6 +136,10 @@ public class CreatingExecutionGraph extends 
StateWithoutExecutionGraph {
                                                 .iterator(),
                                 
executionGraphWithVertexParallelism.getVertexParallelism());
                 executionGraph.setJsonPlan(updatedPlan);
+
+                executionGraph.transitionToRunning();
+                operatorCoordinatorHandler.startAllOperatorCoordinators();
+
                 context.goToExecuting(
                         result.getExecutionGraph(),
                         executionGraphHandler,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
index b831b3bb62f..0f89cdf7e12 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
@@ -93,8 +93,12 @@ class CreatingExecutionGraphTest {
                 ignored -> 
CreatingExecutionGraph.AssignmentResult.notPossible());
         context.setExpectWaitingForResources();
 
-        executionGraphWithVertexParallelismFuture.complete(
-                getGraph(new StateTrackingMockExecutionGraph()));
+        final StateTrackingMockExecutionGraph executionGraph =
+                new StateTrackingMockExecutionGraph();
+
+        
executionGraphWithVertexParallelismFuture.complete(getGraph(executionGraph));
+
+        
assertThat(executionGraph.getState()).isEqualTo(JobStatus.INITIALIZING);
     }
 
     @Test

Reply via email to