This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.19 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.19 by this push: new 87ed9ccc210 [FLINK-35159] Transition ExecutionGraph to RUNNING after slot assignment 87ed9ccc210 is described below commit 87ed9ccc2103457ba91f6ca45adfd2bfcc75c9ac Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Thu Apr 18 19:10:42 2024 +0200 [FLINK-35159] Transition ExecutionGraph to RUNNING after slot assignment --- .../flink/runtime/scheduler/adaptive/AdaptiveScheduler.java | 1 - .../flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java | 5 ++++- .../runtime/scheduler/adaptive/CreatingExecutionGraphTest.java | 8 ++++++-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 5f6438ce181..238c594fd55 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -1187,7 +1187,6 @@ public class AdaptiveScheduler executionGraphWithVertexParallelism.getExecutionGraph(); executionGraph.start(componentMainThreadExecutor); - executionGraph.transitionToRunning(); executionGraph.setInternalTaskFailuresListener( new UpdateSchedulerNgOnInternalFailuresListener(this)); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java index da90ef1468d..e9b1317e46e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java @@ -123,7 +123,6 @@ public class CreatingExecutionGraph extends StateWithoutExecutionGraph { operatorCoordinatorHandlerFactory.create(executionGraph, context); operatorCoordinatorHandler.initializeOperatorCoordinators( context.getMainThreadExecutor()); - operatorCoordinatorHandler.startAllOperatorCoordinators(); final String updatedPlan = JsonPlanGenerator.generatePlan( executionGraph.getJobID(), @@ -137,6 +136,10 @@ public class CreatingExecutionGraph extends StateWithoutExecutionGraph { .iterator(), executionGraphWithVertexParallelism.getVertexParallelism()); executionGraph.setJsonPlan(updatedPlan); + + executionGraph.transitionToRunning(); + operatorCoordinatorHandler.startAllOperatorCoordinators(); + context.goToExecuting( result.getExecutionGraph(), executionGraphHandler, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java index b831b3bb62f..0f89cdf7e12 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java @@ -93,8 +93,12 @@ class CreatingExecutionGraphTest { ignored -> CreatingExecutionGraph.AssignmentResult.notPossible()); context.setExpectWaitingForResources(); - executionGraphWithVertexParallelismFuture.complete( - getGraph(new StateTrackingMockExecutionGraph())); + final StateTrackingMockExecutionGraph executionGraph = + new StateTrackingMockExecutionGraph(); + + executionGraphWithVertexParallelismFuture.complete(getGraph(executionGraph)); + + assertThat(executionGraph.getState()).isEqualTo(JobStatus.INITIALIZING); } @Test