This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new ccb5cb531a8 Refactor PipelineJobRunnerManager (#32731)
ccb5cb531a8 is described below
commit ccb5cb531a85c78fb78dd7a89495ad5ba95f27c7
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Aug 30 19:32:51 2024 +0800
Refactor PipelineJobRunnerManager (#32731)
---
.../data/pipeline/core/job/AbstractSeparablePipelineJob.java | 6 +++---
.../data/pipeline/core/job/engine/PipelineJobRunnerManager.java | 9 ++++-----
2 files changed, 7 insertions(+), 8 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index 3cfdc634b5c..196034b3549 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -75,9 +75,9 @@ public abstract class AbstractSeparablePipelineJob<T extends
PipelineJobConfigur
public final void execute(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
int shardingItem = shardingContext.getShardingItem();
- log.info("Execute job {}-{}", jobId, shardingItem);
+ log.info("Execute job {}-{}.", jobId, shardingItem);
if (jobRunnerManager.isStopping()) {
- log.info("Stopping true, ignore");
+ log.info("Job is stopping, ignore.");
return;
}
PipelineJobType jobType = PipelineJobIdUtils.parseJobType(jobId);
@@ -115,7 +115,7 @@ public abstract class AbstractSeparablePipelineJob<T
extends PipelineJobConfigur
String jobId = jobItemContext.getJobId();
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemFacade().getErrorMessage().clean(jobId,
shardingItem);
prepare(jobItemContext);
- log.info("Start tasks runner, jobId={}, shardingItem={}", jobId,
shardingItem);
+ log.info("Start tasks runner, jobId={}, shardingItem={}.", jobId,
shardingItem);
tasksRunner.start();
return true;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
index a1083185d45..c5bb8bc0be5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/engine/PipelineJobRunnerManager.java
@@ -33,7 +33,6 @@ import
org.apache.shardingsphere.elasticjob.infra.spi.ElasticJobServiceLoader;
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.JobBootstrap;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
@@ -50,6 +49,8 @@ public final class PipelineJobRunnerManager {
private static final long JOB_WAITING_TIMEOUT_MILLS = 2000L;
+ private final PipelineJobRunnerCleaner cleaner;
+
private final AtomicBoolean stopping = new AtomicBoolean(false);
private final AtomicReference<JobBootstrap> jobBootstrap = new
AtomicReference<>();
@@ -59,8 +60,6 @@ public final class PipelineJobRunnerManager {
@Getter
private final PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
- private final PipelineJobRunnerCleaner cleaner;
-
public PipelineJobRunnerManager() {
this(null);
}
@@ -99,7 +98,7 @@ public final class PipelineJobRunnerManager {
* @return sharding items
*/
public Collection<Integer> getShardingItems() {
- return new ArrayList<>(tasksRunners.keySet());
+ return tasksRunners.keySet();
}
/**
@@ -111,7 +110,7 @@ public final class PipelineJobRunnerManager {
*/
public boolean addTasksRunner(final int shardingItem, final
PipelineTasksRunner tasksRunner) {
if (null != tasksRunners.putIfAbsent(shardingItem, tasksRunner)) {
- log.warn("shardingItem {} tasks runner exists, ignore",
shardingItem);
+ log.warn("Tasks runner on sharding item {} exists, ignore.",
shardingItem);
return false;
}
String jobId = tasksRunner.getJobItemContext().getJobId();