This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ab3bcce0cfb44ccc402a6f694a760440ac30940b
Author: Yuepeng Pan <[email protected]>
AuthorDate: Thu Feb 26 09:48:12 2026 +0800

    [FLINK-38343][runtime] Support store rescale history with statistics by 
ExecutionGraphInfo.
    
    Co-authored-by: XComp <[email protected]>
    Co-authored-by: WeiZhong94 <[email protected]>
    Co-authored-by: ferenc-csaky <[email protected]>
    
    Signed-off-by: Yuepeng Pan <[email protected]>
---
 .../flink/runtime/minicluster/MiniCluster.java     |  21 +
 .../runtime/scheduler/ExecutionGraphInfo.java      |  33 +-
 .../flink/runtime/scheduler/SchedulerBase.java     |   1 +
 .../scheduler/adaptive/AdaptiveScheduler.java      |   3 +-
 .../adaptive/timeline/DefaultRescaleTimeline.java  |   9 +
 .../adaptive/timeline/RescaleTimeline.java         |   8 +
 .../adaptive/timeline/RescalesStatsSnapshot.java   |  58 ++
 .../adaptive/timeline/RescalesSummary.java         |  10 +
 .../adaptive/timeline/RescalesSummarySnapshot.java |  88 +++
 .../runtime/jobmaster/TestingJobManagerRunner.java |   1 +
 .../rest/handler/job/JobExceptionsHandlerTest.java |   5 +-
 .../job/rescales/JobRescaleConfigHandlerTest.java  |   4 +-
 .../scheduler/adaptive/AdaptiveSchedulerTest.java  |   6 +
 .../adaptive/timeline/RescaleTimelineITCase.java   | 711 +++++++++++++++++++--
 .../adaptive/timeline/RescalesSummaryTest.java     |   6 +
 15 files changed, 902 insertions(+), 62 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 0b834ca5032..96cd052698a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -32,6 +32,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ConfigurationUtils;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.StateRecoveryOptions;
 import org.apache.flink.core.execution.CheckpointType;
 import org.apache.flink.core.execution.RecoveryClaimMode;
@@ -70,6 +71,7 @@ import 
org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipCont
 import org.apache.flink.runtime.io.network.partition.ClusterPartitionManager;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmaster.JobResult;
@@ -850,6 +852,25 @@ public class MiniCluster implements AutoCloseableAsync {
                                 
.thenApply(ExecutionGraphInfo::getArchivedExecutionGraph));
     }
 
+    public CompletableFuture<ExecutionGraphInfo> getExecutionGraphInfo(JobID 
jobId) {
+        return runDispatcherCommand(
+                dispatcherGateway ->
+                        dispatcherGateway.requestExecutionGraphInfo(jobId, 
rpcTimeout));
+    }
+
+    public CompletableFuture<Acknowledge> updateJobResourceRequirements(
+            JobID jobId, JobResourceRequirements resourceRequirements) {
+        if 
(miniClusterConfiguration.getConfiguration().get(JobManagerOptions.SCHEDULER)
+                != JobManagerOptions.SchedulerType.Adaptive) {
+            throw new UnsupportedOperationException(
+                    "updateJobResourceRequirements is only supported for 
adaptive scheduler");
+        }
+        return runDispatcherCommand(
+                dispatcherGateway ->
+                        dispatcherGateway.updateJobResourceRequirements(
+                                jobId, resourceRequirements));
+    }
+
     public CompletableFuture<Collection<JobStatusMessage>> listJobs() {
         return runDispatcherCommand(
                 dispatcherGateway ->
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
index 7e6680f498e..b705a598d77 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java
@@ -24,12 +24,13 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import 
org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigInfo;
+import 
org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesStatsSnapshot;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
 
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
-import java.util.Collections;
+import java.util.List;
 import java.util.Optional;
 
 /**
@@ -46,35 +47,48 @@ public class ExecutionGraphInfo implements Serializable {
 
     /**
      * The value is null when the job is not enabled {@link
-     * org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler}.
+     * JobManagerOptions.SchedulerType#Adaptive} scheduler.
      */
     @Nullable private final JobRescaleConfigInfo jobRescaleConfigInfo;
 
+    /**
+     * This field is null when the value of {@link
+     * 
org.apache.flink.configuration.WebOptions#MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE}
 is not
+     * a positive number.
+     */
+    @Nullable private final RescalesStatsSnapshot rescalesStatsSnapshot;
+
     public ExecutionGraphInfo(ArchivedExecutionGraph executionGraph) {
         this(
                 executionGraph,
                 executionGraph.getFailureInfo() != null
-                        ? Collections.singleton(
+                        ? List.of(
                                 RootExceptionHistoryEntry.fromGlobalFailure(
                                         executionGraph.getFailureInfo()))
-                        : Collections.emptyList());
+                        : List.of(),
+                null,
+                null,
+                null);
     }
 
     public ExecutionGraphInfo(
             ArchivedExecutionGraph executionGraph,
-            Iterable<RootExceptionHistoryEntry> exceptionHistory) {
-        this(executionGraph, exceptionHistory, null, null);
+            Iterable<RootExceptionHistoryEntry> exceptionHistory,
+            @Nullable JobManagerOptions.SchedulerType schedulerType) {
+        this(executionGraph, exceptionHistory, schedulerType, null, null);
     }
 
     public ExecutionGraphInfo(
             ArchivedExecutionGraph executionGraph,
             Iterable<RootExceptionHistoryEntry> exceptionHistory,
             @Nullable JobManagerOptions.SchedulerType schedulerType,
-            @Nullable JobRescaleConfigInfo jobRescaleConfigInfo) {
+            @Nullable JobRescaleConfigInfo jobRescaleConfigInfo,
+            @Nullable RescalesStatsSnapshot rescalesStatsSnapshot) {
         this.executionGraph = executionGraph;
         this.exceptionHistory = exceptionHistory;
         this.schedulerType = schedulerType;
         this.jobRescaleConfigInfo = jobRescaleConfigInfo;
+        this.rescalesStatsSnapshot = rescalesStatsSnapshot;
     }
 
     public JobID getJobId() {
@@ -108,4 +122,9 @@ public class ExecutionGraphInfo implements Serializable {
     public JobManagerOptions.SchedulerType getSchedulerType() {
         return schedulerType;
     }
+
+    @Nullable
+    public RescalesStatsSnapshot getRescalesStatsSnapshot() {
+        return rescalesStatsSnapshot;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
index 50778dc9478..7d0c21c0470 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
@@ -873,6 +873,7 @@ public abstract class SchedulerBase implements SchedulerNG, 
CheckpointScheduling
                 ArchivedExecutionGraph.createFrom(executionGraph),
                 getExceptionHistory(),
                 getSchedulerType(),
+                null,
                 null);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
index 153d99c8f85..70c5b62ce90 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
@@ -896,7 +896,8 @@ public class AdaptiveScheduler
                 state.getJob(),
                 exceptionHistory.toArrayList(),
                 getSchedulerType(),
-                settings.toJobRescaleConfigInfo());
+                settings.toJobRescaleConfigInfo(),
+                rescaleTimeline.createSnapshot());
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
index 1a23e229ea1..e37852db6d0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java
@@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
@@ -122,6 +124,13 @@ public class DefaultRescaleTimeline implements 
RescaleTimeline {
         return currentRescale;
     }
 
+    @Override
+    public RescalesStatsSnapshot createSnapshot() {
+        List<Rescale> rescales = rescaleHistory.toArrayList();
+        Collections.reverse(rescales);
+        return new RescalesStatsSnapshot(List.copyOf(rescales), 
rescalesSummary.createSnapshot());
+    }
+
     private RescaleIdInfo nextRescaleId(boolean newRescaleEpoch) {
         if (newRescaleEpoch) {
             rescaleIdInfo = new RescaleIdInfo(new AbstractID(), 1L);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimeline.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimeline.java
index 7c9f1411939..7842ce60509 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimeline.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimeline.java
@@ -67,6 +67,9 @@ public interface RescaleTimeline {
      */
     boolean updateRescale(RescaleUpdater rescaleUpdater);
 
+    /** Create the snapshot of max history of rescales. */
+    RescalesStatsSnapshot createSnapshot();
+
     /** Rescale operation interface. */
     interface RescaleUpdater {
         void update(Rescale rescaleToUpdate);
@@ -98,6 +101,11 @@ public interface RescaleTimeline {
             return null;
         }
 
+        @Override
+        public RescalesStatsSnapshot createSnapshot() {
+            return RescalesStatsSnapshot.emptySnapshot();
+        }
+
         @Override
         public boolean isIdling() {
             return false;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java
new file mode 100644
index 00000000000..316d34ef634
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.util.stats.StatsSummarySnapshot;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RescalesStatsSnapshot implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final List<Rescale> rescaleHistory;
+    private final RescalesSummarySnapshot rescalesSummarySnapshot;
+
+    public RescalesStatsSnapshot(
+            List<Rescale> rescaleHistory, RescalesSummarySnapshot 
rescalesSummarySnapshot) {
+        this.rescaleHistory = rescaleHistory;
+        this.rescalesSummarySnapshot = rescalesSummarySnapshot;
+    }
+
+    public List<Rescale> getRescaleHistory() {
+        return rescaleHistory;
+    }
+
+    public RescalesSummarySnapshot getRescalesSummarySnapshot() {
+        return rescalesSummarySnapshot;
+    }
+
+    public static RescalesStatsSnapshot emptySnapshot() {
+        return new RescalesStatsSnapshot(
+                new ArrayList<>(),
+                new RescalesSummarySnapshot(
+                        StatsSummarySnapshot.empty(),
+                        StatsSummarySnapshot.empty(),
+                        StatsSummarySnapshot.empty(),
+                        StatsSummarySnapshot.empty(),
+                        0,
+                        0));
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummary.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummary.java
index 6d1e2a567cb..93474bf563d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummary.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummary.java
@@ -133,4 +133,14 @@ public class RescalesSummary implements Serializable {
     public StatsSummary getFailedRescalesSummary() {
         return failedRescalesSummary;
     }
+
+    public RescalesSummarySnapshot createSnapshot() {
+        return new RescalesSummarySnapshot(
+                allTerminatedSummary.createSnapshot(),
+                completedRescalesSummary.createSnapshot(),
+                ignoredRescalesSummary.createSnapshot(),
+                failedRescalesSummary.createSnapshot(),
+                totalRescalesCount,
+                inProgressRescalesCount);
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummarySnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummarySnapshot.java
new file mode 100644
index 00000000000..e7f14896e5f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummarySnapshot.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.apache.flink.runtime.util.stats.StatsSummarySnapshot;
+
+import java.io.Serializable;
+
+public class RescalesSummarySnapshot implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    private final StatsSummarySnapshot allTerminatedSummarySnapshot;
+
+    private final StatsSummarySnapshot completedRescalesSummarySnapshot;
+    private final StatsSummarySnapshot ignoredRescalesSummarySnapshot;
+    private final StatsSummarySnapshot failedRescalesSummarySnapshot;
+
+    private long totalRescalesCount = 0L;
+    private long inProgressRescaleCount = 0L;
+
+    public RescalesSummarySnapshot(
+            StatsSummarySnapshot allTerminatedSummarySnapshot,
+            StatsSummarySnapshot completedRescalesSummarySnapshot,
+            StatsSummarySnapshot ignoredRescalesSummarySnapshot,
+            StatsSummarySnapshot failedRescalesSummarySnapshot,
+            long totalRescalesCount,
+            long inProgressRescaleCount) {
+        this.allTerminatedSummarySnapshot = allTerminatedSummarySnapshot;
+        this.completedRescalesSummarySnapshot = 
completedRescalesSummarySnapshot;
+        this.ignoredRescalesSummarySnapshot = ignoredRescalesSummarySnapshot;
+        this.failedRescalesSummarySnapshot = failedRescalesSummarySnapshot;
+        this.totalRescalesCount = totalRescalesCount;
+        this.inProgressRescaleCount = inProgressRescaleCount;
+    }
+
+    public StatsSummarySnapshot getAllTerminatedSummarySnapshot() {
+        return allTerminatedSummarySnapshot;
+    }
+
+    public StatsSummarySnapshot getCompletedRescalesSummarySnapshot() {
+        return completedRescalesSummarySnapshot;
+    }
+
+    public StatsSummarySnapshot getIgnoredRescalesSummarySnapshot() {
+        return ignoredRescalesSummarySnapshot;
+    }
+
+    public StatsSummarySnapshot getFailedRescalesSummarySnapshot() {
+        return failedRescalesSummarySnapshot;
+    }
+
+    public long getTotalRescalesCount() {
+        return totalRescalesCount;
+    }
+
+    public long getCompletedRescalesCount() {
+        return completedRescalesSummarySnapshot.getCount();
+    }
+
+    public long getIgnoredRescalesCount() {
+        return ignoredRescalesSummarySnapshot.getCount();
+    }
+
+    public long getFailedRescalesCount() {
+        return failedRescalesSummarySnapshot.getCount();
+    }
+
+    public long getInProgressRescaleCount() {
+        return inProgressRescaleCount;
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
index 93091d138b7..abb2c1b5584 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java
@@ -78,6 +78,7 @@ public class TestingJobManagerRunner implements 
JobManagerRunner {
                                 null,
                                 null,
                                 0L),
+                        null,
                         null);
         terminationFuture.whenComplete(
                 (ignored, ignoredThrowable) ->
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
index cd2f47ade1a..73d7c7e4baa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java
@@ -480,7 +480,8 @@ class JobExceptionsHandlerTest {
                         .setFailureCause(new ErrorInfo(failureCause, 
failureTimestamp))
                         .setTasks(tasks)
                         .build(),
-                exceptionHistory);
+                exceptionHistory,
+                null);
     }
 
     private static ArchivedExecutionJobVertex createArchivedExecutionJobVertex(
@@ -555,7 +556,7 @@ class JobExceptionsHandlerTest {
         // we have to reverse it to simulate how the Scheduler collects it
         Collections.reverse(historyEntryCollection);
 
-        return new ExecutionGraphInfo(executionGraphBuilder.build(), 
historyEntryCollection);
+        return new ExecutionGraphInfo(executionGraphBuilder.build(), 
historyEntryCollection, null);
     }
 
     private static HandlerRequest<EmptyRequestBody> createRequest(JobID jobId, 
int size)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleConfigHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleConfigHandlerTest.java
index 3fb7c10f170..8b1edeb9101 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleConfigHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleConfigHandlerTest.java
@@ -74,6 +74,7 @@ class JobRescaleConfigHandlerTest {
                         new ArchivedExecutionGraphBuilder().build(),
                         Collections.emptyList(),
                         null,
+                        null,
                         null);
         final ExecutionGraphInfo finalExecutionGraphInfo = executionGraphInfo;
         assertThatThrownBy(() -> testInstance.handleRequest(request, 
finalExecutionGraphInfo))
@@ -88,7 +89,8 @@ class JobRescaleConfigHandlerTest {
                         new ArchivedExecutionGraphBuilder().build(),
                         Collections.emptyList(),
                         null,
-                        jobRescaleConfigInfo);
+                        jobRescaleConfigInfo,
+                        null);
         assertThat(testInstance.handleRequest(request, executionGraphInfo))
                 .isEqualTo(jobRescaleConfigInfo);
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index d5c312c4dfe..4feef4ca0f5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -100,6 +100,7 @@ import 
org.apache.flink.runtime.scheduler.VertexParallelismStore;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot;
 import 
org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
+import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
 import 
org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
 import org.apache.flink.runtime.slots.ResourceRequirement;
@@ -2699,6 +2700,11 @@ public class AdaptiveSchedulerTest extends 
AdaptiveSchedulerTestBase {
             this.jobStatus = jobStatus;
         }
 
+        @Override
+        public Durable getDurable() {
+            return super.getDurable();
+        }
+
         @Override
         public JobStatus getJobStatus() {
             return jobStatus;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java
index 96e61c7e0c9..09d31c00d76 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java
@@ -18,85 +18,694 @@
 
 package org.apache.flink.runtime.scheduler.adaptive.timeline;
 
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
+import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
+import org.apache.flink.runtime.testtasks.OnceBlockingNoOpInvokable;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.runtime.testutils.MiniClusterResource;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.util.RestartStrategyUtils;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+import org.apache.flink.util.function.RunnableWithException;
+import org.apache.flink.util.function.SupplierWithException;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 /**
  * Test for recording rescale history by {@link DefaultRescaleTimeline} or 
{@link
  * RescaleTimeline.NoOpRescaleTimeline}.
  */
+@ExtendWith(ParameterizedTestExtension.class)
 class RescaleTimelineITCase {
-    static final String DISABLED_DESCRIPTION =
-            "TODO: Blocked by FLINK-38343, the ITCases need the 
SchedulerNG#requstJob() to get the rescale history.";
+
+    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
+    private static final int NUMBER_TASK_MANAGERS = 2;
+    private static final int PARALLELISM = NUMBER_SLOTS_PER_TASK_MANAGER * 
NUMBER_TASK_MANAGERS;
+    private static final JobVertexID JOB_VERTEX_ID = new JobVertexID();
+
+    @Parameter private Configuration configuration;
+    private MiniClusterResource miniClusterResource;
+
+    @Parameters
+    static Collection<Object[]> getClusterConfigs() {
+        Configuration confNonEnabledRescaleHistory = createConfiguration();
+        Configuration confEnabledRescaleHistory = createConfiguration();
+        
confEnabledRescaleHistory.set(WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE,
 3);
+        return Arrays.asList(
+                new Object[] {confNonEnabledRescaleHistory},
+                new Object[] {confEnabledRescaleHistory});
+    }
+
+    private static Configuration createConfiguration() {
+        final Configuration configuration = new Configuration();
+        configuration.set(JobManagerOptions.SCHEDULER, 
JobManagerOptions.SchedulerType.Adaptive);
+        configuration.set(
+                
JobManagerOptions.SCHEDULER_SUBMISSION_RESOURCE_STABILIZATION_TIMEOUT,
+                Duration.ofMillis(50));
+        configuration.set(
+                JobManagerOptions.SCHEDULER_EXECUTING_COOLDOWN_AFTER_RESCALING,
+                // Use the 0.1 seconds to trigger the long-time non-terminal 
rescale event after a
+                // rescaling.
+                Duration.ofMillis(100));
+        configuration.set(
+                
JobManagerOptions.SCHEDULER_EXECUTING_RESOURCE_STABILIZATION_TIMEOUT,
+                Duration.ofMillis(50));
+        configuration.set(
+                JobManagerOptions.SCHEDULER_SUBMISSION_RESOURCE_WAIT_TIMEOUT,
+                Duration.ofSeconds(2));
+        return configuration;
+    }
+
+    @BeforeEach
+    void setUp() throws Exception {
+        OnceBlockingNoOpInvokable.reset();
+        this.miniClusterResource =
+                new MiniClusterResource(
+                        new MiniClusterResourceConfiguration.Builder()
+                                .setConfiguration(configuration)
+                                
.setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER)
+                                .setNumberTaskManagers(NUMBER_TASK_MANAGERS)
+                                .build());
+        miniClusterResource.before();
+    }
+
+    @AfterEach
+    void tearDown() {
+        miniClusterResource.after();
+    }
 
     // Tests for rescale trigger causes.
-    @Disabled(DISABLED_DESCRIPTION)
-    @Test
-    void testRecordRescaleForInitialScheduling() {}
+    @TestTemplate
+    void testRecordRescaleForInitialScheduling() throws Exception {
+        final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+        final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM);
 
-    @Disabled(DISABLED_DESCRIPTION)
-    @Test
-    void testRecordRescaleForNewResourcesRequirements() {}
+        miniCluster.submitJob(jobGraph).join();
 
-    @Disabled(DISABLED_DESCRIPTION)
-    @Test
-    void testRecordRescaleForNewAvailableResource() {}
+        waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, 
PARALLELISM);
+        OnceBlockingNoOpInvokable.unblock();
 
-    @Disabled(DISABLED_DESCRIPTION)
-    @Test
-    void testRecordRescaleForRecoverableFailover() {}
+        final ExecutionGraphInfo executionGraphInfo =
+                miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).join();
+        runAdaptedParameterizedAssertion(
+                executionGraphInfo,
+                () -> {
+                    
assertThat(executionGraphInfo.getRescalesStatsSnapshot()).isNotNull();
+                    List<Rescale> rescaleHistory =
+                            
executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory();
+                    assertThat(rescaleHistory).hasSize(1);
+                    Rescale rescale = rescaleHistory.get(0);
+                    assertTriggerCause(rescale, TriggerCause.INITIAL_SCHEDULE);
+                    assertTerminalRelatedFields(rescale, 
TerminatedReason.SUCCEEDED);
+                    assertThat(
+                                    rescale.getSchedulerStates().stream()
+                                            .map(SchedulerStateSpan::getState)
+                                            .collect(Collectors.toList()))
+                            .containsExactly(
+                                    "Created",
+                                    "WaitingForResources",
+                                    "CreatingExecutionGraph",
+                                    "Executing");
+                });
+    }
 
-    // End of tests for rescale trigger causes.
+    @TestTemplate
+    void testRecordRescaleForNewResourcesRequirements() throws Exception {
+        final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+        final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM);
+
+        miniCluster.submitJob(jobGraph).join();
+
+        waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, 
PARALLELISM);
+
+        updateJobResourceRequirements(miniCluster, jobGraph, 1, 
NUMBER_SLOTS_PER_TASK_MANAGER);
+
+        waitForVertexParallelismReachedAndJobRunning(
+                jobGraph, JOB_VERTEX_ID, NUMBER_SLOTS_PER_TASK_MANAGER);
+
+        OnceBlockingNoOpInvokable.unblock();
+
+        final ExecutionGraphInfo executionGraphInfo =
+                miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).join();
+        runAdaptedParameterizedAssertion(
+                executionGraphInfo,
+                () -> {
+                    
assertThat(executionGraphInfo.getRescalesStatsSnapshot()).isNotNull();
+                    List<Rescale> rescaleHistory =
+                            
executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory();
+                    assertThat(rescaleHistory).hasSize(2);
+                    Rescale rescale = rescaleHistory.get(0);
+                    assertTriggerCause(rescale, 
TriggerCause.UPDATE_REQUIREMENT);
+                    assertTerminalRelatedFields(rescale, 
TerminatedReason.SUCCEEDED);
+                });
+    }
+
+    @TestTemplate
+    void testRecordRescaleForNewAvailableResource() throws Exception {
+        final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+        final JobGraph jobGraph =
+                createBlockingJobGraph(PARALLELISM + 
NUMBER_SLOTS_PER_TASK_MANAGER);
+
+        miniCluster.submitJob(jobGraph).join();
+
+        waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, 
PARALLELISM);
+
+        miniCluster.startTaskManager();
+
+        waitForVertexParallelismReachedAndJobRunning(
+                jobGraph,
+                JOB_VERTEX_ID,
+                NUMBER_SLOTS_PER_TASK_MANAGER * (NUMBER_TASK_MANAGERS + 1));
+
+        OnceBlockingNoOpInvokable.unblock();
 
-    // Start of tests for rescale terminated reasons and terminal state.
+        final ExecutionGraphInfo executionGraphInfo =
+                miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).join();
+        runAdaptedParameterizedAssertion(
+                executionGraphInfo,
+                () -> {
+                    
assertThat(executionGraphInfo.getRescalesStatsSnapshot()).isNotNull();
+                    List<Rescale> rescaleHistory =
+                            
executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory();
+                    assertThat(rescaleHistory).hasSize(2);
+                    Rescale rescale = rescaleHistory.get(0);
+                    assertTriggerCause(rescale, 
TriggerCause.NEW_RESOURCE_AVAILABLE);
+                    assertTerminalRelatedFields(rescale, 
TerminatedReason.SUCCEEDED);
+                });
+    }
 
-    @Disabled(DISABLED_DESCRIPTION)
-    @Test
+    @TestTemplate
+    void testRecordRescaleForRecoverableFailover() throws Exception {
+        final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+        final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM);
+
+        miniCluster.submitJob(jobGraph).join();
+
+        waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, 
PARALLELISM);
+
+        miniCluster.terminateTaskManager(0);
+        waitForVertexParallelismReachedAndJobRunning(
+                jobGraph, JOB_VERTEX_ID, NUMBER_SLOTS_PER_TASK_MANAGER);
+
+        OnceBlockingNoOpInvokable.unblock();
+
+        final ExecutionGraphInfo executionGraphInfo =
+                miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).join();
+        runAdaptedParameterizedAssertion(
+                executionGraphInfo,
+                () -> {
+                    
assertThat(executionGraphInfo.getRescalesStatsSnapshot()).isNotNull();
+                    List<Rescale> rescaleHistory =
+                            
executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory();
+                    assertThat(rescaleHistory).hasSize(2);
+                    Rescale rescale = rescaleHistory.get(0);
+                    assertTriggerCause(rescale, 
TriggerCause.RECOVERABLE_FAILOVER);
+                    assertTerminalRelatedFields(rescale, 
TerminatedReason.SUCCEEDED);
+                });
+    }
+
+    // End of tests for rescale trigger causes.
+
+    // Tests for rescale terminated reasons and terminal state.
+    /** Already tested in {@link #testRecordRescaleForInitialScheduling()}, 
etc. */
+    @Disabled
+    @TestTemplate
     void testRescaleTerminatedBySucceeded() {}
 
-    @Disabled(DISABLED_DESCRIPTION)
-    @Test
-    void testRescaleTerminatedByJobFinished() {}
+    @TestTemplate
+    void testRescaleTerminatedByJobFinished() throws Exception {
+        final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+        final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM);
+
+        miniCluster.submitJob(jobGraph).join();
+
+        waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, 
PARALLELISM);
+
+        updateJobResourceRequirements(miniCluster, jobGraph, 1, PARALLELISM * 
2);
+
+        OnceBlockingNoOpInvokable.unblock();
+
+        assumeThat(enabledRescaleHistory(configuration)).isTrue();
+        waitUntilConditionWithTimeout(
+                () -> {
+                    List<Rescale> rescaleHistory = 
getRescaleHistory(miniCluster, jobGraph);
+                    return hasRescaleHistoryMetCondition(
+                            rescaleHistory, 2, TerminatedReason.JOB_FINISHED);
+                },
+                10000);
+    }
+
+    @TestTemplate
+    void testRescaleTerminatedByJobCancelled() throws Exception {
+        final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+        final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM);
+
+        miniCluster.submitJob(jobGraph).join();
+
+        waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, 
PARALLELISM);
+
+        updateJobResourceRequirements(miniCluster, jobGraph, 1, PARALLELISM * 
2);
+
+        miniCluster.cancelJob(jobGraph.getJobID());
+
+        assumeThat(enabledRescaleHistory(configuration)).isTrue();
+        waitUntilConditionWithTimeout(
+                () -> {
+                    List<Rescale> rescaleHistory = 
getRescaleHistory(miniCluster, jobGraph);
+                    return hasRescaleHistoryMetCondition(
+                            rescaleHistory, 2, TerminatedReason.JOB_CANCELED);
+                },
+                10000);
+    }
+
+    @TestTemplate
+    void testRescaleTerminatedByJobFailed() throws Exception {
+        final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+        final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM);
+
+        miniCluster.submitJob(jobGraph).join();
 
-    @Disabled(DISABLED_DESCRIPTION)
-    @Test
-    void testRescaleTerminatedByJobCancelled() {}
+        waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, 
PARALLELISM);
 
-    @Disabled(DISABLED_DESCRIPTION)
-    @Test
-    void testRescaleTerminatedByJobFailed() {}
+        miniCluster.terminateTaskManager(1);
 
-    @Disabled(DISABLED_DESCRIPTION)
-    @Test
-    void testRescaleTerminatedByNoResourcesOrNoParallelismsChange() {}
+        waitForVertexParallelismReachedAndJobRunning(
+                jobGraph, JOB_VERTEX_ID, NUMBER_SLOTS_PER_TASK_MANAGER);
 
-    @Disabled(DISABLED_DESCRIPTION)
-    @Test
-    void testRescaleTerminatedByResourcesNotEnoughException() {}
+        updateJobResourceRequirements(miniCluster, jobGraph, 1, PARALLELISM * 
2);
 
-    @Disabled(DISABLED_DESCRIPTION)
-    @Test
-    void testRescaleTerminatedByResourceRequirementsUpdated() {}
+        miniCluster.terminateTaskManager(0);
 
-    @Disabled(DISABLED_DESCRIPTION)
-    @Test
+        assumeThat(enabledRescaleHistory(configuration)).isTrue();
+        waitUntilConditionWithTimeout(
+                () -> {
+                    List<Rescale> rescaleHistory = 
getRescaleHistory(miniCluster, jobGraph);
+                    return hasRescaleHistoryMetCondition(
+                            rescaleHistory, 3, TerminatedReason.JOB_FAILED);
+                },
+                10000);
+    }
+
+    @TestTemplate
+    void testRescaleTerminatedByNoResourcesOrNoParallelismsChange() throws 
Exception {
+        final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+        final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM);
+        miniCluster.submitJob(jobGraph).join();
+        waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, 
PARALLELISM);
+
+        updateJobResourceRequirements(miniCluster, jobGraph, 1, PARALLELISM * 
2);
+
+        assumeThat(enabledRescaleHistory(configuration)).isTrue();
+        waitUntilConditionWithTimeout(
+                () -> {
+                    List<Rescale> rescaleHistory = 
getRescaleHistory(miniCluster, jobGraph);
+                    return hasRescaleHistoryMetCondition(
+                            rescaleHistory,
+                            2,
+                            
TerminatedReason.NO_RESOURCES_OR_PARALLELISMS_CHANGE);
+                },
+                20000);
+    }
+
+    @TestTemplate
+    void testRescaleTerminatedByResourcesNotEnoughException() throws Exception 
{
+        final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+        final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM);
+        miniCluster.submitJob(jobGraph).join();
+        waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, 
PARALLELISM);
+
+        miniCluster.terminateTaskManager(1);
+        miniCluster.terminateTaskManager(0);
+
+        assumeThat(enabledRescaleHistory(configuration)).isTrue();
+        waitUntilConditionWithTimeout(
+                () -> {
+                    List<Rescale> rescaleHistory = 
getRescaleHistory(miniCluster, jobGraph);
+                    return hasRescaleHistoryMetCondition(
+                            rescaleHistory, 2, 
TerminatedReason.EXCEPTION_OCCURRED);
+                },
+                100000);
+    }
+
+    @TestTemplate
+    void testRescaleTerminatedByResourceRequirementsUpdated() throws Exception 
{
+        final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+        final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM);
+        miniCluster.submitJob(jobGraph).join();
+        waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, 
PARALLELISM);
+
+        updateJobResourceRequirements(miniCluster, jobGraph, 1, PARALLELISM * 
2);
+        updateJobResourceRequirements(miniCluster, jobGraph, 1, PARALLELISM * 
3);
+
+        waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, 
PARALLELISM);
+
+        final ExecutionGraphInfo executionGraphInfo =
+                miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).join();
+        runAdaptedParameterizedAssertion(
+                executionGraphInfo,
+                () -> {
+                    
assertThat(executionGraphInfo.getRescalesStatsSnapshot()).isNotNull();
+                    List<Rescale> rescaleHistory =
+                            
executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory();
+                    assertThat(rescaleHistory).hasSize(3);
+                    Rescale rescale = rescaleHistory.get(1);
+                    assertTerminalRelatedFields(
+                            rescale, 
TerminatedReason.RESOURCE_REQUIREMENTS_UPDATED);
+                });
+    }
+
+    /**
+     * The test case is disabled after processing by
+     * https://lists.apache.org/thread/hh7w2p6lnmbo1q6d9ngkttdyrw4lp74h. Merge 
the current
+     * non-terminated rescale and the new rescale triggered by recoverable 
failover into the current
+     * rescale.
+     */
+    @Disabled
+    @TestTemplate
     void testRescaleTerminatedByJobRestarting() {}
 
     // End of tests for rescale terminated reasons and terminal state.
 
-    @Disabled(DISABLED_DESCRIPTION)
-    @Test
-    void testRecordRescaleWithoutPreRescaleInfo() {}
+    @TestTemplate
+    void testRecordRescaleWithoutPreRescaleInfo() throws Exception {
+        final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+        final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM);
+
+        miniCluster.submitJob(jobGraph).join();
+
+        waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, 
PARALLELISM);
+
+        OnceBlockingNoOpInvokable.unblock();
+
+        final ExecutionGraphInfo executionGraphInfo =
+                miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).join();
+        runAdaptedParameterizedAssertion(
+                executionGraphInfo,
+                () -> {
+                    
assertThat(executionGraphInfo.getRescalesStatsSnapshot()).isNotNull();
+                    Rescale rescale =
+                            executionGraphInfo
+                                    .getRescalesStatsSnapshot()
+                                    .getRescaleHistory()
+                                    .get(0);
+                    assertThat(rescale).isNotNull();
+                    Map<SlotSharingGroupId, SlotSharingGroupRescale> slots = 
rescale.getSlots();
+                    assertThat(slots.values())
+                            .allSatisfy(
+                                    (Consumer<SlotSharingGroupRescale>)
+                                            slotSharingGroupRescale -> {
+                                                assertThat(
+                                                                
slotSharingGroupRescale
+                                                                        
.getPreRescaleSlots())
+                                                        .isNull();
+                                                
assertSlotSharingGroupRescaleNotNullBesidesPreRelatedFields(
+                                                        
slotSharingGroupRescale);
+                                            });
+                    Map<JobVertexID, VertexParallelismRescale> vertices = 
rescale.getVertices();
+                    assertThat(vertices.values())
+                            .allSatisfy(
+                                    (Consumer<VertexParallelismRescale>)
+                                            vpr -> {
+                                                
assertThat(vpr.getPreRescaleParallelism()).isNull();
+                                                
assertVertexParallelismRescaleNotNullBesidesPreRelatedFields(
+                                                        vpr);
+                                            });
+                });
+    }
+
+    /** Tested in {@link #testRecordRescaleForNewAvailableResource()} already. 
*/
+    @TestTemplate
+    void testRecordRescaleWithPreRescaleInfo() throws Exception {
+        final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+        final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM + 1);
+
+        miniCluster.submitJob(jobGraph).join();
+
+        waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, 
PARALLELISM);
+
+        miniCluster.startTaskManager();
+
+        waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, 
PARALLELISM + 1);
+
+        OnceBlockingNoOpInvokable.unblock();
+
+        final ExecutionGraphInfo executionGraphInfo =
+                miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).join();
+        runAdaptedParameterizedAssertion(
+                executionGraphInfo,
+                () -> {
+                    
assertThat(executionGraphInfo.getRescalesStatsSnapshot()).isNotNull();
+                    Rescale rescale =
+                            executionGraphInfo
+                                    .getRescalesStatsSnapshot()
+                                    .getRescaleHistory()
+                                    .get(0);
+                    assertThat(rescale).isNotNull();
+                    Map<SlotSharingGroupId, SlotSharingGroupRescale> slots = 
rescale.getSlots();
+                    assertThat(slots.values())
+                            .allSatisfy(
+                                    (Consumer<SlotSharingGroupRescale>)
+                                            slotSharingGroupRescale -> {
+                                                assertThat(
+                                                                
slotSharingGroupRescale
+                                                                        
.getPreRescaleSlots())
+                                                        .isNotNull();
+                                                
assertSlotSharingGroupRescaleNotNullBesidesPreRelatedFields(
+                                                        
slotSharingGroupRescale);
+                                            });
+                    Map<JobVertexID, VertexParallelismRescale> vertices = 
rescale.getVertices();
+                    assertThat(vertices.values())
+                            .allSatisfy(
+                                    (Consumer<VertexParallelismRescale>)
+                                            vpr -> {
+                                                
assertThat(vpr.getPreRescaleParallelism())
+                                                        .isNotNull();
+                                                
assertVertexParallelismRescaleNotNullBesidesPreRelatedFields(
+                                                        vpr);
+                                            });
+                });
+    }
+
+    /**
+     * Test for 'Merge the current non-terminated rescale and the new rescale 
triggered by
+     * recoverable failover into the current rescale' anyone case.
+     */
+    @TestTemplate
+    void 
testRecordNonTerminatedRescaleMergingWithNewRecoverableFailureTriggerCause()
+            throws Exception {
+        final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+        final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM);
+
+        miniCluster.submitJob(jobGraph).join();
+
+        waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, 
PARALLELISM);
+
+        updateJobResourceRequirements(miniCluster, jobGraph, 1, PARALLELISM * 
2);
+
+        miniCluster.terminateTaskManager(0);
+
+        waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, 
PARALLELISM);
+
+        final ExecutionGraphInfo executionGraphInfo =
+                miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).join();
+        runAdaptedParameterizedAssertion(
+                executionGraphInfo,
+                () -> {
+                    
assertThat(executionGraphInfo.getRescalesStatsSnapshot()).isNotNull();
+                    List<Rescale> rescaleHistory =
+                            
executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory();
+                    assertThat(rescaleHistory).hasSize(2);
+                    assertThat(rescaleHistory.get(0).getTriggerCause())
+                            .isEqualTo(TriggerCause.RECOVERABLE_FAILOVER);
+                });
+    }
+
+    @TestTemplate
+    void testRecordInProgressRescale() throws Exception {
+        final MiniCluster miniCluster = miniClusterResource.getMiniCluster();
+        final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM);
+
+        miniCluster.submitJob(jobGraph).join();
+
+        waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, 
PARALLELISM);
+
+        updateJobResourceRequirements(miniCluster, jobGraph, 1, PARALLELISM * 
2);
+
+        final ExecutionGraphInfo executionGraphInfo =
+                miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).join();
+        runAdaptedParameterizedAssertion(
+                executionGraphInfo,
+                () -> {
+                    
assertThat(executionGraphInfo.getRescalesStatsSnapshot()).isNotNull();
+                    List<Rescale> rescaleHistory =
+                            
executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory();
+                    assertThat(rescaleHistory).hasSize(2);
+                    Rescale rescale = rescaleHistory.get(0);
+                    assertThat(rescale.getTerminatedReason()).isNull();
+                    assertThat(rescale.getTerminalState()).isNull();
+                });
+    }
+
+    // Private methods.
+    private JobGraph createBlockingJobGraph(int parallelism) {
+        final JobVertex blockingOperator = new JobVertex("Blocking operator", 
JOB_VERTEX_ID);
+        SlotSharingGroup sharingGroup = new SlotSharingGroup();
+        sharingGroup.setSlotSharingGroupName("slot-sharing-group-A");
+        blockingOperator.setSlotSharingGroup(sharingGroup);
+        blockingOperator.setInvokableClass(OnceBlockingNoOpInvokable.class);
+        blockingOperator.setParallelism(parallelism);
+        final JobGraph jobGraph = 
JobGraphTestUtils.streamingJobGraph(blockingOperator);
+        RestartStrategyUtils.configureFixedDelayRestartStrategy(jobGraph, 1, 
0L);
+        return jobGraph;
+    }
+
+    private void runAdaptedParameterizedAssertion(
+            ExecutionGraphInfo executionGraphInfo,
+            RunnableWithException assertionForEnabledRescaleHistory)
+            throws Exception {
+        if (enabledRescaleHistory(configuration)) {
+            assertionForEnabledRescaleHistory.run();
+        } else {
+            
assertThat(executionGraphInfo.getRescalesStatsSnapshot()).isNotNull();
+            
assertThat(executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory()).isEmpty();
+        }
+    }
+
+    private static boolean enabledRescaleHistory(Configuration configuration) {
+        return 
configuration.get(WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE) > 0;
+    }
+
+    private static void assertTerminalRelatedFields(
+            Rescale rescale, TerminatedReason expectedTerminatedReason) {
+        TerminatedReason terminatedReason = rescale.getTerminatedReason();
+        assertThat(terminatedReason).isEqualTo(expectedTerminatedReason);
+        assertThat(terminatedReason.getTerminalState())
+                .isEqualTo(rescale.getTerminalState())
+                .isEqualTo(expectedTerminatedReason.getTerminalState());
+    }
+
+    private static void assertTriggerCause(Rescale rescale, TriggerCause 
expectedTriggerCause) {
+        assertThat(rescale).isNotNull();
+        assertThat(rescale.getTriggerCause()).isEqualTo(expectedTriggerCause);
+    }
+
+    private void assertSlotSharingGroupRescaleNotNullBesidesPreRelatedFields(
+            SlotSharingGroupRescale slotSharingGroupRescale) {
+        assertThat(slotSharingGroupRescale.getDesiredSlots()).isNotNull();
+        assertThat(slotSharingGroupRescale.getPostRescaleSlots()).isNotNull();
+        
assertThat(slotSharingGroupRescale.getSlotSharingGroupName()).isNotNull();
+        
assertThat(slotSharingGroupRescale.getMinimalRequiredSlots()).isNotNull();
+        
assertThat(slotSharingGroupRescale.getAcquiredResourceProfile()).isNotNull();
+        
assertThat(slotSharingGroupRescale.getRequiredResourceProfile()).isNotNull();
+        
assertThat(slotSharingGroupRescale.getSlotSharingGroupId()).isNotNull();
+    }
+
+    private void assertVertexParallelismRescaleNotNullBesidesPreRelatedFields(
+            VertexParallelismRescale vpr) {
+        assertThat(vpr.getDesiredParallelism()).isNotNull();
+        assertThat(vpr.getPostRescaleParallelism()).isNotNull();
+        assertThat(vpr.getSlotSharingGroupName()).isNotNull();
+        assertThat(vpr.getSufficientParallelism()).isNotNull();
+        assertThat(vpr.getJobVertexId()).isNotNull();
+        assertThat(vpr.getSlotSharingGroupId()).isNotNull();
+        assertThat(vpr.getSlotSharingGroupName()).isNotNull();
+    }
+
+    private void waitUntilConditionWithTimeout(
+            SupplierWithException<Boolean, Exception> condition, long 
timeoutMillis)
+            throws Exception {
+        CompletableFuture.runAsync(
+                        () -> {
+                            try {
+                                CommonTestUtils.waitUntilCondition(condition);
+                            } catch (Exception e) {
+                                throw new RuntimeException(e);
+                            }
+                        })
+                .get(timeoutMillis, TimeUnit.MILLISECONDS);
+    }
+
+    private void waitForVertexParallelismReachedAndJobRunning(
+            JobGraph jobGraph, JobVertexID jobVertexId, int targetParallelism) 
throws Exception {
+        CommonTestUtils.waitUntilCondition(
+                () -> {
+                    final ArchivedExecutionGraph archivedExecutionGraph =
+                            miniClusterResource
+                                    .getMiniCluster()
+                                    
.getArchivedExecutionGraph(jobGraph.getJobID())
+                                    .get();
+                    final AccessExecutionJobVertex executionJobVertex =
+                            
archivedExecutionGraph.getAllVertices().get(jobVertexId);
+                    if (executionJobVertex == null) {
+                        // parallelism was not yet determined
+                        return false;
+                    }
+                    return executionJobVertex.getParallelism() == 
targetParallelism;
+                });
+        CommonTestUtils.waitUntilCondition(
+                () ->
+                        
miniClusterResource.getMiniCluster().getJobStatus(jobGraph.getJobID()).get()
+                                == JobStatus.RUNNING);
+    }
 
-    @Disabled(DISABLED_DESCRIPTION)
-    @Test
-    void testRecordRescaleWithPreRescaleInfo() {}
+    private void updateJobResourceRequirements(
+            MiniCluster miniCluster, JobGraph jobGraph, int lowerBound, int 
upperBound)
+            throws ExecutionException, InterruptedException {
+        miniCluster
+                .updateJobResourceRequirements(
+                        jobGraph.getJobID(),
+                        JobResourceRequirements.newBuilder()
+                                .setParallelismForJobVertex(
+                                        
jobGraph.getVertices().iterator().next().getID(),
+                                        lowerBound,
+                                        upperBound)
+                                .build())
+                .get();
+    }
 
-    @Disabled(DISABLED_DESCRIPTION)
-    @Test
-    void 
testRecordNonTerminatedRescaleMergingWithNewRecoverableFailureTriggerCause() {}
+    private boolean hasRescaleHistoryMetCondition(
+            List<Rescale> rescales, int expectedSize, TerminatedReason 
terminatedReasonOfLatest) {
+        return rescales.size() == expectedSize
+                && terminatedReasonOfLatest == 
rescales.get(0).getTerminatedReason();
+    }
 
-    @Disabled(DISABLED_DESCRIPTION)
-    @Test
-    void testRecordInProgressRescale() {}
+    private static List<Rescale> getRescaleHistory(MiniCluster miniCluster, 
JobGraph jobGraph)
+            throws InterruptedException, ExecutionException {
+        ExecutionGraphInfo executionGraphInfo =
+                miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).get();
+        return 
executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory();
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java
index 4e717aee344..d773f0ac165 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java
@@ -85,5 +85,11 @@ class RescalesSummaryTest {
         assertThat(rescalesSummary.getInProgressRescalesCount()).isZero();
         assertThat(rescalesSummary.getFailedRescalesCount()).isZero();
         assertSummary(rescalesSummary.getCompletedRescalesSummary(), 1L, 1L, 
1L, 1L, 1L);
+        assertThat(
+                        rescalesSummary
+                                .createSnapshot()
+                                .getCompletedRescalesSummarySnapshot()
+                                .getQuantile(1))
+                .isOne();
     }
 }

Reply via email to