This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9365049facc95fe5467f6f88c60ef68336ffaae8
Author: Aleksandr Iushmanov <aiushma...@confluent.io>
AuthorDate: Wed Jun 25 09:54:52 2025 +0100

    [FLINK-37701] Extract LocalRecoverTest to reduce AdaptiveSchedulerTest 
length, restore checkstyle line-length limit.
---
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  | 312 +--------------------
 .../adaptive/AdaptiveSchedulerTestBase.java        | 176 ++++++++++++
 .../scheduler/adaptive/LocalRecoveryTest.java      | 219 +++++++++++++++
 tools/maven/checkstyle.xml                         |   2 +-
 4 files changed, 397 insertions(+), 312 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index 77bcfba43f9..2ad3aff639a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -24,23 +24,19 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.configuration.SchedulerExecutionMode;
 import org.apache.flink.configuration.TraceOptions;
-import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.failure.FailureEnricher;
 import org.apache.flink.core.failure.TestingFailureEnricher;
 import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.Gauge;
-import org.apache.flink.runtime.OperatorIDPair;
 import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsListener;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.NoOpCheckpointStatsTracker;
-import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
@@ -59,10 +55,7 @@ import 
org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraphTest;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
-import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionGraph;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
 import 
org.apache.flink.runtime.executiongraph.failover.FixedDelayRestartBackoffTimeStrategy;
 import 
org.apache.flink.runtime.executiongraph.failover.NoRestartBackoffTimeStrategy;
@@ -73,7 +66,6 @@ import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGate
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
 import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
@@ -85,8 +77,6 @@ import 
org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.DefaultAllocatedSlotPool;
 import org.apache.flink.runtime.jobmaster.slotpool.DefaultDeclarativeSlotPool;
-import 
org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPoolBuilder;
-import org.apache.flink.runtime.jobmaster.slotpool.TestingFreeSlotTracker;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.metrics.MetricNames;
@@ -103,28 +93,20 @@ import 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
 import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
 import org.apache.flink.runtime.scheduler.SchedulerBase;
 import org.apache.flink.runtime.scheduler.SchedulerNG;
-import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
-import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
 import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
 import org.apache.flink.runtime.scheduler.VertexParallelismStore;
-import 
org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation;
-import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
 import org.apache.flink.runtime.slots.ResourceRequirement;
-import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.util.ResourceCounter;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
-import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.testutils.executor.TestExecutorExtension;
 import org.apache.flink.traces.Span;
 import org.apache.flink.traces.SpanBuilder;
 import org.apache.flink.util.ConfigurationException;
@@ -133,10 +115,7 @@ import org.apache.flink.util.IterableUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.FutureUtils;
 
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -149,16 +128,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -170,119 +145,24 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
-import static 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generateKeyGroupState;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
 import static 
org.apache.flink.runtime.jobgraph.JobGraphTestUtils.singleNoOpJobGraph;
 import static 
org.apache.flink.runtime.jobgraph.JobGraphTestUtils.streamingJobGraph;
 import static 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.createSlotOffersForResourceRequirements;
 import static 
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
-import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint;
 import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.enableCheckpointing;
-import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.setAllExecutionsToRunning;
-import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForCheckpointInProgress;
-import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForCompletedCheckpoint;
-import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForJobStatusRunning;
-import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator.getArgumentCapturingDelegatingSlotAllocator;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatNoException;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /** Tests for the {@link AdaptiveScheduler}. */
-public class AdaptiveSchedulerTest {
-
-    private static final Duration DEFAULT_TIMEOUT = Duration.ofHours(1);
-    private static final int PARALLELISM = 4;
-    private static final JobVertex JOB_VERTEX = createNoOpVertex("v1", 
PARALLELISM);
+public class AdaptiveSchedulerTest extends AdaptiveSchedulerTestBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(AdaptiveSchedulerTest.class);
 
-    @RegisterExtension
-    private static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
-            TestingUtils.defaultExecutorExtension();
-
-    @RegisterExtension
-    private static final TestExecutorExtension<ScheduledExecutorService> 
TEST_EXECUTOR_RESOURCE =
-            new 
TestExecutorExtension<>(Executors::newSingleThreadScheduledExecutor);
-
-    public static final int CHECKPOINT_TIMEOUT_SECONDS = 10;
-
-    private final ManuallyTriggeredComponentMainThreadExecutor 
mainThreadExecutor =
-            new 
ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
-
-    private final ComponentMainThreadExecutor singleThreadMainThreadExecutor =
-            ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
-                    TEST_EXECUTOR_RESOURCE.getExecutor());
-
-    private final ClassLoader classLoader = ClassLoader.getSystemClassLoader();
-
-    private AdaptiveScheduler scheduler;
-
-    @BeforeEach
-    void before() {
-        scheduler = null;
-    }
-
-    @AfterEach
-    void after() {
-        closeInExecutorService(scheduler, singleThreadMainThreadExecutor);
-    }
-
-    private static void closeInExecutorService(
-            @Nullable AdaptiveScheduler scheduler, Executor executor) {
-        if (scheduler != null) {
-            final CompletableFuture<Void> closeFuture = new 
CompletableFuture<>();
-            executor.execute(
-                    () -> {
-                        try {
-                            scheduler.cancel();
-
-                            FutureUtils.forward(scheduler.closeAsync(), 
closeFuture);
-                        } catch (Throwable t) {
-                            closeFuture.completeExceptionally(t);
-                        }
-                    });
-
-            // we have to wait for the job termination outside the main thread 
because the
-            // cancellation tasks are scheduled on the main thread as well.
-            scheduler
-                    .getJobTerminationFuture()
-                    .whenCompleteAsync(
-                            (jobStatus, error) -> {
-                                assertThat(scheduler.getState().getClass())
-                                        .isEqualTo(Finished.class);
-
-                                if (error != null) {
-                                    closeFuture.completeExceptionally(error);
-                                } else {
-                                    try {
-                                        
FutureUtils.forward(scheduler.closeAsync(), closeFuture);
-                                    } catch (Throwable t) {
-                                        closeFuture.completeExceptionally(t);
-                                    }
-                                }
-                            },
-                            executor);
-            assertThatFuture(closeFuture).eventuallySucceeds();
-        }
-    }
-
-    private void startTestInstanceInMainThread() {
-        runInMainThread(() -> scheduler.startScheduling());
-    }
-
-    private void runInMainThread(Runnable callback) {
-        CompletableFuture.runAsync(callback, 
singleThreadMainThreadExecutor).join();
-    }
-
-    private <T> T supplyInMainThread(Supplier<T> supplier) throws Exception {
-        return CompletableFuture.supplyAsync(supplier, 
singleThreadMainThreadExecutor).get();
-    }
-
     @Test
     void testInitialState() throws Exception {
         scheduler =
@@ -2096,126 +1976,6 @@ public class AdaptiveSchedulerTest {
         assertThat(assignmentResult.isSuccess()).isFalse();
     }
 
-    @Test
-    void testStateSizeIsConsideredForLocalRecoveryOnRestart() throws Exception 
{
-        final JobGraph jobGraph = createJobGraphWithCheckpointing(JOB_VERTEX);
-        final DeclarativeSlotPool slotPool = 
getSlotPoolWithFreeSlots(PARALLELISM);
-        final List<JobAllocationsInformation> capturedAllocations = new 
ArrayList<>();
-        final boolean localRecoveryEnabled = true;
-        final String executionTarget = "local";
-        final boolean minimalTaskManagerPreferred = false;
-        final SlotAllocator slotAllocator =
-                getArgumentCapturingDelegatingSlotAllocator(
-                        
AdaptiveSchedulerFactory.createSlotSharingSlotAllocator(
-                                slotPool,
-                                localRecoveryEnabled,
-                                executionTarget,
-                                minimalTaskManagerPreferred),
-                        capturedAllocations);
-
-        scheduler =
-                new AdaptiveSchedulerBuilder(
-                                jobGraph,
-                                singleThreadMainThreadExecutor,
-                                EXECUTOR_RESOURCE.getExecutor())
-                        .setDeclarativeSlotPool(slotPool)
-                        .setSlotAllocator(slotAllocator)
-                        .setStateTransitionManagerFactory(
-                                
createAutoAdvanceStateTransitionManagerFactory())
-                        .setRestartBackoffTimeStrategy(new 
TestRestartBackoffTimeStrategy(true, 0L))
-                        .build();
-
-        // Start scheduler
-        startTestInstanceInMainThread();
-
-        // Transition job and all subtasks to RUNNING state.
-        waitForJobStatusRunning(scheduler);
-        runInMainThread(() -> setAllExecutionsToRunning(scheduler));
-
-        // Trigger a checkpoint
-        CompletableFuture<CompletedCheckpoint> completedCheckpointFuture =
-                supplyInMainThread(() -> 
scheduler.triggerCheckpoint(CheckpointType.FULL));
-
-        // Verify that checkpoint was registered by scheduler. Required to 
prevent race condition
-        // when checkpoint is acknowledged before start.
-        waitForCheckpointInProgress(scheduler);
-
-        // Acknowledge the checkpoint for all tasks with the fake state.
-        final Map<OperatorID, OperatorSubtaskState> operatorStates =
-                generateFakeKeyedManagedStateForAllOperators(jobGraph);
-        runInMainThread(() -> acknowledgePendingCheckpoint(scheduler, 1, 
operatorStates));
-
-        // Wait for the checkpoint to complete.
-        final CompletedCheckpoint completedCheckpoint = 
completedCheckpointFuture.join();
-
-        // completedCheckpointStore.getLatestCheckpoint() can return null if 
called immediately
-        // after the checkpoint is completed.
-        waitForCompletedCheckpoint(scheduler);
-
-        // Fail early if the checkpoint is null.
-        assertThat(completedCheckpoint).withFailMessage("Checkpoint shouldn't 
be null").isNotNull();
-
-        // Emulating new graph creation call on job recovery to ensure that 
the state is considered
-        // for new allocations.
-        final List<ExecutionAttemptID> executionAttemptIds =
-                supplyInMainThread(
-                        () -> {
-                            final Optional<ExecutionGraph> maybeExecutionGraph 
=
-                                    scheduler
-                                            .getState()
-                                            .as(StateWithExecutionGraph.class)
-                                            
.map(StateWithExecutionGraph::getExecutionGraph);
-                            assertThat(maybeExecutionGraph).isNotEmpty();
-                            final ExecutionVertex[] taskVertices =
-                                    Objects.requireNonNull(
-                                                    maybeExecutionGraph
-                                                            .get()
-                                                            
.getJobVertex(JOB_VERTEX.getID()))
-                                            .getTaskVertices();
-                            return Arrays.stream(taskVertices)
-                                    
.map(ExecutionVertex::getCurrentExecutionAttempt)
-                                    .map(Execution::getAttemptId)
-                                    .collect(Collectors.toList());
-                        });
-
-        assertThat(executionAttemptIds).hasSize(PARALLELISM);
-
-        runInMainThread(
-                () -> {
-                    // fail one of the vertices
-                    scheduler.updateTaskExecutionState(
-                            new TaskExecutionState(
-                                    executionAttemptIds.get(0),
-                                    ExecutionState.FAILED,
-                                    new Exception("Test exception for local 
recovery")));
-                });
-
-        runInMainThread(
-                () -> {
-                    // cancel remaining vertices
-                    for (int idx = 1; idx < executionAttemptIds.size(); idx++) 
{
-                        scheduler.updateTaskExecutionState(
-                                new TaskExecutionState(
-                                        executionAttemptIds.get(idx), 
ExecutionState.CANCELED));
-                    }
-                });
-
-        waitForJobStatusRunning(scheduler);
-
-        // First allocation during the job start + second allocation after job 
restart.
-        assertThat(capturedAllocations).hasSize(2);
-        // Fist allocation won't use state data.
-        assertTrue(capturedAllocations.get(0).isEmpty());
-        // Second allocation should use data from latest checkpoint.
-        assertThat(
-                        capturedAllocations
-                                .get(1)
-                                .getAllocations(JOB_VERTEX.getID())
-                                .get(0)
-                                .stateSizeInBytes)
-                .isGreaterThan(0);
-    }
-
     @Test
     void testComputeVertexParallelismStoreForExecutionInReactiveMode() {
         JobVertex v1 = createNoOpVertex("v1", 1, 50);
@@ -2738,30 +2498,6 @@ public class AdaptiveSchedulerTest {
                 mainThreadExecutor);
     }
 
-    /**
-     * Creates a testing SlotPool instance that would allow for the scheduler 
to transition to
-     * Executing state.
-     */
-    private static DeclarativeSlotPool getSlotPoolWithFreeSlots(int freeSlots) 
{
-        return new TestingDeclarativeSlotPoolBuilder()
-                .setContainsFreeSlotFunction(allocationID -> true)
-                .setReserveFreeSlotFunction(
-                        (allocationId, resourceProfile) ->
-                                TestingPhysicalSlot.builder()
-                                        .withAllocationID(allocationId)
-                                        .build())
-                .setGetFreeSlotTrackerSupplier(
-                        () ->
-                                TestingFreeSlotTracker.newBuilder()
-                                        .setGetFreeSlotsInformationSupplier(
-                                                () ->
-                                                        IntStream.range(0, 
freeSlots)
-                                                                .mapToObj(v -> 
new TestingSlot())
-                                                                
.collect(Collectors.toSet()))
-                                        .build())
-                .build();
-    }
-
     private static JobGraph createJobGraph() {
         return streamingJobGraph(JOB_VERTEX);
     }
@@ -3066,50 +2802,4 @@ public class AdaptiveSchedulerTest {
                     .containsEntry("canRestart", String.valueOf(canRestart));
         }
     }
-
-    private static JobGraph createJobGraphWithCheckpointing(final JobVertex... 
jobVertex) {
-        final JobGraph jobGraph =
-                JobGraphBuilder.newStreamingJobGraphBuilder()
-                        .addJobVertices(Arrays.asList(jobVertex))
-                        .build();
-        SchedulerTestingUtils.enableCheckpointing(
-                jobGraph, null, null, Duration.ofHours(1).toMillis(), true);
-        return jobGraph;
-    }
-
-    private static AdaptiveScheduler.StateTransitionManagerFactory
-            createAutoAdvanceStateTransitionManagerFactory() {
-        return (context,
-                ignoredClock,
-                ignoredCooldown,
-                ignoredResourceStabilizationTimeout,
-                ignoredMaxTriggerDelay) ->
-                TestingStateTransitionManager.withOnTriggerEventOnly(
-                        () -> {
-                            if (context instanceof WaitingForResources) {
-                                context.transitionToSubsequentState();
-                            }
-                        });
-    }
-
-    private static Map<OperatorID, OperatorSubtaskState>
-            generateFakeKeyedManagedStateForAllOperators(final JobGraph 
jobGraph)
-                    throws IOException {
-        final Map<OperatorID, OperatorSubtaskState> operatorStates = new 
HashMap<>();
-        for (final JobVertex jobVertex : jobGraph.getVertices()) {
-            final KeyedStateHandle keyedStateHandle =
-                    generateKeyGroupState(
-                            jobVertex.getID(),
-                            KeyGroupRange.of(0, 
jobGraph.getMaximumParallelism() - 1),
-                            false);
-            for (OperatorIDPair operatorId : jobVertex.getOperatorIDs()) {
-                operatorStates.put(
-                        operatorId.getGeneratedOperatorID(),
-                        OperatorSubtaskState.builder()
-                                .setManagedKeyedState(keyedStateHandle)
-                                .build());
-            }
-        }
-        return operatorStates;
-    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTestBase.java
new file mode 100644
index 00000000000..9f856d785a3
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTestBase.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
+import 
org.apache.flink.runtime.jobmaster.slotpool.TestingDeclarativeSlotPoolBuilder;
+import org.apache.flink.runtime.jobmaster.slotpool.TestingFreeSlotTracker;
+import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
+import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot;
+import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.executor.TestExecutorExtension;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
+import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class AdaptiveSchedulerTestBase {
+    protected static final Duration DEFAULT_TIMEOUT = Duration.ofHours(1);
+    protected static final int PARALLELISM = 4;
+    protected static final JobVertex JOB_VERTEX = createNoOpVertex("v1", 
PARALLELISM);
+
+    @RegisterExtension
+    protected static final TestExecutorExtension<ScheduledExecutorService> 
EXECUTOR_RESOURCE =
+            TestingUtils.defaultExecutorExtension();
+
+    @RegisterExtension
+    protected static final TestExecutorExtension<ScheduledExecutorService> 
TEST_EXECUTOR_RESOURCE =
+            new 
TestExecutorExtension<>(Executors::newSingleThreadScheduledExecutor);
+
+    protected final ManuallyTriggeredComponentMainThreadExecutor 
mainThreadExecutor =
+            new 
ManuallyTriggeredComponentMainThreadExecutor(Thread.currentThread());
+
+    protected final ComponentMainThreadExecutor singleThreadMainThreadExecutor 
=
+            ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
+                    TEST_EXECUTOR_RESOURCE.getExecutor());
+
+    protected final ClassLoader classLoader = 
ClassLoader.getSystemClassLoader();
+
+    protected AdaptiveScheduler scheduler;
+
+    @BeforeEach
+    void before() {
+        scheduler = null;
+    }
+
+    @AfterEach
+    void after() {
+        closeInExecutorService(scheduler, singleThreadMainThreadExecutor);
+    }
+
+    protected static JobGraph createJobGraphWithCheckpointing(final 
JobVertex... jobVertex) {
+        final JobGraph jobGraph =
+                JobGraphBuilder.newStreamingJobGraphBuilder()
+                        .addJobVertices(Arrays.asList(jobVertex))
+                        .build();
+        SchedulerTestingUtils.enableCheckpointing(
+                jobGraph, null, null, Duration.ofHours(1).toMillis(), true);
+        return jobGraph;
+    }
+
+    protected static void closeInExecutorService(
+            @Nullable AdaptiveScheduler scheduler, Executor executor) {
+        if (scheduler != null) {
+            final CompletableFuture<Void> closeFuture = new 
CompletableFuture<>();
+            executor.execute(
+                    () -> {
+                        try {
+                            scheduler.cancel();
+
+                            FutureUtils.forward(scheduler.closeAsync(), 
closeFuture);
+                        } catch (Throwable t) {
+                            closeFuture.completeExceptionally(t);
+                        }
+                    });
+
+            // we have to wait for the job termination outside the main thread 
because the
+            // cancellation tasks are scheduled on the main thread as well.
+            scheduler
+                    .getJobTerminationFuture()
+                    .whenCompleteAsync(
+                            (jobStatus, error) -> {
+                                assertThat(scheduler.getState().getClass())
+                                        .isEqualTo(Finished.class);
+
+                                if (error != null) {
+                                    closeFuture.completeExceptionally(error);
+                                } else {
+                                    try {
+                                        
FutureUtils.forward(scheduler.closeAsync(), closeFuture);
+                                    } catch (Throwable t) {
+                                        closeFuture.completeExceptionally(t);
+                                    }
+                                }
+                            },
+                            executor);
+            assertThatFuture(closeFuture).eventuallySucceeds();
+        }
+    }
+
+    /**
+     * Creates a testing SlotPool instance that would allow for the scheduler 
to transition to
+     * Executing state.
+     */
+    protected static DeclarativeSlotPool getSlotPoolWithFreeSlots(int 
freeSlots) {
+        return new TestingDeclarativeSlotPoolBuilder()
+                .setContainsFreeSlotFunction(allocationID -> true)
+                .setReserveFreeSlotFunction(
+                        (allocationId, resourceProfile) ->
+                                TestingPhysicalSlot.builder()
+                                        .withAllocationID(allocationId)
+                                        .build())
+                .setGetFreeSlotTrackerSupplier(
+                        () ->
+                                TestingFreeSlotTracker.newBuilder()
+                                        .setGetFreeSlotsInformationSupplier(
+                                                () ->
+                                                        IntStream.range(0, 
freeSlots)
+                                                                .mapToObj(v -> 
new TestingSlot())
+                                                                
.collect(Collectors.toSet()))
+                                        .build())
+                .build();
+    }
+
+    protected void startTestInstanceInMainThread() {
+        runInMainThread(() -> scheduler.startScheduling());
+    }
+
+    protected void runInMainThread(final Runnable callback) {
+        CompletableFuture.runAsync(callback, 
singleThreadMainThreadExecutor).join();
+    }
+
+    protected <T> T supplyInMainThread(final Supplier<T> supplier) throws 
Exception {
+        return CompletableFuture.supplyAsync(supplier, 
singleThreadMainThreadExecutor).get();
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
new file mode 100644
index 00000000000..35e2bb54db4
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/LocalRecoveryTest.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive;
+
+import org.apache.flink.core.execution.CheckpointType;
+import org.apache.flink.runtime.OperatorIDPair;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import 
org.apache.flink.runtime.executiongraph.failover.TestRestartBackoffTimeStrategy;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.generateKeyGroupState;
+import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.acknowledgePendingCheckpoint;
+import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.setAllExecutionsToRunning;
+import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForCheckpointInProgress;
+import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForCompletedCheckpoint;
+import static 
org.apache.flink.runtime.scheduler.SchedulerTestingUtils.waitForJobStatusRunning;
+import static 
org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator.getArgumentCapturingDelegatingSlotAllocator;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class LocalRecoveryTest extends AdaptiveSchedulerTestBase {
+    @Test
+    void testStateSizeIsConsideredForLocalRecoveryOnRestart() throws Exception 
{
+        final JobGraph jobGraph = createJobGraphWithCheckpointing(JOB_VERTEX);
+        final DeclarativeSlotPool slotPool = 
getSlotPoolWithFreeSlots(PARALLELISM);
+        final List<JobAllocationsInformation> capturedAllocations = new 
ArrayList<>();
+        final boolean localRecoveryEnabled = true;
+        final String executionTarget = "local";
+        final boolean minimalTaskManagerPreferred = false;
+        final SlotAllocator slotAllocator =
+                getArgumentCapturingDelegatingSlotAllocator(
+                        
AdaptiveSchedulerFactory.createSlotSharingSlotAllocator(
+                                slotPool,
+                                localRecoveryEnabled,
+                                executionTarget,
+                                minimalTaskManagerPreferred),
+                        capturedAllocations);
+
+        scheduler =
+                new AdaptiveSchedulerBuilder(
+                                jobGraph,
+                                singleThreadMainThreadExecutor,
+                                EXECUTOR_RESOURCE.getExecutor())
+                        .setDeclarativeSlotPool(slotPool)
+                        .setSlotAllocator(slotAllocator)
+                        .setStateTransitionManagerFactory(
+                                
createAutoAdvanceStateTransitionManagerFactory())
+                        .setRestartBackoffTimeStrategy(new 
TestRestartBackoffTimeStrategy(true, 0L))
+                        .build();
+
+        // Start scheduler
+        startTestInstanceInMainThread();
+
+        // Transition job and all subtasks to RUNNING state.
+        waitForJobStatusRunning(scheduler);
+        runInMainThread(() -> setAllExecutionsToRunning(scheduler));
+
+        // Trigger a checkpoint
+        CompletableFuture<CompletedCheckpoint> completedCheckpointFuture =
+                supplyInMainThread(() -> 
scheduler.triggerCheckpoint(CheckpointType.FULL));
+
+        // Verify that checkpoint was registered by scheduler. Required to 
prevent race condition
+        // when checkpoint is acknowledged before start.
+        waitForCheckpointInProgress(scheduler);
+
+        // Acknowledge the checkpoint for all tasks with the fake state.
+        final Map<OperatorID, OperatorSubtaskState> operatorStates =
+                generateFakeKeyedManagedStateForAllOperators(jobGraph);
+        runInMainThread(() -> acknowledgePendingCheckpoint(scheduler, 1, 
operatorStates));
+
+        // Wait for the checkpoint to complete.
+        final CompletedCheckpoint completedCheckpoint = 
completedCheckpointFuture.join();
+
+        // completedCheckpointStore.getLatestCheckpoint() can return null if 
called immediately
+        // after the checkpoint is completed.
+        waitForCompletedCheckpoint(scheduler);
+
+        // Fail early if the checkpoint is null.
+        assertThat(completedCheckpoint).withFailMessage("Checkpoint shouldn't 
be null").isNotNull();
+
+        // Emulating new graph creation call on job recovery to ensure that 
the state is considered
+        // for new allocations.
+        final List<ExecutionAttemptID> executionAttemptIds =
+                supplyInMainThread(this::getExecutionAttemptIDS);
+
+        assertThat(executionAttemptIds).hasSize(PARALLELISM);
+
+        runInMainThread(
+                () -> {
+                    // fail one of the vertices
+                    scheduler.updateTaskExecutionState(
+                            new TaskExecutionState(
+                                    executionAttemptIds.get(0),
+                                    ExecutionState.FAILED,
+                                    new Exception("Test exception for local 
recovery")));
+                });
+
+        runInMainThread(
+                () -> {
+                    // cancel remaining vertices
+                    for (int idx = 1; idx < executionAttemptIds.size(); idx++) 
{
+                        scheduler.updateTaskExecutionState(
+                                new TaskExecutionState(
+                                        executionAttemptIds.get(idx), 
ExecutionState.CANCELED));
+                    }
+                });
+
+        waitForJobStatusRunning(scheduler);
+
+        // First allocation during the job start + second allocation after job 
restart.
+        assertThat(capturedAllocations).hasSize(2);
+        // Fist allocation won't use state data.
+        assertTrue(capturedAllocations.get(0).isEmpty());
+        // Second allocation should use data from latest checkpoint.
+        assertThat(
+                        capturedAllocations
+                                .get(1)
+                                .getAllocations(JOB_VERTEX.getID())
+                                .get(0)
+                                .stateSizeInBytes)
+                .isGreaterThan(0);
+    }
+
+    private List<ExecutionAttemptID> getExecutionAttemptIDS() {
+        final Optional<ExecutionGraph> maybeExecutionGraph =
+                scheduler
+                        .getState()
+                        .as(StateWithExecutionGraph.class)
+                        .map(StateWithExecutionGraph::getExecutionGraph);
+        assertThat(maybeExecutionGraph).isNotEmpty();
+        final ExecutionVertex[] taskVertices =
+                
Objects.requireNonNull(maybeExecutionGraph.get().getJobVertex(JOB_VERTEX.getID()))
+                        .getTaskVertices();
+        return Arrays.stream(taskVertices)
+                .map(ExecutionVertex::getCurrentExecutionAttempt)
+                .map(Execution::getAttemptId)
+                .collect(Collectors.toList());
+    }
+
+    private static AdaptiveScheduler.StateTransitionManagerFactory
+            createAutoAdvanceStateTransitionManagerFactory() {
+        return (context,
+                ignoredClock,
+                ignoredCooldown,
+                ignoredResourceStabilizationTimeout,
+                ignoredMaxTriggerDelay) ->
+                TestingStateTransitionManager.withOnTriggerEventOnly(
+                        () -> {
+                            if (context instanceof WaitingForResources) {
+                                context.transitionToSubsequentState();
+                            }
+                        });
+    }
+
+    private static Map<OperatorID, OperatorSubtaskState>
+            generateFakeKeyedManagedStateForAllOperators(final JobGraph 
jobGraph)
+                    throws IOException {
+        final Map<OperatorID, OperatorSubtaskState> operatorStates = new 
HashMap<>();
+        for (final JobVertex jobVertex : jobGraph.getVertices()) {
+            final KeyedStateHandle keyedStateHandle =
+                    generateKeyGroupState(
+                            jobVertex.getID(),
+                            KeyGroupRange.of(0, 
jobGraph.getMaximumParallelism() - 1),
+                            false);
+            for (OperatorIDPair operatorId : jobVertex.getOperatorIDs()) {
+                operatorStates.put(
+                        operatorId.getGeneratedOperatorID(),
+                        OperatorSubtaskState.builder()
+                                .setManagedKeyedState(keyedStateHandle)
+                                .build());
+            }
+        }
+        return operatorStates;
+    }
+}
diff --git a/tools/maven/checkstyle.xml b/tools/maven/checkstyle.xml
index ebc80a25fe5..ff6c13932b9 100644
--- a/tools/maven/checkstyle.xml
+++ b/tools/maven/checkstyle.xml
@@ -61,7 +61,7 @@ This file is based on the checkstyle file of Apache Beam.
        -->
 
        <module name="FileLength">
-               <property name="max" value="3150"/>
+               <property name="max" value="3100"/>
        </module>
 
        <!-- All Java AST specific tests live under TreeWalker module. -->


Reply via email to