xishuaidelin commented on PR #2510:
URL: https://github.com/apache/fluss/pull/2510#issuecomment-3996356631

   > Thank you @xishuaidelin for your contribution. I have left the following 
comments for your consideration:
   > 
   > 1. Introduce a flag named `enableBacklogReporting` in the Source to 
control backlog behavior. This flag should be disabled when processing log 
tables, operating in batch mode, or when explicitly turned off via 
configuration (e.g., `scan.backlog.report.enable`).
   > 2. Currently, the system sends a backlog event whenever the source starts, 
restores, or upgrades from a legacy version. To optimize this, please store a 
boolean flag `isBacklogProcessed` within the `SourceEnumeratorState`. The 
backlog should only be processed if this flag is false, regardless of whether 
the source is newly started or restored. For legacy `SourceEnumeratorState` 
instances, initialize this flag to true to skip backlog processing.
   > 3. Do not record the backlog offset within the split itself. Instead, use 
a separate event to transmit this information.
   > 4. Add an Integration Test (IT) case to verify the backlog processing 
logic. This test should ensure that events are correctly sent to the downstream 
operator. You can mock an operator to collect these backlog events, similar to 
the approach used in 
`org.apache.flink.test.streaming.runtime.RecordAttributesPropagationITCase.TwoInputOperator`.
   
   Hi, @wuchong , 
   Thank you for the detailed review and guidance. I have updated the PR to 
address all your comments:
   
   Backlog Flag: Introduced the enableBacklogReporting flag in FlinkSource. In 
FlinkTableSource, it is correctly disabled for log tables, batch mode, or when 
explicitly turned off via the scan.backlog.report.enable configuration. In 
FlussSourceBuilder, it is disabled for non-PK tables or when the configuration 
is off.
   
   State Management: Added the isBacklogProcessed boolean to 
SourceEnumeratorState. The enumerator only initializes backlog processing when 
both enableBacklogReporting is true and isBacklogProcessed is false. The 
serializer (version 3) persists this flag; for legacy states (versions 0–2), it 
is initialized to true to ensure backward compatibility and skip redundant 
backlog processing on restore.
   
   
   Offset Handling: Removed the backlog offset recording from the split itself. 
The enumerator now tracks backlog boundary offsets in a separate 
bucketsWithBacklogOffset map and transmits them to readers via 
MarkedBacklogOffsetEvent upon split assignment. Readers report completion back 
via FinishedBacklogEvent.
   
   Integration Test: Added BacklogProcessingITCase with three test cases: 
   1. testBacklogEventsPropagation (verifies isBacklog=true → false 
transition), 
   2. testBacklogReportingDisabled (verifies no backlog events when disabled), 
   3. testBacklogStateRestoredFromSavepoint (verifies no re-entry of backlog 
processing after restore). 
   
   
   Could you please take another look when you have time? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to