This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 95ffc7a65fc branch-2.1: [fix](job)Fix millisecond offset issue in time
window scheduling trigger time calculation #45176 (#45353)
95ffc7a65fc is described below
commit 95ffc7a65fcfd80557d4438dac4e76e9d04a558e
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Dec 12 19:09:15 2024 +0800
branch-2.1: [fix](job)Fix millisecond offset issue in time window
scheduling trigger time calculation #45176 (#45353)
Cherry-picked from #45176
Co-authored-by: Calvin Kirs <[email protected]>
---
.../main/java/org/apache/doris/common/util/TimeUtils.java | 11 +++++++++++
.../apache/doris/job/base/JobExecutionConfiguration.java | 2 +-
.../java/org/apache/doris/job/base/TimerDefinition.java | 7 ++++++-
.../java/org/apache/doris/job/scheduler/JobScheduler.java | 13 +++++++++----
.../doris/job/base/JobExecutionConfigurationTest.java | 7 +++++++
5 files changed, 34 insertions(+), 6 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
index e7066846c30..d88971a6e72 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/TimeUtils.java
@@ -257,6 +257,17 @@ public class TimeUtils {
return d.getTime();
}
+ /**
+ * Converts a millisecond timestamp to a second-level timestamp.
+ *
+ * @param timestamp The millisecond timestamp to be converted.
+ * @return The timestamp rounded to the nearest second (in milliseconds).
+ */
+ public static long convertToSecondTimestamp(long timestamp) {
+ // Divide by 1000 to convert to seconds, then multiply by 1000 to
return to milliseconds with no fractional part
+ return (timestamp / 1000) * 1000;
+ }
+
public static long timeStringToLong(String timeStr, TimeZone timeZone) {
DateTimeFormatter dateFormatTimeZone = getDatetimeFormatWithTimeZone();
dateFormatTimeZone.withZone(timeZone.toZoneId());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
index 4c6ef4d2037..d564b114312 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/base/JobExecutionConfiguration.java
@@ -155,7 +155,7 @@ public class JobExecutionConfiguration {
return 0L;
}
- return (startTimeMs - currentTimeMs) / 1000;
+ return (startTimeMs * 1000 / 1000 - currentTimeMs) / 1000;
}
// Returns a list of delay times in seconds for executing the job within
the specified window
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java
index 9068a18f693..96181877b9a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/TimerDefinition.java
@@ -17,6 +17,7 @@
package org.apache.doris.job.base;
+import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.job.common.IntervalUnit;
import com.google.gson.annotations.SerializedName;
@@ -40,11 +41,15 @@ public class TimerDefinition {
public void checkParams() {
if (null == startTimeMs) {
- startTimeMs = System.currentTimeMillis() +
intervalUnit.getIntervalMs(interval);
+ long currentTimeMs =
TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
+ startTimeMs = currentTimeMs + intervalUnit.getIntervalMs(interval);
}
if (null != endTimeMs && endTimeMs < startTimeMs) {
throw new IllegalArgumentException("endTimeMs must be greater than
the start time");
}
+ if (null != endTimeMs) {
+ endTimeMs = TimeUtils.convertToSecondTimestamp(endTimeMs);
+ }
if (null != intervalUnit) {
if (null == interval) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index 921f333791c..2bd6fc04dac 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -84,7 +84,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C>
implements Closeable {
taskDisruptorGroupManager = new TaskDisruptorGroupManager();
taskDisruptorGroupManager.init();
this.timerJobDisruptor =
taskDisruptorGroupManager.getDispatchDisruptor();
- latestBatchSchedulerTimerTaskTimeMs = System.currentTimeMillis();
+ long currentTimeMs =
TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
+ latestBatchSchedulerTimerTaskTimeMs = currentTimeMs;
batchSchedulerTimerJob();
cycleSystemSchedulerTasks();
}
@@ -94,7 +95,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C>
implements Closeable {
* Jobs will be re-registered after the task is completed
*/
private void cycleSystemSchedulerTasks() {
- log.info("re-register system scheduler timer tasks" +
TimeUtils.longToTimeString(System.currentTimeMillis()));
+ log.info("re-register system scheduler timer tasks, time is " +
TimeUtils
+ .longToTimeStringWithms(System.currentTimeMillis()));
timerTaskScheduler.newTimeout(timeout -> {
batchSchedulerTimerJob();
cycleSystemSchedulerTasks();
@@ -144,7 +146,9 @@ public class JobScheduler<T extends AbstractJob<?, C>, C>
implements Closeable {
private void cycleTimerJobScheduler(T job, long startTimeWindowMs) {
- List<Long> delaySeconds =
job.getJobConfig().getTriggerDelayTimes(System.currentTimeMillis(),
+ long currentTimeMs =
TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
+ startTimeWindowMs =
TimeUtils.convertToSecondTimestamp(startTimeWindowMs);
+ List<Long> delaySeconds =
job.getJobConfig().getTriggerDelayTimes(currentTimeMs,
startTimeWindowMs, latestBatchSchedulerTimerTaskTimeMs);
if (CollectionUtils.isEmpty(delaySeconds)) {
log.info("skip job {} scheduler timer job, delay seconds is
empty", job.getJobName());
@@ -190,7 +194,8 @@ public class JobScheduler<T extends AbstractJob<?, C>, C>
implements Closeable {
long lastTimeWindowMs = latestBatchSchedulerTimerTaskTimeMs;
if (latestBatchSchedulerTimerTaskTimeMs < System.currentTimeMillis()) {
- this.latestBatchSchedulerTimerTaskTimeMs =
System.currentTimeMillis();
+ long currentTimeMs =
TimeUtils.convertToSecondTimestamp(System.currentTimeMillis());
+ this.latestBatchSchedulerTimerTaskTimeMs = currentTimeMs;
}
this.latestBatchSchedulerTimerTaskTimeMs +=
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
log.info("execute timer job ids within last ten minutes window, last
time window is {}",
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
index cce0a93c01d..163b2494189 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/job/base/JobExecutionConfigurationTest.java
@@ -75,7 +75,14 @@ public class JobExecutionConfigurationTest {
timerDefinition.setInterval(1L);
Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second *
5 + 10L, second * 3, second * 7).size());
Assertions.assertEquals(3, configuration.getTriggerDelayTimes(second *
5, second * 5, second * 7).size());
+ timerDefinition.setStartTimeMs(1672531200000L);
+ timerDefinition.setIntervalUnit(IntervalUnit.MINUTE);
+ timerDefinition.setInterval(1L);
+ Assertions.assertArrayEquals(new Long[]{0L},
configuration.getTriggerDelayTimes(1672531800000L, 1672531200000L,
1672531800000L).toArray());
+
+ List<Long> expectDelayTimes =
configuration.getTriggerDelayTimes(1672531200000L, 1672531200000L,
1672531850000L);
+ Assertions.assertArrayEquals(new Long[]{0L, 60L, 120L, 180L, 240L,
300L, 360L, 420L, 480L, 540L, 600L}, expectDelayTimes.toArray());
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]