This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new f9b38638488 branch-2.1: [Fix](Job)Fix some issues in the Insert job. 
#44543 (#44597)
f9b38638488 is described below

commit f9b38638488b5a7eb824d99399b94e79858e1446
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Sat Nov 30 11:18:55 2024 +0800

    branch-2.1: [Fix](Job)Fix some issues in the Insert job. #44543 (#44597)
    
    Cherry-picked from #44543
    
    ---------
    
    Co-authored-by: Calvin Kirs <guoqi...@selectdb.com>
---
 .../src/main/java/org/apache/doris/job/base/AbstractJob.java  |  8 +++++---
 .../org/apache/doris/job/extensions/insert/InsertTask.java    |  3 +++
 .../main/java/org/apache/doris/job/manager/JobManager.java    |  3 +++
 .../src/main/java/org/apache/doris/job/task/AbstractTask.java |  3 +++
 regression-test/suites/job_p0/test_base_insert_job.groovy     | 11 ++++++++++-
 5 files changed, 24 insertions(+), 4 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
index 62ac0c4d59d..906b86494fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/base/AbstractJob.java
@@ -155,6 +155,7 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
         }
         for (T task : runningTasks) {
             task.cancel();
+            canceledTaskCount.incrementAndGet();
         }
         runningTasks = new CopyOnWriteArrayList<>();
         logUpdateOperation();
@@ -185,6 +186,7 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
         runningTasks.stream().filter(task -> 
task.getTaskId().equals(taskId)).findFirst()
                 .orElseThrow(() -> new JobException("Not found task id: " + 
taskId)).cancel();
         runningTasks.removeIf(task -> task.getTaskId().equals(taskId));
+        canceledTaskCount.incrementAndGet();
         if (jobConfig.getExecuteType().equals(JobExecuteType.ONE_TIME)) {
             updateJobStatus(JobStatus.FINISHED);
         }
@@ -418,13 +420,13 @@ public abstract class AbstractJob<T extends AbstractTask, 
C> implements Job<T, C
     /**
      * Generates a common error message when the execution queue is full.
      *
-     * @param taskId                The ID of the task.
-     * @param queueConfigName       The name of the queue configuration.
+     * @param taskId                  The ID of the task.
+     * @param queueConfigName         The name of the queue configuration.
      * @param executeThreadConfigName The name of the execution thread 
configuration.
      * @return A formatted error message.
      */
     protected String commonFormatMsgWhenExecuteQueueFull(Long taskId, String 
queueConfigName,
-                                                            String 
executeThreadConfigName) {
+                                                         String 
executeThreadConfigName) {
         return String.format("Dispatch task failed, jobId: %d, jobName: %s, 
taskId: %d, the queue size is full, "
                         + "you can increase the queue size by setting the 
property "
                         + "%s in the fe.conf file or increase the value of "
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
index e7d5b8b1d54..fcf0a6b33f9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/InsertTask.java
@@ -209,6 +209,9 @@ public class InsertTask extends AbstractTask {
 
     @Override
     public void onFail() throws JobException {
+        if (isCanceled.get()) {
+            return;
+        }
         isFinished.set(true);
         super.onFail();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
index 39646bab18f..47a3a0c5c19 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/manager/JobManager.java
@@ -189,6 +189,9 @@ public class JobManager<T extends AbstractJob<?, C>, C> 
implements Writable {
     public void alterJobStatus(Long jobId, JobStatus status) throws 
JobException {
         checkJobExist(jobId);
         jobMap.get(jobId).updateJobStatus(status);
+        if (status.equals(JobStatus.RUNNING)) {
+            jobScheduler.scheduleOneJob(jobMap.get(jobId));
+        }
         jobMap.get(jobId).logUpdateOperation();
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
index f78446aaf85..8a230c0bd38 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/job/task/AbstractTask.java
@@ -167,6 +167,9 @@ public abstract class AbstractTask implements Task {
             run();
             onSuccess();
         } catch (Exception e) {
+            if (TaskStatus.CANCELED.equals(status)) {
+                return;
+            }
             this.errMsg = e.getMessage();
             onFail();
             log.warn("execute task error, job id is {}, task id is {}", jobId, 
taskId, e);
diff --git a/regression-test/suites/job_p0/test_base_insert_job.groovy 
b/regression-test/suites/job_p0/test_base_insert_job.groovy
index 76264d8ae94..97fda38bf2c 100644
--- a/regression-test/suites/job_p0/test_base_insert_job.groovy
+++ b/regression-test/suites/job_p0/test_base_insert_job.groovy
@@ -221,9 +221,11 @@ suite("test_base_insert_job") {
         RESUME JOB where jobname =  '${jobName}'
     """
     println(tasks.size())
+    // test resume job success
     Awaitility.await("resume-job-test").atMost(60, SECONDS).until({
         def afterResumeTasks = sql """ select status from 
tasks("type"="insert") where JobName= '${jobName}'   """
         println "resume tasks :" + afterResumeTasks
+        //resume tasks size should be greater than before pause
         afterResumeTasks.size() > tasks.size()
     })
 
@@ -249,7 +251,6 @@ suite("test_base_insert_job") {
             CREATE JOB ${jobName}  ON SCHEDULE at '2023-11-13 14:18:07'   
comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values 
('2023-03-18','1','12213');
         """
     } catch (Exception e) {
-        println e.getMessage()
         assert e.getMessage().contains("startTimeMs must be greater than 
current time")
     }
     // assert end time less than start time
@@ -283,6 +284,14 @@ suite("test_base_insert_job") {
     } catch (Exception e) {
         assert e.getMessage().contains("Invalid interval time unit: years")
     }
+    // assert interval time unit is -1
+    try {
+        sql """
+            CREATE JOB test_error_starts  ON SCHEDULE every -1 second    
comment 'test' DO insert into ${tableName} (timestamp, type, user_id) values 
('2023-03-18','1','12213');
+        """
+    } catch (Exception e) {
+        //ignore
+    }
 
     // test keyword as job name
     sql """


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to