This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 599c81460ce31937fa82ff1584539ab4cd8ae1d9
Author: Calvin Kirs <[email protected]>
AuthorDate: Fri Aug 30 15:03:24 2024 +0800

    [Feat](Job)After a job is paused, it can be manually triggered to execute. 
(#39565)
    
    ## Proposed changes
    - After a job is paused, it can be manually triggered to execute.
    - Update the return fields of the Insert task query to include a new
    start time field.
---
 .../org/apache/doris/job/base/AbstractJob.java     | 26 +++++++++++++++++-----
 .../doris/job/extensions/insert/InsertTask.java    |  8 ++++++-
 .../apache/doris/job/scheduler/JobScheduler.java   |  5 ++---
 3 files changed, 29 insertions(+), 10 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 3f595d6daf5..94a0b0146cd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -212,8 +212,9 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
     }
 
     public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
-        if (!getJobStatus().equals(JobStatus.RUNNING)) {
-            log.warn("job is not running, job id is {}", jobId);
+        if (!canCreateTask(taskType)) {
+            log.info("job is not ready for scheduling, job id is {},job status 
is {}, taskType is {}", jobId,
+                    jobStatus, taskType);
             return new ArrayList<>();
         }
         if (!isReadyForScheduling(taskContext)) {
@@ -235,6 +236,19 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
         }
     }
 
+    private boolean canCreateTask(TaskType taskType) {
+        JobStatus currentJobStatus = getJobStatus();
+
+        switch (taskType) {
+            case SCHEDULED:
+                return currentJobStatus.equals(JobStatus.RUNNING);
+            case MANUAL:
+                return currentJobStatus.equals(JobStatus.RUNNING) || 
currentJobStatus.equals(JobStatus.PAUSED);
+            default:
+                throw new IllegalArgumentException("Unsupported TaskType: " + 
taskType);
+        }
+    }
+
     public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
         tasks.forEach(task -> {
             task.setTaskType(taskType);
@@ -307,7 +321,7 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
     @Override
     public void onTaskFail(T task) throws JobException {
         failedTaskCount.incrementAndGet();
-        updateJobStatusIfEnd(false);
+        updateJobStatusIfEnd(false, task.getTaskType());
         runningTasks.remove(task);
         logUpdateOperation();
     }
@@ -315,16 +329,16 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
     @Override
     public void onTaskSuccess(T task) throws JobException {
         succeedTaskCount.incrementAndGet();
-        updateJobStatusIfEnd(true);
+        updateJobStatusIfEnd(true, task.getTaskType());
         runningTasks.remove(task);
         logUpdateOperation();
 
     }
 
 
-    private void updateJobStatusIfEnd(boolean taskSuccess) throws JobException 
{
+    private void updateJobStatusIfEnd(boolean taskSuccess, TaskType taskType) 
throws JobException {
         JobExecuteType executeType = getJobConfig().getExecuteType();
-        if (executeType.equals(JobExecuteType.MANUAL)) {
+        if (executeType.equals(JobExecuteType.MANUAL) || 
taskType.equals(TaskType.MANUAL)) {
             return;
         }
         switch (executeType) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index 0fe2a8364aa..ee5abed8392 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -58,6 +58,7 @@ public class InsertTask extends AbstractTask {
             new Column("Status", ScalarType.createStringType()),
             new Column("ErrorMsg", ScalarType.createStringType()),
             new Column("CreateTime", ScalarType.createStringType()),
+            new Column("StartTime", ScalarType.createStringType()),
             new Column("FinishTime", ScalarType.createStringType()),
             new Column("TrackingUrl", ScalarType.createStringType()),
             new Column("LoadStatistic", ScalarType.createStringType()),
@@ -247,6 +248,8 @@ public class InsertTask extends AbstractTask {
         trow.addToColumnValue(new TCell().setStringVal(errorMsg));
         // create time
         trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
+        trow.addToColumnValue(new TCell().setStringVal(null == 
getStartTimeMs() ? ""
+                : TimeUtils.longToTimeString(getStartTimeMs())));
         // load end time
         trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(getFinishTimeMs())));
         // tracking url
@@ -274,7 +277,10 @@ public class InsertTask extends AbstractTask {
         trow.addToColumnValue(new TCell().setStringVal(getStatus().name()));
         trow.addToColumnValue(new TCell().setStringVal(""));
         trow.addToColumnValue(new 
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
-        trow.addToColumnValue(new TCell().setStringVal(""));
+        trow.addToColumnValue(new TCell().setStringVal(null == 
getStartTimeMs() ? ""
+                : TimeUtils.longToTimeString(getStartTimeMs())));
+        trow.addToColumnValue(new TCell().setStringVal(null == 
getFinishTimeMs() ? ""
+                : TimeUtils.longToTimeString(getFinishTimeMs())));
         trow.addToColumnValue(new TCell().setStringVal(""));
         trow.addToColumnValue(new TCell().setStringVal(""));
         trow.addToColumnValue(new 
TCell().setStringVal(userIdentity.getQualifiedUser()));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index 7f0133bf957..33d12c30a4b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -192,10 +192,9 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> 
implements Closeable {
                 clearEndJob(job);
                 continue;
             }
-            if (!job.getJobStatus().equals(JobStatus.RUNNING) && 
!job.getJobConfig().checkIsTimerJob()) {
-                continue;
+            if (job.getJobStatus().equals(JobStatus.RUNNING) && 
job.getJobConfig().checkIsTimerJob()) {
+                cycleTimerJobScheduler(job, lastTimeWindowMs);
             }
-            cycleTimerJobScheduler(job, lastTimeWindowMs);
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to