guyinyou opened a new issue, #10079:
URL: https://github.com/apache/rocketmq/issues/10079

   ### Before Creating the Enhancement Request
   
   - [x] I have confirmed that this should be classified as an enhancement 
rather than a bug/feature.
   
   
   ### Summary
   
   In FlushConsumeQueueService.doFlush(), stop gating the store checkpoint 
update and flush on flushConsumeQueueLeastPages == 0. Always persist the 
checkpoint (including logicsMsgTimestamp when applicable) after the consume 
queue flush pass, so checkpoint semantics are correct and not tied to the CQ 
flush return value or least-pages condition.
   
   ### Motivation
   
   Today, the code only updates and flushes the store checkpoint when 
flushConsumeQueueLeastPages == 0 (thorough flush). That condition is tied to 
“we did a full CQ flush this round,” but the return value of flush(cq, 
flushConsumeQueueLeastPages) does not mean “flush failed” when it is false—it 
can simply mean there is continuous new data and the least-pages threshold was 
not met. Using the same “thorough flush” condition to decide whether to persist 
the checkpoint can therefore:
   Delay or skip checkpoint updates when we still want to record progress.
   Couple checkpoint persistence to CQ flush policy in a way that is easy to 
misunderstand (e.g. treating “no flush done” as “flush failed”).
   Treating checkpoint persistence as a separate concern and always flushing it 
after the CQ flush pass avoids this and keeps checkpoint behavior predictable.
   
   ### Describe the Solution You'd Like
   
   In FlushConsumeQueueService.doFlush():
   After the loop that calls flush(cq, flushConsumeQueueLeastPages) for each 
consume queue (and after compaction flush if enabled), always:
   When a thorough flush was intended this round (e.g. 
flushConsumeQueueLeastPages == 0), set logicsMsgTimestamp from the captured 
temp value if logicsMsgTimestamp > 0.
   Call messageStore.getStoreCheckpoint().flush() unconditionally (no outer if 
(0 == flushConsumeQueueLeastPages) guarding the whole checkpoint update and 
flush).
   In other words: remove the conditional that wraps both “set 
logicsMsgTimestamp” and “checkpoint.flush()” so that at least the checkpoint 
file flush happens every time after the CQ flush pass; the update of 
logicsMsgTimestamp can still be limited to thorough flush rounds if desired, 
but the decision to flush the checkpoint itself should not depend on the CQ 
flush return value or on “we did a full CQ flush” in a way that could be 
confused with “flush failed.”
   (If the intended behavior is “every run we flush the checkpoint; we only 
update logicsMsgTimestamp when we did a thorough CQ flush,” the solution is: 
remove only the condition around storeCheckpoint.flush(), and keep the 
condition for setLogicsMsgTimestamp so that it runs only when 
flushConsumeQueueLeastPages == 0 and logicsMsgTimestamp > 0.)
   
   
   ### Describe Alternatives You've Considered
   
   Keep current behavior and document it
   The condition is easy to misinterpret (e.g. as “only flush checkpoint when 
CQ flush succeeds”). Documenting does not fix the risk of future logic 
depending on “checkpoint flushed” only when that condition holds.
   Use the return value of flush(cq, ...) to decide whether to flush the 
checkpoint
   Rejected because false does not mean “flush failed,” only that sufficient 
pages were not flushed (e.g. due to continuous writes). Basing checkpoint 
persistence on that would be semantically wrong and could delay checkpoint 
updates unnecessarily.
   
   ### Additional Context
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to