This is an automated email from the ASF dual-hosted git repository.
zhangliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 1bfdb658196 Refactor AbstractSeparablePipelineJob and
AbstractInseparablePipelineJob (#32751)
1bfdb658196 is described below
commit 1bfdb658196fea850082c6836b9435830d10fc68
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Aug 31 21:20:47 2024 +0800
Refactor AbstractSeparablePipelineJob and AbstractInseparablePipelineJob
(#32751)
---
.../data/pipeline/core/job/AbstractInseparablePipelineJob.java | 7 +++++--
.../data/pipeline/core/job/AbstractSeparablePipelineJob.java | 6 ++++--
.../java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java | 3 +--
.../pipeline/scenario/consistencycheck/ConsistencyCheckJob.java | 5 -----
.../data/pipeline/scenario/migration/MigrationJob.java | 9 ++-------
5 files changed, 12 insertions(+), 18 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
index bcbcca15ede..9839cb8dc0a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractInseparablePipelineJob.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionJobItemContext;
@@ -29,6 +28,7 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.finishe
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.config.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
+import
org.apache.shardingsphere.data.pipeline.core.job.engine.cleaner.PipelineJobRunnerCleaner;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.PipelineJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
@@ -52,13 +52,16 @@ import java.util.concurrent.CompletableFuture;
* @param <I> type of pipeline job item context
* @param <P> type of pipeline job item progress
*/
-@RequiredArgsConstructor
@Getter
@Slf4j
public abstract class AbstractInseparablePipelineJob<T extends
PipelineJobConfiguration, I extends PipelineJobItemContext, P extends
PipelineJobItemProgress> implements PipelineJob {
private final PipelineJobRunnerManager jobRunnerManager;
+ protected AbstractInseparablePipelineJob(final PipelineJobRunnerCleaner
cleaner) {
+ jobRunnerManager = new PipelineJobRunnerManager(cleaner);
+ }
+
@SuppressWarnings("unchecked")
@Override
public final void execute(final ShardingContext shardingContext) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
index c77303c1feb..ae16f08672a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractSeparablePipelineJob.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.job;
import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
@@ -47,13 +46,16 @@ import java.sql.SQLException;
* @param <I> type of pipeline job item context
* @param <P> type of pipeline job item progress
*/
-@RequiredArgsConstructor
@Getter
@Slf4j
public abstract class AbstractSeparablePipelineJob<T extends
PipelineJobConfiguration, I extends PipelineJobItemContext, P extends
PipelineJobItemProgress> implements PipelineJob {
private final PipelineJobRunnerManager jobRunnerManager;
+ protected AbstractSeparablePipelineJob() {
+ jobRunnerManager = new PipelineJobRunnerManager();
+ }
+
@Override
public final void execute(final ShardingContext shardingContext) {
String jobId = shardingContext.getJobName();
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index ad69003cd6a..8fcd1de568f 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -45,7 +45,6 @@ import
org.apache.shardingsphere.data.pipeline.core.job.AbstractInseparablePipel
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobRegistry;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
-import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
@@ -76,7 +75,7 @@ public final class CDCJob extends
AbstractInseparablePipelineJob<CDCJobConfigura
private final PipelineSink sink;
public CDCJob(final PipelineSink sink) {
- super(new PipelineJobRunnerManager(new CDCJobRunnerCleaner(sink)));
+ super(new CDCJobRunnerCleaner(sink));
this.sink = sink;
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
index 2c73aef56b7..fca9649325c 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJob.java
@@ -20,7 +20,6 @@ package
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
-import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -32,10 +31,6 @@ import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.task.Co
*/
public final class ConsistencyCheckJob extends
AbstractSeparablePipelineJob<ConsistencyCheckJobConfiguration,
ConsistencyCheckJobItemContext, ConsistencyCheckJobItemProgress> {
- public ConsistencyCheckJob() {
- super(new PipelineJobRunnerManager());
- }
-
@Override
public ConsistencyCheckJobItemContext buildJobItemContext(final
ConsistencyCheckJobConfiguration jobConfig,
final int
shardingItem, final ConsistencyCheckJobItemProgress jobItemProgress, final
TransmissionProcessContext jobProcessContext) {
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index 09e140a72ef..aea9d599519 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -25,11 +25,8 @@ import
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfigurati
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.incremental.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSeparablePipelineJob;
-import
org.apache.shardingsphere.data.pipeline.core.job.engine.PipelineJobRunnerManager;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.config.PipelineProcessConfiguration;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
-import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
import
org.apache.shardingsphere.data.pipeline.core.preparer.datasource.param.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
@@ -42,6 +39,8 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.ingest.dumper.
import
org.apache.shardingsphere.data.pipeline.scenario.migration.preparer.MigrationJobPreparer;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.datanode.DataNode;
+import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier;
+import
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
import java.sql.SQLException;
import java.util.Collection;
@@ -56,10 +55,6 @@ public final class MigrationJob extends
AbstractSeparablePipelineJob<MigrationJo
private final MigrationJobPreparer jobPreparer = new
MigrationJobPreparer();
- public MigrationJob() {
- super(new PipelineJobRunnerManager());
- }
-
@Override
protected MigrationJobItemContext buildJobItemContext(final
MigrationJobConfiguration jobConfig,
final int
shardingItem, final TransmissionJobItemProgress jobItemProgress, final
TransmissionProcessContext jobProcessContext) {