This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 599c81460ce31937fa82ff1584539ab4cd8ae1d9 Author: Calvin Kirs <[email protected]> AuthorDate: Fri Aug 30 15:03:24 2024 +0800 [Feat](Job)After a job is paused, it can be manually triggered to execute. (#39565) ## Proposed changes - After a job is paused, it can be manually triggered to execute. - Update the return fields of the Insert task query to include a new start time field. --- .../org/apache/doris/job/base/AbstractJob.java | 26 +++++++++++++++++----- .../doris/job/extensions/insert/InsertTask.java | 8 ++++++- .../apache/doris/job/scheduler/JobScheduler.java | 5 ++--- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 3f595d6daf5..94a0b0146cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -212,8 +212,9 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C } public List<T> commonCreateTasks(TaskType taskType, C taskContext) { - if (!getJobStatus().equals(JobStatus.RUNNING)) { - log.warn("job is not running, job id is {}", jobId); + if (!canCreateTask(taskType)) { + log.info("job is not ready for scheduling, job id is {},job status is {}, taskType is {}", jobId, + jobStatus, taskType); return new ArrayList<>(); } if (!isReadyForScheduling(taskContext)) { @@ -235,6 +236,19 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C } } + private boolean canCreateTask(TaskType taskType) { + JobStatus currentJobStatus = getJobStatus(); + + switch (taskType) { + case SCHEDULED: + return currentJobStatus.equals(JobStatus.RUNNING); + case MANUAL: + return currentJobStatus.equals(JobStatus.RUNNING) || currentJobStatus.equals(JobStatus.PAUSED); + default: + throw new IllegalArgumentException("Unsupported TaskType: " + taskType); + } + } + public void initTasks(Collection<? extends T> tasks, TaskType taskType) { tasks.forEach(task -> { task.setTaskType(taskType); @@ -307,7 +321,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C @Override public void onTaskFail(T task) throws JobException { failedTaskCount.incrementAndGet(); - updateJobStatusIfEnd(false); + updateJobStatusIfEnd(false, task.getTaskType()); runningTasks.remove(task); logUpdateOperation(); } @@ -315,16 +329,16 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C @Override public void onTaskSuccess(T task) throws JobException { succeedTaskCount.incrementAndGet(); - updateJobStatusIfEnd(true); + updateJobStatusIfEnd(true, task.getTaskType()); runningTasks.remove(task); logUpdateOperation(); } - private void updateJobStatusIfEnd(boolean taskSuccess) throws JobException { + private void updateJobStatusIfEnd(boolean taskSuccess, TaskType taskType) throws JobException { JobExecuteType executeType = getJobConfig().getExecuteType(); - if (executeType.equals(JobExecuteType.MANUAL)) { + if (executeType.equals(JobExecuteType.MANUAL) || taskType.equals(TaskType.MANUAL)) { return; } switch (executeType) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index 0fe2a8364aa..ee5abed8392 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -58,6 +58,7 @@ public class InsertTask extends AbstractTask { new Column("Status", ScalarType.createStringType()), new Column("ErrorMsg", ScalarType.createStringType()), new Column("CreateTime", ScalarType.createStringType()), + new Column("StartTime", ScalarType.createStringType()), new Column("FinishTime", ScalarType.createStringType()), new Column("TrackingUrl", ScalarType.createStringType()), new Column("LoadStatistic", ScalarType.createStringType()), @@ -247,6 +248,8 @@ public class InsertTask extends AbstractTask { trow.addToColumnValue(new TCell().setStringVal(errorMsg)); // create time trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs()))); + trow.addToColumnValue(new TCell().setStringVal(null == getStartTimeMs() ? "" + : TimeUtils.longToTimeString(getStartTimeMs()))); // load end time trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getFinishTimeMs()))); // tracking url @@ -274,7 +277,10 @@ public class InsertTask extends AbstractTask { trow.addToColumnValue(new TCell().setStringVal(getStatus().name())); trow.addToColumnValue(new TCell().setStringVal("")); trow.addToColumnValue(new TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs()))); - trow.addToColumnValue(new TCell().setStringVal("")); + trow.addToColumnValue(new TCell().setStringVal(null == getStartTimeMs() ? "" + : TimeUtils.longToTimeString(getStartTimeMs()))); + trow.addToColumnValue(new TCell().setStringVal(null == getFinishTimeMs() ? "" + : TimeUtils.longToTimeString(getFinishTimeMs()))); trow.addToColumnValue(new TCell().setStringVal("")); trow.addToColumnValue(new TCell().setStringVal("")); trow.addToColumnValue(new TCell().setStringVal(userIdentity.getQualifiedUser())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java index 7f0133bf957..33d12c30a4b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java @@ -192,10 +192,9 @@ public class JobScheduler<T extends AbstractJob<?, C>, C> implements Closeable { clearEndJob(job); continue; } - if (!job.getJobStatus().equals(JobStatus.RUNNING) && !job.getJobConfig().checkIsTimerJob()) { - continue; + if (job.getJobStatus().equals(JobStatus.RUNNING) && job.getJobConfig().checkIsTimerJob()) { + cycleTimerJobScheduler(job, lastTimeWindowMs); } - cycleTimerJobScheduler(job, lastTimeWindowMs); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
