This is an automated email from the ASF dual-hosted git repository.

wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new 273b159c [runtime] Handle notifyCheckpointAborted to stop leaking 
checkpoint entries (#667)
273b159c is described below

commit 273b159cb41908da541b6dfee8bee55157ff69ef
Author: Weiqing Yang <[email protected]>
AuthorDate: Fri May 15 20:33:10 2026 -0700

    [runtime] Handle notifyCheckpointAborted to stop leaking checkpoint entries 
(#667)
    
    Issue #665. When Flink aborts a checkpoint, it calls notifyCheckpointAborted
    instead of notifyCheckpointComplete. The DurableExecutionManager only 
handled
    the complete path, so the per-checkpoint sequence-number entry recorded by
    snapshotLastCompletedSequenceNumbers was never released for aborted
    checkpoints. Under sustained abort pressure (timeouts, alignment failures,
    backend pressure), checkpointIdToSeqNums grew unboundedly.
    
    Changes:
    
    - Add DurableExecutionManager.notifyCheckpointAborted(long): removes the
      entry from checkpointIdToSeqNums, guarded by the same actionStateStore
      != null check as notifyCheckpointComplete. Does NOT prune durable action
      state — the aborted checkpoint's writes were never committed, so the
      prior committed checkpoint's recovery state is still load-bearing and
      must not be pruned.
    
    - Add ActionExecutionOperator.notifyCheckpointAborted(long): thin override
      that delegates to the manager and then calls super, mirroring the
      existing notifyCheckpointComplete override.
    
    - Extend the symmetric-guard invariant javadoc on
      snapshotLastCompletedSequenceNumbers and notifyCheckpointComplete to
      name both release paths (complete OR abort). The actionStateStore !=
      null guard now lives on three methods; the cross-linked javadoc makes
      that explicit and cites issues #645 and #665.
    
    - Three new DurableExecutionManagerTest cases (using the existing
      getCheckpointIdToSeqNums() @VisibleForTesting accessor introduced in
      #659):
      * notifyAbortedRemovesEntryWithoutPruning — entry released, durable
        state untouched (verified against a real InMemoryActionStateStore so
        wrongful pruning would be observable).
      * completedAndAbortedInterleavedKeepsInFlightEntries — three in-flight
        checkpoints, one completes (state pruned), one aborts (state preserved),
        one remains.
      * noStoreModeNotifyCheckpointAbortedIsNoOp — symmetric null-store no-op
        coverage matching the existing notifyCheckpointComplete null-store case.
---
 .../runtime/operator/ActionExecutionOperator.java  |   6 +
 .../runtime/operator/DurableExecutionManager.java  |  53 +++++++--
 .../operator/DurableExecutionManagerTest.java      | 129 +++++++++++++++++++++
 3 files changed, 179 insertions(+), 9 deletions(-)

diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
index 957798f8..279d4652 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/ActionExecutionOperator.java
@@ -511,6 +511,12 @@ public class ActionExecutionOperator<IN, OUT> extends 
AbstractStreamOperator<OUT
         super.notifyCheckpointComplete(checkpointId);
     }
 
+    @Override
+    public void notifyCheckpointAborted(long checkpointId) throws Exception {
+        durableExecManager.notifyCheckpointAborted(checkpointId);
+        super.notifyCheckpointAborted(checkpointId);
+    }
+
     private MailboxProcessor getMailboxProcessor() throws Exception {
         Field field = 
MailboxExecutorImpl.class.getDeclaredField("mailboxProcessor");
         field.setAccessible(true);
diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
index 85c5df23..295b36c9 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
@@ -333,11 +333,15 @@ class DurableExecutionManager implements 
ActionStatePersister, AutoCloseable {
      * via {@link #snapshotLastCompletedSequenceNumbers}. After pruning, the 
entry for that
      * checkpoint is removed. No-op when durable execution is disabled.
      *
-     * <p><b>Invariant:</b> the {@code checkpointIdToSeqNums.remove} below and 
the {@code put} in
-     * {@link #snapshotLastCompletedSequenceNumbers} MUST share the same 
{@code actionStateStore !=
-     * null} guard. Dropping the guard on either side breaks the symmetry and 
reintroduces the
+     * <p><b>Invariant:</b> the {@code checkpointIdToSeqNums.remove} below, 
the {@code put} in
+     * {@link #snapshotLastCompletedSequenceNumbers}, and the {@code remove} 
in {@link
+     * #notifyCheckpointAborted} MUST all share the same {@code 
actionStateStore != null} guard.
+     * Every snapshotted entry is released by exactly one of the two paths — 
Flink notifies either
+     * {@code notifyCheckpointComplete} OR {@code notifyCheckpointAborted} for 
each checkpoint,
+     * never both. Dropping the guard on any side breaks the symmetry and 
reintroduces the
      * unbounded-map leak tracked by <a 
href="https://github.com/apache/flink-agents/issues/645";>
-     * issue #645</a>.
+     * issue #645</a> (complete path) or <a
+     * href="https://github.com/apache/flink-agents/issues/665";>issue #665</a> 
(abort path).
      *
      * @param checkpointId the id of the completed checkpoint.
      */
@@ -352,6 +356,35 @@ class DurableExecutionManager implements 
ActionStatePersister, AutoCloseable {
         }
     }
 
+    /**
+     * Releases the per-checkpoint sequence-number snapshot recorded by {@link
+     * #snapshotLastCompletedSequenceNumbers} when Flink aborts the checkpoint 
instead of completing
+     * it. Unlike {@link #notifyCheckpointComplete}, this method does NOT 
prune durable action
+     * state: the aborted checkpoint's writes were never committed, so the 
previously-pruned-up-to
+     * point is still the {@code lastCompletedSequenceNumber} from the last 
successful checkpoint,
+     * and any state recorded since is still load-bearing for recovery from 
that prior checkpoint.
+     * We only release the in-memory tracking entry to prevent unbounded 
growth of {@code
+     * checkpointIdToSeqNums} when checkpoints abort under sustained pressure 
(issue #665).
+     *
+     * <p>Safe when no entry exists for {@code checkpointId} (e.g., abort 
fires for a checkpoint
+     * this task never snapshotted): {@link Map#remove} returns {@code null}. 
No-op when durable
+     * execution is disabled.
+     *
+     * <p><b>Invariant:</b> see {@link #snapshotLastCompletedSequenceNumbers} 
— together with {@link
+     * #notifyCheckpointComplete}, this method shares the same {@code 
actionStateStore != null}
+     * guard that releases entries recorded by the snapshot side. Dropping the 
guard on any side
+     * breaks the symmetry and reintroduces the unbounded-map leak tracked by 
<a
+     * href="https://github.com/apache/flink-agents/issues/645";>issue #645</a> 
(complete path) or <a
+     * href="https://github.com/apache/flink-agents/issues/665";>issue #665</a> 
(abort path).
+     *
+     * @param checkpointId the id of the aborted checkpoint.
+     */
+    void notifyCheckpointAborted(long checkpointId) {
+        if (actionStateStore != null) {
+            checkpointIdToSeqNums.remove(checkpointId);
+        }
+    }
+
     void snapshotRecoveryMarker() throws Exception {
         if (actionStateStore != null) {
             Object recoveryMarker = actionStateStore.getRecoveryMarker();
@@ -371,11 +404,13 @@ class DurableExecutionManager implements 
ActionStatePersister, AutoCloseable {
      * strictly up to the sequence number that was committed by that 
checkpoint. No-op when durable
      * execution is disabled.
      *
-     * <p><b>Invariant:</b> the {@code checkpointIdToSeqNums.put} below and 
the {@code remove} in
-     * {@link #notifyCheckpointComplete(long)} MUST share the same {@code 
actionStateStore != null}
-     * guard. Dropping the guard on either side breaks the symmetry and 
reintroduces the
-     * unbounded-map leak tracked by <a 
href="https://github.com/apache/flink-agents/issues/645";>
-     * issue #645</a>.
+     * <p><b>Invariant:</b> the {@code checkpointIdToSeqNums.put} below, the 
{@code remove} in
+     * {@link #notifyCheckpointComplete(long)}, and the {@code remove} in 
{@link
+     * #notifyCheckpointAborted(long)} MUST all share the same {@code 
actionStateStore != null}
+     * guard. Dropping the guard on any side breaks the symmetry and 
reintroduces the unbounded-map
+     * leak tracked by <a 
href="https://github.com/apache/flink-agents/issues/645";>issue #645</a>
+     * (complete path) or <a 
href="https://github.com/apache/flink-agents/issues/665";>issue #665</a>
+     * (abort path).
      *
      * @param keyedStateBackend the keyed state backend to scan.
      * @param checkpointId the id of the checkpoint being snapshotted.
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
index f5fc2a17..4d62dc16 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.agents.runtime.context.RunnerContextImpl;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.runtime.state.KeyedStateBackend;
 import org.apache.flink.runtime.state.KeyedStateFunction;
 import org.apache.flink.runtime.state.OperatorStateBackend;
@@ -240,4 +241,132 @@ class DurableExecutionManagerTest {
 
         dem.close();
     }
+
+    @Test
+    void notifyAbortedRemovesEntryWithoutPruning() throws Exception {
+        // Use a cleanup-enabled in-memory store so pruning would be 
observable if it
+        // (incorrectly) fired on abort. doCleanup=true makes pruneState 
remove the keyed entry
+        // from getKeyedActionStates().
+        InMemoryActionStateStore store = new InMemoryActionStateStore(true);
+        DurableExecutionManager dem = new DurableExecutionManager(store);
+
+        Action action = TestActions.noopAction();
+        Event event = new InputEvent(1L);
+        String key = "key-a";
+        long seq = 7L;
+        long checkpointId = 1L;
+
+        // Seed durable state for the key/seq pair.
+        dem.maybeInitActionState(key, seq, action, event);
+        assertThat(store.getKeyedActionStates()).containsKey(key);
+
+        // Snapshot the per-key sequence number for this checkpoint. Stub the 
backend so that
+        // applyToAllKeys invokes the provided KeyedStateFunction once with 
our seeded key and a
+        // ValueState that returns the seeded sequence number.
+        @SuppressWarnings("unchecked")
+        KeyedStateBackend<Object> backend = mock(KeyedStateBackend.class);
+        stubApplyToAllKeysSingle(backend, key, seq);
+        dem.snapshotLastCompletedSequenceNumbers(backend, checkpointId);
+        assertThat(dem.getCheckpointIdToSeqNums()).containsKey(checkpointId);
+
+        // Abort the checkpoint.
+        dem.notifyCheckpointAborted(checkpointId);
+
+        // The in-memory tracking entry is released.
+        
assertThat(dem.getCheckpointIdToSeqNums()).doesNotContainKey(checkpointId);
+        // The durable state was NOT pruned — the key is still present in the 
store.
+        assertThat(store.getKeyedActionStates()).containsKey(key);
+
+        dem.close();
+    }
+
+    @Test
+    void completedAndAbortedInterleavedKeepsInFlightEntries() throws Exception 
{
+        // doCleanup=true so a completed checkpoint's pruneState calls are 
observable as a
+        // disappearance from getKeyedActionStates().
+        InMemoryActionStateStore store = new InMemoryActionStateStore(true);
+        DurableExecutionManager dem = new DurableExecutionManager(store);
+
+        Action action = TestActions.noopAction();
+        Event eventA = new InputEvent(1L);
+        Event eventB = new InputEvent(2L);
+        Event eventC = new InputEvent(3L);
+        String keyA = "key-a";
+        String keyB = "key-b";
+        String keyC = "key-c";
+
+        // Seed durable state for three distinct keys, one per upcoming 
checkpoint.
+        dem.maybeInitActionState(keyA, 10L, action, eventA);
+        dem.maybeInitActionState(keyB, 20L, action, eventB);
+        dem.maybeInitActionState(keyC, 30L, action, eventC);
+        assertThat(store.getKeyedActionStates()).containsKeys(keyA, keyB, 
keyC);
+
+        // Snapshot three checkpoints. Each snapshot is stubbed to record 
exactly one key with its
+        // seeded sequence number, so the per-checkpoint map is fully 
deterministic.
+        @SuppressWarnings("unchecked")
+        KeyedStateBackend<Object> backendA = mock(KeyedStateBackend.class);
+        stubApplyToAllKeysSingle(backendA, keyA, 10L);
+        dem.snapshotLastCompletedSequenceNumbers(backendA, 1L);
+
+        @SuppressWarnings("unchecked")
+        KeyedStateBackend<Object> backendB = mock(KeyedStateBackend.class);
+        stubApplyToAllKeysSingle(backendB, keyB, 20L);
+        dem.snapshotLastCompletedSequenceNumbers(backendB, 2L);
+
+        @SuppressWarnings("unchecked")
+        KeyedStateBackend<Object> backendC = mock(KeyedStateBackend.class);
+        stubApplyToAllKeysSingle(backendC, keyC, 30L);
+        dem.snapshotLastCompletedSequenceNumbers(backendC, 3L);
+
+        assertThat(dem.getCheckpointIdToSeqNums()).containsKeys(1L, 2L, 3L);
+
+        // Complete checkpoint 1 (prunes keyA) and abort checkpoint 2 
(releases entry without
+        // pruning keyB). Checkpoint 3 stays in-flight.
+        dem.notifyCheckpointComplete(1L);
+        dem.notifyCheckpointAborted(2L);
+
+        // Only the in-flight checkpoint entry remains.
+        assertThat(dem.getCheckpointIdToSeqNums()).containsOnlyKeys(3L);
+
+        // Completed checkpoint's durable state was pruned away.
+        assertThat(store.getKeyedActionStates()).doesNotContainKey(keyA);
+        // Aborted checkpoint's durable state is untouched.
+        assertThat(store.getKeyedActionStates()).containsKey(keyB);
+        // In-flight checkpoint's durable state is untouched.
+        assertThat(store.getKeyedActionStates()).containsKey(keyC);
+
+        dem.close();
+    }
+
+    @Test
+    void noStoreModeNotifyCheckpointAbortedIsNoOp() {
+        DurableExecutionManager dem = new DurableExecutionManager(null);
+
+        // Should not throw and should not populate any per-checkpoint map 
entries.
+        dem.notifyCheckpointAborted(42L);
+
+        assertThat(dem.getCheckpointIdToSeqNums()).isEmpty();
+    }
+
+    /**
+     * Stubs {@code backend.applyToAllKeys(...)} to invoke the supplied {@link 
KeyedStateFunction}
+     * exactly once with the given key and a {@link ValueState} mock that 
returns the given sequence
+     * number. This mirrors the per-key iteration shape used by {@link
+     * DurableExecutionManager#snapshotLastCompletedSequenceNumbers}.
+     */
+    @SuppressWarnings("unchecked")
+    private static void stubApplyToAllKeysSingle(
+            KeyedStateBackend<Object> backend, Object key, long 
sequenceNumber) throws Exception {
+        ValueState<Long> valueState = mock(ValueState.class);
+        when(valueState.value()).thenReturn(sequenceNumber);
+        doAnswer(
+                        invocation -> {
+                            KeyedStateFunction<Object, ValueState<Long>> 
function =
+                                    invocation.getArgument(3);
+                            function.process(key, valueState);
+                            return null;
+                        })
+                .when(backend)
+                .applyToAllKeys(any(), any(), any(ValueStateDescriptor.class), 
any());
+    }
 }

Reply via email to