weiqingy opened a new issue, #665:
URL: https://github.com/apache/flink-agents/issues/665

   ### Search before asking
   
   - [x] I searched in the 
[issues](https://github.com/apache/flink-agents/issues) and found nothing 
similar.
   
   ### Description
   
   ### Search before asking
   
   - [x] I searched in the 
[issues](https://github.com/apache/flink-agents/issues) and found nothing 
similar.
   
   ### Description
   
   ## Summary
   
   `DurableExecutionManager` records per-key sequence numbers under a 
checkpoint id in `snapshotLastCompletedSequenceNumbers(...)`, and 
`notifyCheckpointComplete(...)` later removes that entry while pruning durable 
state. However, Flink can also *abort* a checkpoint — in which case the 
framework calls `notifyCheckpointAborted(...)` instead of 
`notifyCheckpointComplete(...)`. The `DurableExecutionManager` does not 
currently override `notifyCheckpointAborted`, so the entry recorded for an 
aborted checkpoint is never removed from `checkpointIdToSeqNums`. When durable 
execution is enabled, this causes the map to grow over time as checkpoints 
abort (e.g., due to timeouts, alignment failures, or backend pressure).
   
   This is structurally distinct from the null-store case tracked in #645 — 
that one is already structurally fixed by the symmetric `actionStateStore == 
null` guard. The aborted-checkpoint leak only manifests when `actionStateStore 
!= null`.
   
   ## Root Cause
   
   ```java
   // Records entry on snapshot — works fine for completed checkpoints
   void snapshotLastCompletedSequenceNumbers(KeyedStateBackend<?> backend, long 
checkpointId) {
       if (actionStateStore == null) return;
       ...
       checkpointIdToSeqNums.put(checkpointId, keyToSeqNum);
   }
   
   // Removes entry only on successful completion
   void notifyCheckpointComplete(long checkpointId) {
       if (actionStateStore != null) {
           ...
           checkpointIdToSeqNums.remove(checkpointId);
       }
   }
   
   // MISSING: no notifyCheckpointAborted override → aborted checkpoints leak 
their entry
   ```
   
   In `ActionExecutionOperator`, only `notifyCheckpointComplete` is wired 
through to the manager; there is no corresponding `notifyCheckpointAborted` 
handler that forwards the call to the manager.
   
   ## Fix (proposed)
   
   1. Add `notifyCheckpointAborted(long checkpointId)` to 
`DurableExecutionManager` that removes the corresponding entry from 
`checkpointIdToSeqNums` (no pruning needed — the durable state for an aborted 
checkpoint was never committed).
   2. Override `notifyCheckpointAborted` on `ActionExecutionOperator` and 
forward to the manager.
   3. Add a regression test in `DurableExecutionManagerTest` covering both:
      - Aborted checkpoint removes its entry.
      - Completed and aborted checkpoints interleaved leave only the in-flight 
ones in the map.
   
   
   
   
   ### How to reproduce
   
   This is a structural code bug — no specific input data is needed.
   
   1. Configure an agent job with durable execution **enabled** (set 
`ActionStateStore`).
   2. Enable Flink checkpointing with conditions that cause some checkpoints to 
abort (e.g., short timeout, backend pressure).
   3. Run the job and observe `checkpointIdToSeqNums` size.
   
   **Expected**: only entries for in-flight (not yet completed or aborted) 
checkpoints remain.
   **Actual**: entries for aborted checkpoints persist for the lifetime of the 
job, growing unboundedly under sustained abort conditions.
   
   
   ### Version and environment
   
   - **Flink Agents**: 0.3-SNAPSHOT
   - **Apache Flink**: 2.2.0
   - **Java**: 11
   - **Deployment mode**: any (bug is in operator logic)
   
   ### Are you willing to submit a PR?
   
   - [x] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to