Copilot commented on code in PR #64423:
URL: https://github.com/apache/doris/pull/64423#discussion_r3395246701
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java:
##########
@@ -192,12 +244,63 @@ public void close(String jobId) {
}
}
+ /** Liveness evidence (FE heartbeat or active poll): keep this job's
reader alive. */
+ public void keepAlive(String jobId) {
+ JobContext context = jobContexts.get(jobId);
+ if (context != null) {
+ context.lastAliveTime = System.currentTimeMillis();
+ }
+ }
Review Comment:
keepAlive() updates lastAliveTime without taking the per-job lock, so
releaseIdleReaders() can acquire the lock, observe a stale lastAliveTime, and
reclaim/release an actively used reader. This is a real race because
keepAlive() is called from polling loops but does not synchronize with the
reaper’s tryLock().
##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java:
##########
@@ -168,13 +168,40 @@ private void sendWriteRequest() throws JobException {
log.warn("cdc_client RPC timeout api=/api/writeRecords taskId={}
jobId={} backend={}:{} timeout_sec={}",
taskId, getJobId(), backend.getHost(),
backend.getBrpcPort(),
Config.streaming_cdc_heavy_rpc_timeout_sec);
+ // the request may have been dispatched, the retry must not reuse
the reader
+ markJobNeedRebuildReader();
throw new JobException("cdc_client RPC timeout: /api/writeRecords
taskId=" + taskId);
} catch (ExecutionException | InterruptedException ex) {
log.error("Send write request failed: ", ex);
+ markJobNeedRebuildReader();
throw new JobException(ex);
}
Review Comment:
InterruptedException is caught and wrapped into JobException without
restoring the thread interrupt flag. This can break cooperative
cancellation/shutdown in the scheduler thread that runs streaming tasks.
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java:
##########
@@ -192,12 +244,63 @@ public void close(String jobId) {
}
}
+ /** Liveness evidence (FE heartbeat or active poll): keep this job's
reader alive. */
+ public void keepAlive(String jobId) {
+ JobContext context = jobContexts.get(jobId);
+ if (context != null) {
+ context.lastAliveTime = System.currentTimeMillis();
+ }
+ }
+
+ // Release (keep slot) readers FE no longer drives; maxIntervalMs<=0 =
untracked (e.g. TVF),
+ // skip.
+ private void releaseIdleReaders() {
+ long now = System.currentTimeMillis();
+ for (String jobId : jobContexts.keySet()) {
+ Lock lock = jobLocks.get(jobId);
+ if (lock == null || !lock.tryLock()) {
+ continue;
+ }
+ try {
+ JobContext context = jobContexts.get(jobId);
+ if (context == null || context.lastAliveTime <= 0 ||
context.maxIntervalMs <= 0) {
+ continue;
+ }
+ long timeout =
+ Math.max(
+ (long) Constants.IDLE_READER_TIMEOUT_MULTIPLIER
+ * context.maxIntervalMs,
+ Constants.IDLE_READER_MIN_TIMEOUT_MS);
+ if (now - context.lastAliveTime <= timeout) {
+ continue;
+ }
+ LOG.info(
+ "Releasing idle reader for job {}, idle {} ms, keep
slot",
+ jobId,
+ now - context.lastAliveTime);
+ jobContexts.remove(jobId);
+ if (context.reader != null && context.jobConfig != null) {
+ try {
+ context.reader.release(context.jobConfig);
+ } catch (Exception ex) {
Review Comment:
releaseIdleReaders() calls context.reader.release(...) while holding the
per-job lock. release() can perform network/IO and may block, which can stall
getReaderAndClaim()/detachReaderIfOwner() for the same job and cause request
pileups. Consider removing the context under the lock, then releasing the
reader outside the lock (similar to staleReader release in getReaderAndClaim).
##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -742,6 +767,10 @@ public String getTaskFailReason(String taskId) {
*/
private void cleanupReaderResources(
SourceReader sourceReader, String jobId, SplitReadResult
readResult) {
+ boolean isSnapshotSplit =
+ readResult != null
+ && readResult.getSplit() != null
+ && sourceReader.isSnapshotSplit(readResult.getSplit());
try {
Review Comment:
cleanupReaderResources() now only calls finishSplitRecords() for snapshot
splits. If prepareAndSubmitSplit throws before assigning readResult (or returns
a split but setup fails later), this method will skip finishSplitRecords() and
can leave partially-initialized readers/futures running and leaking resources.
At minimum, finishSplitRecords() should run when readResult/split is null
(unknown state).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]