xishuaidelin commented on PR #2510: URL: https://github.com/apache/fluss/pull/2510#issuecomment-3996356631
> Thank you @xishuaidelin for your contribution. I have left the following comments for your consideration: > > 1. Introduce a flag named `enableBacklogReporting` in the Source to control backlog behavior. This flag should be disabled when processing log tables, operating in batch mode, or when explicitly turned off via configuration (e.g., `scan.backlog.report.enable`). > 2. Currently, the system sends a backlog event whenever the source starts, restores, or upgrades from a legacy version. To optimize this, please store a boolean flag `isBacklogProcessed` within the `SourceEnumeratorState`. The backlog should only be processed if this flag is false, regardless of whether the source is newly started or restored. For legacy `SourceEnumeratorState` instances, initialize this flag to true to skip backlog processing. > 3. Do not record the backlog offset within the split itself. Instead, use a separate event to transmit this information. > 4. Add an Integration Test (IT) case to verify the backlog processing logic. This test should ensure that events are correctly sent to the downstream operator. You can mock an operator to collect these backlog events, similar to the approach used in `org.apache.flink.test.streaming.runtime.RecordAttributesPropagationITCase.TwoInputOperator`. Hi, @wuchong , Thank you for the detailed review and guidance. I have updated the PR to address all your comments: Backlog Flag: Introduced the enableBacklogReporting flag in FlinkSource. In FlinkTableSource, it is correctly disabled for log tables, batch mode, or when explicitly turned off via the scan.backlog.report.enable configuration. In FlussSourceBuilder, it is disabled for non-PK tables or when the configuration is off. State Management: Added the isBacklogProcessed boolean to SourceEnumeratorState. The enumerator only initializes backlog processing when both enableBacklogReporting is true and isBacklogProcessed is false. The serializer (version 3) persists this flag; for legacy states (versions 0–2), it is initialized to true to ensure backward compatibility and skip redundant backlog processing on restore. Offset Handling: Removed the backlog offset recording from the split itself. The enumerator now tracks backlog boundary offsets in a separate bucketsWithBacklogOffset map and transmits them to readers via MarkedBacklogOffsetEvent upon split assignment. Readers report completion back via FinishedBacklogEvent. Integration Test: Added BacklogProcessingITCase with three test cases: 1. testBacklogEventsPropagation (verifies isBacklog=true → false transition), 2. testBacklogReportingDisabled (verifies no backlog events when disabled), 3. testBacklogStateRestoredFromSavepoint (verifies no re-entry of backlog processing after restore). Could you please take another look when you have time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
