This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e88c218390 [Improve](Job)Job internal interface provides immediate
scheduling (#23735)
e88c218390 is described below
commit e88c218390cede307b593044be2f0dfebc9bd0d3
Author: Calvin Kirs <[email protected]>
AuthorDate: Fri Sep 1 12:50:08 2023 +0800
[Improve](Job)Job internal interface provides immediate scheduling (#23735)
Delete meaningless job status
System scheduling is executed in the time wheel
Optimize window calculation code
---
.../sql-reference/Show-Statements/SHOW-JOB.md | 1 -
.../sql-reference/Show-Statements/SHOW-JOB.md | 1 -
.../doris/scheduler/constants/JobStatus.java | 2 --
.../doris/scheduler/constants/SystemJob.java | 42 ----------------------
.../doris/scheduler/disruptor/TaskHandler.java | 35 +++---------------
.../java/org/apache/doris/scheduler/job/Job.java | 29 +++++++++++----
.../doris/scheduler/manager/TimerJobManager.java | 33 +++++++----------
.../scheduler/disruptor/TimerJobManagerTest.java | 12 +++++++
8 files changed, 51 insertions(+), 104 deletions(-)
diff --git a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md
b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md
index 60c1dfd1cd..ec11fd3168 100644
--- a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md
+++ b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md
@@ -71,7 +71,6 @@ Result description:
* PAUSED: Paused
* STOPPED: end (manually triggered by the user)
* FINISHED: Finished
- * WAITING_FINISH: pending completion
### Example
diff --git
a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md
b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md
index 009e2cb4e4..1b5da59908 100644
--- a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md
+++ b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md
@@ -71,7 +71,6 @@ SHOW JOBS 用于展示当前 DB 下所有作业的运行状态,SHOW JOB FOR jo
* PAUSED:暂停
* STOPPED:结束(用户手动触发)
* FINISHED: 完成
- * WAITING_FINISH: 待结束
### Example
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java
index ae204ef948..f01686a521 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java
@@ -37,8 +37,6 @@ public enum JobStatus {
*/
STOPPED,
- WAITING_FINISH,
-
/**
* When the task is finished, the finished state will be triggered.
*/
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/SystemJob.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/SystemJob.java
deleted file mode 100644
index 3428c1724f..0000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/SystemJob.java
+++ /dev/null
@@ -1,42 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.scheduler.constants;
-
-import lombok.Getter;
-
-/**
- * System scheduler event job
- * They will start when scheduler starts,don't use this job in other place,it
just for system inner scheduler
- */
-public enum SystemJob {
-
- /**
- * System cycle scheduler event job, it will start cycle scheduler
- */
- SYSTEM_SCHEDULER_JOB("system_scheduler_event_job", 1L);
-
- @Getter
- private final String description;
- @Getter
- private final Long id;
-
- SystemJob(String description, Long id) {
- this.description = description;
- this.id = id;
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
index 297537f712..0b309ec3ce 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
@@ -18,8 +18,6 @@
package org.apache.doris.scheduler.disruptor;
import org.apache.doris.catalog.Env;
-import org.apache.doris.scheduler.constants.JobStatus;
-import org.apache.doris.scheduler.constants.SystemJob;
import org.apache.doris.scheduler.exception.JobException;
import org.apache.doris.scheduler.executor.TransientTaskExecutor;
import org.apache.doris.scheduler.job.Job;
@@ -71,10 +69,6 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
*/
@Override
public void onEvent(TaskEvent event) {
- if (checkIsSystemEvent(event)) {
- onSystemEvent();
- return;
- }
switch (event.getTaskType()) {
case TimerJobTask:
onTimerJobTaskHandle(event);
@@ -97,14 +91,14 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
long jobId = taskEvent.getId();
Job job = timerJobManager.getJob(jobId);
if (job == null) {
- log.info("Event job is null, eventJobId: {}", jobId);
+ log.info("job is null, jobId: {}", jobId);
return;
}
- if (!job.isRunning() &&
!job.getJobStatus().equals(JobStatus.WAITING_FINISH)) {
- log.info("Event job is not running, eventJobId: {}", jobId);
+ if (!job.isRunning()) {
+ log.info("job is not running, eventJobId: {}", jobId);
return;
}
- log.debug("Event job is running, eventJobId: {}", jobId);
+ log.debug("job is running, eventJobId: {}", jobId);
JobTask jobTask = new JobTask(jobId);
try {
jobTask.setStartTimeMs(System.currentTimeMillis());
@@ -147,27 +141,6 @@ public class TaskHandler implements WorkHandler<TaskEvent>
{
}
}
- /**
- * Handles a system event by scheduling batch scheduler tasks.
- */
- private void onSystemEvent() {
- try {
- timerJobManager.batchSchedulerTasks();
- } catch (Exception e) {
- log.error("System batch scheduler execute failed", e);
- }
- }
-
- /**
- * Checks whether the specified event task is a system event.
- *
- * @param event The event task to be checked.
- * @return true if the event task is a system event, false otherwise.
- */
- private boolean checkIsSystemEvent(TaskEvent event) {
- return Objects.equals(event.getId(),
SystemJob.SYSTEM_SCHEDULER_JOB.getId());
- }
-
private void updateJobStatusIfPastEndTime(Job job) {
if (job.isExpired()) {
job.finish();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java
index 3d29b0b842..e65cf07754 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java
@@ -18,6 +18,7 @@
package org.apache.doris.scheduler.job;
import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.TimeUtils;
@@ -129,6 +130,14 @@ public class Job implements Writable {
@SerializedName("errMsg")
private String errMsg;
+ /**
+ * if we want to start the job immediately, we can set this flag to true.
+ * The default value is false.
+ * when we set this flag to true, the start time will be set to current
time.
+ * we don't need to serialize this field.
+ */
+ private boolean immediatelyStart = false;
+
public boolean isRunning() {
return jobStatus == JobStatus.RUNNING;
}
@@ -187,21 +196,29 @@ public class Job implements Writable {
this.jobStatus = JobStatus.STOPPED;
}
- public boolean checkJobParam() {
+ public void checkJobParam() throws DdlException {
if (startTimeMs != 0L && startTimeMs < System.currentTimeMillis()) {
- return false;
+ throw new DdlException("startTimeMs must be greater than current
time");
+ }
+ if (immediatelyStart && startTimeMs != 0L) {
+ throw new DdlException("immediately start and startTimeMs can't be
set at the same time");
+ }
+ if (immediatelyStart) {
+ startTimeMs = System.currentTimeMillis();
}
if (endTimeMs != 0L && endTimeMs < System.currentTimeMillis()) {
- return false;
+ throw new DdlException("endTimeMs must be greater than current
time");
}
if (isCycleJob && (intervalMs == null || intervalMs <= 0L)) {
- return false;
+ throw new DdlException("cycle job must set intervalMs");
}
if (null == jobCategory) {
- return false;
+ throw new DdlException("jobCategory must be set");
+ }
+ if (null == executor) {
+ throw new DdlException("Job executor must be set");
}
- return null != executor;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
index 2b3b922b37..9000dba470 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
@@ -53,10 +53,12 @@ public class TimerJobManager implements Closeable, Writable
{
private long lastBatchSchedulerTimestamp;
+ private static final long BATCH_SCHEDULER_INTERVAL_SECONDS = 600;
+
/**
- * batch scheduler interval time
+ * batch scheduler interval ms time
*/
- private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = 10 * 60
* 1000L;
+ private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS =
BATCH_SCHEDULER_INTERVAL_SECONDS * 1000L;
private boolean isClosed = false;
@@ -88,9 +90,7 @@ public class TimerJobManager implements Closeable, Writable {
}
public Long registerJob(Job job) throws DdlException {
- if (!job.checkJobParam()) {
- throw new DdlException("Job param is invalid, please check time
param");
- }
+ job.checkJobParam();
checkIsJobNameUsed(job.getDbName(), job.getJobName(),
job.getJobCategory());
jobMap.putIfAbsent(job.getJobId(), job);
initAndSchedulerJob(job);
@@ -144,9 +144,9 @@ public class TimerJobManager implements Closeable, Writable
{
Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs,
job.getStartTimeMs(),
job.getIntervalMs(), job.isCycleJob());
job.setNextExecuteTimeMs(nextExecuteTimeMs);
- if (job.getNextExecuteTimeMs() <
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + lastBatchSchedulerTimestamp) {
+ if (job.getNextExecuteTimeMs() < lastBatchSchedulerTimestamp) {
List<Long> executeTimestamp = findTasksBetweenTime(job,
- BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS +
lastBatchSchedulerTimestamp,
+ lastBatchSchedulerTimestamp,
job.getNextExecuteTimeMs());
if (!executeTimestamp.isEmpty()) {
for (Long timestamp : executeTimestamp) {
@@ -300,11 +300,6 @@ public class TimerJobManager implements Closeable,
Writable {
List<Long> jobExecuteTimes = new ArrayList<>();
if (!job.isCycleJob() && (nextExecuteTime < endTimeEndWindow)) {
jobExecuteTimes.add(nextExecuteTime);
- if (job.isStreamingJob()) {
- job.setJobStatus(JobStatus.RUNNING);
- } else {
- job.setJobStatus(JobStatus.WAITING_FINISH);
- }
return jobExecuteTimes;
}
while (endTimeEndWindow >= nextExecuteTime) {
@@ -323,18 +318,18 @@ public class TimerJobManager implements Closeable,
Writable {
private void executeJobIdsWithinLastTenMinutesWindow() {
// if the task executes for more than 10 minutes, it will be delay, so,
// set lastBatchSchedulerTimestamp to current time
- if (lastBatchSchedulerTimestamp +
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS < System.currentTimeMillis()) {
+ if (lastBatchSchedulerTimestamp < System.currentTimeMillis()) {
this.lastBatchSchedulerTimestamp = System.currentTimeMillis();
}
+ this.lastBatchSchedulerTimestamp +=
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
if (jobMap.isEmpty()) {
- this.lastBatchSchedulerTimestamp +=
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
return;
}
jobMap.forEach((k, v) -> {
if (v.isRunning() && (v.getNextExecuteTimeMs()
- + v.getIntervalMs() < lastBatchSchedulerTimestamp +
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS)) {
+ + v.getIntervalMs() < lastBatchSchedulerTimestamp)) {
List<Long> executeTimes = findTasksBetweenTime(
- v, lastBatchSchedulerTimestamp +
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS,
+ v, lastBatchSchedulerTimestamp,
v.getNextExecuteTimeMs());
if (!executeTimes.isEmpty()) {
for (Long executeTime : executeTimes) {
@@ -343,7 +338,6 @@ public class TimerJobManager implements Closeable, Writable
{
}
}
});
- this.lastBatchSchedulerTimestamp +=
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
}
/**
@@ -351,10 +345,7 @@ public class TimerJobManager implements Closeable,
Writable {
* Jobs will be re-registered after the task is completed
*/
private void cycleSystemSchedulerTasks() {
- dorisTimer.newTimeout(timeout -> {
- batchSchedulerTasks();
- cycleSystemSchedulerTasks();
- }, BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS, TimeUnit.MILLISECONDS);
+ dorisTimer.newTimeout(timeout -> batchSchedulerTasks(),
BATCH_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
/**
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java
index 86c0dcdca9..fd871be962 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java
@@ -132,6 +132,18 @@ public class TimerJobManagerTest {
Assertions.assertEquals(2, testExecuteCount.get());
}
+ @Test
+ public void testCycleSchedulerWithImmediatelyStart(@Mocked Env env) throws
DdlException {
+ setContext(env);
+ long startTimestamp = System.currentTimeMillis();
+ job.setImmediatelyStart(true);
+ timerJobManager.registerJob(job);
+ //consider the time of the first execution and give some buffer time
+ Awaitility.await().atMost(16, TimeUnit.SECONDS).until(() ->
System.currentTimeMillis()
+ >= startTimestamp + 15000L);
+ Assertions.assertEquals(3, testExecuteCount.get());
+ }
+
@Test
public void testOneTimeJob(@Mocked Env env) throws DdlException {
setContext(env);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]