This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new ccb5cb531a8 Refactor PipelineJobRunnerManager (#32731)
ccb5cb531a8 is described below

commit ccb5cb531a85c78fb78dd7a89495ad5ba95f27c7
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Aug 30 19:32:51 2024 +0800

    Refactor PipelineJobRunnerManager (#32731)
---
 .../data/pipeline/core/job/AbstractSeparablePipelineJob.java     | 6 +++---
 .../data/pipeline/core/job/engine/PipelineJobRunnerManager.java  | 9 ++++-----
 2 files changed, 7 insertions(+), 8 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 3cfdc634b5c..196034b3549 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -75,9 +75,9 @@ public abstract class AbstractSeparablePipelineJob<T extends 
PipelineJobConfigur
     public final void execute(final ShardingContext shardingContext) {
         String jobId = shardingContext.getJobName();
         int shardingItem = shardingContext.getShardingItem();
-        log.info("Execute job {}-{}", jobId, shardingItem);
+        log.info("Execute job {}-{}.", jobId, shardingItem);
         if (jobRunnerManager.isStopping()) {
-            log.info("Stopping true, ignore");
+            log.info("Job is stopping, ignore.");
             return;
         }
         PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
@@ -115,7 +115,7 @@ public abstract class AbstractSeparablePipelineJob<T 
extends PipelineJobConfigur
         String jobId = jobItemContext.getJobId();
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
 shardingItem);
         prepare(jobItemContext);
-        log.info("Start tasks runner, jobId={}, shardingItem={}", jobId, 
shardingItem);
+        log.info("Start tasks runner, jobId={}, shardingItem={}.", jobId, 
shardingItem);
         tasksRunner.start();
         return true;
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
index a1083185d45..c5bb8bc0be5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
@@ -33,7 +33,6 @@ import 
org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
 import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
@@ -50,6 +49,8 @@ public final class PipelineJobRunnerManager {
     
     private static final long JOB_WAITING_TIMEOUT_MILLS = 2000L;
     
+    private final PipelineJobRunnerCleaner cleaner;
+    
     private final AtomicBoolean stopping = new AtomicBoolean(false);
     
     private final AtomicReference<JobBootstrap> jobBootstrap = new 
AtomicReference<>();
@@ -59,8 +60,6 @@ public final class PipelineJobRunnerManager {
     @Getter
     private final PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
     
-    private final PipelineJobRunnerCleaner cleaner;
-    
     public PipelineJobRunnerManager() {
         this(null);
     }
@@ -99,7 +98,7 @@ public final class PipelineJobRunnerManager {
      * @return sharding items
      */
     public Collection<Integer> getShardingItems() {
-        return new ArrayList<>(tasksRunners.keySet());
+        return tasksRunners.keySet();
     }
     
     /**
@@ -111,7 +110,7 @@ public final class PipelineJobRunnerManager {
      */
     public boolean addTasksRunner(final int shardingItem, final 
PipelineTasksRunner tasksRunner) {
         if (null != tasksRunners.putIfAbsent(shardingItem, tasksRunner)) {
-            log.warn("shardingItem {} tasks runner exists, ignore", 
shardingItem);
+            log.warn("Tasks runner on sharding item {} exists, ignore.", 
shardingItem);
             return false;
         }
         String jobId = tasksRunner.getJobItemContext().getJobId();

Reply via email to