This is an automated email from the ASF dual-hosted git repository.

wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f79c09a [runtime] Lock null-store symmetry invariant in 
DurableExecutionManager (#666)
8f79c09a is described below

commit 8f79c09a3971caaeef164668e4148fee67d09cba
Author: Weiqing Yang <[email protected]>
AuthorDate: Thu May 14 23:42:03 2026 -0700

    [runtime] Lock null-store symmetry invariant in DurableExecutionManager 
(#666)
---
 .../runtime/operator/DurableExecutionManager.java  | 12 +++++++++++
 .../operator/DurableExecutionManagerTest.java      | 25 ++++++++++++++++++++++
 2 files changed, 37 insertions(+)

diff --git 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
index 03bfe264..85c5df23 100644
--- 
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
+++ 
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
@@ -333,6 +333,12 @@ class DurableExecutionManager implements 
ActionStatePersister, AutoCloseable {
      * via {@link #snapshotLastCompletedSequenceNumbers}. After pruning, the 
entry for that
      * checkpoint is removed. No-op when durable execution is disabled.
      *
+     * <p><b>Invariant:</b> the {@code checkpointIdToSeqNums.remove} below and 
the {@code put} in
+     * {@link #snapshotLastCompletedSequenceNumbers} MUST share the same 
{@code actionStateStore !=
+     * null} guard. Dropping the guard on either side breaks the symmetry and 
reintroduces the
+     * unbounded-map leak tracked by <a 
href="https://github.com/apache/flink-agents/issues/645";>
+     * issue #645</a>.
+     *
      * @param checkpointId the id of the completed checkpoint.
      */
     void notifyCheckpointComplete(long checkpointId) {
@@ -365,6 +371,12 @@ class DurableExecutionManager implements 
ActionStatePersister, AutoCloseable {
      * strictly up to the sequence number that was committed by that 
checkpoint. No-op when durable
      * execution is disabled.
      *
+     * <p><b>Invariant:</b> the {@code checkpointIdToSeqNums.put} below and 
the {@code remove} in
+     * {@link #notifyCheckpointComplete(long)} MUST share the same {@code 
actionStateStore != null}
+     * guard. Dropping the guard on either side breaks the symmetry and 
reintroduces the
+     * unbounded-map leak tracked by <a 
href="https://github.com/apache/flink-agents/issues/645";>
+     * issue #645</a>.
+     *
      * @param keyedStateBackend the keyed state backend to scan.
      * @param checkpointId the id of the checkpoint being snapshotted.
      */
diff --git 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
index 6c737d47..f5fc2a17 100644
--- 
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
+++ 
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
@@ -43,6 +43,7 @@ import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
 import static org.mockito.Mockito.when;
 
 /** Contract tests for {@link DurableExecutionManager}. */
@@ -69,6 +70,30 @@ class DurableExecutionManagerTest {
         dem.close();
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    void noStoreModeSnapshotAndNotifyKeepCheckpointMapEmpty() throws Exception 
{
+        DurableExecutionManager dem = new DurableExecutionManager(null);
+        KeyedStateBackend<Object> backend = mock(KeyedStateBackend.class);
+
+        // Cycle 1: snapshot + notify with null store. The snapshot-side guard 
must short-circuit
+        // before any backend access, and the cleanup-side guard must leave 
the map untouched.
+        dem.snapshotLastCompletedSequenceNumbers(backend, 1L);
+        assertThat(dem.getCheckpointIdToSeqNums()).isEmpty();
+        verifyNoInteractions(backend);
+        dem.notifyCheckpointComplete(1L);
+        assertThat(dem.getCheckpointIdToSeqNums()).isEmpty();
+
+        // Cycle 2: confirm the invariant holds across multiple checkpoints.
+        dem.snapshotLastCompletedSequenceNumbers(backend, 2L);
+        assertThat(dem.getCheckpointIdToSeqNums()).isEmpty();
+        verifyNoInteractions(backend);
+        dem.notifyCheckpointComplete(2L);
+        assertThat(dem.getCheckpointIdToSeqNums()).isEmpty();
+
+        dem.close();
+    }
+
     @Test
     void withInjectedStorePersistsTaskResult() throws Exception {
         InMemoryActionStateStore store = new InMemoryActionStateStore(false);

Reply via email to