This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new c4d0e1e6935 branch-2.1: [fix](job scheduler) specifies both startTime
and immediate, it will trigger one fewer task execution #50624 (#50897)
c4d0e1e6935 is described below
commit c4d0e1e6935f3a83a8ee4d9d254c19b7f8b09a2b
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed May 14 22:59:48 2025 +0800
branch-2.1: [fix](job scheduler) specifies both startTime and immediate, it
will trigger one fewer task execution #50624 (#50897)
Cherry-picked from #50624
Co-authored-by: zhangdong <[email protected]>
---
.../doris/job/base/JobExecutionConfiguration.java | 10 +--
.../job/base/JobExecutionConfigurationTest.java | 11 ++-
.../data/mtmv_p0/test_immediate_starttime_mtmv.out | Bin 0 -> 134 bytes
.../mtmv_p0/test_immediate_starttime_mtmv.groovy | 82 +++++++++++++++++++++
4 files changed, 95 insertions(+), 8 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
index 80e8b0cf5e3..629c88c19b5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
@@ -136,12 +136,6 @@ public class JobExecutionConfiguration {
}
long intervalValue =
timerDefinition.getIntervalUnit().getIntervalMs(timerDefinition.getInterval());
long jobStartTimeMs = timerDefinition.getStartTimeMs();
- if (isImmediate()) {
- jobStartTimeMs += intervalValue;
- if (jobStartTimeMs > endTimeMs) {
- return delayTimeSeconds;
- }
- }
return getExecutionDelaySeconds(startTimeMs, endTimeMs,
jobStartTimeMs,
intervalValue, currentTimeMs);
}
@@ -171,6 +165,10 @@ public class JobExecutionConfiguration {
long firstTriggerTime = windowStartTimeMs + (intervalMs -
((windowStartTimeMs - startTimeMs)
% intervalMs)) % intervalMs;
+ // should filter result which smaller than start time
+ if (firstTriggerTime < startTimeMs) {
+ firstTriggerTime = startTimeMs;
+ }
if (firstTriggerTime < currentTimeMs) {
// Calculate how many intervals to add to get the largest trigger
time < currentTimeMs
long intervalsToAdd = (currentTimeMs - firstTriggerTime) /
intervalMs;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
index fb0600b281f..8196c000959 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
@@ -53,13 +53,20 @@ public class JobExecutionConfigurationTest {
configuration.setExecuteType(JobExecuteType.RECURRING);
TimerDefinition timerDefinition = new TimerDefinition();
- timerDefinition.setStartTimeMs(100000L); // Start time set to 1 second
in the future
- timerDefinition.setInterval(10L); // Interval set to 10 milliseconds
+ timerDefinition.setStartTimeMs(700000L); // Start time set to 700
second in the future
+ timerDefinition.setInterval(10L); // Interval set to 10 minute
timerDefinition.setIntervalUnit(IntervalUnit.MINUTE);
configuration.setTimerDefinition(timerDefinition);
List<Long> delayTimes = configuration.getTriggerDelayTimes(
0L, 0L, 1100000L);
+ // test should filter result which smaller than start time
+ Assertions.assertEquals(1, delayTimes.size());
+ Assertions.assertArrayEquals(new Long[]{700L}, delayTimes.toArray());
+
+ timerDefinition.setStartTimeMs(100000L); // Start time set to 100
second in the future
+ delayTimes = configuration.getTriggerDelayTimes(
+ 0L, 0L, 1100000L);
Assertions.assertEquals(2, delayTimes.size());
Assertions.assertArrayEquals(new Long[]{100L, 700L},
delayTimes.toArray());
diff --git a/regression-test/data/mtmv_p0/test_immediate_starttime_mtmv.out
b/regression-test/data/mtmv_p0/test_immediate_starttime_mtmv.out
new file mode 100644
index 00000000000..79ac76f677c
Binary files /dev/null and
b/regression-test/data/mtmv_p0/test_immediate_starttime_mtmv.out differ
diff --git
a/regression-test/suites/mtmv_p0/test_immediate_starttime_mtmv.groovy
b/regression-test/suites/mtmv_p0/test_immediate_starttime_mtmv.groovy
new file mode 100644
index 00000000000..8732eb20fda
--- /dev/null
+++ b/regression-test/suites/mtmv_p0/test_immediate_starttime_mtmv.groovy
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+import java.time.Instant;
+import java.time.ZoneId;
+import org.junit.Assert;
+
+suite("test_immediate_starttime_mtmv","mtmv") {
+ String suiteName = "test_immediate_starttime_mtmv"
+ String tableName = "${suiteName}_table"
+ String mvName = "${suiteName}_mv"
+
+ sql """drop table if exists `${tableName}`"""
+ sql """drop materialized view if exists ${mvName};"""
+
+ sql """
+ CREATE TABLE ${tableName}
+ (
+ k2 INT,
+ k3 varchar(32)
+ )
+ DISTRIBUTED BY HASH(k2) BUCKETS 2
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+ sql """
+ insert into ${tableName} values (2,1),(2,2);
+ """
+ def currentMs = System.currentTimeMillis() + 10000;
+ def dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentMs),
ZoneId.systemDefault());
+ def formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ def startTime= dateTime.format(formatter);
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ REFRESH AUTO ON SCHEDULE EVERY 1 DAY STARTS '${startTime}'
+ DISTRIBUTED BY hash(k2) BUCKETS 2
+ PROPERTIES (
+ 'replication_num' = '1'
+ )
+ AS
+ SELECT * from ${tableName};
+ """
+ Thread.sleep(20000)
+ order_qt_immediate "SELECT count(*) from tasks('type'='mv') where
MvName='${mvName}'"
+
+ sql """drop materialized view if exists ${mvName};"""
+ currentMs = System.currentTimeMillis() + 10000;
+ dateTime = LocalDateTime.ofInstant(Instant.ofEpochMilli(currentMs),
ZoneId.systemDefault());
+ formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ startTime= dateTime.format(formatter);
+ sql """
+ CREATE MATERIALIZED VIEW ${mvName}
+ build deferred REFRESH AUTO ON SCHEDULE EVERY 1 DAY STARTS
'${startTime}'
+ DISTRIBUTED BY hash(k2) BUCKETS 2
+ PROPERTIES (
+ 'replication_num' = '1'
+ )
+ AS
+ SELECT * from ${tableName};
+ """
+ Thread.sleep(20000)
+ order_qt_deferred "SELECT count(*) from tasks('type'='mv') where
MvName='${mvName}'"
+ sql """drop table if exists `${tableName}`"""
+ sql """drop materialized view if exists ${mvName};"""
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]