This is an automated email from the ASF dual-hosted git repository. kirs pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new f9b38638488 branch-2.1: [Fix](Job)Fix some issues in the Insert job. #44543 (#44597) f9b38638488 is described below commit f9b38638488b5a7eb824d99399b94e79858e1446 Author: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> AuthorDate: Sat Nov 30 11:18:55 2024 +0800 branch-2.1: [Fix](Job)Fix some issues in the Insert job. #44543 (#44597) Cherry-picked from #44543 --------- Co-authored-by: Calvin Kirs <guoqi...@selectdb.com> --- .../src/main/java/org/apache/doris/job/base/AbstractJob.java | 8 +++++--- .../org/apache/doris/job/extensions/insert/InsertTask.java | 3 +++ .../main/java/org/apache/doris/job/manager/JobManager.java | 3 +++ .../src/main/java/org/apache/doris/job/task/AbstractTask.java | 3 +++ regression-test/suites/job_p0/test_base_insert_job.groovy | 11 ++++++++++- 5 files changed, 24 insertions(+), 4 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java index 62ac0c4d59d..906b86494fb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java @@ -155,6 +155,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C } for (T task : runningTasks) { task.cancel(); + canceledTaskCount.incrementAndGet(); } runningTasks = new CopyOnWriteArrayList<>(); logUpdateOperation(); @@ -185,6 +186,7 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C runningTasks.stream().filter(task -> task.getTaskId().equals(taskId)).findFirst() .orElseThrow(() -> new JobException("Not found task id: " + taskId)).cancel(); runningTasks.removeIf(task -> task.getTaskId().equals(taskId)); + canceledTaskCount.incrementAndGet(); if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) { updateJobStatus(JobStatus.FINISHED); } @@ -418,13 +420,13 @@ public abstract class AbstractJob<T extends AbstractTask, C> implements Job<T, C /** * Generates a common error message when the execution queue is full. * - * @param taskId The ID of the task. - * @param queueConfigName The name of the queue configuration. + * @param taskId The ID of the task. + * @param queueConfigName The name of the queue configuration. * @param executeThreadConfigName The name of the execution thread configuration. * @return A formatted error message. */ protected String commonFormatMsgWhenExecuteQueueFull(Long taskId, String queueConfigName, - String executeThreadConfigName) { + String executeThreadConfigName) { return String.format("Dispatch task failed, jobId: %d, jobName: %s, taskId: %d, the queue size is full, " + "you can increase the queue size by setting the property " + "%s in the fe.conf file or increase the value of " diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java index e7d5b8b1d54..fcf0a6b33f9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java @@ -209,6 +209,9 @@ public class InsertTask extends AbstractTask { @Override public void onFail() throws JobException { + if (isCanceled.get()) { + return; + } isFinished.set(true); super.onFail(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java index 39646bab18f..47a3a0c5c19 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java @@ -189,6 +189,9 @@ public class JobManager<T extends AbstractJob<?, C>, C> implements Writable { public void alterJobStatus(Long jobId, JobStatus status) throws JobException { checkJobExist(jobId); jobMap.get(jobId).updateJobStatus(status); + if (status.equals(JobStatus.RUNNING)) { + jobScheduler.scheduleOneJob(jobMap.get(jobId)); + } jobMap.get(jobId).logUpdateOperation(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java index f78446aaf85..8a230c0bd38 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java @@ -167,6 +167,9 @@ public abstract class AbstractTask implements Task { run(); onSuccess(); } catch (Exception e) { + if (TaskStatus.CANCELED.equals(status)) { + return; + } this.errMsg = e.getMessage(); onFail(); log.warn("execute task error, job id is {}, task id is {}", jobId, taskId, e); diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy b/regression-test/suites/job_p0/test_base_insert_job.groovy index 76264d8ae94..97fda38bf2c 100644 --- a/regression-test/suites/job_p0/test_base_insert_job.groovy +++ b/regression-test/suites/job_p0/test_base_insert_job.groovy @@ -221,9 +221,11 @@ suite("test_base_insert_job") { RESUME JOB where jobname = '${jobName}' """ println(tasks.size()) + // test resume job success Awaitility.await("resume-job-test").atMost(60, SECONDS).until({ def afterResumeTasks = sql """ select status from tasks("type"="insert") where JobName= '${jobName}' """ println "resume tasks :" + afterResumeTasks + //resume tasks size should be greater than before pause afterResumeTasks.size() > tasks.size() }) @@ -249,7 +251,6 @@ suite("test_base_insert_job") { CREATE JOB ${jobName} ON SCHEDULE at '2023-11-13 14:18:07' comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); """ } catch (Exception e) { - println e.getMessage() assert e.getMessage().contains("startTimeMs must be greater than current time") } // assert end time less than start time @@ -283,6 +284,14 @@ suite("test_base_insert_job") { } catch (Exception e) { assert e.getMessage().contains("Invalid interval time unit: years") } + // assert interval time unit is -1 + try { + sql """ + CREATE JOB test_error_starts ON SCHEDULE every -1 second comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values ('2023-03-18','1','12213'); + """ + } catch (Exception e) { + //ignore + } // test keyword as job name sql """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org