This is an automated email from the ASF dual-hosted git repository.
wenjin272 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-agents.git
The following commit(s) were added to refs/heads/main by this push:
new 8f79c09a [runtime] Lock null-store symmetry invariant in
DurableExecutionManager (#666)
8f79c09a is described below
commit 8f79c09a3971caaeef164668e4148fee67d09cba
Author: Weiqing Yang <[email protected]>
AuthorDate: Thu May 14 23:42:03 2026 -0700
[runtime] Lock null-store symmetry invariant in DurableExecutionManager
(#666)
---
.../runtime/operator/DurableExecutionManager.java | 12 +++++++++++
.../operator/DurableExecutionManagerTest.java | 25 ++++++++++++++++++++++
2 files changed, 37 insertions(+)
diff --git
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
index 03bfe264..85c5df23 100644
---
a/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
+++
b/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java
@@ -333,6 +333,12 @@ class DurableExecutionManager implements
ActionStatePersister, AutoCloseable {
* via {@link #snapshotLastCompletedSequenceNumbers}. After pruning, the
entry for that
* checkpoint is removed. No-op when durable execution is disabled.
*
+ * <p><b>Invariant:</b> the {@code checkpointIdToSeqNums.remove} below and
the {@code put} in
+ * {@link #snapshotLastCompletedSequenceNumbers} MUST share the same
{@code actionStateStore !=
+ * null} guard. Dropping the guard on either side breaks the symmetry and
reintroduces the
+ * unbounded-map leak tracked by <a
href="https://github.com/apache/flink-agents/issues/645">
+ * issue #645</a>.
+ *
* @param checkpointId the id of the completed checkpoint.
*/
void notifyCheckpointComplete(long checkpointId) {
@@ -365,6 +371,12 @@ class DurableExecutionManager implements
ActionStatePersister, AutoCloseable {
* strictly up to the sequence number that was committed by that
checkpoint. No-op when durable
* execution is disabled.
*
+ * <p><b>Invariant:</b> the {@code checkpointIdToSeqNums.put} below and
the {@code remove} in
+ * {@link #notifyCheckpointComplete(long)} MUST share the same {@code
actionStateStore != null}
+ * guard. Dropping the guard on either side breaks the symmetry and
reintroduces the
+ * unbounded-map leak tracked by <a
href="https://github.com/apache/flink-agents/issues/645">
+ * issue #645</a>.
+ *
* @param keyedStateBackend the keyed state backend to scan.
* @param checkpointId the id of the checkpoint being snapshotted.
*/
diff --git
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
index 6c737d47..f5fc2a17 100644
---
a/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
+++
b/runtime/src/test/java/org/apache/flink/agents/runtime/operator/DurableExecutionManagerTest.java
@@ -43,6 +43,7 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.when;
/** Contract tests for {@link DurableExecutionManager}. */
@@ -69,6 +70,30 @@ class DurableExecutionManagerTest {
dem.close();
}
+ @Test
+ @SuppressWarnings("unchecked")
+ void noStoreModeSnapshotAndNotifyKeepCheckpointMapEmpty() throws Exception
{
+ DurableExecutionManager dem = new DurableExecutionManager(null);
+ KeyedStateBackend<Object> backend = mock(KeyedStateBackend.class);
+
+ // Cycle 1: snapshot + notify with null store. The snapshot-side guard
must short-circuit
+ // before any backend access, and the cleanup-side guard must leave
the map untouched.
+ dem.snapshotLastCompletedSequenceNumbers(backend, 1L);
+ assertThat(dem.getCheckpointIdToSeqNums()).isEmpty();
+ verifyNoInteractions(backend);
+ dem.notifyCheckpointComplete(1L);
+ assertThat(dem.getCheckpointIdToSeqNums()).isEmpty();
+
+ // Cycle 2: confirm the invariant holds across multiple checkpoints.
+ dem.snapshotLastCompletedSequenceNumbers(backend, 2L);
+ assertThat(dem.getCheckpointIdToSeqNums()).isEmpty();
+ verifyNoInteractions(backend);
+ dem.notifyCheckpointComplete(2L);
+ assertThat(dem.getCheckpointIdToSeqNums()).isEmpty();
+
+ dem.close();
+ }
+
@Test
void withInjectedStorePersistsTaskResult() throws Exception {
InMemoryActionStateStore store = new InMemoryActionStateStore(false);