wilmerdooley opened a new pull request, #28483:
URL: https://github.com/apache/flink/pull/28483

   ##### Was generative AI tooling used to co-author this PR?
   
   - [x] Yes (please specify the tool below)
   
   Generated-by: MiniMax-M3
   
   ---
   
   ### Purpose
   
   When unaligned checkpoints are enabled, 
`TaskMailboxImpl.moveUrgentMailsToBatchIfNeeded` is invoked with 
`onlyMoveUrgentMails=true` whenever the first mail in the batch is urgent 
(which is always the case once an unaligned checkpoint barrier is pending). In 
that situation, every non-urgent mail sitting in the queue was put back and the 
drain loop was broken, so downstream callers such as `tryTakeFromBatch()` would 
skip non-urgent mails until the urgent batch was fully drained. Combined with 
mini-batch sinks that rely on `ProcessingTimeService` timer callbacks, this 
caused those callbacks to be delayed until the next checkpoint, regressing 
Flink 1.20 behavior.
   
   This PR gates the urgent-only restriction with a `isBatchEmpty` check: if 
the batch is empty, we still move the next non-urgent mail into the batch 
instead of leaving it stuck in the queue, while urgent mails keep their FIFO 
priority semantics introduced in FLINK-35796.
   
   ### Brief change log
   
   - `flink-runtime/.../mailbox/TaskMailboxImpl.java`: in 
`moveUrgentMailsToBatchIfNeeded`, capture whether the batch is empty and only 
restrict the drain to urgent mails when the batch is non-empty.
   - `flink-runtime/.../mailbox/TaskMailboxImplTest.java`: add 
`testNonUrgentMailQueuedBehindUrgentMailIsTakenFromBatch` covering the case 
where a non-urgent mail queued behind an urgent mail is taken from the batch on 
a subsequent `tryTakeFromBatch()` call.
   
   ### Verifying this change
   
   - Added a unit test that enqueues an urgent mail followed by a normal mail 
and asserts both can be taken from the batch in order.
   - Ran the existing `TaskMailboxImplTest` suite (including the new test) 
locally and observed it pass.
   
   ### Affected matrix
   
   - Runtime / mailbox processing for any operator whose timer or mail handling 
interacts with `TaskMailboxImpl.tryTakeFromBatch()` while urgent mails are 
present.
   - Specifically observed to affect `ProcessingTimeService` timer callbacks 
when `table.exec.mini-batch.enabled=true` and 
`execution.checkpointing.unaligned.enabled=true` (Flink 2.1.x).
   
   ### Documentation
   
   - No public documentation changes required; this restores intended runtime 
behavior.
   
   JIRA: https://issues.apache.org/jira/browse/FLINK-39898


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to