Copilot commented on code in PR #27539:
URL: https://github.com/apache/flink/pull/27539#discussion_r2872310223
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java:
##########
@@ -31,8 +32,17 @@ public interface StateTransitionManager {
/**
* Is called if the environment changed in a way that a state transition
could be considered.
+ *
+ * @param newResourceDriven Whether the onchange is driven by new
available resources.
Review Comment:
JavaDoc typo/wording: `onchange` should be `onChange`, and the sentence can
be clarified (e.g., “Whether the onChange is driven by newly available
resources.”).
```suggestion
* @param newResourceDriven Whether the onChange is driven by newly
available resources.
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java:
##########
@@ -2699,6 +2700,22 @@ public DummyState(StateWithoutExecutionGraph.Context
context, JobStatus jobStatu
this.jobStatus = jobStatus;
}
+ @Override
+ public Durable getDurable() {
+ return null;
+ }
+
+ @Override
+ public void cancel() {}
+
+ @Override
+ public void suspend(Throwable cause) {}
+
+ @Override
+ public JobID getJobId() {
+ return null;
Review Comment:
These overrides in `DummyState` return `null` / no-op for methods that are
already correctly implemented by `StateWithoutExecutionGraph` (e.g.,
`getDurable()` is used by `State#onLeave` and must not be null). This can cause
NPEs and makes the test state behave unlike real states. Remove these overrides
or delegate to `super`/`context` instead of returning null.
```suggestion
return super.getDurable();
}
@Override
public void cancel() {
super.cancel();
}
@Override
public void suspend(Throwable cause) {
super.suspend(cause);
}
@Override
public JobID getJobId() {
return super.getJobId();
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -1101,6 +1130,27 @@ public void
updateJobResourceRequirements(JobResourceRequirements jobResourceReq
}
}
+ private void recordRescaleForNewResourceRequirements() {
+ rescaleTimeline.updateRescale(
+ rescale ->
+ rescale.addSchedulerState(state)
+
.setTerminatedReason(TerminatedReason.RESOURCE_REQUIREMENTS_UPDATED)
+ .setEndTimestamp(Instant.now().toEpochMilli())
+ .log());
Review Comment:
In `recordRescaleForNewResourceRequirements()`, the first
`updateRescale(...)` will emit WARN logs and be ignored if the timeline is
idling/terminated. If this is expected, please guard the update with
`!rescaleTimeline.isIdling()` to keep logs clean (similar to the
`goToCanceling` guard).
```suggestion
if (!rescaleTimeline.isIdling()) {
rescaleTimeline.updateRescale(
rescale ->
rescale.addSchedulerState(state)
.setTerminatedReason(
TerminatedReason.RESOURCE_REQUIREMENTS_UPDATED)
.setEndTimestamp(Instant.now().toEpochMilli())
.log());
}
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -1349,6 +1452,12 @@ public CompletableFuture<String> goToStopWithSavepoint(
@Override
public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) {
+ rescaleTimeline.updateRescale(
+ rescale ->
+ rescale.addSchedulerState(state)
+ .setEndTimestamp(Instant.now().toEpochMilli())
+
.setTerminatedReason(TerminatedReason.JOB_FINISHED)
+ .log());
Review Comment:
`goToFinished(...)` updates the rescale timeline unconditionally. If there
is no active rescale (idling/terminated),
`DefaultRescaleTimeline.updateRescale()` logs WARNs and ignores the update;
this can lead to noisy logs on every normal job completion. Consider guarding
with `if (!rescaleTimeline.isIdling())` (or otherwise ensuring a rescale
exists) before updating.
```suggestion
if (!rescaleTimeline.isIdling()) {
rescaleTimeline.updateRescale(
rescale ->
rescale.addSchedulerState(state)
.setEndTimestamp(Instant.now().toEpochMilli())
.setTerminatedReason(TerminatedReason.JOB_FINISHED)
.log());
}
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -1304,13 +1365,55 @@ public void goToRestarting(
}
}
+ private void recordRescaleForJobRestarting(VertexParallelism
restartWithParallelism) {
+ if (restartWithParallelism == null) {
+ // For the failover restarting.
+ if (!rescaleTimeline.isIdling()) {
+ // Process by
https://lists.apache.org/thread/hh7w2p6lnmbo1q6d9ngkttdyrw4lp74h.
+ LOG.info(
+ "Merge the current non-terminated rescale and the new
rescale triggered by recoverable failover into the current rescale.");
+ rescaleTimeline.updateRescale(Rescale::clearSchedulerStates);
+ } else if (rescaleTimeline.isIdling()) {
+ rescaleTimeline.newRescale(false);
+ }
+ rescaleTimeline.updateRescale(
+ rescale ->
+
rescale.setStartTimestamp(Instant.now().toEpochMilli())
+
.setTriggerCause(TriggerCause.RECOVERABLE_FAILOVER)
+ .setMinimalRequiredSlots(jobInformation)
+ .setPreRescaleSlotsAndParallelisms(
+ jobInformation,
+ rescaleTimeline.getLatestRescale(
+ TerminalState.COMPLETED))
+
.setDesiredVertexParallelism(jobInformation)
+ .setDesiredSlots(jobInformation)
+ .log());
+ } else {
+ // For the normal rescaling restarting.
+ rescaleTimeline.updateRescale(
+ rescale ->
+ rescale.setMinimalRequiredSlots(jobInformation)
+
.setDesiredVertexParallelism(jobInformation)
+ .setDesiredSlots(jobInformation)
+ .log());
+ }
+ }
+
@Override
public void goToFailing(
ExecutionGraph executionGraph,
ExecutionGraphHandler executionGraphHandler,
OperatorCoordinatorHandler operatorCoordinatorHandler,
Throwable failureCause,
List<ExceptionHistoryEntry> failureCollection) {
+ rescaleTimeline.updateRescale(
+ rescale ->
+ rescale.setEndTimestamp(Instant.now().toEpochMilli())
+ .addSchedulerState(state, failureCause)
+
.setTerminatedReason(TerminatedReason.JOB_FAILED)
+ .setStringifiedException(
+
ExceptionUtils.stringifyException(failureCause))
+ .log());
Review Comment:
`goToFailing(...)` updates the rescale timeline unconditionally. If there is
no active rescale (idling/terminated), `DefaultRescaleTimeline.updateRescale()`
logs WARNs and ignores the update; this can create noisy logs during normal job
failures unrelated to rescaling. Consider guarding with `if
(!rescaleTimeline.isIdling())` (like `goToCanceling`).
```suggestion
if (!rescaleTimeline.isIdling()) {
rescaleTimeline.updateRescale(
rescale ->
rescale.setEndTimestamp(Instant.now().toEpochMilli())
.addSchedulerState(state, failureCause)
.setTerminatedReason(TerminatedReason.JOB_FAILED)
.setStringifiedException(
ExceptionUtils.stringifyException(failureCause))
.log());
}
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -1618,6 +1727,9 @@ <T extends State> T transitionToState(StateFactory<T>
targetState) {
final JobStatus previousJobStatus = state.getJobStatus();
state.onLeave(targetState.getStateClass());
+
+ rescaleTimeline.updateRescale(rescale ->
rescale.addSchedulerState(state));
Review Comment:
`transitionToState(...)` always calls `rescaleTimeline.updateRescale(...)`
on state transitions. When there is no active rescale,
`DefaultRescaleTimeline.updateRescale()` logs WARNs and ignores the update,
potentially producing warnings on many (or all) state transitions. Consider
guarding this call with `if (!rescaleTimeline.isIdling())` or making the
timeline implementation tolerate idling updates without WARNs.
```suggestion
if (!rescaleTimeline.isIdling()) {
rescaleTimeline.updateRescale(rescale ->
rescale.addSchedulerState(state));
}
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/timeline/RescaleTimelineITCase.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptive.timeline;
+
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test for recording rescale history by {@link DefaultRescaleTimeline} or
{@link
+ * RescaleTimeline.NoOpRescaleTimeline}.
+ */
+class RescaleTimelineITCase {
+ static final String DISABLED_DESCRIPTION =
+ "TODO: Blocked by FLINK-38343, the ITCases need the
SchedulerNG#requstJob() to get the rescale history.";
Review Comment:
The disabled-test description contains a typo: `SchedulerNG#requstJob()`
should be `SchedulerNG#requestJob()` (and consider fixing the sentence grammar
while you are here).
```suggestion
"TODO: Blocked by FLINK-38343. These IT cases require
SchedulerNG#requestJob() to get the rescale history.";
```
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManagerTest.java:
##########
@@ -511,6 +512,16 @@ public TestingStateTransitionManagerContext
withSufficientResources() {
// StateTransitionManager.Context interface methods
// ///////////////////////////////////////////////
+ @Override
+ public State schedulerstate() {
+ throw new UnsupportedOperationException();
Review Comment:
`schedulerstate()` currently throws `UnsupportedOperationException`. Some
new production paths call this method (e.g., rescale-recording logic in
`DefaultStateTransitionManager.onChange(boolean)`), which can make these tests
fail unexpectedly once `onChange(true)` is exercised. Prefer returning a stable
stub `State` (or `null`) that won’t trigger rescale-recording branches, instead
of throwing.
```suggestion
return null;
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java:
##########
@@ -292,14 +307,30 @@ static final class Idling extends Phase {
private Idling(Supplier<Temporal> clock, DefaultStateTransitionManager
context) {
super(clock, context);
+ recordRescaleForNoResourcesOrNoParallelismChange();
}
@Override
- void onChange() {
+ void onChange(boolean newResourceDriven) {
if (hasSufficientResources()) {
+ recordRescaleForNewAvailableResources(context(),
newResourceDriven);
context().progressToStabilizing(now());
}
}
+
+ private void recordRescaleForNoResourcesOrNoParallelismChange() {
+ if (context().transitionContext.schedulerstate() instanceof
Executing) {
+ context()
+ .getRescaleTimeline()
+ .updateRescale(
+ rescale ->
+ rescale.setTerminatedReason(
+ TerminatedReason
+
.NO_RESOURCES_OR_PARALLELISMS_CHANGE)
+
.setEndTimestamp(Instant.now().toEpochMilli())
+
.addSchedulerState(context().schedulerState()));
+ }
Review Comment:
`recordRescaleForNoResourcesOrNoParallelismChange()` updates the rescale
unconditionally once the scheduler state is `Executing`, but
`RescaleTimeline.updateRescale()` logs WARNs when there is no active rescale
(idling/terminated) and ignores the update. Consider guarding this with
`!getRescaleTimeline().isIdling()` (or otherwise ensuring a rescale exists) to
avoid noisy logs and no-op updates.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java:
##########
@@ -1304,13 +1365,55 @@ public void goToRestarting(
}
}
+ private void recordRescaleForJobRestarting(VertexParallelism
restartWithParallelism) {
+ if (restartWithParallelism == null) {
+ // For the failover restarting.
+ if (!rescaleTimeline.isIdling()) {
+ // Process by
https://lists.apache.org/thread/hh7w2p6lnmbo1q6d9ngkttdyrw4lp74h.
+ LOG.info(
+ "Merge the current non-terminated rescale and the new
rescale triggered by recoverable failover into the current rescale.");
+ rescaleTimeline.updateRescale(Rescale::clearSchedulerStates);
+ } else if (rescaleTimeline.isIdling()) {
Review Comment:
This `else if (rescaleTimeline.isIdling())` branch is redundant because it’s
the negation of the preceding `if (!rescaleTimeline.isIdling())`. Simplifying
this improves readability and avoids suggesting different semantics between the
two checks.
```suggestion
} else {
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java:
##########
@@ -49,6 +59,10 @@ default void close() {}
*/
interface Context {
+ State schedulerstate();
Review Comment:
Method name `schedulerstate()` doesn’t follow the usual camelCase convention
(should be `schedulerState()`), and the misspelling propagates into call sites.
Renaming this now will improve readability and avoid the appearance of a typo
in a core runtime interface.
```suggestion
State schedulerState();
```
##########
flink-core/src/main/java/org/apache/flink/configuration/WebOptions.java:
##########
@@ -141,6 +141,20 @@ public class WebOptions {
.withDescription(
"The maximum number of failures collected by the
exception history per job.");
+ /** The maximum number of the adaptive scheduler rescale history. */
+ @Documentation.Section(Documentation.Sections.ALL_JOB_MANAGER)
+ public static final ConfigOption<Integer>
MAX_ADAPTIVE_SCHEDULER_RESCALE_HISTORY_SIZE =
+ key("web.adaptive-scheduler.rescale-history.size")
+ .intType()
+ .defaultValue(0)
+ .withDescription(
+ Description.builder()
+ .text(
+ "The maximum number of the rescale
records per job whose scheduler is %s. "
+ + "The feature will be
disabled when the configuration value is smaller or equals to 0.",
Review Comment:
Grammar in the config description: “smaller or equals to 0” should be
“smaller than or equal to 0”.
```suggestion
+ "The feature will be
disabled when the configuration value is smaller than or equal to 0.",
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -149,19 +175,31 @@ public ScheduledFuture<?> scheduleOperation(Runnable
callback, Duration delay) {
@Override
public void transitionToSubsequentState() {
+ Optional<VertexParallelism> availableVertexParallelism =
+ context.getAvailableVertexParallelism();
+ if (!availableVertexParallelism.isPresent()) {
+ IllegalStateException exception =
+ new IllegalStateException("Resources must be available
when rescaling.");
+ recordRescaleForNoResourcesEnough(exception);
+ throw exception;
+ }
context.goToRestarting(
getExecutionGraph(),
getExecutionGraphHandler(),
getOperatorCoordinatorHandler(),
Duration.ofMillis(0L),
- context.getAvailableVertexParallelism()
- .orElseThrow(
- () ->
- new IllegalStateException(
- "Resources must be available
when rescaling.")),
+ availableVertexParallelism.get(),
getFailures());
}
+ private void recordRescaleForNoResourcesEnough(IllegalStateException
exception) {
+ context.getRescaleTimeline()
+ .updateRescale(
+ rescale ->
+ rescale.setStringifiedException(
+
ExceptionUtils.stringifyException(exception)));
Review Comment:
`recordRescaleForNoResourcesEnough` only stores the stringified exception,
but it doesn’t terminate the rescale (no `TerminatedReason.EXCEPTION_OCCURRED`
/ end timestamp). Since the method throws immediately afterward, the current
rescale will remain in-progress and may never be sealed, which can make rescale
history inaccurate (and makes it hard to test the “resources not enough
exception” termination case).
```suggestion
rescale -> {
rescale.setStringifiedException(
ExceptionUtils.stringifyException(exception));
rescale.setTerminatedReason(TerminatedReason.EXCEPTION_OCCURRED);
rescale.setEndTime(Instant.now());
});
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]