github-actions[bot] commented on code in PR #64301:
URL: https://github.com/apache/doris/pull/64301#discussion_r3449782272


##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java:
##########
@@ -361,72 +365,69 @@ private String getFrontendAddress() {
         return Env.getCurrentEnv().getMasterHost() + ":" + 
Env.getCurrentEnv().getMasterHttpPort();
     }
 
-    public boolean isTimeout() {
+    // Local pre-check, no RPC: gates whether to pull real progress this tick.
+    boolean isLocalTimeout() {
+        if (startTimeMs == null) {
+            return false;
+        }
+        return System.currentTimeMillis() - lastProgressMs > 
getTaskTimeoutMs();
+    }
+
+    boolean isTimeout(StreamingTaskStatus status) {
         if (startTimeMs == null) {
             // It's still pending, waiting for scheduling.
             return false;
         }
+        long now = System.currentTimeMillis();
+        if (status != null && status.getScannedRows() > lastScannedRows) {
+            lastScannedRows = status.getScannedRows();
+            lastProgressMs = now;
+        }
         long timeoutMs = getTaskTimeoutMs();
-        long elapsed = System.currentTimeMillis() - startTimeMs;
+        long elapsed = now - lastProgressMs;
         if (elapsed > timeoutMs) {
-            log.info("Task {} timeout detected: elapsed={}ms, timeoutMs={}ms", 
taskId, elapsed, timeoutMs);
+            log.info("Task {} timeout detected: no progress for {}ms, 
timeoutMs={}ms",
+                    taskId, elapsed, timeoutMs);
             return true;
         }
         return false;
     }
 
     // Read multiplier live so config changes affect already-running tasks.
     private long getTaskTimeoutMs() {
-        return Config.streaming_task_timeout_multiplier * 
jobProperties.getMaxIntervalSecond() * 1000L;
+        return Math.max(
+                Config.streaming_task_timeout_multiplier * 
jobProperties.getMaxIntervalSecond() * 1000L,
+                Config.streaming_task_min_timeout_sec * 1000L);
     }
 
-    /**
-     * When a task encounters a write error, it will time out.
-     * The job needs to obtain the reason for the timeout,
-     * such as a data quality error, and needs to expose it to the user.
-     */
-    public String getTimeoutReason() {
+    StreamingTaskStatus fetchTaskStatus() {
         if (runningBackendId <= 0) {
-            log.info("No running backend for task {}", runningBackendId);
-            return "";
+            return null;
         }
         Backend backend = 
Env.getCurrentSystemInfo().getBackend(runningBackendId);
+        if (backend == null) {
+            return null;
+        }
         try {

Review Comment:
   [P2] Please keep a fallback to `/api/getFailReason/{taskId}` here for 
rolling upgrades. At the base version the cdc_client only exposes 
`/api/getFailReason` and `writeRecordsAsync()` records async stream-load 
failures in `taskErrorMaps`; it does not have `/api/getTaskStatus` or the new 
push endpoint. With a new FE talking to an old BE/cdc_client, this RPC returns 
non-OK/null, `processTimeoutTasks()` treats `status == null` as a generic 
timeout, and the real stream-load/data-quality error is never read from 
`taskErrorMaps`. Please call the old fail-reason endpoint when `getTaskStatus` 
is unavailable/non-OK before defaulting to `"task failed cause timeout"`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to