github-actions[bot] commented on code in PR #63079:
URL: https://github.com/apache/doris/pull/63079#discussion_r3240901259
##########
fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java:
##########
@@ -420,13 +457,22 @@ public void validateAlterOffset(String offset) throws
Exception {
*/
@Override
public void replayIfNeed(StreamingInsertJob job) throws JobException {
+ synchronized (splitsLock) {
+ this.cachedSyncTables = job.getSyncTables();
+ }
+
String offsetProviderPersist = job.getOffsetProviderPersist();
if (offsetProviderPersist != null) {
JdbcSourceOffsetProvider replayFromPersist =
GsonUtils.GSON.fromJson(offsetProviderPersist,
JdbcSourceOffsetProvider.class);
this.binlogOffsetPersist =
replayFromPersist.getBinlogOffsetPersist();
this.chunkHighWatermarkMap =
replayFromPersist.getChunkHighWatermarkMap();
this.tableSchemas = replayFromPersist.getTableSchemas();
+ synchronized (splitsLock) {
+ this.committedSplitProgress =
replayFromPersist.getCommittedSplitProgress() != null
+ ? replayFromPersist.getCommittedSplitProgress() : new
SplitProgress();
+ this.cdcSplitProgress = this.committedSplitProgress.copy();
+ }
log.info("Replaying offset provider for job {}, binlogOffset size
{}, chunkHighWatermark size {}",
getJobId(),
binlogOffsetPersist == null ? 0 :
binlogOffsetPersist.size(),
Review Comment:
This unconditional resume step breaks replay for an `initial` job that had
already finished snapshot and persisted `binlogOffsetPersist`. In the branch
above (`MapUtils.isNotEmpty(binlogOffsetPersist)`), replay restores only
`currentOffset` as a `BinlogSplit`; `finishedSplits` and `remainingSplits` stay
empty. Then this call clears `cdcSplitProgress`, so `noMoreSplits()` sees every
`cachedSyncTables` entry as untouched and the next scheduler tick calls
`advanceSplits()`, repopulating snapshot splits even though the job is already
in binlog phase. Because `getNextOffset()` prioritizes `remainingSplits`, a
later task can re-read snapshot chunks and duplicate data. Please mark split
progress as terminal (or skip async split advancing) when replay restores a
binlog offset, and add a restart test after snapshot-to-binlog transition.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]