This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e88c218390 [Improve](Job)Job internal interface provides immediate 
scheduling (#23735)
e88c218390 is described below

commit e88c218390cede307b593044be2f0dfebc9bd0d3
Author: Calvin Kirs <[email protected]>
AuthorDate: Fri Sep 1 12:50:08 2023 +0800

    [Improve](Job)Job internal interface provides immediate scheduling (#23735)
    
    Delete meaningless job status
    System scheduling is executed in the time wheel
    Optimize window calculation code
---
 .../sql-reference/Show-Statements/SHOW-JOB.md      |  1 -
 .../sql-reference/Show-Statements/SHOW-JOB.md      |  1 -
 .../doris/scheduler/constants/JobStatus.java       |  2 --
 .../doris/scheduler/constants/SystemJob.java       | 42 ----------------------
 .../doris/scheduler/disruptor/TaskHandler.java     | 35 +++---------------
 .../java/org/apache/doris/scheduler/job/Job.java   | 29 +++++++++++----
 .../doris/scheduler/manager/TimerJobManager.java   | 33 +++++++----------
 .../scheduler/disruptor/TimerJobManagerTest.java   | 12 +++++++
 8 files changed, 51 insertions(+), 104 deletions(-)

diff --git a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md 
b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md
index 60c1dfd1cd..ec11fd3168 100644
--- a/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md
+++ b/docs/en/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md
@@ -71,7 +71,6 @@ Result description:
          * PAUSED: Paused
          * STOPPED: end (manually triggered by the user)
          * FINISHED: Finished
-         * WAITING_FINISH: pending completion
 
 ### Example
 
diff --git 
a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md 
b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md
index 009e2cb4e4..1b5da59908 100644
--- a/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md
+++ b/docs/zh-CN/docs/sql-manual/sql-reference/Show-Statements/SHOW-JOB.md
@@ -71,7 +71,6 @@ SHOW JOBS 用于展示当前 DB 下所有作业的运行状态,SHOW JOB FOR jo
         * PAUSED:暂停
         * STOPPED:结束(用户手动触发)
         * FINISHED: 完成
-        * WAITING_FINISH: 待结束
 
 ### Example
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java
index ae204ef948..f01686a521 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/JobStatus.java
@@ -37,8 +37,6 @@ public enum JobStatus {
      */
     STOPPED,
 
-    WAITING_FINISH,
-
     /**
      * When the task is finished, the finished state will be triggered.
      */
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/SystemJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/SystemJob.java
deleted file mode 100644
index 3428c1724f..0000000000
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/constants/SystemJob.java
+++ /dev/null
@@ -1,42 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.scheduler.constants;
-
-import lombok.Getter;
-
-/**
- * System scheduler event job
- * They will start when scheduler starts,don't use this job in other place,it 
just for system inner scheduler
- */
-public enum SystemJob {
-
-    /**
-     * System cycle scheduler event job, it will start cycle scheduler
-     */
-    SYSTEM_SCHEDULER_JOB("system_scheduler_event_job", 1L);
-
-    @Getter
-    private final String description;
-    @Getter
-    private final Long id;
-
-    SystemJob(String description, Long id) {
-        this.description = description;
-        this.id = id;
-    }
-}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
index 297537f712..0b309ec3ce 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/disruptor/TaskHandler.java
@@ -18,8 +18,6 @@
 package org.apache.doris.scheduler.disruptor;
 
 import org.apache.doris.catalog.Env;
-import org.apache.doris.scheduler.constants.JobStatus;
-import org.apache.doris.scheduler.constants.SystemJob;
 import org.apache.doris.scheduler.exception.JobException;
 import org.apache.doris.scheduler.executor.TransientTaskExecutor;
 import org.apache.doris.scheduler.job.Job;
@@ -71,10 +69,6 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
      */
     @Override
     public void onEvent(TaskEvent event) {
-        if (checkIsSystemEvent(event)) {
-            onSystemEvent();
-            return;
-        }
         switch (event.getTaskType()) {
             case TimerJobTask:
                 onTimerJobTaskHandle(event);
@@ -97,14 +91,14 @@ public class TaskHandler implements WorkHandler<TaskEvent> {
         long jobId = taskEvent.getId();
         Job job = timerJobManager.getJob(jobId);
         if (job == null) {
-            log.info("Event job is null, eventJobId: {}", jobId);
+            log.info("job is null, jobId: {}", jobId);
             return;
         }
-        if (!job.isRunning() && 
!job.getJobStatus().equals(JobStatus.WAITING_FINISH)) {
-            log.info("Event job is not running, eventJobId: {}", jobId);
+        if (!job.isRunning()) {
+            log.info("job is not running, eventJobId: {}", jobId);
             return;
         }
-        log.debug("Event job is running, eventJobId: {}", jobId);
+        log.debug("job is running, eventJobId: {}", jobId);
         JobTask jobTask = new JobTask(jobId);
         try {
             jobTask.setStartTimeMs(System.currentTimeMillis());
@@ -147,27 +141,6 @@ public class TaskHandler implements WorkHandler<TaskEvent> 
{
         }
     }
 
-    /**
-     * Handles a system event by scheduling batch scheduler tasks.
-     */
-    private void onSystemEvent() {
-        try {
-            timerJobManager.batchSchedulerTasks();
-        } catch (Exception e) {
-            log.error("System batch scheduler execute failed", e);
-        }
-    }
-
-    /**
-     * Checks whether the specified event task is a system event.
-     *
-     * @param event The event task to be checked.
-     * @return true if the event task is a system event, false otherwise.
-     */
-    private boolean checkIsSystemEvent(TaskEvent event) {
-        return Objects.equals(event.getId(), 
SystemJob.SYSTEM_SCHEDULER_JOB.getId());
-    }
-
     private void updateJobStatusIfPastEndTime(Job job) {
         if (job.isExpired()) {
             job.finish();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java
index 3d29b0b842..e65cf07754 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/scheduler/job/Job.java
@@ -18,6 +18,7 @@
 package org.apache.doris.scheduler.job;
 
 import org.apache.doris.catalog.Env;
+import org.apache.doris.common.DdlException;
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.TimeUtils;
@@ -129,6 +130,14 @@ public class Job implements Writable {
     @SerializedName("errMsg")
     private String errMsg;
 
+    /**
+     * if we want to start the job immediately, we can set this flag to true.
+     * The default value is false.
+     * when we set this flag to true, the start time will be set to current 
time.
+     * we don't need to serialize this field.
+     */
+    private boolean immediatelyStart = false;
+
     public boolean isRunning() {
         return jobStatus == JobStatus.RUNNING;
     }
@@ -187,21 +196,29 @@ public class Job implements Writable {
         this.jobStatus = JobStatus.STOPPED;
     }
 
-    public boolean checkJobParam() {
+    public void checkJobParam() throws DdlException {
         if (startTimeMs != 0L && startTimeMs < System.currentTimeMillis()) {
-            return false;
+            throw new DdlException("startTimeMs must be greater than current 
time");
+        }
+        if (immediatelyStart && startTimeMs != 0L) {
+            throw new DdlException("immediately start and startTimeMs can't be 
set at the same time");
+        }
+        if (immediatelyStart) {
+            startTimeMs = System.currentTimeMillis();
         }
         if (endTimeMs != 0L && endTimeMs < System.currentTimeMillis()) {
-            return false;
+            throw new DdlException("endTimeMs must be greater than current 
time");
         }
 
         if (isCycleJob && (intervalMs == null || intervalMs <= 0L)) {
-            return false;
+            throw new DdlException("cycle job must set intervalMs");
         }
         if (null == jobCategory) {
-            return false;
+            throw new DdlException("jobCategory must be set");
+        }
+        if (null == executor) {
+            throw new DdlException("Job executor must be set");
         }
-        return null != executor;
     }
 
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
index 2b3b922b37..9000dba470 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/scheduler/manager/TimerJobManager.java
@@ -53,10 +53,12 @@ public class TimerJobManager implements Closeable, Writable 
{
 
     private long lastBatchSchedulerTimestamp;
 
+    private static final long BATCH_SCHEDULER_INTERVAL_SECONDS = 600;
+
     /**
-     * batch scheduler interval time
+     * batch scheduler interval ms time
      */
-    private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = 10 * 60 
* 1000L;
+    private static final long BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS = 
BATCH_SCHEDULER_INTERVAL_SECONDS * 1000L;
 
     private boolean isClosed = false;
 
@@ -88,9 +90,7 @@ public class TimerJobManager implements Closeable, Writable {
     }
 
     public Long registerJob(Job job) throws DdlException {
-        if (!job.checkJobParam()) {
-            throw new DdlException("Job param is invalid, please check time 
param");
-        }
+        job.checkJobParam();
         checkIsJobNameUsed(job.getDbName(), job.getJobName(), 
job.getJobCategory());
         jobMap.putIfAbsent(job.getJobId(), job);
         initAndSchedulerJob(job);
@@ -144,9 +144,9 @@ public class TimerJobManager implements Closeable, Writable 
{
         Long nextExecuteTimeMs = findFistExecuteTime(currentTimeMs, 
job.getStartTimeMs(),
                 job.getIntervalMs(), job.isCycleJob());
         job.setNextExecuteTimeMs(nextExecuteTimeMs);
-        if (job.getNextExecuteTimeMs() < 
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + lastBatchSchedulerTimestamp) {
+        if (job.getNextExecuteTimeMs() < lastBatchSchedulerTimestamp) {
             List<Long> executeTimestamp = findTasksBetweenTime(job,
-                    BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS + 
lastBatchSchedulerTimestamp,
+                    lastBatchSchedulerTimestamp,
                     job.getNextExecuteTimeMs());
             if (!executeTimestamp.isEmpty()) {
                 for (Long timestamp : executeTimestamp) {
@@ -300,11 +300,6 @@ public class TimerJobManager implements Closeable, 
Writable {
         List<Long> jobExecuteTimes = new ArrayList<>();
         if (!job.isCycleJob() && (nextExecuteTime < endTimeEndWindow)) {
             jobExecuteTimes.add(nextExecuteTime);
-            if (job.isStreamingJob()) {
-                job.setJobStatus(JobStatus.RUNNING);
-            } else {
-                job.setJobStatus(JobStatus.WAITING_FINISH);
-            }
             return jobExecuteTimes;
         }
         while (endTimeEndWindow >= nextExecuteTime) {
@@ -323,18 +318,18 @@ public class TimerJobManager implements Closeable, 
Writable {
     private void executeJobIdsWithinLastTenMinutesWindow() {
         // if the task executes for more than 10 minutes, it will be delay, so,
         // set lastBatchSchedulerTimestamp to current time
-        if (lastBatchSchedulerTimestamp + 
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS < System.currentTimeMillis()) {
+        if (lastBatchSchedulerTimestamp < System.currentTimeMillis()) {
             this.lastBatchSchedulerTimestamp = System.currentTimeMillis();
         }
+        this.lastBatchSchedulerTimestamp += 
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
         if (jobMap.isEmpty()) {
-            this.lastBatchSchedulerTimestamp += 
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
             return;
         }
         jobMap.forEach((k, v) -> {
             if (v.isRunning() && (v.getNextExecuteTimeMs()
-                    + v.getIntervalMs() < lastBatchSchedulerTimestamp + 
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS)) {
+                    + v.getIntervalMs() < lastBatchSchedulerTimestamp)) {
                 List<Long> executeTimes = findTasksBetweenTime(
-                        v, lastBatchSchedulerTimestamp + 
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS,
+                        v, lastBatchSchedulerTimestamp,
                         v.getNextExecuteTimeMs());
                 if (!executeTimes.isEmpty()) {
                     for (Long executeTime : executeTimes) {
@@ -343,7 +338,6 @@ public class TimerJobManager implements Closeable, Writable 
{
                 }
             }
         });
-        this.lastBatchSchedulerTimestamp += 
BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS;
     }
 
     /**
@@ -351,10 +345,7 @@ public class TimerJobManager implements Closeable, 
Writable {
      * Jobs will be re-registered after the task is completed
      */
     private void cycleSystemSchedulerTasks() {
-        dorisTimer.newTimeout(timeout -> {
-            batchSchedulerTasks();
-            cycleSystemSchedulerTasks();
-        }, BATCH_SCHEDULER_INTERVAL_MILLI_SECONDS, TimeUnit.MILLISECONDS);
+        dorisTimer.newTimeout(timeout -> batchSchedulerTasks(), 
BATCH_SCHEDULER_INTERVAL_SECONDS, TimeUnit.SECONDS);
     }
 
     /**
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java
index 86c0dcdca9..fd871be962 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/scheduler/disruptor/TimerJobManagerTest.java
@@ -132,6 +132,18 @@ public class TimerJobManagerTest {
         Assertions.assertEquals(2, testExecuteCount.get());
     }
 
+    @Test
+    public void testCycleSchedulerWithImmediatelyStart(@Mocked Env env) throws 
DdlException {
+        setContext(env);
+        long startTimestamp = System.currentTimeMillis();
+        job.setImmediatelyStart(true);
+        timerJobManager.registerJob(job);
+        //consider the time of the first execution and give some buffer time
+        Awaitility.await().atMost(16, TimeUnit.SECONDS).until(() -> 
System.currentTimeMillis()
+                >= startTimestamp + 15000L);
+        Assertions.assertEquals(3, testExecuteCount.get());
+    }
+
     @Test
     public void testOneTimeJob(@Mocked Env env) throws DdlException {
         setContext(env);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to