Copilot commented on code in PR #64423:
URL: https://github.com/apache/doris/pull/64423#discussion_r3395246701


##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java:
##########
@@ -192,12 +244,63 @@ public void close(String jobId) {
         }
     }
 
+    /** Liveness evidence (FE heartbeat or active poll): keep this job's 
reader alive. */
+    public void keepAlive(String jobId) {
+        JobContext context = jobContexts.get(jobId);
+        if (context != null) {
+            context.lastAliveTime = System.currentTimeMillis();
+        }
+    }

Review Comment:
   keepAlive() updates lastAliveTime without taking the per-job lock, so 
releaseIdleReaders() can acquire the lock, observe a stale lastAliveTime, and 
reclaim/release an actively used reader. This is a real race because 
keepAlive() is called from polling loops but does not synchronize with the 
reaper’s tryLock().



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java:
##########
@@ -168,13 +168,40 @@ private void sendWriteRequest() throws JobException {
             log.warn("cdc_client RPC timeout api=/api/writeRecords taskId={} 
jobId={} backend={}:{} timeout_sec={}",
                     taskId, getJobId(), backend.getHost(), 
backend.getBrpcPort(),
                     Config.streaming_cdc_heavy_rpc_timeout_sec);
+            // the request may have been dispatched, the retry must not reuse 
the reader
+            markJobNeedRebuildReader();
             throw new JobException("cdc_client RPC timeout: /api/writeRecords 
taskId=" + taskId);
         } catch (ExecutionException | InterruptedException ex) {
             log.error("Send write request failed: ", ex);
+            markJobNeedRebuildReader();
             throw new JobException(ex);
         }

Review Comment:
   InterruptedException is caught and wrapped into JobException without 
restoring the thread interrupt flag. This can break cooperative 
cancellation/shutdown in the scheduler thread that runs streaming tasks.



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Env.java:
##########
@@ -192,12 +244,63 @@ public void close(String jobId) {
         }
     }
 
+    /** Liveness evidence (FE heartbeat or active poll): keep this job's 
reader alive. */
+    public void keepAlive(String jobId) {
+        JobContext context = jobContexts.get(jobId);
+        if (context != null) {
+            context.lastAliveTime = System.currentTimeMillis();
+        }
+    }
+
+    // Release (keep slot) readers FE no longer drives; maxIntervalMs<=0 = 
untracked (e.g. TVF),
+    // skip.
+    private void releaseIdleReaders() {
+        long now = System.currentTimeMillis();
+        for (String jobId : jobContexts.keySet()) {
+            Lock lock = jobLocks.get(jobId);
+            if (lock == null || !lock.tryLock()) {
+                continue;
+            }
+            try {
+                JobContext context = jobContexts.get(jobId);
+                if (context == null || context.lastAliveTime <= 0 || 
context.maxIntervalMs <= 0) {
+                    continue;
+                }
+                long timeout =
+                        Math.max(
+                                (long) Constants.IDLE_READER_TIMEOUT_MULTIPLIER
+                                        * context.maxIntervalMs,
+                                Constants.IDLE_READER_MIN_TIMEOUT_MS);
+                if (now - context.lastAliveTime <= timeout) {
+                    continue;
+                }
+                LOG.info(
+                        "Releasing idle reader for job {}, idle {} ms, keep 
slot",
+                        jobId,
+                        now - context.lastAliveTime);
+                jobContexts.remove(jobId);
+                if (context.reader != null && context.jobConfig != null) {
+                    try {
+                        context.reader.release(context.jobConfig);
+                    } catch (Exception ex) {

Review Comment:
   releaseIdleReaders() calls context.reader.release(...) while holding the 
per-job lock. release() can perform network/IO and may block, which can stall 
getReaderAndClaim()/detachReaderIfOwner() for the same job and cause request 
pileups. Consider removing the context under the lock, then releasing the 
reader outside the lock (similar to staleReader release in getReaderAndClaim).



##########
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java:
##########
@@ -742,6 +767,10 @@ public String getTaskFailReason(String taskId) {
      */
     private void cleanupReaderResources(
             SourceReader sourceReader, String jobId, SplitReadResult 
readResult) {
+        boolean isSnapshotSplit =
+                readResult != null
+                        && readResult.getSplit() != null
+                        && sourceReader.isSnapshotSplit(readResult.getSplit());
         try {

Review Comment:
   cleanupReaderResources() now only calls finishSplitRecords() for snapshot 
splits. If prepareAndSubmitSplit throws before assigning readResult (or returns 
a split but setup fails later), this method will skip finishSplitRecords() and 
can leave partially-initialized readers/futures running and leaking resources. 
At minimum, finishSplitRecords() should run when readResult/split is null 
(unknown state).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to