This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 55de454a0c9 [Feat](Job)After a job is paused, it can be manually
triggered to execute. (#39565)
55de454a0c9 is described below
commit 55de454a0c9bac16bd01ec7ea2b0a09a0d9b3ec2
Author: Calvin Kirs <[email protected]>
AuthorDate: Fri Aug 30 15:03:24 2024 +0800
[Feat](Job)After a job is paused, it can be manually triggered to execute.
(#39565)
## Proposed changes
- After a job is paused, it can be manually triggered to execute.
- Update the return fields of the Insert task query to include a new
start time field.
---
.../org/apache/doris/job/base/AbstractJob.java | 26 +++++++++++++++++-----
.../doris/job/extensions/insert/InsertTask.java | 8 ++++++-
.../apache/doris/job/scheduler/JobScheduler.java | 5 ++---
3 files changed, 29 insertions(+), 10 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 3f595d6daf5..94a0b0146cd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -212,8 +212,9 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
}
public List<T> commonCreateTasks(TaskType taskType, C taskContext) {
- if (!getJobStatus().equals(JobStatus.RUNNING)) {
- log.warn("job is not running, job id is {}", jobId);
+ if (!canCreateTask(taskType)) {
+ log.info("job is not ready for scheduling, job id is {},job status
is {}, taskType is {}", jobId,
+ jobStatus, taskType);
return new ArrayList<>();
}
if (!isReadyForScheduling(taskContext)) {
@@ -235,6 +236,19 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
}
}
+ private boolean canCreateTask(TaskType taskType) {
+ JobStatus currentJobStatus = getJobStatus();
+
+ switch (taskType) {
+ case SCHEDULED:
+ return currentJobStatus.equals(JobStatus.RUNNING);
+ case MANUAL:
+ return currentJobStatus.equals(JobStatus.RUNNING) ||
currentJobStatus.equals(JobStatus.PAUSED);
+ default:
+ throw new IllegalArgumentException("Unsupported TaskType: " +
taskType);
+ }
+ }
+
public void initTasks(Collection<? extends T> tasks, TaskType taskType) {
tasks.forEach(task -> {
task.setTaskType(taskType);
@@ -307,7 +321,7 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
@Override
public void onTaskFail(T task) throws JobException {
failedTaskCount.incrementAndGet();
- updateJobStatusIfEnd(false);
+ updateJobStatusIfEnd(false, task.getTaskType());
runningTasks.remove(task);
logUpdateOperation();
}
@@ -315,16 +329,16 @@ public abstract class AbstractJob<T extends AbstractTask,
C> implements Job<T, C
@Override
public void onTaskSuccess(T task) throws JobException {
succeedTaskCount.incrementAndGet();
- updateJobStatusIfEnd(true);
+ updateJobStatusIfEnd(true, task.getTaskType());
runningTasks.remove(task);
logUpdateOperation();
}
- private void updateJobStatusIfEnd(boolean taskSuccess) throws JobException
{
+ private void updateJobStatusIfEnd(boolean taskSuccess, TaskType taskType)
throws JobException {
JobExecuteType executeType = getJobConfig().getExecuteType();
- if (executeType.equals(JobExecuteType.MANUAL)) {
+ if (executeType.equals(JobExecuteType.MANUAL) ||
taskType.equals(TaskType.MANUAL)) {
return;
}
switch (executeType) {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index a8317ada51e..266f3f8476f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -58,6 +58,7 @@ public class InsertTask extends AbstractTask {
new Column("Status", ScalarType.createStringType()),
new Column("ErrorMsg", ScalarType.createStringType()),
new Column("CreateTime", ScalarType.createStringType()),
+ new Column("StartTime", ScalarType.createStringType()),
new Column("FinishTime", ScalarType.createStringType()),
new Column("TrackingUrl", ScalarType.createStringType()),
new Column("LoadStatistic", ScalarType.createStringType()),
@@ -246,6 +247,8 @@ public class InsertTask extends AbstractTask {
trow.addToColumnValue(new TCell().setStringVal(errorMsg));
// create time
trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
+ trow.addToColumnValue(new TCell().setStringVal(null ==
getStartTimeMs() ? ""
+ : TimeUtils.longToTimeString(getStartTimeMs())));
// load end time
trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(getFinishTimeMs())));
// tracking url
@@ -273,7 +276,10 @@ public class InsertTask extends AbstractTask {
trow.addToColumnValue(new TCell().setStringVal(getStatus().name()));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new
TCell().setStringVal(TimeUtils.longToTimeString(getCreateTimeMs())));
- trow.addToColumnValue(new TCell().setStringVal(""));
+ trow.addToColumnValue(new TCell().setStringVal(null ==
getStartTimeMs() ? ""
+ : TimeUtils.longToTimeString(getStartTimeMs())));
+ trow.addToColumnValue(new TCell().setStringVal(null ==
getFinishTimeMs() ? ""
+ : TimeUtils.longToTimeString(getFinishTimeMs())));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new
TCell().setStringVal(userIdentity.getQualifiedUser()));
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
index 7f0133bf957..33d12c30a4b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/scheduler/JobScheduler.java
@@ -192,10 +192,9 @@ public class JobScheduler<T extends AbstractJob<?, C>, C>
implements Closeable {
clearEndJob(job);
continue;
}
- if (!job.getJobStatus().equals(JobStatus.RUNNING) &&
!job.getJobConfig().checkIsTimerJob()) {
- continue;
+ if (job.getJobStatus().equals(JobStatus.RUNNING) &&
job.getJobConfig().checkIsTimerJob()) {
+ cycleTimerJobScheduler(job, lastTimeWindowMs);
}
- cycleTimerJobScheduler(job, lastTimeWindowMs);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]