XComp commented on code in PR #27921:
URL: https://github.com/apache/flink/pull/27921#discussion_r3218138993
##########
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##########
@@ -745,6 +745,25 @@ public InlineElement getDescription() {
.key()))
.build());
+ @Documentation.Section({
+ Documentation.Sections.EXPERT_SCHEDULING,
+ Documentation.Sections.ALL_JOB_MANAGER
+ })
+ public static final ConfigOption<Boolean>
SCHEDULER_RESCALE_TRIGGER_ACTIVE_CHECKPOINT_ENABLED =
+
key("jobmanager.adaptive-scheduler.rescale-trigger.active-checkpoint.enabled")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ Description.builder()
+ .text(
+ "When enabled, the Adaptive
Scheduler actively triggers a checkpoint when resources change and rescaling is
desired, "
+ + "rather than waiting for
the next periodic checkpoint. "
+ + "This reduces rescaling
latency, especially when checkpoint intervals are large. "
+ + "The active trigger
respects %s and "
+ + "will not fire if a
checkpoint is already in progress or being triggered.",
+
code("execution.checkpointing.min-pause"))
Review Comment:
```suggestion
code(CheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS.key()))
```
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/DefaultStateTransitionManager.java:
##########
@@ -164,6 +164,10 @@ private void progressToStabilized(Temporal
firstChangeEventTimestamp) {
progressToPhase(new Stabilized(clock, this, firstChangeEventTimestamp,
maxTriggerDelay));
}
+ void requestActiveCheckpointTrigger() {
Review Comment:
```suggestion
private void requestActiveCheckpointTrigger() {
```
we can reduce the access level because the `DefaultStateTransitionManager`
packages its state implementations in the same class. No need to expose it on a
package level.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -182,6 +189,103 @@ public ScheduledFuture<?> scheduleOperation(Runnable
callback, Duration delay) {
return context.runIfState(this, callback, delay);
}
+ @Override
+ public void requestActiveCheckpointTrigger() {
+ if (!activeCheckpointTriggerEnabled) {
+ return;
+ }
+
+ final CheckpointCoordinator checkpointCoordinator =
+ getExecutionGraph().getCheckpointCoordinator();
+ if (!shouldTriggerActiveCheckpoint(checkpointCoordinator)) {
+ return;
+ }
+
+ final long triggerDelayMillis =
checkpointCoordinator.getActiveCheckpointTriggerDelay();
+ if (triggerDelayMillis < 0L) {
+ getLogger()
+ .debug(
+ "Skipping active checkpoint trigger for rescale:
checkpoint already in progress.");
+ return;
+ }
+ if (triggerDelayMillis > 0L) {
Review Comment:
do we gain anything from this condition? We could just schedule the trigger
in both cases. WDYT?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -182,6 +189,103 @@ public ScheduledFuture<?> scheduleOperation(Runnable
callback, Duration delay) {
return context.runIfState(this, callback, delay);
}
+ @Override
+ public void requestActiveCheckpointTrigger() {
+ if (!activeCheckpointTriggerEnabled) {
+ return;
+ }
+
+ final CheckpointCoordinator checkpointCoordinator =
+ getExecutionGraph().getCheckpointCoordinator();
+ if (!shouldTriggerActiveCheckpoint(checkpointCoordinator)) {
+ return;
+ }
+
+ final long triggerDelayMillis =
checkpointCoordinator.getActiveCheckpointTriggerDelay();
+ if (triggerDelayMillis < 0L) {
+ getLogger()
+ .debug(
+ "Skipping active checkpoint trigger for rescale:
checkpoint already in progress.");
+ return;
+ }
+ if (triggerDelayMillis > 0L) {
+ scheduleActiveCheckpointTriggerRetry(triggerDelayMillis);
+ return;
+ }
+ fireActiveCheckpointTrigger(checkpointCoordinator);
+ }
+
+ private boolean shouldTriggerActiveCheckpoint(
+ @Nullable CheckpointCoordinator checkpointCoordinator) {
+ if (checkpointCoordinator == null
+ || !checkpointCoordinator.isPeriodicCheckpointingConfigured())
{
+ getLogger()
+ .debug(
+ "Skipping active checkpoint trigger for rescale:
checkpointing not configured.");
+ return false;
+ }
+ if (!parallelismChanged()) {
+ getLogger()
+ .debug(
+ "Skipping active checkpoint trigger for rescale:
parallelism unchanged.");
+ return false;
+ }
+ return true;
+ }
+
+ private void scheduleActiveCheckpointTriggerRetry(long delayMillis) {
+ if (activeCheckpointTriggerScheduled) {
+ return;
+ }
+ activeCheckpointTriggerScheduled = true;
+ getLogger()
+ .debug(
+ "Min pause not satisfied, scheduling active checkpoint
trigger retry in {} ms.",
+ delayMillis);
+ context.runIfState(
+ this, this::tryFireActiveCheckpointAfterRetry,
Duration.ofMillis(delayMillis));
+ }
+
+ private void tryFireActiveCheckpointAfterRetry() {
+ activeCheckpointTriggerScheduled = false;
+ final CheckpointCoordinator checkpointCoordinator =
+ getExecutionGraph().getCheckpointCoordinator();
+ if (!shouldTriggerActiveCheckpoint(checkpointCoordinator)) {
+ return;
+ }
+ if (checkpointCoordinator.getActiveCheckpointTriggerDelay() == 0L) {
+ fireActiveCheckpointTrigger(checkpointCoordinator);
+ }
Review Comment:
should we add an else branch with a log message? Otherwise, the checkpoint
trigger is silently dropped if another periodic checkpoint is completed while
the trigger is scheduled. WDYT?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java:
##########
@@ -89,5 +89,24 @@ interface Context extends RescaleContext {
* @return the {@link JobID} of the job
*/
JobID getJobId();
+
+ /**
+ * Requests the context to actively trigger a checkpoint to expedite
rescaling. Called by
+ * the {@link DefaultStateTransitionManager} from within phase
lifecycle methods:
Review Comment:
Although, it's an internal interface, we might not want to add documentation
of implementations in the interface itself. How it's done in the end is an
implementation detail, no?
That documentation is valid but might be better suited in the newly added
`DefaultStateTransitionManager#requestActiveCheckpointTrigger`. WDYT?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/Executing.java:
##########
@@ -64,6 +67,8 @@ class Executing extends StateWithExecutionGraph
private final StateTransitionManager stateTransitionManager;
private final int rescaleOnFailedCheckpointCount;
+ private final boolean activeCheckpointTriggerEnabled;
+ private boolean activeCheckpointTriggerScheduled;
Review Comment:
```suggestion
// only modifiable from the main thread
private boolean activeCheckpointTriggerScheduled;
```
to make the synchronization log more explicit
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -556,6 +562,452 @@ public CheckpointCoordinator getCheckpointCoordinator() {
}
}
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenDisabled() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ final AtomicBoolean coordinatorAccessed = new AtomicBoolean(false);
+ CheckpointCoordinator coordinator =
+ new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ coordinatorAccessed.set(true);
+ return coordinator;
+ }
+ };
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(false)
+ .build(ctx);
+
+ exec.requestActiveCheckpointTrigger();
+ assertThat(coordinatorAccessed.get()).isFalse();
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenNoCoordinator() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new
MockExecutionJobVertex(MockExecutionVertex::new);
+ // MockExecutionGraph.getCheckpointCoordinator() returns null
Review Comment:
That's an obvious one - the comment doesn't add much value, does it?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -556,6 +562,452 @@ public CheckpointCoordinator getCheckpointCoordinator() {
}
}
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenDisabled() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ final AtomicBoolean coordinatorAccessed = new AtomicBoolean(false);
+ CheckpointCoordinator coordinator =
+ new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .build(EXECUTOR_EXTENSION.getExecutor());
+ StateTrackingMockExecutionGraph graph =
+ new StateTrackingMockExecutionGraph() {
+ @Nullable
+ @Override
+ public CheckpointCoordinator
getCheckpointCoordinator() {
+ coordinatorAccessed.set(true);
+ return coordinator;
+ }
+ };
+ Executing exec =
+ new ExecutingStateBuilder()
+ .setExecutionGraph(graph)
+ .setActiveCheckpointTriggerEnabled(false)
+ .build(ctx);
+
+ exec.requestActiveCheckpointTrigger();
+ assertThat(coordinatorAccessed.get()).isFalse();
+ }
+ }
+
+ @Test
+ void testActiveCheckpointTriggerSkipsWhenNoCoordinator() throws Exception {
+ try (MockExecutingContext ctx = new MockExecutingContext()) {
+ MockExecutionJobVertex mejv = new
MockExecutionJobVertex(MockExecutionVertex::new);
Review Comment:
Why do we have this variable if we never verify its usage in the test?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitionManager.java:
##########
@@ -89,5 +89,24 @@ interface Context extends RescaleContext {
* @return the {@link JobID} of the job
*/
JobID getJobId();
+
+ /**
+ * Requests the context to actively trigger a checkpoint to expedite
rescaling. Called by
+ * the {@link DefaultStateTransitionManager} from within phase
lifecycle methods:
+ *
+ * <ul>
+ * <li>On entering {@link DefaultStateTransitionManager.Stabilizing}
(to overlap
+ * checkpoint with the stabilization wait)
+ * <li>On each {@link
DefaultStateTransitionManager.Stabilizing#onChange} event (retry if
+ * a previous trigger was skipped)
+ * <li>On entering {@link DefaultStateTransitionManager.Stabilized}
(fallback if no
+ * checkpoint completed during stabilization)
+ * </ul>
+ *
+ * <p>The implementation decides whether to actually trigger based on
its own guard
+ * conditions (e.g., checkpointing enabled, no checkpoint in progress,
config flag).
+ * Multiple calls are safe; guards prevent redundant triggers.
Review Comment:
Isn't that an implementation detail that belongs to the `AdaptiveScheduler`?
The `StateTransitionManager` only forwards the event to the
`AdaptiveScheduler`. It doesn't control the checkpoint-related guardrails.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -1999,6 +1999,21 @@ public long getCheckpointTimeout() {
return checkpointTimeout;
}
+ /**
+ * Returns {@code -1} if a checkpoint is already in flight, otherwise the
remaining time (in ms)
+ * until {@code minPauseBetweenCheckpoints} is satisfied ({@code 0} =
trigger now). All checks
+ * are made under the coordinator lock.
+ */
+ public long getActiveCheckpointTriggerDelay() {
Review Comment:
Wouldn't `Optional<Duration>` a more straightforward return type to serve a
cleaner interface w/o needing to define the contract in JavaDoc? Or why did we
decide to use `long` here?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -1999,6 +1999,21 @@ public long getCheckpointTimeout() {
return checkpointTimeout;
}
+ /**
+ * Returns {@code -1} if a checkpoint is already in flight, otherwise the
remaining time (in ms)
+ * until {@code minPauseBetweenCheckpoints} is satisfied ({@code 0} =
trigger now). All checks
+ * are made under the coordinator lock.
+ */
+ public long getActiveCheckpointTriggerDelay() {
+ synchronized (lock) {
+ if (isTriggering || !pendingCheckpoints.isEmpty()) {
+ return -1L;
+ }
+ final long elapsed = clock.relativeTimeMillis() -
lastCheckpointCompletionRelativeTime;
Review Comment:
Should we handle the case where no checkpoint was created
(`lastCheckpointCompletionRelativeTime=0`), yet, explicitly and mention it in
the JavaDoc?
##########
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/ExecutingTest.java:
##########
@@ -556,6 +562,452 @@ public CheckpointCoordinator getCheckpointCoordinator() {
}
}
+ @Test
Review Comment:
Can we add a test case to check what happens if `stopWithSavepoint` is
called concurrently?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]