This is an automated email from the ASF dual-hosted git repository.
liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cf7a5880147 [fix](streaming-job) recompute derived fields after replay
and ALTER (#62936)
cf7a5880147 is described below
commit cf7a5880147e9333786021b35557a9daa7554c1f
Author: wudi <[email protected]>
AuthorDate: Thu May 14 23:10:18 2026 +0800
[fix](streaming-job) recompute derived fields after replay and ALTER
(#62936)
### What problem does this PR solve?
Problem Summary:
`StreamingInsertJob` initializes two derived fields from
`jobProperties.max_interval` in `init()`:
- `sampleWindowMs` = `max_interval * 10 * 1000` — used by
`checkDataQuality()` for the `load.max_filter_ratio` time window
- `jobConfig.timerDefinition.interval` = `max_interval` — used by
`JobScheduler` to compute next trigger time
Neither is persisted in the gson image, and neither is refreshed in two
paths:
1. **gson replay (`gsonPostProcess`)**: after FE checkpoint restart,
`sampleWindowMs` stays at default `0`. The time-window check `(now -
sampleStartTime) > sampleWindowMs` is then always true, so the sample
window expires on every commit. The window-accumulation contract used by
`load.max_filter_ratio` degrades to single-batch judgment, and a job
recovered from image can be wrongly paused on a small bad batch that
should be diluted by the surrounding window.
2. **ALTER PROPERTIES (`modifyPropertiesInternal`)**: changing
`max_interval` only updates `properties` and `jobProperties`. Neither
`sampleWindowMs` nor `timerDefinition.interval` is refreshed. The
scheduler keeps reading the old interval (the new value never reaches
`JobExecutionConfiguration.getTriggerDelayTimes`), so ALTER
`max_interval` never takes effect — not even after FE restart, since
image carries the stale `interval` too.
### Fix
Extract a single `recomputeDerivedFields()` that re-derives all
transient state from `jobProperties`:
- `sampleWindowMs = maxIntervalSec * 10 * 1000`
- `timerDefinition.interval = maxIntervalSec`
- reset `sampleStartTime` / `sampleWindowScannedRows` /
`sampleWindowFilteredRows`
Call it at every entry point where `jobProperties` is rebuilt:
- `init()` (job creation)
- `gsonPostProcess()` (image replay)
- `modifyPropertiesInternal()` (ALTER PROPERTIES)
Resetting the sample counters on ALTER is intentional: changing
`max_interval` redefines the window itself, so accumulated counts from
the old window have no meaningful interpretation in the new one.
### Release note
Fix streaming insert job sample window and scheduler interval not being
restored after FE checkpoint replay or ALTER PROPERTIES.
---
.../insert/streaming/StreamingInsertJob.java | 20 +++++++-
.../StreamingInsertJobCheckDataQualityTest.java | 54 ++++++++++++++++++++++
2 files changed, 72 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index f0fd31623fa..e86e7fcda64 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -297,21 +297,35 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
try {
this.jobProperties = new StreamingJobProperties(properties);
jobProperties.validate();
- this.sampleWindowMs = jobProperties.getMaxIntervalSecond() * 10 *
1000;
resolveCloudCluster();
// build time definition
JobExecutionConfiguration execConfig = getJobConfig();
TimerDefinition timerDefinition = new TimerDefinition();
- timerDefinition.setInterval(jobProperties.getMaxIntervalSecond());
timerDefinition.setIntervalUnit(IntervalUnit.SECOND);
timerDefinition.setStartTimeMs(execConfig.getTimerDefinition().getStartTimeMs());
execConfig.setTimerDefinition(timerDefinition);
+ recomputeDerivedFields();
} catch (AnalysisException ae) {
log.warn("parse streaming insert job failed, props: {}",
properties, ae);
throw new RuntimeException(ae.getMessage());
}
}
+ private void recomputeDerivedFields() {
+ if (jobProperties == null) {
+ return;
+ }
+ long maxIntervalSec = jobProperties.getMaxIntervalSecond();
+ this.sampleWindowMs = maxIntervalSec * 10 * 1000;
+ JobExecutionConfiguration execConfig = getJobConfig();
+ if (execConfig != null && execConfig.getTimerDefinition() != null) {
+ execConfig.getTimerDefinition().setInterval(maxIntervalSec);
+ }
+ this.sampleStartTime = System.currentTimeMillis();
+ this.sampleWindowScannedRows = 0L;
+ this.sampleWindowFilteredRows = 0L;
+ }
+
private void resolveCloudCluster() throws AnalysisException {
String requested = validateComputeGroupProperty(properties);
if (requested != null) {
@@ -938,6 +952,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
}
this.properties.putAll(inputProperties);
this.jobProperties = new StreamingJobProperties(this.properties);
+ recomputeDerivedFields();
}
private void resetCloudProgress(Offset offset) throws JobException {
@@ -1317,6 +1332,7 @@ public class StreamingInsertJob extends
AbstractJob<StreamingJobSchedulerTask, M
if (jobProperties == null && properties != null) {
jobProperties = new StreamingJobProperties(properties);
}
+ recomputeDerivedFields();
if (null == getSucceedTaskCount()) {
setSucceedTaskCount(new AtomicLong(0));
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobCheckDataQualityTest.java
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobCheckDataQualityTest.java
index a0ca82075d4..06c5e1a5c75 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobCheckDataQualityTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobCheckDataQualityTest.java
@@ -18,6 +18,8 @@
package org.apache.doris.job.extensions.insert.streaming;
import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.base.TimerDefinition;
import org.apache.doris.job.cdc.request.CommitOffsetRequest;
import org.apache.doris.job.common.JobStatus;
import org.apache.doris.job.exception.JobException;
@@ -144,6 +146,58 @@ public class StreamingInsertJobCheckDataQualityTest {
Assert.assertTrue((long) Deencapsulation.getField(job,
"sampleStartTime") > oldStartTime);
}
+ private static StreamingInsertJob jobWithProperties(String maxIntervalSec)
{
+ StreamingInsertJob job =
Deencapsulation.newInstance(StreamingInsertJob.class);
+ Map<String, String> props = new HashMap<>();
+ props.put(StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY,
maxIntervalSec);
+ Deencapsulation.setField(job, "properties", props);
+ Deencapsulation.setField(job, "jobProperties", new
StreamingJobProperties(props));
+
+ JobExecutionConfiguration cfg = new JobExecutionConfiguration();
+ cfg.setTimerDefinition(new TimerDefinition());
+ Deencapsulation.setField(job, "jobConfig", cfg);
+ return job;
+ }
+
+ @Test
+ public void testRecomputeDerivedFieldsRebuildsSampleWindowMs() throws
Exception {
+ StreamingInsertJob job = jobWithProperties("10");
+ Deencapsulation.setField(job, "sampleWindowMs", 0L);
+ Deencapsulation.setField(job, "sampleStartTime", 0L);
+
+ Deencapsulation.invoke(job, "recomputeDerivedFields");
+
+ Assert.assertEquals(100_000L, (long) Deencapsulation.getField(job,
"sampleWindowMs"));
+ Assert.assertTrue((long) Deencapsulation.getField(job,
"sampleStartTime") > 0L);
+ }
+
+ @Test
+ public void testModifyPropertiesInternalRefreshesDerivedFields() throws
Exception {
+ StreamingInsertJob job = jobWithProperties("10");
+ JobExecutionConfiguration cfg = (JobExecutionConfiguration)
Deencapsulation.getField(job, "jobConfig");
+ cfg.getTimerDefinition().setInterval(10L);
+ Deencapsulation.setField(job, "sampleWindowMs", 100_000L);
+
+ Map<String, String> alter = new HashMap<>();
+ alter.put(StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY, "30");
+ Deencapsulation.invoke(job, "modifyPropertiesInternal", alter);
+
+ Assert.assertEquals(300_000L, (long) Deencapsulation.getField(job,
"sampleWindowMs"));
+ Assert.assertEquals(30L,
cfg.getTimerDefinition().getInterval().longValue());
+ }
+
+ @Test
+ public void testRecomputeDerivedFieldsResetsSampleCounters() throws
Exception {
+ StreamingInsertJob job = jobWithProperties("10");
+ Deencapsulation.setField(job, "sampleWindowScannedRows", 1000L);
+ Deencapsulation.setField(job, "sampleWindowFilteredRows", 100L);
+
+ Deencapsulation.invoke(job, "recomputeDerivedFields");
+
+ Assert.assertEquals(0L, (long) Deencapsulation.getField(job,
"sampleWindowScannedRows"));
+ Assert.assertEquals(0L, (long) Deencapsulation.getField(job,
"sampleWindowFilteredRows"));
+ }
+
@Test
public void testMissingMaxFilterRatioIsNoop() throws Exception {
StreamingInsertJob job = newJob(0.10, 60_000L);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]