This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 5ea9876b6c6 branch-4.0: [fix](streaming-job) start counting task max
interval after the first record is received #63141 (#63162)
5ea9876b6c6 is described below
commit 5ea9876b6c6dd78b2a9d4d48e0184b9dd194cde3
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed May 13 17:08:57 2026 +0800
branch-4.0: [fix](streaming-job) start counting task max interval after the
first record is received #63141 (#63162)
Cherry-picked from #63141
Co-authored-by: wudi <[email protected]>
---
.../cdcclient/service/PipelineCoordinator.java | 30 +++++++++++++++++-----
1 file changed, 24 insertions(+), 6 deletions(-)
diff --git
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
index 614c506619f..003ab813829 100644
---
a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
+++
b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
@@ -248,14 +248,16 @@ public class PipelineCoordinator {
boolean isSnapshotSplit =
sourceReader.isSnapshotSplit(readResult.getSplit());
long startTime = System.currentTimeMillis();
+ long streamingStartTime = -1;
long maxIntervalMillis = writeRecordRequest.getMaxInterval() *
1000;
boolean shouldStop = false;
boolean lastMessageIsHeartbeat = false;
LOG.info(
- "Start polling records for jobId={} taskId={},
isSnapshotSplit={}",
+ "Start polling records for jobId={} taskId={},
isSnapshotSplit={}, maxIntervalMillis={}",
writeRecordRequest.getJobId(),
writeRecordRequest.getTaskId(),
- isSnapshotSplit);
+ isSnapshotSplit,
+ maxIntervalMillis);
// 2. poll record
while (!shouldStop) {
@@ -265,9 +267,14 @@ public class PipelineCoordinator {
Thread.sleep(100);
// Check if should stop
- long elapsedTime = System.currentTimeMillis() - startTime;
+ long elapsedTime =
+ streamingStartTime > 0
+ ? System.currentTimeMillis() -
streamingStartTime
+ : 0;
boolean timeoutReached =
- maxIntervalMillis > 0 && elapsedTime >=
maxIntervalMillis;
+ streamingStartTime > 0
+ && maxIntervalMillis > 0
+ && elapsedTime >= maxIntervalMillis;
if (shouldStop(
isSnapshotSplit,
@@ -281,6 +288,15 @@ public class PipelineCoordinator {
continue;
}
+ if (streamingStartTime < 0) {
+ streamingStartTime = System.currentTimeMillis();
+ LOG.info(
+ "Streaming phase started after {} ms setup for
jobId={} taskId={}",
+ streamingStartTime - startTime,
+ writeRecordRequest.getJobId(),
+ writeRecordRequest.getTaskId());
+ }
+
while (recordIterator.hasNext()) {
SourceRecord element = recordIterator.next();
@@ -294,9 +310,11 @@ public class PipelineCoordinator {
}
// If already timeout, stop immediately when heartbeat
received
- long elapsedTime = System.currentTimeMillis() -
startTime;
+ long elapsedTime = System.currentTimeMillis() -
streamingStartTime;
boolean timeoutReached =
- maxIntervalMillis > 0 && elapsedTime >=
maxIntervalMillis;
+ streamingStartTime > 0
+ && maxIntervalMillis > 0
+ && elapsedTime >= maxIntervalMillis;
if (!isSnapshotSplit && timeoutReached) {
LOG.info(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]