github-actions[bot] commented on code in PR #63079:
URL: https://github.com/apache/doris/pull/63079#discussion_r3249036074
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -135,22 +152,26 @@ public String getSourceType() {
@Override
public Offset getNextOffset(StreamingJobProperties jobProps, Map<String,
String> properties) {
- JdbcOffset nextOffset = new JdbcOffset();
- if (!remainingSplits.isEmpty()) {
- int splitsNum = Math.min(remainingSplits.size(),
snapshotParallelism);
- List<SnapshotSplit> snapshotSplits = new
ArrayList<>(remainingSplits.subList(0, splitsNum));
- nextOffset.setSplits(snapshotSplits);
- return nextOffset;
- } else if (currentOffset != null && currentOffset.snapshotSplit()) {
- // initial mode: snapshot to binlog
- // snapshot-only mode must be intercepted by hasReachedEnd()
before reaching here
- BinlogSplit binlogSplit = new BinlogSplit();
- binlogSplit.setFinishedSplits(finishedSplits);
- nextOffset.setSplits(Collections.singletonList(binlogSplit));
- return nextOffset;
- } else {
- // only binlog
- return currentOffset == null ? new
JdbcOffset(Collections.singletonList(new BinlogSplit())) : currentOffset;
+ synchronized (splitsLock) {
+ JdbcOffset nextOffset = new JdbcOffset();
+ if (!remainingSplits.isEmpty()) {
+ int splitsNum = Math.min(remainingSplits.size(),
snapshotParallelism);
+ List<SnapshotSplit> snapshotSplits = new
ArrayList<>(remainingSplits.subList(0, splitsNum));
+ nextOffset.setSplits(snapshotSplits);
+ return nextOffset;
+ } else if (currentOffset != null && currentOffset.snapshotSplit()
&& noMoreSplits()) {
+ // initial mode: snapshot to binlog. noMoreSplits() guards
against switching while
+ // splitting is still in progress (remainingSplits empty
doesn't mean fully cut).
+ // snapshot-only mode is intercepted by hasReachedEnd() before
reaching here.
+ BinlogSplit binlogSplit = new BinlogSplit();
+ binlogSplit.setFinishedSplits(new ArrayList<>(finishedSplits));
+ nextOffset.setSplits(Collections.singletonList(binlogSplit));
+ return nextOffset;
+ } else {
+ // only binlog
+ return currentOffset == null
+ ? new JdbcOffset(Collections.singletonList(new
BinlogSplit())) : currentOffset;
Review Comment:
This fallback can still start a binlog task before any snapshot split has
been produced. In the first `PENDING` scheduler tick for an
`initial`/`snapshot` job, `advanceSplitsIfNeed()` may call `advanceSplits()`,
which sets `cdcSplitProgress.currentSplittingTable` and then returns if
`/api/fetchSplits` yields an empty batch. `handlePendingState()` still creates
the first streaming task, and because `currentOffset == null` and
`remainingSplits` is empty, this branch returns an empty `BinlogSplit`. That
task can then read from the binlog without the initial snapshot ever being
scheduled. This is distinct from the already-raised post-commit reuse issue: it
happens before the first snapshot task exists. Please make snapshot modes
return a non-consumable/defer state (or have the scheduler skip task creation)
while `checkNeedSplitChunks(sourceProperties)` is true, `remainingSplits` is
empty, and `noMoreSplits()` is false.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]