This is an automated email from the ASF dual-hosted git repository.
wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 273b159c [runtime] Handle notifyCheckpointAborted to stop leaking
checkpoint entries (#667)
273b159c is described below
commit 273b159cb41908da541b6dfee8bee55157ff69ef
Author: Weiqing Yang <[email protected]>
AuthorDate: Fri May 15 20:33:10 2026 -0700
[runtime] Handle notifyCheckpointAborted to stop leaking checkpoint entries
(#667)
Issue #665. When Flink aborts a checkpoint, it calls notifyCheckpointAborted
instead of notifyCheckpointComplete. The DurableExecutionManager only
handled
the complete path, so the per-checkpoint sequence-number entry recorded by
snapshotLastCompletedSequenceNumbers was never released for aborted
checkpoints. Under sustained abort pressure (timeouts, alignment failures,
backend pressure), checkpointIdToSeqNums grew unboundedly.
Changes:
- Add DurableExecutionManager.notifyCheckpointAborted(long): removes the
entry from checkpointIdToSeqNums, guarded by the same actionStateStore
!= null check as notifyCheckpointComplete. Does NOT prune durable action
state — the aborted checkpoint's writes were never committed, so the
prior committed checkpoint's recovery state is still load-bearing and
must not be pruned.
- Add ActionExecutionOperator.notifyCheckpointAborted(long): thin override
that delegates to the manager and then calls super, mirroring the
existing notifyCheckpointComplete override.
- Extend the symmetric-guard invariant javadoc on
snapshotLastCompletedSequenceNumbers and notifyCheckpointComplete to
name both release paths (complete OR abort). The actionStateStore !=
null guard now lives on three methods; the cross-linked javadoc makes
that explicit and cites issues #645 and #665.
- Three new DurableExecutionManagerTest cases (using the existing
getCheckpointIdToSeqNums() @VisibleForTesting accessor introduced in
#659):
* notifyAbortedRemovesEntryWithoutPruning — entry released, durable
state untouched (verified against a real InMemoryActionStateStore so
wrongful pruning would be observable).
* completedAndAbortedInterleavedKeepsInFlightEntries — three in-flight
checkpoints, one completes (state pruned), one aborts (state preserved),
one remains.
* noStoreModeNotifyCheckpointAbortedIsNoOp — symmetric null-store no-op
coverage matching the existing notifyCheckpointComplete null-store case.
---
.../runtime/operator/ActionExecutionOperator.java | 6 +
.../runtime/operator/DurableExecutionManager.java | 53 +++++++--
.../operator/DurableExecutionManagerTest.java | 129 +++++++++++++++++++++
3 files changed, 179 insertions(+), 9 deletions(-)
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 957798f8..279d4652 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -511,6 +511,12 @@ public class ActionExecutionOperator<IN, OUT> extends
AbstractStreamOperator<OUT
super.notifyCheckpointComplete(checkpointId);
}
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) throws Exception {
+ durableExecManager.notifyCheckpointAborted(checkpointId);
+ super.notifyCheckpointAborted(checkpointId);
+ }
+
private MailboxProcessor getMailboxProcessor() throws Exception {
Field field =
MailboxExecutorImpl.class.getDeclaredField("mailboxProcessor");
field.setAccessible(true);
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
index 85c5df23..295b36c9 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
@@ -333,11 +333,15 @@ class DurableExecutionManager implements
ActionStatePersister, AutoCloseable {
* via {@link #snapshotLastCompletedSequenceNumbers}. After pruning, the
entry for that
* checkpoint is removed. No-op when durable execution is disabled.
*
- * <p><b>Invariant:</b> the {@code checkpointIdToSeqNums.remove} below and
the {@code put} in
- * {@link #snapshotLastCompletedSequenceNumbers} MUST share the same
{@code actionStateStore !=
- * null} guard. Dropping the guard on either side breaks the symmetry and
reintroduces the
+ * <p><b>Invariant:</b> the {@code checkpointIdToSeqNums.remove} below,
the {@code put} in
+ * {@link #snapshotLastCompletedSequenceNumbers}, and the {@code remove}
in {@link
+ * #notifyCheckpointAborted} MUST all share the same {@code
actionStateStore != null} guard.
+ * Every snapshotted entry is released by exactly one of the two paths —
Flink notifies either
+ * {@code notifyCheckpointComplete} OR {@code notifyCheckpointAborted} for
each checkpoint,
+ * never both. Dropping the guard on any side breaks the symmetry and
reintroduces the
* unbounded-map leak tracked by <a
href="https://github.com/apache/flink-agents/issues/645">
- * issue #645</a>.
+ * issue #645</a> (complete path) or <a
+ * href="https://github.com/apache/flink-agents/issues/665">issue #665</a>
(abort path).
*
* @param checkpointId the id of the completed checkpoint.
*/
@@ -352,6 +356,35 @@ class DurableExecutionManager implements
ActionStatePersister, AutoCloseable {
}
}
+ /**
+ * Releases the per-checkpoint sequence-number snapshot recorded by {@link
+ * #snapshotLastCompletedSequenceNumbers} when Flink aborts the checkpoint
instead of completing
+ * it. Unlike {@link #notifyCheckpointComplete}, this method does NOT
prune durable action
+ * state: the aborted checkpoint's writes were never committed, so the
previously-pruned-up-to
+ * point is still the {@code lastCompletedSequenceNumber} from the last
successful checkpoint,
+ * and any state recorded since is still load-bearing for recovery from
that prior checkpoint.
+ * We only release the in-memory tracking entry to prevent unbounded
growth of {@code
+ * checkpointIdToSeqNums} when checkpoints abort under sustained pressure
(issue #665).
+ *
+ * <p>Safe when no entry exists for {@code checkpointId} (e.g., abort
fires for a checkpoint
+ * this task never snapshotted): {@link Map#remove} returns {@code null}.
No-op when durable
+ * execution is disabled.
+ *
+ * <p><b>Invariant:</b> see {@link #snapshotLastCompletedSequenceNumbers}
— together with {@link
+ * #notifyCheckpointComplete}, this method shares the same {@code
actionStateStore != null}
+ * guard that releases entries recorded by the snapshot side. Dropping the
guard on any side
+ * breaks the symmetry and reintroduces the unbounded-map leak tracked by
<a
+ * href="https://github.com/apache/flink-agents/issues/645">issue #645</a>
(complete path) or <a
+ * href="https://github.com/apache/flink-agents/issues/665">issue #665</a>
(abort path).
+ *
+ * @param checkpointId the id of the aborted checkpoint.
+ */
+ void notifyCheckpointAborted(long checkpointId) {
+ if (actionStateStore != null) {
+ checkpointIdToSeqNums.remove(checkpointId);
+ }
+ }
+
void snapshotRecoveryMarker() throws Exception {
if (actionStateStore != null) {
Object recoveryMarker = actionStateStore.getRecoveryMarker();
@@ -371,11 +404,13 @@ class DurableExecutionManager implements
ActionStatePersister, AutoCloseable {
* strictly up to the sequence number that was committed by that
checkpoint. No-op when durable
* execution is disabled.
*
- * <p><b>Invariant:</b> the {@code checkpointIdToSeqNums.put} below and
the {@code remove} in
- * {@link #notifyCheckpointComplete(long)} MUST share the same {@code
actionStateStore != null}
- * guard. Dropping the guard on either side breaks the symmetry and
reintroduces the
- * unbounded-map leak tracked by <a
href="https://github.com/apache/flink-agents/issues/645">
- * issue #645</a>.
+ * <p><b>Invariant:</b> the {@code checkpointIdToSeqNums.put} below, the
{@code remove} in
+ * {@link #notifyCheckpointComplete(long)}, and the {@code remove} in
{@link
+ * #notifyCheckpointAborted(long)} MUST all share the same {@code
actionStateStore != null}
+ * guard. Dropping the guard on any side breaks the symmetry and
reintroduces the unbounded-map
+ * leak tracked by <a
href="https://github.com/apache/flink-agents/issues/645">issue #645</a>
+ * (complete path) or <a
href="https://github.com/apache/flink-agents/issues/665">issue #665</a>
+ * (abort path).
*
* @param keyedStateBackend the keyed state backend to scan.
* @param checkpointId the id of the checkpoint being snapshotted.
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
index f5fc2a17..4d62dc16 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
@@ -28,6 +28,7 @@ import
org.apache.flink.agents.runtime.context.RunnerContextImpl;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateFunction;
import org.apache.flink.runtime.state.OperatorStateBackend;
@@ -240,4 +241,132 @@ class DurableExecutionManagerTest {
dem.close();
}
+
+ @Test
+ void notifyAbortedRemovesEntryWithoutPruning() throws Exception {
+ // Use a cleanup-enabled in-memory store so pruning would be
observable if it
+ // (incorrectly) fired on abort. doCleanup=true makes pruneState
remove the keyed entry
+ // from getKeyedActionStates().
+ InMemoryActionStateStore store = new InMemoryActionStateStore(true);
+ DurableExecutionManager dem = new DurableExecutionManager(store);
+
+ Action action = TestActions.noopAction();
+ Event event = new InputEvent(1L);
+ String key = "key-a";
+ long seq = 7L;
+ long checkpointId = 1L;
+
+ // Seed durable state for the key/seq pair.
+ dem.maybeInitActionState(key, seq, action, event);
+ assertThat(store.getKeyedActionStates()).containsKey(key);
+
+ // Snapshot the per-key sequence number for this checkpoint. Stub the
backend so that
+ // applyToAllKeys invokes the provided KeyedStateFunction once with
our seeded key and a
+ // ValueState that returns the seeded sequence number.
+ @SuppressWarnings("unchecked")
+ KeyedStateBackend<Object> backend = mock(KeyedStateBackend.class);
+ stubApplyToAllKeysSingle(backend, key, seq);
+ dem.snapshotLastCompletedSequenceNumbers(backend, checkpointId);
+ assertThat(dem.getCheckpointIdToSeqNums()).containsKey(checkpointId);
+
+ // Abort the checkpoint.
+ dem.notifyCheckpointAborted(checkpointId);
+
+ // The in-memory tracking entry is released.
+
assertThat(dem.getCheckpointIdToSeqNums()).doesNotContainKey(checkpointId);
+ // The durable state was NOT pruned — the key is still present in the
store.
+ assertThat(store.getKeyedActionStates()).containsKey(key);
+
+ dem.close();
+ }
+
+ @Test
+ void completedAndAbortedInterleavedKeepsInFlightEntries() throws Exception
{
+ // doCleanup=true so a completed checkpoint's pruneState calls are
observable as a
+ // disappearance from getKeyedActionStates().
+ InMemoryActionStateStore store = new InMemoryActionStateStore(true);
+ DurableExecutionManager dem = new DurableExecutionManager(store);
+
+ Action action = TestActions.noopAction();
+ Event eventA = new InputEvent(1L);
+ Event eventB = new InputEvent(2L);
+ Event eventC = new InputEvent(3L);
+ String keyA = "key-a";
+ String keyB = "key-b";
+ String keyC = "key-c";
+
+ // Seed durable state for three distinct keys, one per upcoming
checkpoint.
+ dem.maybeInitActionState(keyA, 10L, action, eventA);
+ dem.maybeInitActionState(keyB, 20L, action, eventB);
+ dem.maybeInitActionState(keyC, 30L, action, eventC);
+ assertThat(store.getKeyedActionStates()).containsKeys(keyA, keyB,
keyC);
+
+ // Snapshot three checkpoints. Each snapshot is stubbed to record
exactly one key with its
+ // seeded sequence number, so the per-checkpoint map is fully
deterministic.
+ @SuppressWarnings("unchecked")
+ KeyedStateBackend<Object> backendA = mock(KeyedStateBackend.class);
+ stubApplyToAllKeysSingle(backendA, keyA, 10L);
+ dem.snapshotLastCompletedSequenceNumbers(backendA, 1L);
+
+ @SuppressWarnings("unchecked")
+ KeyedStateBackend<Object> backendB = mock(KeyedStateBackend.class);
+ stubApplyToAllKeysSingle(backendB, keyB, 20L);
+ dem.snapshotLastCompletedSequenceNumbers(backendB, 2L);
+
+ @SuppressWarnings("unchecked")
+ KeyedStateBackend<Object> backendC = mock(KeyedStateBackend.class);
+ stubApplyToAllKeysSingle(backendC, keyC, 30L);
+ dem.snapshotLastCompletedSequenceNumbers(backendC, 3L);
+
+ assertThat(dem.getCheckpointIdToSeqNums()).containsKeys(1L, 2L, 3L);
+
+ // Complete checkpoint 1 (prunes keyA) and abort checkpoint 2
(releases entry without
+ // pruning keyB). Checkpoint 3 stays in-flight.
+ dem.notifyCheckpointComplete(1L);
+ dem.notifyCheckpointAborted(2L);
+
+ // Only the in-flight checkpoint entry remains.
+ assertThat(dem.getCheckpointIdToSeqNums()).containsOnlyKeys(3L);
+
+ // Completed checkpoint's durable state was pruned away.
+ assertThat(store.getKeyedActionStates()).doesNotContainKey(keyA);
+ // Aborted checkpoint's durable state is untouched.
+ assertThat(store.getKeyedActionStates()).containsKey(keyB);
+ // In-flight checkpoint's durable state is untouched.
+ assertThat(store.getKeyedActionStates()).containsKey(keyC);
+
+ dem.close();
+ }
+
+ @Test
+ void noStoreModeNotifyCheckpointAbortedIsNoOp() {
+ DurableExecutionManager dem = new DurableExecutionManager(null);
+
+ // Should not throw and should not populate any per-checkpoint map
entries.
+ dem.notifyCheckpointAborted(42L);
+
+ assertThat(dem.getCheckpointIdToSeqNums()).isEmpty();
+ }
+
+ /**
+ * Stubs {@code backend.applyToAllKeys(...)} to invoke the supplied {@link
KeyedStateFunction}
+ * exactly once with the given key and a {@link ValueState} mock that
returns the given sequence
+ * number. This mirrors the per-key iteration shape used by {@link
+ * DurableExecutionManager#snapshotLastCompletedSequenceNumbers}.
+ */
+ @SuppressWarnings("unchecked")
+ private static void stubApplyToAllKeysSingle(
+ KeyedStateBackend<Object> backend, Object key, long
sequenceNumber) throws Exception {
+ ValueState<Long> valueState = mock(ValueState.class);
+ when(valueState.value()).thenReturn(sequenceNumber);
+ doAnswer(
+ invocation -> {
+ KeyedStateFunction<Object, ValueState<Long>>
function =
+ invocation.getArgument(3);
+ function.process(key, valueState);
+ return null;
+ })
+ .when(backend)
+ .applyToAllKeys(any(), any(), any(ValueStateDescriptor.class),
any());
+ }
}