wilmerdooley opened a new pull request, #28483: URL: https://github.com/apache/flink/pull/28483
##### Was generative AI tooling used to co-author this PR? - [x] Yes (please specify the tool below) Generated-by: MiniMax-M3 --- ### Purpose When unaligned checkpoints are enabled, `TaskMailboxImpl.moveUrgentMailsToBatchIfNeeded` is invoked with `onlyMoveUrgentMails=true` whenever the first mail in the batch is urgent (which is always the case once an unaligned checkpoint barrier is pending). In that situation, every non-urgent mail sitting in the queue was put back and the drain loop was broken, so downstream callers such as `tryTakeFromBatch()` would skip non-urgent mails until the urgent batch was fully drained. Combined with mini-batch sinks that rely on `ProcessingTimeService` timer callbacks, this caused those callbacks to be delayed until the next checkpoint, regressing Flink 1.20 behavior. This PR gates the urgent-only restriction with a `isBatchEmpty` check: if the batch is empty, we still move the next non-urgent mail into the batch instead of leaving it stuck in the queue, while urgent mails keep their FIFO priority semantics introduced in FLINK-35796. ### Brief change log - `flink-runtime/.../mailbox/TaskMailboxImpl.java`: in `moveUrgentMailsToBatchIfNeeded`, capture whether the batch is empty and only restrict the drain to urgent mails when the batch is non-empty. - `flink-runtime/.../mailbox/TaskMailboxImplTest.java`: add `testNonUrgentMailQueuedBehindUrgentMailIsTakenFromBatch` covering the case where a non-urgent mail queued behind an urgent mail is taken from the batch on a subsequent `tryTakeFromBatch()` call. ### Verifying this change - Added a unit test that enqueues an urgent mail followed by a normal mail and asserts both can be taken from the batch in order. - Ran the existing `TaskMailboxImplTest` suite (including the new test) locally and observed it pass. ### Affected matrix - Runtime / mailbox processing for any operator whose timer or mail handling interacts with `TaskMailboxImpl.tryTakeFromBatch()` while urgent mails are present. - Specifically observed to affect `ProcessingTimeService` timer callbacks when `table.exec.mini-batch.enabled=true` and `execution.checkpointing.unaligned.enabled=true` (Flink 2.1.x). ### Documentation - No public documentation changes required; this restores intended runtime behavior. JIRA: https://issues.apache.org/jira/browse/FLINK-39898 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
