github-actions[bot] commented on code in PR #63079:
URL: https://github.com/apache/doris/pull/63079#discussion_r3249036074


##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -135,22 +152,26 @@ public String getSourceType() {
 
     @Override
     public Offset getNextOffset(StreamingJobProperties jobProps, Map<String, 
String> properties) {
-        JdbcOffset nextOffset = new JdbcOffset();
-        if (!remainingSplits.isEmpty()) {
-            int splitsNum = Math.min(remainingSplits.size(), 
snapshotParallelism);
-            List<SnapshotSplit> snapshotSplits = new 
ArrayList<>(remainingSplits.subList(0, splitsNum));
-            nextOffset.setSplits(snapshotSplits);
-            return nextOffset;
-        } else if (currentOffset != null && currentOffset.snapshotSplit()) {
-            // initial mode: snapshot to binlog
-            // snapshot-only mode must be intercepted by hasReachedEnd() 
before reaching here
-            BinlogSplit binlogSplit = new BinlogSplit();
-            binlogSplit.setFinishedSplits(finishedSplits);
-            nextOffset.setSplits(Collections.singletonList(binlogSplit));
-            return nextOffset;
-        } else {
-            // only binlog
-            return currentOffset == null ? new 
JdbcOffset(Collections.singletonList(new BinlogSplit())) : currentOffset;
+        synchronized (splitsLock) {
+            JdbcOffset nextOffset = new JdbcOffset();
+            if (!remainingSplits.isEmpty()) {
+                int splitsNum = Math.min(remainingSplits.size(), 
snapshotParallelism);
+                List<SnapshotSplit> snapshotSplits = new 
ArrayList<>(remainingSplits.subList(0, splitsNum));
+                nextOffset.setSplits(snapshotSplits);
+                return nextOffset;
+            } else if (currentOffset != null && currentOffset.snapshotSplit() 
&& noMoreSplits()) {
+                // initial mode: snapshot to binlog. noMoreSplits() guards 
against switching while
+                // splitting is still in progress (remainingSplits empty 
doesn't mean fully cut).
+                // snapshot-only mode is intercepted by hasReachedEnd() before 
reaching here.
+                BinlogSplit binlogSplit = new BinlogSplit();
+                binlogSplit.setFinishedSplits(new ArrayList<>(finishedSplits));
+                nextOffset.setSplits(Collections.singletonList(binlogSplit));
+                return nextOffset;
+            } else {
+                // only binlog
+                return currentOffset == null
+                        ? new JdbcOffset(Collections.singletonList(new 
BinlogSplit())) : currentOffset;

Review Comment:
   This fallback can still start a binlog task before any snapshot split has 
been produced. In the first `PENDING` scheduler tick for an 
`initial`/`snapshot` job, `advanceSplitsIfNeed()` may call `advanceSplits()`, 
which sets `cdcSplitProgress.currentSplittingTable` and then returns if 
`/api/fetchSplits` yields an empty batch. `handlePendingState()` still creates 
the first streaming task, and because `currentOffset == null` and 
`remainingSplits` is empty, this branch returns an empty `BinlogSplit`. That 
task can then read from the binlog without the initial snapshot ever being 
scheduled. This is distinct from the already-raised post-commit reuse issue: it 
happens before the first snapshot task exists. Please make snapshot modes 
return a non-consumable/defer state (or have the scheduler skip task creation) 
while `checkNeedSplitChunks(sourceProperties)` is true, `remainingSplits` is 
empty, and `noMoreSplits()` is false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to