This is an automated email from the ASF dual-hosted git repository.

liaoxin01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new cf7a5880147 [fix](streaming-job) recompute derived fields after replay 
and ALTER (#62936)
cf7a5880147 is described below

commit cf7a5880147e9333786021b35557a9daa7554c1f
Author: wudi <[email protected]>
AuthorDate: Thu May 14 23:10:18 2026 +0800

    [fix](streaming-job) recompute derived fields after replay and ALTER 
(#62936)
    
    ### What problem does this PR solve?
    
    Problem Summary:
    
    `StreamingInsertJob` initializes two derived fields from
    `jobProperties.max_interval` in `init()`:
    - `sampleWindowMs` = `max_interval * 10 * 1000` — used by
    `checkDataQuality()` for the `load.max_filter_ratio` time window
    - `jobConfig.timerDefinition.interval` = `max_interval` — used by
    `JobScheduler` to compute next trigger time
    
    Neither is persisted in the gson image, and neither is refreshed in two
    paths:
    
    1. **gson replay (`gsonPostProcess`)**: after FE checkpoint restart,
    `sampleWindowMs` stays at default `0`. The time-window check `(now -
    sampleStartTime) > sampleWindowMs` is then always true, so the sample
    window expires on every commit. The window-accumulation contract used by
    `load.max_filter_ratio` degrades to single-batch judgment, and a job
    recovered from image can be wrongly paused on a small bad batch that
    should be diluted by the surrounding window.
    
    2. **ALTER PROPERTIES (`modifyPropertiesInternal`)**: changing
    `max_interval` only updates `properties` and `jobProperties`. Neither
    `sampleWindowMs` nor `timerDefinition.interval` is refreshed. The
    scheduler keeps reading the old interval (the new value never reaches
    `JobExecutionConfiguration.getTriggerDelayTimes`), so ALTER
    `max_interval` never takes effect — not even after FE restart, since
    image carries the stale `interval` too.
    
    ### Fix
    
    Extract a single `recomputeDerivedFields()` that re-derives all
    transient state from `jobProperties`:
    - `sampleWindowMs = maxIntervalSec * 10 * 1000`
    - `timerDefinition.interval = maxIntervalSec`
    - reset `sampleStartTime` / `sampleWindowScannedRows` /
    `sampleWindowFilteredRows`
    
    Call it at every entry point where `jobProperties` is rebuilt:
    - `init()` (job creation)
    - `gsonPostProcess()` (image replay)
    - `modifyPropertiesInternal()` (ALTER PROPERTIES)
    
    Resetting the sample counters on ALTER is intentional: changing
    `max_interval` redefines the window itself, so accumulated counts from
    the old window have no meaningful interpretation in the new one.
    
    ### Release note
    
    Fix streaming insert job sample window and scheduler interval not being
    restored after FE checkpoint replay or ALTER PROPERTIES.
---
 .../insert/streaming/StreamingInsertJob.java       | 20 +++++++-
 .../StreamingInsertJobCheckDataQualityTest.java    | 54 ++++++++++++++++++++++
 2 files changed, 72 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index f0fd31623fa..e86e7fcda64 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -297,21 +297,35 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         try {
             this.jobProperties = new StreamingJobProperties(properties);
             jobProperties.validate();
-            this.sampleWindowMs = jobProperties.getMaxIntervalSecond() * 10 * 
1000;
             resolveCloudCluster();
             // build time definition
             JobExecutionConfiguration execConfig = getJobConfig();
             TimerDefinition timerDefinition = new TimerDefinition();
-            timerDefinition.setInterval(jobProperties.getMaxIntervalSecond());
             timerDefinition.setIntervalUnit(IntervalUnit.SECOND);
             
timerDefinition.setStartTimeMs(execConfig.getTimerDefinition().getStartTimeMs());
             execConfig.setTimerDefinition(timerDefinition);
+            recomputeDerivedFields();
         } catch (AnalysisException ae) {
             log.warn("parse streaming insert job failed, props: {}", 
properties, ae);
             throw new RuntimeException(ae.getMessage());
         }
     }
 
+    private void recomputeDerivedFields() {
+        if (jobProperties == null) {
+            return;
+        }
+        long maxIntervalSec = jobProperties.getMaxIntervalSecond();
+        this.sampleWindowMs = maxIntervalSec * 10 * 1000;
+        JobExecutionConfiguration execConfig = getJobConfig();
+        if (execConfig != null && execConfig.getTimerDefinition() != null) {
+            execConfig.getTimerDefinition().setInterval(maxIntervalSec);
+        }
+        this.sampleStartTime = System.currentTimeMillis();
+        this.sampleWindowScannedRows = 0L;
+        this.sampleWindowFilteredRows = 0L;
+    }
+
     private void resolveCloudCluster() throws AnalysisException {
         String requested = validateComputeGroupProperty(properties);
         if (requested != null) {
@@ -938,6 +952,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         }
         this.properties.putAll(inputProperties);
         this.jobProperties = new StreamingJobProperties(this.properties);
+        recomputeDerivedFields();
     }
 
     private void resetCloudProgress(Offset offset) throws JobException {
@@ -1317,6 +1332,7 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         if (jobProperties == null && properties != null) {
             jobProperties = new StreamingJobProperties(properties);
         }
+        recomputeDerivedFields();
 
         if (null == getSucceedTaskCount()) {
             setSucceedTaskCount(new AtomicLong(0));
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobCheckDataQualityTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobCheckDataQualityTest.java
index a0ca82075d4..06c5e1a5c75 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobCheckDataQualityTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJobCheckDataQualityTest.java
@@ -18,6 +18,8 @@
 package org.apache.doris.job.extensions.insert.streaming;
 
 import org.apache.doris.common.jmockit.Deencapsulation;
+import org.apache.doris.job.base.JobExecutionConfiguration;
+import org.apache.doris.job.base.TimerDefinition;
 import org.apache.doris.job.cdc.request.CommitOffsetRequest;
 import org.apache.doris.job.common.JobStatus;
 import org.apache.doris.job.exception.JobException;
@@ -144,6 +146,58 @@ public class StreamingInsertJobCheckDataQualityTest {
         Assert.assertTrue((long) Deencapsulation.getField(job, 
"sampleStartTime") > oldStartTime);
     }
 
+    private static StreamingInsertJob jobWithProperties(String maxIntervalSec) 
{
+        StreamingInsertJob job = 
Deencapsulation.newInstance(StreamingInsertJob.class);
+        Map<String, String> props = new HashMap<>();
+        props.put(StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY, 
maxIntervalSec);
+        Deencapsulation.setField(job, "properties", props);
+        Deencapsulation.setField(job, "jobProperties", new 
StreamingJobProperties(props));
+
+        JobExecutionConfiguration cfg = new JobExecutionConfiguration();
+        cfg.setTimerDefinition(new TimerDefinition());
+        Deencapsulation.setField(job, "jobConfig", cfg);
+        return job;
+    }
+
+    @Test
+    public void testRecomputeDerivedFieldsRebuildsSampleWindowMs() throws 
Exception {
+        StreamingInsertJob job = jobWithProperties("10");
+        Deencapsulation.setField(job, "sampleWindowMs", 0L);
+        Deencapsulation.setField(job, "sampleStartTime", 0L);
+
+        Deencapsulation.invoke(job, "recomputeDerivedFields");
+
+        Assert.assertEquals(100_000L, (long) Deencapsulation.getField(job, 
"sampleWindowMs"));
+        Assert.assertTrue((long) Deencapsulation.getField(job, 
"sampleStartTime") > 0L);
+    }
+
+    @Test
+    public void testModifyPropertiesInternalRefreshesDerivedFields() throws 
Exception {
+        StreamingInsertJob job = jobWithProperties("10");
+        JobExecutionConfiguration cfg = (JobExecutionConfiguration) 
Deencapsulation.getField(job, "jobConfig");
+        cfg.getTimerDefinition().setInterval(10L);
+        Deencapsulation.setField(job, "sampleWindowMs", 100_000L);
+
+        Map<String, String> alter = new HashMap<>();
+        alter.put(StreamingJobProperties.MAX_INTERVAL_SECOND_PROPERTY, "30");
+        Deencapsulation.invoke(job, "modifyPropertiesInternal", alter);
+
+        Assert.assertEquals(300_000L, (long) Deencapsulation.getField(job, 
"sampleWindowMs"));
+        Assert.assertEquals(30L, 
cfg.getTimerDefinition().getInterval().longValue());
+    }
+
+    @Test
+    public void testRecomputeDerivedFieldsResetsSampleCounters() throws 
Exception {
+        StreamingInsertJob job = jobWithProperties("10");
+        Deencapsulation.setField(job, "sampleWindowScannedRows", 1000L);
+        Deencapsulation.setField(job, "sampleWindowFilteredRows", 100L);
+
+        Deencapsulation.invoke(job, "recomputeDerivedFields");
+
+        Assert.assertEquals(0L, (long) Deencapsulation.getField(job, 
"sampleWindowScannedRows"));
+        Assert.assertEquals(0L, (long) Deencapsulation.getField(job, 
"sampleWindowFilteredRows"));
+    }
+
     @Test
     public void testMissingMaxFilterRatioIsNoop() throws Exception {
         StreamingInsertJob job = newJob(0.10, 60_000L);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to