This is an automated email from the ASF dual-hosted git repository.

zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 1bfdb658196 Refactor AbstractSeparablePipelineJob and 
AbstractInseparablePipelineJob (#32751)
1bfdb658196 is described below

commit 1bfdb658196fea850082c6836b9435830d10fc68
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Aug 31 21:20:47 2024 +0800

    Refactor AbstractSeparablePipelineJob and AbstractInseparablePipelineJob 
(#32751)
---
 .../data/pipeline/core/job/AbstractInseparablePipelineJob.java   | 7 +++++--
 .../data/pipeline/core/job/AbstractSeparablePipelineJob.java     | 6 ++++--
 .../java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java | 3 +--
 .../pipeline/scenario/consistencycheck/ConsistencyCheckJob.java  | 5 -----
 .../data/pipeline/scenario/migration/MigrationJob.java           | 9 ++-------
 5 files changed, 12 insertions(+), 18 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index bcbcca15ede..9839cb8dc0a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.core.job;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
@@ -29,6 +28,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finishe
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
+import 
org.apache.shardingsphere.data.pipeline.core.job.engine.cleaner.PipelineJobRunnerCleaner;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
@@ -52,13 +52,16 @@ import java.util.concurrent.CompletableFuture;
  * @param <I> type of pipeline job item context
  * @param <P> type of pipeline job item progress
  */
-@RequiredArgsConstructor
 @Getter
 @Slf4j
 public abstract class AbstractInseparablePipelineJob<T extends 
PipelineJobConfiguration, I extends PipelineJobItemContext, P extends 
PipelineJobItemProgress> implements PipelineJob {
     
     private final PipelineJobRunnerManager jobRunnerManager;
     
+    protected AbstractInseparablePipelineJob(final PipelineJobRunnerCleaner 
cleaner) {
+        jobRunnerManager = new PipelineJobRunnerManager(cleaner);
+    }
+    
     @SuppressWarnings("unchecked")
     @Override
     public final void execute(final ShardingContext shardingContext) {
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index c77303c1feb..ae16f08672a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.core.job;
 
 import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
@@ -47,13 +46,16 @@ import java.sql.SQLException;
  * @param <I> type of pipeline job item context
  * @param <P> type of pipeline job item progress
  */
-@RequiredArgsConstructor
 @Getter
 @Slf4j
 public abstract class AbstractSeparablePipelineJob<T extends 
PipelineJobConfiguration, I extends PipelineJobItemContext, P extends 
PipelineJobItemProgress> implements PipelineJob {
     
     private final PipelineJobRunnerManager jobRunnerManager;
     
+    protected AbstractSeparablePipelineJob() {
+        jobRunnerManager = new PipelineJobRunnerManager();
+    }
+    
     @Override
     public final void execute(final ShardingContext shardingContext) {
         String jobId = shardingContext.getJobName();
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index ad69003cd6a..8fcd1de568f 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -45,7 +45,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractInseparablePipel
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
-import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
@@ -76,7 +75,7 @@ public final class CDCJob extends 
AbstractInseparablePipelineJob<CDCJobConfigura
     private final PipelineSink sink;
     
     public CDCJob(final PipelineSink sink) {
-        super(new PipelineJobRunnerManager(new CDCJobRunnerCleaner(sink)));
+        super(new CDCJobRunnerCleaner(sink));
         this.sink = sink;
     }
     
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 2c73aef56b7..fca9649325c 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -20,7 +20,6 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
-import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -32,10 +31,6 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task.Co
  */
 public final class ConsistencyCheckJob extends 
AbstractSeparablePipelineJob<ConsistencyCheckJobConfiguration, 
ConsistencyCheckJobItemContext, ConsistencyCheckJobItemProgress> {
     
-    public ConsistencyCheckJob() {
-        super(new PipelineJobRunnerManager());
-    }
-    
     @Override
     public ConsistencyCheckJobItemContext buildJobItemContext(final 
ConsistencyCheckJobConfiguration jobConfig,
                                                               final int 
shardingItem, final ConsistencyCheckJobItemProgress jobItemProgress, final 
TransmissionProcessContext jobProcessContext) {
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 09e140a72ef..aea9d599519 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -25,11 +25,8 @@ import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfigurati
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
-import 
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
 import 
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -42,6 +39,8 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.ingest.dumper.
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
 import org.apache.shardingsphere.infra.datanode.DataNode;
+import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
 
 import java.sql.SQLException;
 import java.util.Collection;
@@ -56,10 +55,6 @@ public final class MigrationJob extends 
AbstractSeparablePipelineJob<MigrationJo
     
     private final MigrationJobPreparer jobPreparer = new 
MigrationJobPreparer();
     
-    public MigrationJob() {
-        super(new PipelineJobRunnerManager());
-    }
-    
     @Override
     protected MigrationJobItemContext buildJobItemContext(final 
MigrationJobConfiguration jobConfig,
                                                           final int 
shardingItem, final TransmissionJobItemProgress jobItemProgress, final 
TransmissionProcessContext jobProcessContext) {

Reply via email to