This is an automated email from the ASF dual-hosted git repository. panyuepeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ab3bcce0cfb44ccc402a6f694a760440ac30940b Author: Yuepeng Pan <[email protected]> AuthorDate: Thu Feb 26 09:48:12 2026 +0800 [FLINK-38343][runtime] Support store rescale history with statistics by ExecutionGraphInfo. Co-authored-by: XComp <[email protected]> Co-authored-by: WeiZhong94 <[email protected]> Co-authored-by: ferenc-csaky <[email protected]> Signed-off-by: Yuepeng Pan <[email protected]> --- .../flink/runtime/minicluster/MiniCluster.java | 21 + .../runtime/scheduler/ExecutionGraphInfo.java | 33 +- .../flink/runtime/scheduler/SchedulerBase.java | 1 + .../scheduler/adaptive/AdaptiveScheduler.java | 3 +- .../adaptive/timeline/DefaultRescaleTimeline.java | 9 + .../adaptive/timeline/RescaleTimeline.java | 8 + .../adaptive/timeline/RescalesStatsSnapshot.java | 58 ++ .../adaptive/timeline/RescalesSummary.java | 10 + .../adaptive/timeline/RescalesSummarySnapshot.java | 88 +++ .../runtime/jobmaster/TestingJobManagerRunner.java | 1 + .../rest/handler/job/JobExceptionsHandlerTest.java | 5 +- .../job/rescales/JobRescaleConfigHandlerTest.java | 4 +- .../scheduler/adaptive/AdaptiveSchedulerTest.java | 6 + .../adaptive/timeline/RescaleTimelineITCase.java | 711 +++++++++++++++++++-- .../adaptive/timeline/RescalesSummaryTest.java | 6 + 15 files changed, 902 insertions(+), 62 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 0b834ca5032..96cd052698a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -32,6 +32,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.StateRecoveryOptions; import org.apache.flink.core.execution.CheckpointType; import org.apache.flink.core.execution.RecoveryClaimMode; @@ -70,6 +71,7 @@ import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipCont import org.apache.flink.runtime.io.network.partition.ClusterPartitionManager; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobResourceRequirements; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobmanager.HighAvailabilityMode; import org.apache.flink.runtime.jobmaster.JobResult; @@ -850,6 +852,25 @@ public class MiniCluster implements AutoCloseableAsync { .thenApply(ExecutionGraphInfo::getArchivedExecutionGraph)); } + public CompletableFuture<ExecutionGraphInfo> getExecutionGraphInfo(JobID jobId) { + return runDispatcherCommand( + dispatcherGateway -> + dispatcherGateway.requestExecutionGraphInfo(jobId, rpcTimeout)); + } + + public CompletableFuture<Acknowledge> updateJobResourceRequirements( + JobID jobId, JobResourceRequirements resourceRequirements) { + if (miniClusterConfiguration.getConfiguration().get(JobManagerOptions.SCHEDULER) + != JobManagerOptions.SchedulerType.Adaptive) { + throw new UnsupportedOperationException( + "updateJobResourceRequirements is only supported for adaptive scheduler"); + } + return runDispatcherCommand( + dispatcherGateway -> + dispatcherGateway.updateJobResourceRequirements( + jobId, resourceRequirements)); + } + public CompletableFuture<Collection<JobStatusMessage>> listJobs() { return runDispatcherCommand( dispatcherGateway -> diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java index 7e6680f498e..b705a598d77 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionGraphInfo.java @@ -24,12 +24,13 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.rest.messages.job.rescales.JobRescaleConfigInfo; +import org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesStatsSnapshot; import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import javax.annotation.Nullable; import java.io.Serializable; -import java.util.Collections; +import java.util.List; import java.util.Optional; /** @@ -46,35 +47,48 @@ public class ExecutionGraphInfo implements Serializable { /** * The value is null when the job is not enabled {@link - * org.apache.flink.runtime.scheduler.adaptive.AdaptiveScheduler}. + * JobManagerOptions.SchedulerType#Adaptive} scheduler. */ @Nullable private final JobRescaleConfigInfo jobRescaleConfigInfo; + /** + * This field is null when the value of {@link + * org.apache.flink.configuration.WebOptions#MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE} is not + * a positive number. + */ + @Nullable private final RescalesStatsSnapshot rescalesStatsSnapshot; + public ExecutionGraphInfo(ArchivedExecutionGraph executionGraph) { this( executionGraph, executionGraph.getFailureInfo() != null - ? Collections.singleton( + ? List.of( RootExceptionHistoryEntry.fromGlobalFailure( executionGraph.getFailureInfo())) - : Collections.emptyList()); + : List.of(), + null, + null, + null); } public ExecutionGraphInfo( ArchivedExecutionGraph executionGraph, - Iterable<RootExceptionHistoryEntry> exceptionHistory) { - this(executionGraph, exceptionHistory, null, null); + Iterable<RootExceptionHistoryEntry> exceptionHistory, + @Nullable JobManagerOptions.SchedulerType schedulerType) { + this(executionGraph, exceptionHistory, schedulerType, null, null); } public ExecutionGraphInfo( ArchivedExecutionGraph executionGraph, Iterable<RootExceptionHistoryEntry> exceptionHistory, @Nullable JobManagerOptions.SchedulerType schedulerType, - @Nullable JobRescaleConfigInfo jobRescaleConfigInfo) { + @Nullable JobRescaleConfigInfo jobRescaleConfigInfo, + @Nullable RescalesStatsSnapshot rescalesStatsSnapshot) { this.executionGraph = executionGraph; this.exceptionHistory = exceptionHistory; this.schedulerType = schedulerType; this.jobRescaleConfigInfo = jobRescaleConfigInfo; + this.rescalesStatsSnapshot = rescalesStatsSnapshot; } public JobID getJobId() { @@ -108,4 +122,9 @@ public class ExecutionGraphInfo implements Serializable { public JobManagerOptions.SchedulerType getSchedulerType() { return schedulerType; } + + @Nullable + public RescalesStatsSnapshot getRescalesStatsSnapshot() { + return rescalesStatsSnapshot; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 50778dc9478..7d0c21c0470 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -873,6 +873,7 @@ public abstract class SchedulerBase implements SchedulerNG, CheckpointScheduling ArchivedExecutionGraph.createFrom(executionGraph), getExceptionHistory(), getSchedulerType(), + null, null); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 153d99c8f85..70c5b62ce90 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -896,7 +896,8 @@ public class AdaptiveScheduler state.getJob(), exceptionHistory.toArrayList(), getSchedulerType(), - settings.toJobRescaleConfigInfo()); + settings.toJobRescaleConfigInfo(), + rescaleTimeline.createSnapshot()); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java index 1a23e229ea1..e37852db6d0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/DefaultRescaleTimeline.java @@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -122,6 +124,13 @@ public class DefaultRescaleTimeline implements RescaleTimeline { return currentRescale; } + @Override + public RescalesStatsSnapshot createSnapshot() { + List<Rescale> rescales = rescaleHistory.toArrayList(); + Collections.reverse(rescales); + return new RescalesStatsSnapshot(List.copyOf(rescales), rescalesSummary.createSnapshot()); + } + private RescaleIdInfo nextRescaleId(boolean newRescaleEpoch) { if (newRescaleEpoch) { rescaleIdInfo = new RescaleIdInfo(new AbstractID(), 1L); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimeline.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimeline.java index 7c9f1411939..7842ce60509 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimeline.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimeline.java @@ -67,6 +67,9 @@ public interface RescaleTimeline { */ boolean updateRescale(RescaleUpdater rescaleUpdater); + /** Create the snapshot of max history of rescales. */ + RescalesStatsSnapshot createSnapshot(); + /** Rescale operation interface. */ interface RescaleUpdater { void update(Rescale rescaleToUpdate); @@ -98,6 +101,11 @@ public interface RescaleTimeline { return null; } + @Override + public RescalesStatsSnapshot createSnapshot() { + return RescalesStatsSnapshot.emptySnapshot(); + } + @Override public boolean isIdling() { return false; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java new file mode 100644 index 00000000000..316d34ef634 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesStatsSnapshot.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.timeline; + +import org.apache.flink.runtime.util.stats.StatsSummarySnapshot; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +public class RescalesStatsSnapshot implements Serializable { + private static final long serialVersionUID = 1L; + + private final List<Rescale> rescaleHistory; + private final RescalesSummarySnapshot rescalesSummarySnapshot; + + public RescalesStatsSnapshot( + List<Rescale> rescaleHistory, RescalesSummarySnapshot rescalesSummarySnapshot) { + this.rescaleHistory = rescaleHistory; + this.rescalesSummarySnapshot = rescalesSummarySnapshot; + } + + public List<Rescale> getRescaleHistory() { + return rescaleHistory; + } + + public RescalesSummarySnapshot getRescalesSummarySnapshot() { + return rescalesSummarySnapshot; + } + + public static RescalesStatsSnapshot emptySnapshot() { + return new RescalesStatsSnapshot( + new ArrayList<>(), + new RescalesSummarySnapshot( + StatsSummarySnapshot.empty(), + StatsSummarySnapshot.empty(), + StatsSummarySnapshot.empty(), + StatsSummarySnapshot.empty(), + 0, + 0)); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummary.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummary.java index 6d1e2a567cb..93474bf563d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummary.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummary.java @@ -133,4 +133,14 @@ public class RescalesSummary implements Serializable { public StatsSummary getFailedRescalesSummary() { return failedRescalesSummary; } + + public RescalesSummarySnapshot createSnapshot() { + return new RescalesSummarySnapshot( + allTerminatedSummary.createSnapshot(), + completedRescalesSummary.createSnapshot(), + ignoredRescalesSummary.createSnapshot(), + failedRescalesSummary.createSnapshot(), + totalRescalesCount, + inProgressRescalesCount); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummarySnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummarySnapshot.java new file mode 100644 index 00000000000..e7f14896e5f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummarySnapshot.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.adaptive.timeline; + +import org.apache.flink.runtime.util.stats.StatsSummarySnapshot; + +import java.io.Serializable; + +public class RescalesSummarySnapshot implements Serializable { + + private static final long serialVersionUID = 1L; + + private final StatsSummarySnapshot allTerminatedSummarySnapshot; + + private final StatsSummarySnapshot completedRescalesSummarySnapshot; + private final StatsSummarySnapshot ignoredRescalesSummarySnapshot; + private final StatsSummarySnapshot failedRescalesSummarySnapshot; + + private long totalRescalesCount = 0L; + private long inProgressRescaleCount = 0L; + + public RescalesSummarySnapshot( + StatsSummarySnapshot allTerminatedSummarySnapshot, + StatsSummarySnapshot completedRescalesSummarySnapshot, + StatsSummarySnapshot ignoredRescalesSummarySnapshot, + StatsSummarySnapshot failedRescalesSummarySnapshot, + long totalRescalesCount, + long inProgressRescaleCount) { + this.allTerminatedSummarySnapshot = allTerminatedSummarySnapshot; + this.completedRescalesSummarySnapshot = completedRescalesSummarySnapshot; + this.ignoredRescalesSummarySnapshot = ignoredRescalesSummarySnapshot; + this.failedRescalesSummarySnapshot = failedRescalesSummarySnapshot; + this.totalRescalesCount = totalRescalesCount; + this.inProgressRescaleCount = inProgressRescaleCount; + } + + public StatsSummarySnapshot getAllTerminatedSummarySnapshot() { + return allTerminatedSummarySnapshot; + } + + public StatsSummarySnapshot getCompletedRescalesSummarySnapshot() { + return completedRescalesSummarySnapshot; + } + + public StatsSummarySnapshot getIgnoredRescalesSummarySnapshot() { + return ignoredRescalesSummarySnapshot; + } + + public StatsSummarySnapshot getFailedRescalesSummarySnapshot() { + return failedRescalesSummarySnapshot; + } + + public long getTotalRescalesCount() { + return totalRescalesCount; + } + + public long getCompletedRescalesCount() { + return completedRescalesSummarySnapshot.getCount(); + } + + public long getIgnoredRescalesCount() { + return ignoredRescalesSummarySnapshot.getCount(); + } + + public long getFailedRescalesCount() { + return failedRescalesSummarySnapshot.getCount(); + } + + public long getInProgressRescaleCount() { + return inProgressRescaleCount; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java index 93091d138b7..abb2c1b5584 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestingJobManagerRunner.java @@ -78,6 +78,7 @@ public class TestingJobManagerRunner implements JobManagerRunner { null, null, 0L), + null, null); terminationFuture.whenComplete( (ignored, ignoredThrowable) -> diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java index cd2f47ade1a..73d7c7e4baa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java @@ -480,7 +480,8 @@ class JobExceptionsHandlerTest { .setFailureCause(new ErrorInfo(failureCause, failureTimestamp)) .setTasks(tasks) .build(), - exceptionHistory); + exceptionHistory, + null); } private static ArchivedExecutionJobVertex createArchivedExecutionJobVertex( @@ -555,7 +556,7 @@ class JobExceptionsHandlerTest { // we have to reverse it to simulate how the Scheduler collects it Collections.reverse(historyEntryCollection); - return new ExecutionGraphInfo(executionGraphBuilder.build(), historyEntryCollection); + return new ExecutionGraphInfo(executionGraphBuilder.build(), historyEntryCollection, null); } private static HandlerRequest<EmptyRequestBody> createRequest(JobID jobId, int size) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleConfigHandlerTest.java index 3fb7c10f170..8b1edeb9101 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleConfigHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/rescales/JobRescaleConfigHandlerTest.java @@ -74,6 +74,7 @@ class JobRescaleConfigHandlerTest { new ArchivedExecutionGraphBuilder().build(), Collections.emptyList(), null, + null, null); final ExecutionGraphInfo finalExecutionGraphInfo = executionGraphInfo; assertThatThrownBy(() -> testInstance.handleRequest(request, finalExecutionGraphInfo)) @@ -88,7 +89,8 @@ class JobRescaleConfigHandlerTest { new ArchivedExecutionGraphBuilder().build(), Collections.emptyList(), null, - jobRescaleConfigInfo); + jobRescaleConfigInfo, + null); assertThat(testInstance.handleRequest(request, executionGraphInfo)) .isEqualTo(jobRescaleConfigInfo); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java index d5c312c4dfe..4feef4ca0f5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java @@ -100,6 +100,7 @@ import org.apache.flink.runtime.scheduler.VertexParallelismStore; import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot; import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlotAllocator; import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism; +import org.apache.flink.runtime.scheduler.adaptive.timeline.Durable; import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry; import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry; import org.apache.flink.runtime.slots.ResourceRequirement; @@ -2699,6 +2700,11 @@ public class AdaptiveSchedulerTest extends AdaptiveSchedulerTestBase { this.jobStatus = jobStatus; } + @Override + public Durable getDurable() { + return super.getDurable(); + } + @Override public JobStatus getJobStatus() { return jobStatus; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java index 96e61c7e0c9..09d31c00d76 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java @@ -18,85 +18,694 @@ package org.apache.flink.runtime.scheduler.adaptive.timeline; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobGraphTestUtils; +import org.apache.flink.runtime.jobgraph.JobResourceRequirements; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.minicluster.MiniCluster; +import org.apache.flink.runtime.scheduler.ExecutionGraphInfo; +import org.apache.flink.runtime.testtasks.OnceBlockingNoOpInvokable; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.runtime.testutils.MiniClusterResource; +import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.util.RestartStrategyUtils; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; +import org.apache.flink.util.function.RunnableWithException; +import org.apache.flink.util.function.SupplierWithException; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; /** * Test for recording rescale history by {@link DefaultRescaleTimeline} or {@link * RescaleTimeline.NoOpRescaleTimeline}. */ +@ExtendWith(ParameterizedTestExtension.class) class RescaleTimelineITCase { - static final String DISABLED_DESCRIPTION = - "TODO: Blocked by FLINK-38343, the ITCases need the SchedulerNG#requstJob() to get the rescale history."; + + private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2; + private static final int NUMBER_TASK_MANAGERS = 2; + private static final int PARALLELISM = NUMBER_SLOTS_PER_TASK_MANAGER * NUMBER_TASK_MANAGERS; + private static final JobVertexID JOB_VERTEX_ID = new JobVertexID(); + + @Parameter private Configuration configuration; + private MiniClusterResource miniClusterResource; + + @Parameters + static Collection<Object[]> getClusterConfigs() { + Configuration confNonEnabledRescaleHistory = createConfiguration(); + Configuration confEnabledRescaleHistory = createConfiguration(); + confEnabledRescaleHistory.set(WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE, 3); + return Arrays.asList( + new Object[] {confNonEnabledRescaleHistory}, + new Object[] {confEnabledRescaleHistory}); + } + + private static Configuration createConfiguration() { + final Configuration configuration = new Configuration(); + configuration.set(JobManagerOptions.SCHEDULER, JobManagerOptions.SchedulerType.Adaptive); + configuration.set( + JobManagerOptions.SCHEDULER_SUBMISSION_RESOURCE_STABILIZATION_TIMEOUT, + Duration.ofMillis(50)); + configuration.set( + JobManagerOptions.SCHEDULER_EXECUTING_COOLDOWN_AFTER_RESCALING, + // Use the 0.1 seconds to trigger the long-time non-terminal rescale event after a + // rescaling. + Duration.ofMillis(100)); + configuration.set( + JobManagerOptions.SCHEDULER_EXECUTING_RESOURCE_STABILIZATION_TIMEOUT, + Duration.ofMillis(50)); + configuration.set( + JobManagerOptions.SCHEDULER_SUBMISSION_RESOURCE_WAIT_TIMEOUT, + Duration.ofSeconds(2)); + return configuration; + } + + @BeforeEach + void setUp() throws Exception { + OnceBlockingNoOpInvokable.reset(); + this.miniClusterResource = + new MiniClusterResource( + new MiniClusterResourceConfiguration.Builder() + .setConfiguration(configuration) + .setNumberSlotsPerTaskManager(NUMBER_SLOTS_PER_TASK_MANAGER) + .setNumberTaskManagers(NUMBER_TASK_MANAGERS) + .build()); + miniClusterResource.before(); + } + + @AfterEach + void tearDown() { + miniClusterResource.after(); + } // Tests for rescale trigger causes. - @Disabled(DISABLED_DESCRIPTION) - @Test - void testRecordRescaleForInitialScheduling() {} + @TestTemplate + void testRecordRescaleForInitialScheduling() throws Exception { + final MiniCluster miniCluster = miniClusterResource.getMiniCluster(); + final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM); - @Disabled(DISABLED_DESCRIPTION) - @Test - void testRecordRescaleForNewResourcesRequirements() {} + miniCluster.submitJob(jobGraph).join(); - @Disabled(DISABLED_DESCRIPTION) - @Test - void testRecordRescaleForNewAvailableResource() {} + waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, PARALLELISM); + OnceBlockingNoOpInvokable.unblock(); - @Disabled(DISABLED_DESCRIPTION) - @Test - void testRecordRescaleForRecoverableFailover() {} + final ExecutionGraphInfo executionGraphInfo = + miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).join(); + runAdaptedParameterizedAssertion( + executionGraphInfo, + () -> { + assertThat(executionGraphInfo.getRescalesStatsSnapshot()).isNotNull(); + List<Rescale> rescaleHistory = + executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory(); + assertThat(rescaleHistory).hasSize(1); + Rescale rescale = rescaleHistory.get(0); + assertTriggerCause(rescale, TriggerCause.INITIAL_SCHEDULE); + assertTerminalRelatedFields(rescale, TerminatedReason.SUCCEEDED); + assertThat( + rescale.getSchedulerStates().stream() + .map(SchedulerStateSpan::getState) + .collect(Collectors.toList())) + .containsExactly( + "Created", + "WaitingForResources", + "CreatingExecutionGraph", + "Executing"); + }); + } - // End of tests for rescale trigger causes. + @TestTemplate + void testRecordRescaleForNewResourcesRequirements() throws Exception { + final MiniCluster miniCluster = miniClusterResource.getMiniCluster(); + final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM); + + miniCluster.submitJob(jobGraph).join(); + + waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, PARALLELISM); + + updateJobResourceRequirements(miniCluster, jobGraph, 1, NUMBER_SLOTS_PER_TASK_MANAGER); + + waitForVertexParallelismReachedAndJobRunning( + jobGraph, JOB_VERTEX_ID, NUMBER_SLOTS_PER_TASK_MANAGER); + + OnceBlockingNoOpInvokable.unblock(); + + final ExecutionGraphInfo executionGraphInfo = + miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).join(); + runAdaptedParameterizedAssertion( + executionGraphInfo, + () -> { + assertThat(executionGraphInfo.getRescalesStatsSnapshot()).isNotNull(); + List<Rescale> rescaleHistory = + executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory(); + assertThat(rescaleHistory).hasSize(2); + Rescale rescale = rescaleHistory.get(0); + assertTriggerCause(rescale, TriggerCause.UPDATE_REQUIREMENT); + assertTerminalRelatedFields(rescale, TerminatedReason.SUCCEEDED); + }); + } + + @TestTemplate + void testRecordRescaleForNewAvailableResource() throws Exception { + final MiniCluster miniCluster = miniClusterResource.getMiniCluster(); + final JobGraph jobGraph = + createBlockingJobGraph(PARALLELISM + NUMBER_SLOTS_PER_TASK_MANAGER); + + miniCluster.submitJob(jobGraph).join(); + + waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, PARALLELISM); + + miniCluster.startTaskManager(); + + waitForVertexParallelismReachedAndJobRunning( + jobGraph, + JOB_VERTEX_ID, + NUMBER_SLOTS_PER_TASK_MANAGER * (NUMBER_TASK_MANAGERS + 1)); + + OnceBlockingNoOpInvokable.unblock(); - // Start of tests for rescale terminated reasons and terminal state. + final ExecutionGraphInfo executionGraphInfo = + miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).join(); + runAdaptedParameterizedAssertion( + executionGraphInfo, + () -> { + assertThat(executionGraphInfo.getRescalesStatsSnapshot()).isNotNull(); + List<Rescale> rescaleHistory = + executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory(); + assertThat(rescaleHistory).hasSize(2); + Rescale rescale = rescaleHistory.get(0); + assertTriggerCause(rescale, TriggerCause.NEW_RESOURCE_AVAILABLE); + assertTerminalRelatedFields(rescale, TerminatedReason.SUCCEEDED); + }); + } - @Disabled(DISABLED_DESCRIPTION) - @Test + @TestTemplate + void testRecordRescaleForRecoverableFailover() throws Exception { + final MiniCluster miniCluster = miniClusterResource.getMiniCluster(); + final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM); + + miniCluster.submitJob(jobGraph).join(); + + waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, PARALLELISM); + + miniCluster.terminateTaskManager(0); + waitForVertexParallelismReachedAndJobRunning( + jobGraph, JOB_VERTEX_ID, NUMBER_SLOTS_PER_TASK_MANAGER); + + OnceBlockingNoOpInvokable.unblock(); + + final ExecutionGraphInfo executionGraphInfo = + miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).join(); + runAdaptedParameterizedAssertion( + executionGraphInfo, + () -> { + assertThat(executionGraphInfo.getRescalesStatsSnapshot()).isNotNull(); + List<Rescale> rescaleHistory = + executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory(); + assertThat(rescaleHistory).hasSize(2); + Rescale rescale = rescaleHistory.get(0); + assertTriggerCause(rescale, TriggerCause.RECOVERABLE_FAILOVER); + assertTerminalRelatedFields(rescale, TerminatedReason.SUCCEEDED); + }); + } + + // End of tests for rescale trigger causes. + + // Tests for rescale terminated reasons and terminal state. + /** Already tested in {@link #testRecordRescaleForInitialScheduling()}, etc. */ + @Disabled + @TestTemplate void testRescaleTerminatedBySucceeded() {} - @Disabled(DISABLED_DESCRIPTION) - @Test - void testRescaleTerminatedByJobFinished() {} + @TestTemplate + void testRescaleTerminatedByJobFinished() throws Exception { + final MiniCluster miniCluster = miniClusterResource.getMiniCluster(); + final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM); + + miniCluster.submitJob(jobGraph).join(); + + waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, PARALLELISM); + + updateJobResourceRequirements(miniCluster, jobGraph, 1, PARALLELISM * 2); + + OnceBlockingNoOpInvokable.unblock(); + + assumeThat(enabledRescaleHistory(configuration)).isTrue(); + waitUntilConditionWithTimeout( + () -> { + List<Rescale> rescaleHistory = getRescaleHistory(miniCluster, jobGraph); + return hasRescaleHistoryMetCondition( + rescaleHistory, 2, TerminatedReason.JOB_FINISHED); + }, + 10000); + } + + @TestTemplate + void testRescaleTerminatedByJobCancelled() throws Exception { + final MiniCluster miniCluster = miniClusterResource.getMiniCluster(); + final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM); + + miniCluster.submitJob(jobGraph).join(); + + waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, PARALLELISM); + + updateJobResourceRequirements(miniCluster, jobGraph, 1, PARALLELISM * 2); + + miniCluster.cancelJob(jobGraph.getJobID()); + + assumeThat(enabledRescaleHistory(configuration)).isTrue(); + waitUntilConditionWithTimeout( + () -> { + List<Rescale> rescaleHistory = getRescaleHistory(miniCluster, jobGraph); + return hasRescaleHistoryMetCondition( + rescaleHistory, 2, TerminatedReason.JOB_CANCELED); + }, + 10000); + } + + @TestTemplate + void testRescaleTerminatedByJobFailed() throws Exception { + final MiniCluster miniCluster = miniClusterResource.getMiniCluster(); + final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM); + + miniCluster.submitJob(jobGraph).join(); - @Disabled(DISABLED_DESCRIPTION) - @Test - void testRescaleTerminatedByJobCancelled() {} + waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, PARALLELISM); - @Disabled(DISABLED_DESCRIPTION) - @Test - void testRescaleTerminatedByJobFailed() {} + miniCluster.terminateTaskManager(1); - @Disabled(DISABLED_DESCRIPTION) - @Test - void testRescaleTerminatedByNoResourcesOrNoParallelismsChange() {} + waitForVertexParallelismReachedAndJobRunning( + jobGraph, JOB_VERTEX_ID, NUMBER_SLOTS_PER_TASK_MANAGER); - @Disabled(DISABLED_DESCRIPTION) - @Test - void testRescaleTerminatedByResourcesNotEnoughException() {} + updateJobResourceRequirements(miniCluster, jobGraph, 1, PARALLELISM * 2); - @Disabled(DISABLED_DESCRIPTION) - @Test - void testRescaleTerminatedByResourceRequirementsUpdated() {} + miniCluster.terminateTaskManager(0); - @Disabled(DISABLED_DESCRIPTION) - @Test + assumeThat(enabledRescaleHistory(configuration)).isTrue(); + waitUntilConditionWithTimeout( + () -> { + List<Rescale> rescaleHistory = getRescaleHistory(miniCluster, jobGraph); + return hasRescaleHistoryMetCondition( + rescaleHistory, 3, TerminatedReason.JOB_FAILED); + }, + 10000); + } + + @TestTemplate + void testRescaleTerminatedByNoResourcesOrNoParallelismsChange() throws Exception { + final MiniCluster miniCluster = miniClusterResource.getMiniCluster(); + final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM); + miniCluster.submitJob(jobGraph).join(); + waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, PARALLELISM); + + updateJobResourceRequirements(miniCluster, jobGraph, 1, PARALLELISM * 2); + + assumeThat(enabledRescaleHistory(configuration)).isTrue(); + waitUntilConditionWithTimeout( + () -> { + List<Rescale> rescaleHistory = getRescaleHistory(miniCluster, jobGraph); + return hasRescaleHistoryMetCondition( + rescaleHistory, + 2, + TerminatedReason.NO_RESOURCES_OR_PARALLELISMS_CHANGE); + }, + 20000); + } + + @TestTemplate + void testRescaleTerminatedByResourcesNotEnoughException() throws Exception { + final MiniCluster miniCluster = miniClusterResource.getMiniCluster(); + final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM); + miniCluster.submitJob(jobGraph).join(); + waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, PARALLELISM); + + miniCluster.terminateTaskManager(1); + miniCluster.terminateTaskManager(0); + + assumeThat(enabledRescaleHistory(configuration)).isTrue(); + waitUntilConditionWithTimeout( + () -> { + List<Rescale> rescaleHistory = getRescaleHistory(miniCluster, jobGraph); + return hasRescaleHistoryMetCondition( + rescaleHistory, 2, TerminatedReason.EXCEPTION_OCCURRED); + }, + 100000); + } + + @TestTemplate + void testRescaleTerminatedByResourceRequirementsUpdated() throws Exception { + final MiniCluster miniCluster = miniClusterResource.getMiniCluster(); + final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM); + miniCluster.submitJob(jobGraph).join(); + waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, PARALLELISM); + + updateJobResourceRequirements(miniCluster, jobGraph, 1, PARALLELISM * 2); + updateJobResourceRequirements(miniCluster, jobGraph, 1, PARALLELISM * 3); + + waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, PARALLELISM); + + final ExecutionGraphInfo executionGraphInfo = + miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).join(); + runAdaptedParameterizedAssertion( + executionGraphInfo, + () -> { + assertThat(executionGraphInfo.getRescalesStatsSnapshot()).isNotNull(); + List<Rescale> rescaleHistory = + executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory(); + assertThat(rescaleHistory).hasSize(3); + Rescale rescale = rescaleHistory.get(1); + assertTerminalRelatedFields( + rescale, TerminatedReason.RESOURCE_REQUIREMENTS_UPDATED); + }); + } + + /** + * The test case is disabled after processing by + * https://lists.apache.org/thread/hh7w2p6lnmbo1q6d9ngkttdyrw4lp74h. Merge the current + * non-terminated rescale and the new rescale triggered by recoverable failover into the current + * rescale. + */ + @Disabled + @TestTemplate void testRescaleTerminatedByJobRestarting() {} // End of tests for rescale terminated reasons and terminal state. - @Disabled(DISABLED_DESCRIPTION) - @Test - void testRecordRescaleWithoutPreRescaleInfo() {} + @TestTemplate + void testRecordRescaleWithoutPreRescaleInfo() throws Exception { + final MiniCluster miniCluster = miniClusterResource.getMiniCluster(); + final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM); + + miniCluster.submitJob(jobGraph).join(); + + waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, PARALLELISM); + + OnceBlockingNoOpInvokable.unblock(); + + final ExecutionGraphInfo executionGraphInfo = + miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).join(); + runAdaptedParameterizedAssertion( + executionGraphInfo, + () -> { + assertThat(executionGraphInfo.getRescalesStatsSnapshot()).isNotNull(); + Rescale rescale = + executionGraphInfo + .getRescalesStatsSnapshot() + .getRescaleHistory() + .get(0); + assertThat(rescale).isNotNull(); + Map<SlotSharingGroupId, SlotSharingGroupRescale> slots = rescale.getSlots(); + assertThat(slots.values()) + .allSatisfy( + (Consumer<SlotSharingGroupRescale>) + slotSharingGroupRescale -> { + assertThat( + slotSharingGroupRescale + .getPreRescaleSlots()) + .isNull(); + assertSlotSharingGroupRescaleNotNullBesidesPreRelatedFields( + slotSharingGroupRescale); + }); + Map<JobVertexID, VertexParallelismRescale> vertices = rescale.getVertices(); + assertThat(vertices.values()) + .allSatisfy( + (Consumer<VertexParallelismRescale>) + vpr -> { + assertThat(vpr.getPreRescaleParallelism()).isNull(); + assertVertexParallelismRescaleNotNullBesidesPreRelatedFields( + vpr); + }); + }); + } + + /** Tested in {@link #testRecordRescaleForNewAvailableResource()} already. */ + @TestTemplate + void testRecordRescaleWithPreRescaleInfo() throws Exception { + final MiniCluster miniCluster = miniClusterResource.getMiniCluster(); + final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM + 1); + + miniCluster.submitJob(jobGraph).join(); + + waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, PARALLELISM); + + miniCluster.startTaskManager(); + + waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, PARALLELISM + 1); + + OnceBlockingNoOpInvokable.unblock(); + + final ExecutionGraphInfo executionGraphInfo = + miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).join(); + runAdaptedParameterizedAssertion( + executionGraphInfo, + () -> { + assertThat(executionGraphInfo.getRescalesStatsSnapshot()).isNotNull(); + Rescale rescale = + executionGraphInfo + .getRescalesStatsSnapshot() + .getRescaleHistory() + .get(0); + assertThat(rescale).isNotNull(); + Map<SlotSharingGroupId, SlotSharingGroupRescale> slots = rescale.getSlots(); + assertThat(slots.values()) + .allSatisfy( + (Consumer<SlotSharingGroupRescale>) + slotSharingGroupRescale -> { + assertThat( + slotSharingGroupRescale + .getPreRescaleSlots()) + .isNotNull(); + assertSlotSharingGroupRescaleNotNullBesidesPreRelatedFields( + slotSharingGroupRescale); + }); + Map<JobVertexID, VertexParallelismRescale> vertices = rescale.getVertices(); + assertThat(vertices.values()) + .allSatisfy( + (Consumer<VertexParallelismRescale>) + vpr -> { + assertThat(vpr.getPreRescaleParallelism()) + .isNotNull(); + assertVertexParallelismRescaleNotNullBesidesPreRelatedFields( + vpr); + }); + }); + } + + /** + * Test for 'Merge the current non-terminated rescale and the new rescale triggered by + * recoverable failover into the current rescale' anyone case. + */ + @TestTemplate + void testRecordNonTerminatedRescaleMergingWithNewRecoverableFailureTriggerCause() + throws Exception { + final MiniCluster miniCluster = miniClusterResource.getMiniCluster(); + final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM); + + miniCluster.submitJob(jobGraph).join(); + + waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, PARALLELISM); + + updateJobResourceRequirements(miniCluster, jobGraph, 1, PARALLELISM * 2); + + miniCluster.terminateTaskManager(0); + + waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, PARALLELISM); + + final ExecutionGraphInfo executionGraphInfo = + miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).join(); + runAdaptedParameterizedAssertion( + executionGraphInfo, + () -> { + assertThat(executionGraphInfo.getRescalesStatsSnapshot()).isNotNull(); + List<Rescale> rescaleHistory = + executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory(); + assertThat(rescaleHistory).hasSize(2); + assertThat(rescaleHistory.get(0).getTriggerCause()) + .isEqualTo(TriggerCause.RECOVERABLE_FAILOVER); + }); + } + + @TestTemplate + void testRecordInProgressRescale() throws Exception { + final MiniCluster miniCluster = miniClusterResource.getMiniCluster(); + final JobGraph jobGraph = createBlockingJobGraph(PARALLELISM); + + miniCluster.submitJob(jobGraph).join(); + + waitForVertexParallelismReachedAndJobRunning(jobGraph, JOB_VERTEX_ID, PARALLELISM); + + updateJobResourceRequirements(miniCluster, jobGraph, 1, PARALLELISM * 2); + + final ExecutionGraphInfo executionGraphInfo = + miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).join(); + runAdaptedParameterizedAssertion( + executionGraphInfo, + () -> { + assertThat(executionGraphInfo.getRescalesStatsSnapshot()).isNotNull(); + List<Rescale> rescaleHistory = + executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory(); + assertThat(rescaleHistory).hasSize(2); + Rescale rescale = rescaleHistory.get(0); + assertThat(rescale.getTerminatedReason()).isNull(); + assertThat(rescale.getTerminalState()).isNull(); + }); + } + + // Private methods. + private JobGraph createBlockingJobGraph(int parallelism) { + final JobVertex blockingOperator = new JobVertex("Blocking operator", JOB_VERTEX_ID); + SlotSharingGroup sharingGroup = new SlotSharingGroup(); + sharingGroup.setSlotSharingGroupName("slot-sharing-group-A"); + blockingOperator.setSlotSharingGroup(sharingGroup); + blockingOperator.setInvokableClass(OnceBlockingNoOpInvokable.class); + blockingOperator.setParallelism(parallelism); + final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(blockingOperator); + RestartStrategyUtils.configureFixedDelayRestartStrategy(jobGraph, 1, 0L); + return jobGraph; + } + + private void runAdaptedParameterizedAssertion( + ExecutionGraphInfo executionGraphInfo, + RunnableWithException assertionForEnabledRescaleHistory) + throws Exception { + if (enabledRescaleHistory(configuration)) { + assertionForEnabledRescaleHistory.run(); + } else { + assertThat(executionGraphInfo.getRescalesStatsSnapshot()).isNotNull(); + assertThat(executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory()).isEmpty(); + } + } + + private static boolean enabledRescaleHistory(Configuration configuration) { + return configuration.get(WebOptions.MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE) > 0; + } + + private static void assertTerminalRelatedFields( + Rescale rescale, TerminatedReason expectedTerminatedReason) { + TerminatedReason terminatedReason = rescale.getTerminatedReason(); + assertThat(terminatedReason).isEqualTo(expectedTerminatedReason); + assertThat(terminatedReason.getTerminalState()) + .isEqualTo(rescale.getTerminalState()) + .isEqualTo(expectedTerminatedReason.getTerminalState()); + } + + private static void assertTriggerCause(Rescale rescale, TriggerCause expectedTriggerCause) { + assertThat(rescale).isNotNull(); + assertThat(rescale.getTriggerCause()).isEqualTo(expectedTriggerCause); + } + + private void assertSlotSharingGroupRescaleNotNullBesidesPreRelatedFields( + SlotSharingGroupRescale slotSharingGroupRescale) { + assertThat(slotSharingGroupRescale.getDesiredSlots()).isNotNull(); + assertThat(slotSharingGroupRescale.getPostRescaleSlots()).isNotNull(); + assertThat(slotSharingGroupRescale.getSlotSharingGroupName()).isNotNull(); + assertThat(slotSharingGroupRescale.getMinimalRequiredSlots()).isNotNull(); + assertThat(slotSharingGroupRescale.getAcquiredResourceProfile()).isNotNull(); + assertThat(slotSharingGroupRescale.getRequiredResourceProfile()).isNotNull(); + assertThat(slotSharingGroupRescale.getSlotSharingGroupId()).isNotNull(); + } + + private void assertVertexParallelismRescaleNotNullBesidesPreRelatedFields( + VertexParallelismRescale vpr) { + assertThat(vpr.getDesiredParallelism()).isNotNull(); + assertThat(vpr.getPostRescaleParallelism()).isNotNull(); + assertThat(vpr.getSlotSharingGroupName()).isNotNull(); + assertThat(vpr.getSufficientParallelism()).isNotNull(); + assertThat(vpr.getJobVertexId()).isNotNull(); + assertThat(vpr.getSlotSharingGroupId()).isNotNull(); + assertThat(vpr.getSlotSharingGroupName()).isNotNull(); + } + + private void waitUntilConditionWithTimeout( + SupplierWithException<Boolean, Exception> condition, long timeoutMillis) + throws Exception { + CompletableFuture.runAsync( + () -> { + try { + CommonTestUtils.waitUntilCondition(condition); + } catch (Exception e) { + throw new RuntimeException(e); + } + }) + .get(timeoutMillis, TimeUnit.MILLISECONDS); + } + + private void waitForVertexParallelismReachedAndJobRunning( + JobGraph jobGraph, JobVertexID jobVertexId, int targetParallelism) throws Exception { + CommonTestUtils.waitUntilCondition( + () -> { + final ArchivedExecutionGraph archivedExecutionGraph = + miniClusterResource + .getMiniCluster() + .getArchivedExecutionGraph(jobGraph.getJobID()) + .get(); + final AccessExecutionJobVertex executionJobVertex = + archivedExecutionGraph.getAllVertices().get(jobVertexId); + if (executionJobVertex == null) { + // parallelism was not yet determined + return false; + } + return executionJobVertex.getParallelism() == targetParallelism; + }); + CommonTestUtils.waitUntilCondition( + () -> + miniClusterResource.getMiniCluster().getJobStatus(jobGraph.getJobID()).get() + == JobStatus.RUNNING); + } - @Disabled(DISABLED_DESCRIPTION) - @Test - void testRecordRescaleWithPreRescaleInfo() {} + private void updateJobResourceRequirements( + MiniCluster miniCluster, JobGraph jobGraph, int lowerBound, int upperBound) + throws ExecutionException, InterruptedException { + miniCluster + .updateJobResourceRequirements( + jobGraph.getJobID(), + JobResourceRequirements.newBuilder() + .setParallelismForJobVertex( + jobGraph.getVertices().iterator().next().getID(), + lowerBound, + upperBound) + .build()) + .get(); + } - @Disabled(DISABLED_DESCRIPTION) - @Test - void testRecordNonTerminatedRescaleMergingWithNewRecoverableFailureTriggerCause() {} + private boolean hasRescaleHistoryMetCondition( + List<Rescale> rescales, int expectedSize, TerminatedReason terminatedReasonOfLatest) { + return rescales.size() == expectedSize + && terminatedReasonOfLatest == rescales.get(0).getTerminatedReason(); + } - @Disabled(DISABLED_DESCRIPTION) - @Test - void testRecordInProgressRescale() {} + private static List<Rescale> getRescaleHistory(MiniCluster miniCluster, JobGraph jobGraph) + throws InterruptedException, ExecutionException { + ExecutionGraphInfo executionGraphInfo = + miniCluster.getExecutionGraphInfo(jobGraph.getJobID()).get(); + return executionGraphInfo.getRescalesStatsSnapshot().getRescaleHistory(); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java index 4e717aee344..d773f0ac165 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescalesSummaryTest.java @@ -85,5 +85,11 @@ class RescalesSummaryTest { assertThat(rescalesSummary.getInProgressRescalesCount()).isZero(); assertThat(rescalesSummary.getFailedRescalesCount()).isZero(); assertSummary(rescalesSummary.getCompletedRescalesSummary(), 1L, 1L, 1L, 1L, 1L); + assertThat( + rescalesSummary + .createSnapshot() + .getCompletedRescalesSummarySnapshot() + .getQuantile(1)) + .isOne(); } }
