This is an automated email from the ASF dual-hosted git repository. gaoyunhaii pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 44c00dbb4a083ff197442e2ce6440558be252787 Author: Gen Luo <luogen...@gmail.com> AuthorDate: Tue Jul 26 16:39:40 2022 +0800 [FLINK-28588][rest] Archive all current executions in ArchivedExecutionVertex. --- .../executiongraph/AccessExecutionVertex.java | 9 + .../executiongraph/ArchivedExecutionVertex.java | 26 ++- .../ArchivedSpeculativeExecutionVertex.java | 52 ----- .../executiongraph/SpeculativeExecutionVertex.java | 4 +- .../ArchivedExecutionGraphTestUtils.java | 21 ++ ...xecutionVertexWithSpeculativeExecutionTest.java | 223 +++++++++++++++++++++ 6 files changed, 280 insertions(+), 55 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java index 6775424ba4b..f8d4581c7e6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionVertex.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import java.util.Collection; import java.util.Optional; /** Common interface for the runtime {@link ExecutionVertex} and {@link ArchivedExecutionVertex}. */ @@ -46,6 +47,14 @@ public interface AccessExecutionVertex { */ AccessExecution getCurrentExecutionAttempt(); + /** + * Returns the current executions for this execution vertex. The returned collection must + * contain the current execution attempt. + * + * @return current executions + */ + <T extends AccessExecution> Collection<T> getCurrentExecutions(); + /** * Returns the current {@link ExecutionState} for this execution vertex. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java index e5feba5445a..d9f8448a702 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java @@ -18,10 +18,14 @@ package org.apache.flink.runtime.executiongraph; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.Optional; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -40,15 +44,29 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa private final ArchivedExecution currentExecution; // this field must never be null + private final Collection<AccessExecution> currentExecutions; + // ------------------------------------------------------------------------ public ArchivedExecutionVertex(ExecutionVertex vertex) { this.subTaskIndex = vertex.getParallelSubtaskIndex(); this.executionHistory = getCopyOfExecutionHistory(vertex); this.taskNameWithSubtask = vertex.getTaskNameWithSubtaskIndex(); - this.currentExecution = vertex.getCurrentExecutionAttempt().archive(); + + Execution vertexCurrentExecution = vertex.getCurrentExecutionAttempt(); + ArrayList<AccessExecution> currentExecutionList = + new ArrayList<>(vertex.getCurrentExecutions().size()); + currentExecution = vertexCurrentExecution.archive(); + currentExecutionList.add(currentExecution); + for (Execution execution : vertex.getCurrentExecutions()) { + if (execution != vertexCurrentExecution) { + currentExecutionList.add(execution.archive()); + } + } + currentExecutions = Collections.unmodifiableList(currentExecutionList); } + @VisibleForTesting public ArchivedExecutionVertex( int subTaskIndex, String taskNameWithSubtask, @@ -58,6 +76,7 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa this.taskNameWithSubtask = checkNotNull(taskNameWithSubtask); this.currentExecution = checkNotNull(currentExecution); this.executionHistory = checkNotNull(executionHistory); + this.currentExecutions = Collections.singletonList(currentExecution); } // -------------------------------------------------------------------------------------------- @@ -79,6 +98,11 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa return currentExecution; } + @Override + public Collection<AccessExecution> getCurrentExecutions() { + return currentExecutions; + } + @Override public ExecutionState getExecutionState() { return currentExecution.getState(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java deleted file mode 100644 index 263049414c5..00000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedSpeculativeExecutionVertex.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.executiongraph; - -/** - * {@link ArchivedSpeculativeExecutionVertex} is a readonly representation of {@link - * SpeculativeExecutionVertex}. - */ -public class ArchivedSpeculativeExecutionVertex extends ArchivedExecutionVertex { - - private static final long serialVersionUID = 1L; - - public ArchivedSpeculativeExecutionVertex(SpeculativeExecutionVertex vertex) { - super( - vertex.getParallelSubtaskIndex(), - vertex.getTaskNameWithSubtaskIndex(), - vertex.getCurrentExecutionAttempt().archive(), - getCopyOfExecutionHistory(vertex)); - } - - private static ExecutionHistory getCopyOfExecutionHistory(SpeculativeExecutionVertex vertex) { - final ExecutionHistory executionHistory = - ArchivedExecutionVertex.getCopyOfExecutionHistory(vertex); - - // add all the executions to the execution history except for the only admitted current - // execution - final Execution currentAttempt = vertex.getCurrentExecutionAttempt(); - for (Execution execution : vertex.getCurrentExecutions()) { - if (execution.getAttemptNumber() != currentAttempt.getAttemptNumber()) { - executionHistory.add(execution.archive()); - } - } - - return executionHistory; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java index 41ef1c0ac2f..71e28779637 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java @@ -285,8 +285,8 @@ public class SpeculativeExecutionVertex extends ExecutionVertex { } @Override - public ArchivedSpeculativeExecutionVertex archive() { - return new ArchivedSpeculativeExecutionVertex(this); + public ArchivedExecutionVertex archive() { + return new ArchivedExecutionVertex(this); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java index 55a10cd5396..fc269901b34 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTestUtils.java @@ -24,6 +24,10 @@ import org.apache.flink.util.OptionalFailure; import org.apache.flink.util.SerializedValue; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -68,6 +72,23 @@ class ArchivedExecutionGraphTestUtils { compareExecution( runtimeVertex.getCurrentExecutionAttempt(), archivedVertex.getCurrentExecutionAttempt()); + + compareExecutions( + runtimeVertex.getCurrentExecutions(), archivedVertex.getCurrentExecutions()); + } + + private static <RT extends AccessExecution, AT extends AccessExecution> void compareExecutions( + Collection<RT> runtimeExecutions, Collection<AT> archivedExecutions) { + assertThat(runtimeExecutions).hasSameSizeAs(archivedExecutions); + + List<RT> sortedRuntimeExecutions = new ArrayList<>(runtimeExecutions); + List<AT> sortedArchivedExecutions = new ArrayList<>(archivedExecutions); + sortedRuntimeExecutions.sort(Comparator.comparingInt(AccessExecution::getAttemptNumber)); + sortedArchivedExecutions.sort(Comparator.comparingInt(AccessExecution::getAttemptNumber)); + + for (int i = 0; i < runtimeExecutions.size(); i++) { + compareExecution(sortedRuntimeExecutions.get(i), sortedArchivedExecutions.get(i)); + } } private static void compareExecution( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java new file mode 100644 index 00000000000..c9e83882b1c --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertexWithSpeculativeExecutionTest.java @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.scheduler.TestingInternalFailuresListener; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.testutils.executor.TestExecutorExtension; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for the {@link ArchivedExecutionVertex} created from a {@link SpeculativeExecutionVertex}. + */ +class ArchivedExecutionVertexWithSpeculativeExecutionTest { + + @RegisterExtension + private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = + TestingUtils.defaultExecutorExtension(); + + private TestingInternalFailuresListener internalFailuresListener; + + @BeforeEach + void setUp() { + internalFailuresListener = new TestingInternalFailuresListener(); + } + + @Test + void testCreateSpeculativeExecution() throws Exception { + final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(); + ev.createNewSpeculativeExecution(System.currentTimeMillis()); + + ArchivedExecutionVertex aev = ev.archive(); + ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev); + } + + @Test + void testResetExecutionVertex() throws Exception { + final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(); + final Execution e1 = ev.getCurrentExecutionAttempt(); + final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis()); + + e1.transitionState(ExecutionState.RUNNING); + e1.markFinished(); + e2.cancel(); + ev.resetForNewExecution(); + + ArchivedExecutionVertex aev = ev.archive(); + ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev); + } + + @Test + void testCancel() throws Exception { + final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(); + ev.createNewSpeculativeExecution(System.currentTimeMillis()); + ev.cancel(); + + ArchivedExecutionVertex aev = ev.archive(); + ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev); + } + + @Test + void testSuspend() throws Exception { + final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(); + ev.createNewSpeculativeExecution(System.currentTimeMillis()); + ev.suspend(); + + ArchivedExecutionVertex aev = ev.archive(); + ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev); + } + + @Test + void testFail() throws Exception { + final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(); + ev.createNewSpeculativeExecution(System.currentTimeMillis()); + ev.fail(new Exception("Forced test failure.")); + + ArchivedExecutionVertex aev = ev.archive(); + ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev); + } + + @Test + void testMarkFailed() throws Exception { + final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(); + ev.createNewSpeculativeExecution(System.currentTimeMillis()); + ev.markFailed(new Exception("Forced test failure.")); + + ArchivedExecutionVertex aev = ev.archive(); + ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev); + } + + @Test + void testVertexTerminationAndJobTermination() throws Exception { + final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1); + final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex); + final ExecutionGraph eg = createExecutionGraph(jobGraph); + eg.transitionToRunning(); + + ExecutionJobVertex jv = eg.getJobVertex(jobVertex.getID()); + assertThat(jv).isNotNull(); + final SpeculativeExecutionVertex ev = (SpeculativeExecutionVertex) jv.getTaskVertices()[0]; + final Execution e1 = ev.getCurrentExecutionAttempt(); + final Execution e2 = ev.createNewSpeculativeExecution(System.currentTimeMillis()); + + e1.transitionState(ExecutionState.RUNNING); + e1.markFinished(); + e2.cancel(); + + ArchivedExecutionVertex aev = ev.archive(); + ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev); + } + + @Test + void testArchiveFailedExecutions() throws Exception { + final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(); + + final Execution e1 = ev.getCurrentExecutionAttempt(); + e1.transitionState(ExecutionState.RUNNING); + + final Execution e2 = ev.createNewSpeculativeExecution(0); + e2.transitionState(ExecutionState.FAILED); + ev.archiveFailedExecution(e2.getAttemptId()); + + final Execution e3 = ev.createNewSpeculativeExecution(0); + e3.transitionState(ExecutionState.RUNNING); + e1.transitionState(ExecutionState.FAILED); + ev.archiveFailedExecution(e1.getAttemptId()); + + ArchivedExecutionVertex aev = ev.archive(); + ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev); + } + + @Test + void testArchiveTheOnlyCurrentExecution() throws Exception { + final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(); + + final Execution e1 = ev.getCurrentExecutionAttempt(); + e1.transitionState(ExecutionState.FAILED); + + ev.archiveFailedExecution(e1.getAttemptId()); + + ArchivedExecutionVertex aev = ev.archive(); + ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev); + } + + @Test + void testGetExecutionState() throws Exception { + final SpeculativeExecutionVertex ev = createSpeculativeExecutionVertex(); + + final Execution e1 = ev.getCurrentExecutionAttempt(); + e1.transitionState(ExecutionState.CANCELED); + + // the latter added state is more likely to reach FINISH state + final List<ExecutionState> statesSortedByPriority = new ArrayList<>(); + statesSortedByPriority.add(ExecutionState.FAILED); + statesSortedByPriority.add(ExecutionState.CANCELING); + statesSortedByPriority.add(ExecutionState.CREATED); + statesSortedByPriority.add(ExecutionState.SCHEDULED); + statesSortedByPriority.add(ExecutionState.DEPLOYING); + statesSortedByPriority.add(ExecutionState.INITIALIZING); + statesSortedByPriority.add(ExecutionState.RUNNING); + statesSortedByPriority.add(ExecutionState.FINISHED); + + for (ExecutionState state : statesSortedByPriority) { + final Execution execution = ev.createNewSpeculativeExecution(0); + execution.transitionState(state); + + // Check the AchievedExecutionVertex in each state. + ArchivedExecutionVertex aev = ev.archive(); + ArchivedExecutionGraphTestUtils.compareExecutionVertex(ev, aev); + } + } + + private SpeculativeExecutionVertex createSpeculativeExecutionVertex() throws Exception { + final JobVertex jobVertex = ExecutionGraphTestUtils.createNoOpVertex(1); + final JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex); + final ExecutionGraph executionGraph = createExecutionGraph(jobGraph); + ExecutionJobVertex jv = executionGraph.getJobVertex(jobVertex.getID()); + assertThat(jv).isNotNull(); + return (SpeculativeExecutionVertex) jv.getTaskVertices()[0]; + } + + private ExecutionGraph createExecutionGraph(final JobGraph jobGraph) throws Exception { + final ExecutionGraph executionGraph = + TestingDefaultExecutionGraphBuilder.newBuilder() + .setJobGraph(jobGraph) + .setExecutionJobVertexFactory(new SpeculativeExecutionJobVertex.Factory()) + .build(EXECUTOR_RESOURCE.getExecutor()); + + executionGraph.setInternalTaskFailuresListener(internalFailuresListener); + executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread()); + + return executionGraph; + } +}