This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9365049facc95fe5467f6f88c60ef68336ffaae8 Author: Aleksandr Iushmanov <aiushma...@confluent.io> AuthorDate: Wed Jun 25 09:54:52 2025 +0100 [FLINK-37701] Extract LocalRecoverTest to reduce AdaptiveSchedulerTest length, restore checkstyle line-length limit. --- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 312 +-------------------- .../adaptive/AdaptiveSchedulerTestBase.java | 176 ++++++++++++ .../scheduler/adaptive/LocalRecoveryTest.java | 219 +++++++++++++++ tools/maven/checkstyle.xml | 2 +- 4 files changed, 397 insertions(+), 312 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index 77bcfba43f9..2ad3aff639a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -24,23 +24,19 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.configuration.SchedulerExecutionMode; import org.apache.flink.configuration.TraceOptions; -import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.failure.FailureEnricher; import org.apache.flink.core.failure.TestingFailureEnricher; import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.Gauge; -import org.apache.flink.runtime.OperatorIDPair; import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.checkpoint.CheckpointIDCounter; import org.apache.flink.runtime.checkpoint.CheckpointStatsListener; import org.apache.flink.runtime.checkpoint.CheckpointsCleaner; -import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.NoOpCheckpointStatsTracker; -import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter; import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter; @@ -59,10 +55,7 @@ import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraphTest; import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; -import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition; import org.apache.flink.runtime.executiongraph.failover.FixedDelayRestartBackoffTimeStrategy; import org.apache.flink.runtime.executiongraph.failover.NoRestartBackoffTimeStrategy; @@ -73,7 +66,6 @@ import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGate import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; -import org.apache.flink.runtime.jobgraph.JobGraphBuilder; import org.apache.flink.runtime.jobgraph.JobResourceRequirements; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements; @@ -85,8 +77,6 @@ import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool; import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool; import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool; -import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPoolBuilder; -import org.apache.flink.runtime.jobmaster.slotpool.TestingFreeSlotTracker; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint; import org.apache.flink.runtime.metrics.MetricNames; @@ -103,28 +93,20 @@ import org.apache.flink.runtime.scheduler.ExecutionGraphHandler; import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler; import org.apache.flink.runtime.scheduler.SchedulerBase; import org.apache.flink.runtime.scheduler.SchedulerNG; -import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; -import org.apache.flink.runtime.scheduler.TestingPhysicalSlot; import org.apache.flink.runtime.scheduler.VertexParallelismInformation; import org.apache.flink.runtime.scheduler.VertexParallelismStore; -import org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation; -import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator; import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot; import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator; import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism; import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry; import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import org.apache.flink.runtime.slots.ResourceRequirement; -import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; -import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.util.ResourceCounter; import org.apache.flink.runtime.util.TestingFatalErrorHandler; -import org.apache.flink.testutils.TestingUtils; -import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.traces.Span; import org.apache.flink.traces.SpanBuilder; import org.apache.flink.util.ConfigurationException; @@ -133,10 +115,7 @@ import org.apache.flink.util.IterableUtils; import org.apache.flink.util.Preconditions; import org.apache.flink.util.concurrent.FutureUtils; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.RegisterExtension; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -149,16 +128,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; -import java.util.Optional; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -170,119 +145,24 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collectors; -import java.util.stream.IntStream; import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; -import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generateKeyGroupState; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; import static org.apache.flink.runtime.jobgraph.JobGraphTestUtils.singleNoOpJobGraph; import static org.apache.flink.runtime.jobgraph.JobGraphTestUtils.streamingJobGraph; import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.createSlotOffersForResourceRequirements; import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots; -import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint; import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.enableCheckpointing; -import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.setAllExecutionsToRunning; -import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForCheckpointInProgress; -import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForCompletedCheckpoint; -import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForJobStatusRunning; -import static org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator.getArgumentCapturingDelegatingSlotAllocator; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatNoException; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.jupiter.api.Assertions.assertTrue; /** Tests for the {@link AdaptiveScheduler}. */ -public class AdaptiveSchedulerTest { - - private static final Duration DEFAULT_TIMEOUT = Duration.ofHours(1); - private static final int PARALLELISM = 4; - private static final JobVertex JOB_VERTEX = createNoOpVertex("v1", PARALLELISM); +public class AdaptiveSchedulerTest extends AdaptiveSchedulerTestBase { private static final Logger LOG = LoggerFactory.getLogger(AdaptiveSchedulerTest.class); - @RegisterExtension - private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = - TestingUtils.defaultExecutorExtension(); - - @RegisterExtension - private static final TestExecutorExtension<ScheduledExecutorService> TEST_EXECUTOR_RESOURCE = - new TestExecutorExtension<>(Executors::newSingleThreadScheduledExecutor); - - public static final int CHECKPOINT_TIMEOUT_SECONDS = 10; - - private final ManuallyTriggeredComponentMainThreadExecutor mainThreadExecutor = - new ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread()); - - private final ComponentMainThreadExecutor singleThreadMainThreadExecutor = - ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( - TEST_EXECUTOR_RESOURCE.getExecutor()); - - private final ClassLoader classLoader = ClassLoader.getSystemClassLoader(); - - private AdaptiveScheduler scheduler; - - @BeforeEach - void before() { - scheduler = null; - } - - @AfterEach - void after() { - closeInExecutorService(scheduler, singleThreadMainThreadExecutor); - } - - private static void closeInExecutorService( - @Nullable AdaptiveScheduler scheduler, Executor executor) { - if (scheduler != null) { - final CompletableFuture<Void> closeFuture = new CompletableFuture<>(); - executor.execute( - () -> { - try { - scheduler.cancel(); - - FutureUtils.forward(scheduler.closeAsync(), closeFuture); - } catch (Throwable t) { - closeFuture.completeExceptionally(t); - } - }); - - // we have to wait for the job termination outside the main thread because the - // cancellation tasks are scheduled on the main thread as well. - scheduler - .getJobTerminationFuture() - .whenCompleteAsync( - (jobStatus, error) -> { - assertThat(scheduler.getState().getClass()) - .isEqualTo(Finished.class); - - if (error != null) { - closeFuture.completeExceptionally(error); - } else { - try { - FutureUtils.forward(scheduler.closeAsync(), closeFuture); - } catch (Throwable t) { - closeFuture.completeExceptionally(t); - } - } - }, - executor); - assertThatFuture(closeFuture).eventuallySucceeds(); - } - } - - private void startTestInstanceInMainThread() { - runInMainThread(() -> scheduler.startScheduling()); - } - - private void runInMainThread(Runnable callback) { - CompletableFuture.runAsync(callback, singleThreadMainThreadExecutor).join(); - } - - private <T> T supplyInMainThread(Supplier<T> supplier) throws Exception { - return CompletableFuture.supplyAsync(supplier, singleThreadMainThreadExecutor).get(); - } - @Test void testInitialState() throws Exception { scheduler = @@ -2096,126 +1976,6 @@ public class AdaptiveSchedulerTest { assertThat(assignmentResult.isSuccess()).isFalse(); } - @Test - void testStateSizeIsConsideredForLocalRecoveryOnRestart() throws Exception { - final JobGraph jobGraph = createJobGraphWithCheckpointing(JOB_VERTEX); - final DeclarativeSlotPool slotPool = getSlotPoolWithFreeSlots(PARALLELISM); - final List<JobAllocationsInformation> capturedAllocations = new ArrayList<>(); - final boolean localRecoveryEnabled = true; - final String executionTarget = "local"; - final boolean minimalTaskManagerPreferred = false; - final SlotAllocator slotAllocator = - getArgumentCapturingDelegatingSlotAllocator( - AdaptiveSchedulerFactory.createSlotSharingSlotAllocator( - slotPool, - localRecoveryEnabled, - executionTarget, - minimalTaskManagerPreferred), - capturedAllocations); - - scheduler = - new AdaptiveSchedulerBuilder( - jobGraph, - singleThreadMainThreadExecutor, - EXECUTOR_RESOURCE.getExecutor()) - .setDeclarativeSlotPool(slotPool) - .setSlotAllocator(slotAllocator) - .setStateTransitionManagerFactory( - createAutoAdvanceStateTransitionManagerFactory()) - .setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0L)) - .build(); - - // Start scheduler - startTestInstanceInMainThread(); - - // Transition job and all subtasks to RUNNING state. - waitForJobStatusRunning(scheduler); - runInMainThread(() -> setAllExecutionsToRunning(scheduler)); - - // Trigger a checkpoint - CompletableFuture<CompletedCheckpoint> completedCheckpointFuture = - supplyInMainThread(() -> scheduler.triggerCheckpoint(CheckpointType.FULL)); - - // Verify that checkpoint was registered by scheduler. Required to prevent race condition - // when checkpoint is acknowledged before start. - waitForCheckpointInProgress(scheduler); - - // Acknowledge the checkpoint for all tasks with the fake state. - final Map<OperatorID, OperatorSubtaskState> operatorStates = - generateFakeKeyedManagedStateForAllOperators(jobGraph); - runInMainThread(() -> acknowledgePendingCheckpoint(scheduler, 1, operatorStates)); - - // Wait for the checkpoint to complete. - final CompletedCheckpoint completedCheckpoint = completedCheckpointFuture.join(); - - // completedCheckpointStore.getLatestCheckpoint() can return null if called immediately - // after the checkpoint is completed. - waitForCompletedCheckpoint(scheduler); - - // Fail early if the checkpoint is null. - assertThat(completedCheckpoint).withFailMessage("Checkpoint shouldn't be null").isNotNull(); - - // Emulating new graph creation call on job recovery to ensure that the state is considered - // for new allocations. - final List<ExecutionAttemptID> executionAttemptIds = - supplyInMainThread( - () -> { - final Optional<ExecutionGraph> maybeExecutionGraph = - scheduler - .getState() - .as(StateWithExecutionGraph.class) - .map(StateWithExecutionGraph::getExecutionGraph); - assertThat(maybeExecutionGraph).isNotEmpty(); - final ExecutionVertex[] taskVertices = - Objects.requireNonNull( - maybeExecutionGraph - .get() - .getJobVertex(JOB_VERTEX.getID())) - .getTaskVertices(); - return Arrays.stream(taskVertices) - .map(ExecutionVertex::getCurrentExecutionAttempt) - .map(Execution::getAttemptId) - .collect(Collectors.toList()); - }); - - assertThat(executionAttemptIds).hasSize(PARALLELISM); - - runInMainThread( - () -> { - // fail one of the vertices - scheduler.updateTaskExecutionState( - new TaskExecutionState( - executionAttemptIds.get(0), - ExecutionState.FAILED, - new Exception("Test exception for local recovery"))); - }); - - runInMainThread( - () -> { - // cancel remaining vertices - for (int idx = 1; idx < executionAttemptIds.size(); idx++) { - scheduler.updateTaskExecutionState( - new TaskExecutionState( - executionAttemptIds.get(idx), ExecutionState.CANCELED)); - } - }); - - waitForJobStatusRunning(scheduler); - - // First allocation during the job start + second allocation after job restart. - assertThat(capturedAllocations).hasSize(2); - // Fist allocation won't use state data. - assertTrue(capturedAllocations.get(0).isEmpty()); - // Second allocation should use data from latest checkpoint. - assertThat( - capturedAllocations - .get(1) - .getAllocations(JOB_VERTEX.getID()) - .get(0) - .stateSizeInBytes) - .isGreaterThan(0); - } - @Test void testComputeVertexParallelismStoreForExecutionInReactiveMode() { JobVertex v1 = createNoOpVertex("v1", 1, 50); @@ -2738,30 +2498,6 @@ public class AdaptiveSchedulerTest { mainThreadExecutor); } - /** - * Creates a testing SlotPool instance that would allow for the scheduler to transition to - * Executing state. - */ - private static DeclarativeSlotPool getSlotPoolWithFreeSlots(int freeSlots) { - return new TestingDeclarativeSlotPoolBuilder() - .setContainsFreeSlotFunction(allocationID -> true) - .setReserveFreeSlotFunction( - (allocationId, resourceProfile) -> - TestingPhysicalSlot.builder() - .withAllocationID(allocationId) - .build()) - .setGetFreeSlotTrackerSupplier( - () -> - TestingFreeSlotTracker.newBuilder() - .setGetFreeSlotsInformationSupplier( - () -> - IntStream.range(0, freeSlots) - .mapToObj(v -> new TestingSlot()) - .collect(Collectors.toSet())) - .build()) - .build(); - } - private static JobGraph createJobGraph() { return streamingJobGraph(JOB_VERTEX); } @@ -3066,50 +2802,4 @@ public class AdaptiveSchedulerTest { .containsEntry("canRestart", String.valueOf(canRestart)); } } - - private static JobGraph createJobGraphWithCheckpointing(final JobVertex... jobVertex) { - final JobGraph jobGraph = - JobGraphBuilder.newStreamingJobGraphBuilder() - .addJobVertices(Arrays.asList(jobVertex)) - .build(); - SchedulerTestingUtils.enableCheckpointing( - jobGraph, null, null, Duration.ofHours(1).toMillis(), true); - return jobGraph; - } - - private static AdaptiveScheduler.StateTransitionManagerFactory - createAutoAdvanceStateTransitionManagerFactory() { - return (context, - ignoredClock, - ignoredCooldown, - ignoredResourceStabilizationTimeout, - ignoredMaxTriggerDelay) -> - TestingStateTransitionManager.withOnTriggerEventOnly( - () -> { - if (context instanceof WaitingForResources) { - context.transitionToSubsequentState(); - } - }); - } - - private static Map<OperatorID, OperatorSubtaskState> - generateFakeKeyedManagedStateForAllOperators(final JobGraph jobGraph) - throws IOException { - final Map<OperatorID, OperatorSubtaskState> operatorStates = new HashMap<>(); - for (final JobVertex jobVertex : jobGraph.getVertices()) { - final KeyedStateHandle keyedStateHandle = - generateKeyGroupState( - jobVertex.getID(), - KeyGroupRange.of(0, jobGraph.getMaximumParallelism() - 1), - false); - for (OperatorIDPair operatorId : jobVertex.getOperatorIDs()) { - operatorStates.put( - operatorId.getGeneratedOperatorID(), - OperatorSubtaskState.builder() - .setManagedKeyedState(keyedStateHandle) - .build()); - } - } - return operatorStates; - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTestBase.java new file mode 100644 index 00000000000..9f856d785a3 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTestBase.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphBuilder; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool; +import org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPoolBuilder; +import org.apache.flink.runtime.jobmaster.slotpool.TestingFreeSlotTracker; +import org.apache.flink.runtime.scheduler.SchedulerTestingUtils; +import org.apache.flink.runtime.scheduler.TestingPhysicalSlot; +import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot; +import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.testutils.executor.TestExecutorExtension; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.extension.RegisterExtension; + +import javax.annotation.Nullable; + +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture; +import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex; +import static org.assertj.core.api.Assertions.assertThat; + +public class AdaptiveSchedulerTestBase { + protected static final Duration DEFAULT_TIMEOUT = Duration.ofHours(1); + protected static final int PARALLELISM = 4; + protected static final JobVertex JOB_VERTEX = createNoOpVertex("v1", PARALLELISM); + + @RegisterExtension + protected static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = + TestingUtils.defaultExecutorExtension(); + + @RegisterExtension + protected static final TestExecutorExtension<ScheduledExecutorService> TEST_EXECUTOR_RESOURCE = + new TestExecutorExtension<>(Executors::newSingleThreadScheduledExecutor); + + protected final ManuallyTriggeredComponentMainThreadExecutor mainThreadExecutor = + new ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread()); + + protected final ComponentMainThreadExecutor singleThreadMainThreadExecutor = + ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor( + TEST_EXECUTOR_RESOURCE.getExecutor()); + + protected final ClassLoader classLoader = ClassLoader.getSystemClassLoader(); + + protected AdaptiveScheduler scheduler; + + @BeforeEach + void before() { + scheduler = null; + } + + @AfterEach + void after() { + closeInExecutorService(scheduler, singleThreadMainThreadExecutor); + } + + protected static JobGraph createJobGraphWithCheckpointing(final JobVertex... jobVertex) { + final JobGraph jobGraph = + JobGraphBuilder.newStreamingJobGraphBuilder() + .addJobVertices(Arrays.asList(jobVertex)) + .build(); + SchedulerTestingUtils.enableCheckpointing( + jobGraph, null, null, Duration.ofHours(1).toMillis(), true); + return jobGraph; + } + + protected static void closeInExecutorService( + @Nullable AdaptiveScheduler scheduler, Executor executor) { + if (scheduler != null) { + final CompletableFuture<Void> closeFuture = new CompletableFuture<>(); + executor.execute( + () -> { + try { + scheduler.cancel(); + + FutureUtils.forward(scheduler.closeAsync(), closeFuture); + } catch (Throwable t) { + closeFuture.completeExceptionally(t); + } + }); + + // we have to wait for the job termination outside the main thread because the + // cancellation tasks are scheduled on the main thread as well. + scheduler + .getJobTerminationFuture() + .whenCompleteAsync( + (jobStatus, error) -> { + assertThat(scheduler.getState().getClass()) + .isEqualTo(Finished.class); + + if (error != null) { + closeFuture.completeExceptionally(error); + } else { + try { + FutureUtils.forward(scheduler.closeAsync(), closeFuture); + } catch (Throwable t) { + closeFuture.completeExceptionally(t); + } + } + }, + executor); + assertThatFuture(closeFuture).eventuallySucceeds(); + } + } + + /** + * Creates a testing SlotPool instance that would allow for the scheduler to transition to + * Executing state. + */ + protected static DeclarativeSlotPool getSlotPoolWithFreeSlots(int freeSlots) { + return new TestingDeclarativeSlotPoolBuilder() + .setContainsFreeSlotFunction(allocationID -> true) + .setReserveFreeSlotFunction( + (allocationId, resourceProfile) -> + TestingPhysicalSlot.builder() + .withAllocationID(allocationId) + .build()) + .setGetFreeSlotTrackerSupplier( + () -> + TestingFreeSlotTracker.newBuilder() + .setGetFreeSlotsInformationSupplier( + () -> + IntStream.range(0, freeSlots) + .mapToObj(v -> new TestingSlot()) + .collect(Collectors.toSet())) + .build()) + .build(); + } + + protected void startTestInstanceInMainThread() { + runInMainThread(() -> scheduler.startScheduling()); + } + + protected void runInMainThread(final Runnable callback) { + CompletableFuture.runAsync(callback, singleThreadMainThreadExecutor).join(); + } + + protected <T> T supplyInMainThread(final Supplier<T> supplier) throws Exception { + return CompletableFuture.supplyAsync(supplier, singleThreadMainThreadExecutor).get(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java new file mode 100644 index 00000000000..35e2bb54db4 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.flink.runtime.scheduler.adaptive; + +import org.apache.flink.core.execution.CheckpointType; +import org.apache.flink.runtime.OperatorIDPair; +import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.Execution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.executiongraph.failover.TestRestartBackoffTimeStrategy; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool; +import org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation; +import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator; +import org.apache.flink.runtime.state.KeyGroupRange; +import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; + +import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generateKeyGroupState; +import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint; +import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.setAllExecutionsToRunning; +import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForCheckpointInProgress; +import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForCompletedCheckpoint; +import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForJobStatusRunning; +import static org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator.getArgumentCapturingDelegatingSlotAllocator; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class LocalRecoveryTest extends AdaptiveSchedulerTestBase { + @Test + void testStateSizeIsConsideredForLocalRecoveryOnRestart() throws Exception { + final JobGraph jobGraph = createJobGraphWithCheckpointing(JOB_VERTEX); + final DeclarativeSlotPool slotPool = getSlotPoolWithFreeSlots(PARALLELISM); + final List<JobAllocationsInformation> capturedAllocations = new ArrayList<>(); + final boolean localRecoveryEnabled = true; + final String executionTarget = "local"; + final boolean minimalTaskManagerPreferred = false; + final SlotAllocator slotAllocator = + getArgumentCapturingDelegatingSlotAllocator( + AdaptiveSchedulerFactory.createSlotSharingSlotAllocator( + slotPool, + localRecoveryEnabled, + executionTarget, + minimalTaskManagerPreferred), + capturedAllocations); + + scheduler = + new AdaptiveSchedulerBuilder( + jobGraph, + singleThreadMainThreadExecutor, + EXECUTOR_RESOURCE.getExecutor()) + .setDeclarativeSlotPool(slotPool) + .setSlotAllocator(slotAllocator) + .setStateTransitionManagerFactory( + createAutoAdvanceStateTransitionManagerFactory()) + .setRestartBackoffTimeStrategy(new TestRestartBackoffTimeStrategy(true, 0L)) + .build(); + + // Start scheduler + startTestInstanceInMainThread(); + + // Transition job and all subtasks to RUNNING state. + waitForJobStatusRunning(scheduler); + runInMainThread(() -> setAllExecutionsToRunning(scheduler)); + + // Trigger a checkpoint + CompletableFuture<CompletedCheckpoint> completedCheckpointFuture = + supplyInMainThread(() -> scheduler.triggerCheckpoint(CheckpointType.FULL)); + + // Verify that checkpoint was registered by scheduler. Required to prevent race condition + // when checkpoint is acknowledged before start. + waitForCheckpointInProgress(scheduler); + + // Acknowledge the checkpoint for all tasks with the fake state. + final Map<OperatorID, OperatorSubtaskState> operatorStates = + generateFakeKeyedManagedStateForAllOperators(jobGraph); + runInMainThread(() -> acknowledgePendingCheckpoint(scheduler, 1, operatorStates)); + + // Wait for the checkpoint to complete. + final CompletedCheckpoint completedCheckpoint = completedCheckpointFuture.join(); + + // completedCheckpointStore.getLatestCheckpoint() can return null if called immediately + // after the checkpoint is completed. + waitForCompletedCheckpoint(scheduler); + + // Fail early if the checkpoint is null. + assertThat(completedCheckpoint).withFailMessage("Checkpoint shouldn't be null").isNotNull(); + + // Emulating new graph creation call on job recovery to ensure that the state is considered + // for new allocations. + final List<ExecutionAttemptID> executionAttemptIds = + supplyInMainThread(this::getExecutionAttemptIDS); + + assertThat(executionAttemptIds).hasSize(PARALLELISM); + + runInMainThread( + () -> { + // fail one of the vertices + scheduler.updateTaskExecutionState( + new TaskExecutionState( + executionAttemptIds.get(0), + ExecutionState.FAILED, + new Exception("Test exception for local recovery"))); + }); + + runInMainThread( + () -> { + // cancel remaining vertices + for (int idx = 1; idx < executionAttemptIds.size(); idx++) { + scheduler.updateTaskExecutionState( + new TaskExecutionState( + executionAttemptIds.get(idx), ExecutionState.CANCELED)); + } + }); + + waitForJobStatusRunning(scheduler); + + // First allocation during the job start + second allocation after job restart. + assertThat(capturedAllocations).hasSize(2); + // Fist allocation won't use state data. + assertTrue(capturedAllocations.get(0).isEmpty()); + // Second allocation should use data from latest checkpoint. + assertThat( + capturedAllocations + .get(1) + .getAllocations(JOB_VERTEX.getID()) + .get(0) + .stateSizeInBytes) + .isGreaterThan(0); + } + + private List<ExecutionAttemptID> getExecutionAttemptIDS() { + final Optional<ExecutionGraph> maybeExecutionGraph = + scheduler + .getState() + .as(StateWithExecutionGraph.class) + .map(StateWithExecutionGraph::getExecutionGraph); + assertThat(maybeExecutionGraph).isNotEmpty(); + final ExecutionVertex[] taskVertices = + Objects.requireNonNull(maybeExecutionGraph.get().getJobVertex(JOB_VERTEX.getID())) + .getTaskVertices(); + return Arrays.stream(taskVertices) + .map(ExecutionVertex::getCurrentExecutionAttempt) + .map(Execution::getAttemptId) + .collect(Collectors.toList()); + } + + private static AdaptiveScheduler.StateTransitionManagerFactory + createAutoAdvanceStateTransitionManagerFactory() { + return (context, + ignoredClock, + ignoredCooldown, + ignoredResourceStabilizationTimeout, + ignoredMaxTriggerDelay) -> + TestingStateTransitionManager.withOnTriggerEventOnly( + () -> { + if (context instanceof WaitingForResources) { + context.transitionToSubsequentState(); + } + }); + } + + private static Map<OperatorID, OperatorSubtaskState> + generateFakeKeyedManagedStateForAllOperators(final JobGraph jobGraph) + throws IOException { + final Map<OperatorID, OperatorSubtaskState> operatorStates = new HashMap<>(); + for (final JobVertex jobVertex : jobGraph.getVertices()) { + final KeyedStateHandle keyedStateHandle = + generateKeyGroupState( + jobVertex.getID(), + KeyGroupRange.of(0, jobGraph.getMaximumParallelism() - 1), + false); + for (OperatorIDPair operatorId : jobVertex.getOperatorIDs()) { + operatorStates.put( + operatorId.getGeneratedOperatorID(), + OperatorSubtaskState.builder() + .setManagedKeyedState(keyedStateHandle) + .build()); + } + } + return operatorStates; + } +} diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml index ebc80a25fe5..ff6c13932b9 100644 --- a/tools/maven/checkstyle.xml +++ b/tools/maven/checkstyle.xml @@ -61,7 +61,7 @@ This file is based on the checkstyle file of Apache Beam. --> <module name="FileLength"> - <property name="max" value="3150"/> + <property name="max" value="3100"/> </module> <!-- All Java AST specific tests live under TreeWalker module. -->