Copilot commented on code in PR #2510:
URL: https://github.com/apache/fluss/pull/2510#discussion_r2741306455
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java:
##########
@@ -962,4 +1009,23 @@ private Map<Integer, Integer> putRows(TablePath
tablePath, int rowsNum) throws E
}
return bucketRows;
}
+
+ private static class MockBacklogSplitEnumeratorContext
+ extends MockSplitEnumeratorContext<SourceSplitBase> {
+
+ private volatile boolean isProcessingBacklogCalled;
Review Comment:
The field name `isProcessingBacklogCalled` is misleading. It suggests it
tracks whether a method was called, but it actually stores the state value
passed to `setIsProcessingBacklog()`. This naming inconsistency could confuse
developers.
Consider renaming to `processingBacklogState` or `isProcessingBacklogValue`
to better reflect that it stores the state value, not call tracking information.
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java:
##########
@@ -962,4 +1009,23 @@ private Map<Integer, Integer> putRows(TablePath
tablePath, int rowsNum) throws E
}
return bucketRows;
}
+
+ private static class MockBacklogSplitEnumeratorContext
+ extends MockSplitEnumeratorContext<SourceSplitBase> {
+
+ private volatile boolean isProcessingBacklogCalled;
+
+ public MockBacklogSplitEnumeratorContext(int parallelism) {
+ super(parallelism);
+ }
+
+ @Override
+ public void setIsProcessingBacklog(boolean isProcessingBacklog) {
+ this.isProcessingBacklogCalled = isProcessingBacklog;
+ }
+
+ public boolean isProcessingBacklogCalled() {
+ return isProcessingBacklogCalled;
+ }
Review Comment:
The method name `isProcessingBacklogCalled()` is misleading. It suggests it
returns whether the method was called, but it actually returns the last value
that was passed to `setIsProcessingBacklog()`. This naming could confuse
developers maintaining this test.
Consider renaming to `getIsProcessingBacklog()` or
`getProcessingBacklogState()` to accurately reflect that it returns the current
state value, not whether a method was called.
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java:
##########
@@ -517,10 +581,34 @@ private FlinkRecordsWithSplitIds forBoundedSplitRecords(
flinkSourceReaderMetrics);
}
+ private void sendBacklogFinishedEvent(Set<TableBucket>
backlogFinishedTbls) {
+ if (backlogFinishedTbls.isEmpty() || context == null) {
+ String msg =
+ backlogFinishedTbls.isEmpty()
+ ? String.format(
+ "No table bucket finished backlog phase
for tableId = %s",
+ table.getTableInfo().getTableId())
+ : "context is null then no operator event could be
sent ";
+ LOG.warn(msg);
+ return;
+ }
Review Comment:
The warning message is misleading when `backlogFinishedTbls` is empty. This
method is called multiple times during normal operation (e.g., from lines 196,
546, etc.), and it's expected that the set might be empty in some cases. The
warning suggests something is wrong, but empty sets are a valid scenario.
Consider either removing the warning when the set is empty, or changing it
to a debug-level log since this is normal behavior, not a warning condition.
```suggestion
if (context == null) {
LOG.warn("context is null then no operator event could be sent
");
return;
}
if (backlogFinishedTbls.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"No table bucket finished backlog phase for tableId
= {}",
table.getTableInfo().getTableId());
}
return;
}
```
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java:
##########
@@ -759,6 +760,52 @@ void testPartitionsExpiredInFlussButExistInLake(
}
}
+ @Test
+ void testReportBacklogStatus() throws Throwable {
Review Comment:
The test name `testReportBacklogStatus` doesn't accurately describe what
it's testing. The test actually verifies that the enumerator correctly handles
BacklogFinishEvent messages and updates the processing backlog status, which is
more about event handling and state management rather than "reporting" status
(which typically means publishing/exposing status to external systems).
Consider renaming to something more descriptive like
`testHandleBacklogFinishEventAndUpdateStatus` or
`testBacklogStatusTransitionOnEventCompletion` to better reflect the actual
test behavior.
```suggestion
void testHandleBacklogFinishEventAndUpdateStatus() throws Throwable {
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java:
##########
@@ -159,12 +186,14 @@ public RecordsWithSplitIds<RecordAndPos> fetch() throws
IOException {
return forBoundedSplitRecords(currentBoundedSplit,
recordIterator);
}
} else {
+ sendBacklogFinishedEvent(onlySnapshotBuckets);
// may need to finish empty log splits
if (!emptyLogSplits.isEmpty()) {
FlinkRecordsWithSplitIds records =
new FlinkRecordsWithSplitIds(
new HashSet<>(emptyLogSplits),
flinkSourceReaderMetrics);
emptyLogSplits.clear();
+ sendBacklogFinishedEvent(subscribedBuckets.keySet());
Review Comment:
Backlog finish events are being sent for all subscribed buckets when
processing empty log splits, but this could include buckets that have backlog
offsets and haven't reached them yet. This could cause the enumerator to
prematurely mark backlog as finished for buckets that are still processing
historical data.
Consider filtering to only send events for buckets that either don't have a
backlog marked offset or have already sent their backlog finish event. For
example, filter `subscribedBuckets.keySet()` to exclude buckets present in
`backlogMarkedOffsets` but not in `backlogEventSentTbls`.
```suggestion
// Only send backlog finish events for buckets that either
don't have a
// backlog marked offset or have already sent their backlog
finish event.
Set<TableBucket> bucketsToNotify = new
HashSet<>(subscribedBuckets.keySet());
bucketsToNotify.removeIf(
bucket ->
backlogMarkedOffsets.containsKey(bucket)
&&
!backlogEventSentTbls.contains(bucket));
if (!bucketsToNotify.isEmpty()) {
sendBacklogFinishedEvent(bucketsToNotify);
}
```
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumerator.java:
##########
@@ -307,6 +315,18 @@ public void start() {
}
}
+ private void initializeBacklog() {
+ context.setIsProcessingBacklog(true);
+ hasBacklogTbls.clear();
+ try {
+ recordBacklogBoundaryOffsets();
+ } catch (Exception e) {
+ throw new FlinkRuntimeException(
+ String.format("Failed to record initial end offsets for
table: %s", tablePath),
+ ExceptionUtils.stripCompletionException(e));
+ }
+ }
Review Comment:
When the table is empty or all buckets have no data (offset <= 0),
`hasBacklogTbls` will be empty after `recordBacklogBoundaryOffsets()`
completes. However, `isProcessingBacklog` is set to `true` at line 319 and
remains true until a BacklogFinishEvent is received. For tables with no backlog
data, `isProcessingBacklog` should be set to `false` immediately after checking
backlog boundaries.
Consider adding a check after `recordBacklogBoundaryOffsets()`: if
`hasBacklogTbls.isEmpty()`, set `context.setIsProcessingBacklog(false)`
immediately rather than waiting for events that may never come (or come from
buckets that were never tracked).
##########
fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/source/reader/FlinkSourceSplitReader.java:
##########
@@ -483,6 +542,11 @@ public String next() {
finishedSplits,
flinkSourceReaderMetrics);
stoppingOffsets.forEach(recordsWithSplitIds::setTableBucketStoppingOffset);
+ if (scanRecords.isEmpty()) {
+ sendBacklogFinishedEvent(subscribedBuckets.keySet());
Review Comment:
Backlog finish events are being sent for all subscribed buckets when scan
records are empty, but empty scan results can occur transiently (e.g., during a
poll timeout) and don't necessarily mean all buckets have finished their
backlog. This could cause the enumerator to prematurely mark backlog as
finished for buckets that still have historical data to process but temporarily
returned no records.
Consider only sending backlog finish events for buckets that don't have a
backlog marked offset tracked in `backlogMarkedOffsets`. Buckets with tracked
backlog offsets should only send finish events when they reach their backlog
boundary through the normal path (lines 507-510).
```suggestion
// Only send backlog finished events for buckets that do not
have a tracked
// backlog offset. Buckets with tracked backlog offsets should
only send
// backlog finished events when they reach their backlog
boundary through
// the normal path (see handling around backlogFinishedTbl).
Set<TableBucket> bucketsWithoutBacklogMark =
new HashSet<>(subscribedBuckets.keySet());
bucketsWithoutBacklogMark.removeAll(backlogMarkedOffsets.keySet());
if (!bucketsWithoutBacklogMark.isEmpty()) {
sendBacklogFinishedEvent(bucketsWithoutBacklogMark);
}
```
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/source/enumerator/FlinkSourceEnumeratorTest.java:
##########
@@ -759,6 +760,52 @@ void testPartitionsExpiredInFlussButExistInLake(
}
}
+ @Test
+ void testReportBacklogStatus() throws Throwable {
+ long tableId = createTable(DEFAULT_TABLE_PATH,
DEFAULT_PK_TABLE_DESCRIPTOR);
+ int numSubtasks = 3;
+
+ try (MockBacklogSplitEnumeratorContext context =
+ new MockBacklogSplitEnumeratorContext(numSubtasks)) {
+ FlinkSourceEnumerator enumerator =
+ new FlinkSourceEnumerator(
+ DEFAULT_TABLE_PATH,
+ flussConf,
+ true,
+ false,
+ context,
+ OffsetsInitializer.full(),
+ DEFAULT_SCAN_PARTITION_DISCOVERY_INTERVAL_MS,
+ streaming,
+ null,
+ null);
+ enumerator.start();
+ assertThat(context.isProcessingBacklogCalled()).isTrue();
+
+ // Register readers
+ for (int i = 0; i < numSubtasks; i++) {
+ registerReader(context, enumerator, i);
+ }
+
+ context.runNextOneTimeCallable();
+
+ // Simulate BacklogFinishEvent from reader
+ TableBucket bucket0 = new TableBucket(tableId, 0);
+ TableBucket bucket1 = new TableBucket(tableId, 1);
+ TableBucket bucket2 = new TableBucket(tableId, 2);
+
+ BacklogFinishEvent event0 = new BacklogFinishEvent(bucket0);
+ BacklogFinishEvent event1 = new BacklogFinishEvent(bucket1);
+ BacklogFinishEvent event2 = new BacklogFinishEvent(bucket2);
+
+ enumerator.handleSourceEvent(0, event0);
+ enumerator.handleSourceEvent(1, event1);
+ enumerator.handleSourceEvent(2, event2);
+
+ assertThat(context.isProcessingBacklogCalled()).isFalse();
+ }
+ }
Review Comment:
This test creates an empty table (no data inserted) and expects to receive
BacklogFinishEvents for all buckets. However, for an empty table, no buckets
will have backlog offsets tracked in `hasBacklogTbls` (see line 1102 in
FlinkSourceEnumerator - only buckets with offset > 0 are tracked). The test
currently passes because the reader sends BacklogFinishEvents for all
subscribed buckets in certain scenarios (lines 196, 546), but these are being
flagged as bugs that incorrectly send events for buckets with active backlog.
Consider either: (1) inserting data into the table so buckets have actual
backlog to track, or (2) explicitly testing the empty-table scenario as a
separate test case that verifies `isProcessingBacklog` is set to false
immediately when no buckets have backlog.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]