github-actions[bot] commented on code in PR #64423:
URL: https://github.com/apache/doris/pull/64423#discussion_r3395273092
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -591,7 +602,21 @@ public void writeRecords(WriteRecordRequest
writeRecordRequest) throws Exception
writeRecordRequest.getTaskId());
} finally {
- cleanupReaderResources(sourceReader,
writeRecordRequest.getJobId(), readResult);
+ stillOwner =
+ Env.getCurrentEnv()
+ .isOwner(writeRecordRequest.getJobId(),
writeRecordRequest.getTaskId());
+ // A displaced task must not touch the reader (finishSplitRecords
would kill the
+ // successor's fetcher) nor commit anything.
+ if (stillOwner) {
+ cleanupReaderResources(sourceReader,
writeRecordRequest.getJobId(), readResult);
+ }
+ }
+ if (!stillOwner) {
+ LOG.info(
Review Comment:
This early return leaves the job-scoped `DorisBatchStreamLoad` in
`batchStreamLoadMap` untouched. The path is reachable after a task has already
called `writeRecord()` and then loses ownership because FE
released/rebuilt/rebound the reader; in that case the old task skips
`forceFlush()` and `commitOffset()`, but its rows can still remain in
`bufferMap`/`flushQueue`. The next task reuses the same loader in
`getOrCreateBatchStreamLoad()` and only clears `LoadStatistic`, so it can flush
the previous task's uncommitted rows under the new task while FE never advanced
the old offset. Please discard or isolate the stream-load buffers when skipping
commit, without closing a successor task's active loader.
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java:
##########
@@ -646,6 +657,18 @@ offsetProvider, getConvertedSourceProperties(), targetDb,
targetProperties, jobP
getCreateUser(), cloudCluster);
}
+ // Binlog phase: prefer the bound BE in selectBackend; rebind + persist on
change.
+ public Backend resolveBoundBackend() throws JobException {
+ Backend selected = StreamingJobUtils.selectBackend(cloudCluster,
boundBackendId);
Review Comment:
When the preferred BE is no longer load-available, this rebinds the job to a
new BE but does not release the live reader on the old `boundBackendId`.
`isLoadAvailable()` can be false while the BE process is still alive, and the
binlog path intentionally keeps the old cdc_client reader open across tasks.
Dispatching the next binlog task to the new BE can therefore create a second
reader for the same source/PG replication slot or duplicate MySQL binlog
consumption until the idle reaper eventually runs. Please cleanly release the
previous bound BE's reader before persisting the new binding, or prevent rebind
until that reader is known stopped.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]