weiqingy opened a new pull request, #667: URL: https://github.com/apache/flink-agents/pull/667
Closes #665. ## What this PR does `DurableExecutionManager.checkpointIdToSeqNums` leaked entries on aborted checkpoints. Flink calls `notifyCheckpointAborted(...)` when a checkpoint is aborted (timeout, alignment failure, backend pressure); the existing code only handled the complete path. Under sustained abort pressure the map grew unboundedly. ### Production changes 1. **`DurableExecutionManager.notifyCheckpointAborted(long)`** (new, package-private) — removes the entry from `checkpointIdToSeqNums`. **No `pruneState` call** — durable writes for an aborted checkpoint were never committed, so the prior committed checkpoint's recovery state is still load-bearing and must not be pruned. Guarded by `actionStateStore != null` to mirror the symmetric guard from #645. 2. **`ActionExecutionOperator.notifyCheckpointAborted(long)`** (new `@Override`) — thin delegate to the manager, then `super.notifyCheckpointAborted(...)`. Mirrors the existing `notifyCheckpointComplete` override exactly. 3. **Javadoc invariant statement** on `snapshotLastCompletedSequenceNumbers` and `notifyCheckpointComplete` strengthened to name BOTH release paths (complete OR abort). The `actionStateStore != null` guard now lives on three methods; the javadoc makes the three-way symmetry explicit and cross-links #645 + #665. 4. **`@VisibleForTesting getCheckpointIdToSeqNums()`** accessor — mirrors `getActionStateStore()` precedent. (Same addition as in #666; the second PR to land drops the duplicate on rebase.) ### Tests (DEM-level, three new) - `notifyAbortedRemovesEntryWithoutPruning` — entry released, durable state untouched. Uses a real `InMemoryActionStateStore` (not a mock) so wrongful pruning would be observable. - `completedAndAbortedInterleavedKeepsInFlightEntries` — three in-flight checkpoints; one completes (state pruned), one aborts (state preserved), one remains. - `noStoreModeNotifyCheckpointAbortedIsNoOp` — symmetric null-store no-op coverage. Sanity-mutation verified locally: - Emptying the new method's body → 2 of 3 new tests fail. - Adding a wrongful `actionStateStore.pruneState(...)` call → 2 of 3 new tests fail (state was incorrectly pruned). Operator-level harness test deferred to #646 — the new operator override is a one-line delegate; the logic is in the manager. ## Test plan - [x] `mvn test -Dtest=DurableExecutionManagerTest -pl runtime` — 5/5 pass (2 existing + 3 new) - [x] `mvn test -Dtest=ActionExecutionOperatorTest -pl runtime` — 28/28 pass - [x] `mvn test -pl runtime` — 307/307 pass (no regressions) - [x] `./tools/lint.sh -c` — 0 violations - [x] `./tools/check-license.sh` — clean (no new tracked files) - [x] Sanity mutation: empty new method body → expected tests fail - [x] Sanity mutation: wrongful `pruneState` call on abort → expected tests fail -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
