This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 97d04001d7e branch-4.1: [fix](streaming-job) start counting task max 
interval after the first record is received #63141 (#63163)
97d04001d7e is described below

commit 97d04001d7ef49a7e10589d242fd16ce8fa0dc6b
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed May 13 17:08:04 2026 +0800

    branch-4.1: [fix](streaming-job) start counting task max interval after the 
first record is received #63141 (#63163)
    
    Cherry-picked from #63141
    
    Co-authored-by: wudi <[email protected]>
---
 .../cdcclient/service/PipelineCoordinator.java     | 30 +++++++++++++++++-----
 1 file changed, 24 insertions(+), 6 deletions(-)

diff --git 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 46fabe2d418..9b2dd5d357f 100644
--- 
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++ 
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -454,15 +454,17 @@ public class PipelineCoordinator {
 
             isSnapshotSplit = 
sourceReader.isSnapshotSplit(readResult.getSplit());
             long startTime = System.currentTimeMillis();
+            long streamingStartTime = -1;
             long maxIntervalMillis = writeRecordRequest.getMaxInterval() * 
1000;
             boolean shouldStop = false;
             boolean lastMessageIsHeartbeat = false;
 
             LOG.info(
-                    "Start polling records for jobId={} taskId={}, 
isSnapshotSplit={}",
+                    "Start polling records for jobId={} taskId={}, 
isSnapshotSplit={}, maxIntervalMillis={}",
                     writeRecordRequest.getJobId(),
                     writeRecordRequest.getTaskId(),
-                    isSnapshotSplit);
+                    isSnapshotSplit,
+                    maxIntervalMillis);
 
             // 2. poll record
             while (!shouldStop) {
@@ -472,9 +474,14 @@ public class PipelineCoordinator {
                     Thread.sleep(100);
 
                     // Check if should stop
-                    long elapsedTime = System.currentTimeMillis() - startTime;
+                    long elapsedTime =
+                            streamingStartTime > 0
+                                    ? System.currentTimeMillis() - 
streamingStartTime
+                                    : 0;
                     boolean timeoutReached =
-                            maxIntervalMillis > 0 && elapsedTime >= 
maxIntervalMillis;
+                            streamingStartTime > 0
+                                    && maxIntervalMillis > 0
+                                    && elapsedTime >= maxIntervalMillis;
 
                     if (shouldStop(
                             isSnapshotSplit,
@@ -488,6 +495,15 @@ public class PipelineCoordinator {
                     continue;
                 }
 
+                if (streamingStartTime < 0) {
+                    streamingStartTime = System.currentTimeMillis();
+                    LOG.info(
+                            "Streaming phase started after {} ms setup for 
jobId={} taskId={}",
+                            streamingStartTime - startTime,
+                            writeRecordRequest.getJobId(),
+                            writeRecordRequest.getTaskId());
+                }
+
                 while (recordIterator.hasNext()) {
                     SourceRecord element = recordIterator.next();
 
@@ -501,9 +517,11 @@ public class PipelineCoordinator {
                         }
 
                         // If already timeout, stop immediately when heartbeat 
received
-                        long elapsedTime = System.currentTimeMillis() - 
startTime;
+                        long elapsedTime = System.currentTimeMillis() - 
streamingStartTime;
                         boolean timeoutReached =
-                                maxIntervalMillis > 0 && elapsedTime >= 
maxIntervalMillis;
+                                streamingStartTime > 0
+                                        && maxIntervalMillis > 0
+                                        && elapsedTime >= maxIntervalMillis;
 
                         if (!isSnapshotSplit && timeoutReached) {
                             LOG.info(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to