weiqingy commented on issue #645:
URL: https://github.com/apache/flink-agents/issues/645#issuecomment-4423556838

   ## Findings after investigation
   
   The unguarded `put` / guarded `remove` asymmetry described in the issue body 
**no longer exists on `main`**. PR #546 (`DurableExecutionManager` extraction) 
moved the put into 
`DurableExecutionManager.snapshotLastCompletedSequenceNumbers()` and added an 
early-return guard at the top of that method:
   
   
https://github.com/apache/flink-agents/blob/main/runtime/src/main/java/org/apache/flink/agents/runtime/operator/DurableExecutionManager.java#L364-L384
   
   ```java
   void snapshotLastCompletedSequenceNumbers(
           KeyedStateBackend<?> keyedStateBackend, long checkpointId) throws 
Exception {
       if (actionStateStore == null) {
           return;
       }
       HashMap<Object, Long> keyToSeqNum = new HashMap<>();
       // ... applyToAllKeys ...
       checkpointIdToSeqNums.put(checkpointId, keyToSeqNum);
   }
   ```
   
   That early-return is exactly the second remediation alternative this issue 
proposed ("guard `recordCheckpointSequenceNumbers` with an early return when 
`actionStateStore == null`"). The bug was therefore inadvertently fixed during 
the refactor.
   
   In other words: the symptom is already gone, but the invariant ("snapshot 
put and notifyCheckpointComplete remove must be symmetrically guarded") is 
currently held by a single conditional and isn't documented or test-locked.
   
   ## Rescoping this PR
   
   Keeping the issue open, but the deliverable changes:
   
   - **Add a regression test** verifying `checkpointIdToSeqNums` stays empty 
across `snapshot` + `notifyCheckpointComplete` cycles when `actionStateStore == 
null` — so any future refactor that drops the snapshot-side guard fails CI 
loudly.
   - **Strengthen javadoc** on `snapshotLastCompletedSequenceNumbers` and 
`notifyCheckpointComplete` to make the symmetry invariant explicit.
   - Single small commit.
   
   ## Out of scope (separate follow-up)
   
   Aborted/failed checkpoints are an adjacent leak class — Flink calls 
`notifyCheckpointAborted` instead of `notifyCheckpointComplete`, and the 
manager does not override it. Entries for aborted checkpoints would leak even 
when the store is enabled. This is structurally different from the null-store 
leak described here. Will file a separate issue rather than expand scope.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to