This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 40e396d45aa Remove SPI from PipelineJobOption (#29206)
40e396d45aa is described below
commit 40e396d45aa1d4fc2952e6f359e9c1dfdc9e613f
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 25 20:50:36 2023 +0800
Remove SPI from PipelineJobOption (#29206)
* Refactor PipelineJobConfigurationManager
* Refactor MigrationJobAPI
* Remove SPI from PipelineJobOption
* Remove SPI from PipelineJobOption
---
.../data/pipeline/common/job/PipelineJobId.java | 4 ++--
.../data/pipeline/common/job/type/JobCodeRegistry.java | 6 +++---
.../job/type/{JobType.java => PipelineJobType.java} | 12 ++++++++++--
.../impl/AbstractJobConfigurationChangedProcessor.java | 4 ++--
.../data/pipeline/core/job/AbstractPipelineJob.java | 3 ++-
.../data/pipeline/core/job/AbstractPipelineJobId.java | 4 ++--
.../data/pipeline/core/job/PipelineJobIdUtils.java | 4 ++--
.../pipeline/core/job/option/PipelineJobOption.java | 9 ++++++---
.../persist/PipelineJobProgressPersistService.java | 4 ++--
.../job/service/PipelineJobConfigurationManager.java | 3 +--
.../pipeline/core/job/service/PipelineJobManager.java | 5 +++--
.../core/task/runner/TransmissionTasksRunner.java | 5 +++--
.../data/pipeline/common/job/type/FixtureJobType.java | 11 ++++++++++-
...here.data.pipeline.common.job.type.PipelineJobType} | 0
.../handler/query/ShowStreamingJobStatusExecutor.java | 4 ++--
.../handler/query/ShowStreamingRuleExecutor.java | 4 ++--
.../handler/query/ShowMigrationJobStatusExecutor.java | 4 ++--
.../data/pipeline/cdc/api/job/type/CDCJobType.java | 11 +++++++++--
...here.data.pipeline.common.job.type.PipelineJobType} | 0
...ere.data.pipeline.core.job.option.PipelineJobOption | 18 ------------------
.../data/pipeline/cdc/core/job/CDCJobIdTest.java | 4 ++--
.../consistencycheck/ConsistencyCheckJobType.java | 11 +++++++++--
...nsistencyCheckJobConfigurationChangedProcessor.java | 4 ++--
.../task/ConsistencyCheckTasksRunner.java | 9 ++++-----
...here.data.pipeline.common.job.type.PipelineJobType} | 0
...ere.data.pipeline.core.job.option.PipelineJobOption | 18 ------------------
.../pipeline/scenario/migration/MigrationJobType.java | 11 +++++++++--
.../scenario/migration/api/impl/MigrationJobAPI.java | 12 +++++++-----
.../MigrationJobConfigurationChangedProcessor.java | 4 ++--
...here.data.pipeline.common.job.type.PipelineJobType} | 0
...ere.data.pipeline.core.job.option.PipelineJobOption | 18 ------------------
.../ral/queryable/ShowMigrationRuleExecutor.java | 4 ++--
.../ral/updatable/AlterTransmissionRuleUpdater.java | 4 ++--
.../data/pipeline/cases/PipelineContainerComposer.java | 8 ++++----
.../pipeline/core/util/JobConfigurationBuilder.java | 4 ++--
35 files changed, 108 insertions(+), 118 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java
index 88ed08a4922..9f8039a30c1 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/PipelineJobId.java
@@ -18,7 +18,7 @@
package org.apache.shardingsphere.data.pipeline.common.job;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
/**
* Pipeline job id.
@@ -30,7 +30,7 @@ public interface PipelineJobId {
*
* @return type
*/
- JobType getJobType();
+ PipelineJobType getJobType();
/**
* Get format version.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobCodeRegistry.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobCodeRegistry.java
index cdd230ebf03..d7fce319ffa 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobCodeRegistry.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobCodeRegistry.java
@@ -33,10 +33,10 @@ import java.util.Map;
@Slf4j
public final class JobCodeRegistry {
- private static final Map<String, JobType> JOB_CODE_AND_TYPE_MAP = new
HashMap<>();
+ private static final Map<String, PipelineJobType> JOB_CODE_AND_TYPE_MAP =
new HashMap<>();
static {
- for (JobType each :
ShardingSphereServiceLoader.getServiceInstances(JobType.class)) {
+ for (PipelineJobType each :
ShardingSphereServiceLoader.getServiceInstances(PipelineJobType.class)) {
Preconditions.checkArgument(2 == each.getCode().length(), "Job
type code length is not 2.");
JOB_CODE_AND_TYPE_MAP.put(each.getCode(), each);
}
@@ -48,7 +48,7 @@ public final class JobCodeRegistry {
* @param jobTypeCode job type code
* @return job type
*/
- public static JobType getJobType(final String jobTypeCode) {
+ public static PipelineJobType getJobType(final String jobTypeCode) {
Preconditions.checkArgument(JOB_CODE_AND_TYPE_MAP.containsKey(jobTypeCode),
"Can not get job type by `%s`.", jobTypeCode);
return JOB_CODE_AND_TYPE_MAP.get(jobTypeCode);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobType.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/PipelineJobType.java
similarity index 81%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobType.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/PipelineJobType.java
index bb599664c64..4781e9c4552 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/JobType.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/job/type/PipelineJobType.java
@@ -17,14 +17,15 @@
package org.apache.shardingsphere.data.pipeline.common.job.type;
+import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
/**
- * Job type.
+ * Pipeline job type.
*/
@SingletonSPI
-public interface JobType extends TypedSPI {
+public interface PipelineJobType extends TypedSPI {
/**
* Get job type code.
@@ -33,6 +34,13 @@ public interface JobType extends TypedSPI {
*/
String getCode();
+ /**
+ * Get job option.
+ *
+ * @return job option
+ */
+ PipelineJobOption getOption();
+
@Override
String getType();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
index 53c1af69305..b331b378543 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/config/processor/impl/AbstractJobConfigurationChangedProcessor.java
@@ -18,7 +18,7 @@
package
org.apache.shardingsphere.data.pipeline.common.metadata.node.config.processor.impl;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.config.processor.JobConfigurationChangedProcessor;
import
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
@@ -89,7 +89,7 @@ public abstract class
AbstractJobConfigurationChangedProcessor implements JobCon
protected abstract AbstractPipelineJob buildPipelineJob(String jobId);
- protected abstract JobType getJobType();
+ protected abstract PipelineJobType getJobType();
@Override
public String getType() {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
index 78eecd175d1..8cf5e7ea051 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJob.java
@@ -22,6 +22,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJob;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.common.listener.PipelineElasticJobListener;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.common.util.PipelineDistributedBarrier;
@@ -65,7 +66,7 @@ public abstract class AbstractPipelineJob implements
PipelineJob {
protected AbstractPipelineJob(final String jobId) {
this.jobId = jobId;
- jobOption = TypedSPILoader.getService(PipelineJobOption.class,
PipelineJobIdUtils.parseJobType(jobId).getType());
+ jobOption = TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobId).getType()).getOption();
}
/**
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
index 9a8834dd831..17ffc1d82db 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/AbstractPipelineJobId.java
@@ -22,7 +22,7 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
/**
* Abstract pipeline job id.
@@ -33,7 +33,7 @@ public abstract class AbstractPipelineJobId implements
PipelineJobId {
public static final String CURRENT_VERSION = "02";
- private final JobType jobType;
+ private final PipelineJobType jobType;
private final PipelineContextKey contextKey;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
index 5611ac02e1c..65a825e0497 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/PipelineJobIdUtils.java
@@ -27,7 +27,7 @@ import org.apache.commons.codec.binary.Hex;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.common.util.InstanceTypeUtils;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
@@ -64,7 +64,7 @@ public final class PipelineJobIdUtils {
* @param jobId job id
* @return job type
*/
- public static JobType parseJobType(final String jobId) {
+ public static PipelineJobType parseJobType(final String jobId) {
verifyJobId(jobId);
return JobCodeRegistry.getJobType(jobId.substring(1, 3));
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
index 55f0c235f1d..ce65bf34c57 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/PipelineJobOption.java
@@ -24,7 +24,6 @@ import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobConf
import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.job.yaml.YamlPipelineJobItemProgressSwapper;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
-import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
import java.util.Optional;
@@ -33,7 +32,7 @@ import java.util.Optional;
* Pipeline job option.
*/
@SingletonSPI
-public interface PipelineJobOption extends TypedSPI {
+public interface PipelineJobOption {
/**
* Get YAML pipeline job configuration swapper.
@@ -95,6 +94,10 @@ public interface PipelineJobOption extends TypedSPI {
*/
Class<? extends PipelineJob> getJobClass();
- @Override
+ /**
+ * Get job type.
+ *
+ * @return job type
+ */
String getType();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
index 0f7bc16a222..26c95849c91 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/persist/PipelineJobProgressPersistService.java
@@ -21,9 +21,9 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -130,7 +130,7 @@ public final class PipelineJobProgressPersistService {
}
persistContext.getHasNewEvents().set(false);
long startTimeMillis = System.currentTimeMillis();
- new
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobOption.class,
PipelineJobIdUtils.parseJobType(jobId).getType())
+ new
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobId).getType()).getOption()
.getYamlJobItemProgressSwapper()).updateProgress(jobItemContext.get());
persistContext.getBeforePersistingProgressMillis().set(null);
if (6 == ThreadLocalRandom.current().nextInt(100)) {
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
index c02dc1355a8..a6c49fcb965 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobConfigurationManager.java
@@ -58,8 +58,7 @@ public final class PipelineJobConfigurationManager {
public JobConfigurationPOJO convertToJobConfigurationPOJO(final
PipelineJobConfiguration jobConfig) {
JobConfigurationPOJO result = new JobConfigurationPOJO();
result.setJobName(jobConfig.getJobId());
- int shardingTotalCount =
jobOption.isForceNoShardingWhenConvertToJobConfigurationPOJO() ? 1 :
jobConfig.getJobShardingCount();
- result.setShardingTotalCount(shardingTotalCount);
+
result.setShardingTotalCount(jobOption.isForceNoShardingWhenConvertToJobConfigurationPOJO()
? 1 : jobConfig.getJobShardingCount());
result.setJobParameter(YamlEngine.marshal(jobOption.getYamlJobConfigurationSwapper().swapToYamlConfiguration(jobConfig)));
String createTimeFormat =
LocalDateTime.now().format(DateTimeFormatterFactory.getStandardFormatter());
result.getProps().setProperty("create_time", createTimeFormat);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
index 717125ea114..27540a94a9d 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobManager.java
@@ -23,6 +23,7 @@ import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConf
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.PipelineGovernanceFacade;
@@ -109,7 +110,7 @@ public final class PipelineJobManager {
private void startNextDisabledJob(final String jobId, final String
toBeStartDisabledNextJobType) {
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().getLatestCheckJobId(jobId).ifPresent(optional
-> {
try {
- new
PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class,
toBeStartDisabledNextJobType)).startDisabledJob(optional);
+ new
PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class,
toBeStartDisabledNextJobType).getOption()).startDisabledJob(optional);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
@@ -131,7 +132,7 @@ public final class PipelineJobManager {
private void stopPreviousJob(final String jobId, final String
toBeStoppedPreviousJobType) {
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(jobId)).getJobFacade().getCheck().getLatestCheckJobId(jobId).ifPresent(optional
-> {
try {
- new
PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class,
toBeStoppedPreviousJobType)).stop(optional);
+ new
PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class,
toBeStoppedPreviousJobType).getOption()).stop(optional);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
index 6291279a019..655dbbb85d3 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/runner/TransmissionTasksRunner.java
@@ -26,6 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PipelineJobNotFoundException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
@@ -66,7 +67,7 @@ public class TransmissionTasksRunner implements
PipelineTasksRunner {
this.jobItemContext = jobItemContext;
inventoryTasks = jobItemContext.getInventoryTasks();
incrementalTasks = jobItemContext.getIncrementalTasks();
- jobOption = TypedSPILoader.getService(PipelineJobOption.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType());
+ jobOption = TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption();
jobManager = new PipelineJobManager(jobOption);
jobItemManager = new
PipelineJobItemManager<>(jobOption.getYamlJobItemProgressSwapper());
}
@@ -89,7 +90,7 @@ public class TransmissionTasksRunner implements
PipelineTasksRunner {
if (jobItemContext.isStopping()) {
return;
}
- new
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobOption.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())
+ new
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption()
.getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
if
(PipelineJobProgressDetector.isAllInventoryTasksFinished(inventoryTasks)) {
log.info("All inventory tasks finished.");
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java
index a1976f940ab..d4de294616f 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/job/type/FixtureJobType.java
@@ -17,16 +17,25 @@
package org.apache.shardingsphere.data.pipeline.common.job.type;
+import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+
+import static org.mockito.Mockito.mock;
+
/**
* Fixture job type.
*/
-public final class FixtureJobType implements JobType {
+public final class FixtureJobType implements PipelineJobType {
@Override
public String getCode() {
return "00";
}
+ @Override
+ public PipelineJobOption getOption() {
+ return mock(PipelineJobOption.class);
+ }
+
@Override
public String getType() {
return "FIXTURE";
diff --git
a/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType
b/kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType
similarity index 100%
rename from
kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType
rename to
kernel/data-pipeline/core/src/test/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
index add405c928f..b385437e6ca 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingJobStatusExecutor.java
@@ -20,8 +20,8 @@ package org.apache.shardingsphere.cdc.distsql.handler.query;
import
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingStatusStatement;
import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
@@ -41,7 +41,7 @@ public final class ShowStreamingJobStatusExecutor implements
QueryableRALExecuto
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowStreamingStatusStatement sqlStatement) {
- TransmissionJobOption jobOption = (TransmissionJobOption)
TypedSPILoader.getService(PipelineJobOption.class, new CDCJobType().getType());
+ TransmissionJobOption jobOption = (TransmissionJobOption)
TypedSPILoader.getService(PipelineJobType.class, new
CDCJobType().getType()).getOption();
List<TransmissionJobItemInfo> jobItemInfos = new
TransmissionJobManager(jobOption).getJobItemInfos(sqlStatement.getJobId());
long currentTimeMillis = System.currentTimeMillis();
return jobItemInfos.stream().map(each -> generateResultRow(each,
currentTimeMillis)).collect(Collectors.toList());
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
index 535299f0da2..843bb10a8e8 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/cdc/distsql/handler/query/ShowStreamingRuleExecutor.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.cdc.distsql.handler.query;
import
org.apache.shardingsphere.cdc.distsql.statement.ShowStreamingRuleStatement;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
@@ -40,7 +40,7 @@ public final class ShowStreamingRuleExecutor implements
QueryableRALExecutor<Sho
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowStreamingRuleStatement sqlStatement) {
- PipelineProcessConfiguration processConfig = new
TransmissionJobManager((TransmissionJobOption)
TypedSPILoader.getService(PipelineJobOption.class, "STREAMING"))
+ PipelineProcessConfiguration processConfig = new
TransmissionJobManager((TransmissionJobOption)
TypedSPILoader.getService(PipelineJobType.class, "STREAMING").getOption())
.showProcessConfiguration(new
PipelineContextKey(InstanceType.PROXY));
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
result.add(new
LocalDataQueryResultRow(getString(processConfig.getRead()),
getString(processConfig.getWrite()),
getString(processConfig.getStreamChannel())));
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
index bb146b4856a..b6a597a82fb 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationJobStatusExecutor.java
@@ -18,8 +18,8 @@
package org.apache.shardingsphere.migration.distsql.handler.query;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.common.pojo.TransmissionJobItemInfo;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
@@ -40,7 +40,7 @@ public final class ShowMigrationJobStatusExecutor implements
QueryableRALExecuto
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowMigrationStatusStatement sqlStatement) {
- TransmissionJobOption jobOption = (TransmissionJobOption)
TypedSPILoader.getService(PipelineJobOption.class, "MIGRATION");
+ TransmissionJobOption jobOption = (TransmissionJobOption)
TypedSPILoader.getService(PipelineJobType.class, "MIGRATION").getOption();
List<TransmissionJobItemInfo> jobItemInfos = new
TransmissionJobManager(jobOption).getJobItemInfos(sqlStatement.getJobId());
long currentTimeMillis = System.currentTimeMillis();
return jobItemInfos.stream().map(each -> generateResultRow(each,
currentTimeMillis)).collect(Collectors.toList());
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/job/type/CDCJobType.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/job/type/CDCJobType.java
index 95cfcbdb538..c073f45c896 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/job/type/CDCJobType.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/job/type/CDCJobType.java
@@ -17,18 +17,25 @@
package org.apache.shardingsphere.data.pipeline.cdc.api.job.type;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
+import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
/**
* CDC job type.
*/
-public final class CDCJobType implements JobType {
+public final class CDCJobType implements PipelineJobType {
@Override
public String getCode() {
return "03";
}
+ @Override
+ public PipelineJobOption getOption() {
+ return new CDCJobOption();
+ }
+
@Override
public String getType() {
return "STREAMING";
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType
b/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType
similarity index 100%
rename from
kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType
rename to
kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption
b/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption
deleted file mode 100644
index 136bec61f32..00000000000
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.cdc.api.impl.CDCJobOption
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
index 70856a72c1c..cb9a8b6eeb0 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/test/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJobIdTest.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.cdc.core.job;
import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
import org.apache.shardingsphere.data.pipeline.cdc.constant.CDCSinkType;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import org.junit.jupiter.api.Test;
@@ -37,7 +37,7 @@ class CDCJobIdTest {
PipelineContextKey contextKey = new PipelineContextKey("sharding_db",
InstanceType.PROXY);
CDCJobId pipelineJobId = new CDCJobId(contextKey,
Arrays.asList("test", "t_order"), false, CDCSinkType.SOCKET.name());
String jobId =
PipelineJobIdUtils.marshalJobIdCommonPrefix(pipelineJobId) + "abcd";
- JobType actualJobType = PipelineJobIdUtils.parseJobType(jobId);
+ PipelineJobType actualJobType = PipelineJobIdUtils.parseJobType(jobId);
assertThat(actualJobType, instanceOf(CDCJobType.class));
}
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
index 87e71d934de..ccc0dc3ad8d 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
@@ -17,18 +17,25 @@
package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
+import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobOption;
/**
* Consistency check job type.
*/
-public final class ConsistencyCheckJobType implements JobType {
+public final class ConsistencyCheckJobType implements PipelineJobType {
@Override
public String getCode() {
return "02";
}
+ @Override
+ public PipelineJobOption getOption() {
+ return new ConsistencyCheckJobOption();
+ }
+
@Override
public String getType() {
return "CONSISTENCY_CHECK";
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
index 89b4a40fc3a..7d90fee414b 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/metadata/processor/ConsistencyCheckJobConfigurationChangedProcessor.java
@@ -18,7 +18,7 @@
package
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.metadata.processor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.config.processor.impl.AbstractJobConfigurationChangedProcessor;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
@@ -41,7 +41,7 @@ public final class
ConsistencyCheckJobConfigurationChangedProcessor extends Abst
}
@Override
- protected JobType getJobType() {
+ protected PipelineJobType getJobType() {
return new ConsistencyCheckJobType();
}
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index e5dc6c5ccc3..65fe73c5eae 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -26,11 +26,10 @@ import
org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.common.execute.PipelineLifecycleRunnable;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
@@ -85,7 +84,7 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
if (jobItemContext.isStopping()) {
return;
}
- new
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobOption.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType())
+ new
PipelineJobItemManager<>(TypedSPILoader.getService(PipelineJobType.class,
PipelineJobIdUtils.parseJobType(jobItemContext.getJobId()).getType()).getOption()
.getYamlJobItemProgressSwapper()).persistProgress(jobItemContext);
CompletableFuture<?> future =
jobItemContext.getProcessContext().getConsistencyCheckExecuteEngine().submit(checkExecutor);
ExecuteEngine.trigger(Collections.singletonList(future), new
CheckExecuteCallback());
@@ -102,8 +101,8 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
@Override
protected void runBlocking() {
jobItemManager.persistProgress(jobItemContext);
- JobType jobType = PipelineJobIdUtils.parseJobType(parentJobId);
- TransmissionJobOption jobOption = (TransmissionJobOption)
TypedSPILoader.getService(PipelineJobOption.class, jobType.getType());
+ PipelineJobType jobType =
PipelineJobIdUtils.parseJobType(parentJobId);
+ TransmissionJobOption jobOption = (TransmissionJobOption)
TypedSPILoader.getService(PipelineJobType.class, jobType.getType()).getOption();
PipelineJobConfiguration parentJobConfig = new
PipelineJobConfigurationManager(jobOption).getJobConfiguration(parentJobId);
try {
PipelineDataConsistencyChecker checker =
jobOption.buildDataConsistencyChecker(
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType
b/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType
similarity index 100%
rename from
kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType
rename to
kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption
b/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption
deleted file mode 100644
index 31edf286e07..00000000000
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobOption
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
index ab7c82ebd90..5afb2f50f7a 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
@@ -17,18 +17,25 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
+import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
/**
* Migration job type.
*/
-public final class MigrationJobType implements JobType {
+public final class MigrationJobType implements PipelineJobType {
@Override
public String getCode() {
return "01";
}
+ @Override
+ public PipelineJobOption getOption() {
+ return new MigrationJobOption();
+ }
+
@Override
public String getType() {
return "MIGRATION";
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 649e0c994fa..b89e63824d3 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -21,6 +21,7 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
@@ -49,10 +50,11 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
public void commit(final String jobId) {
log.info("Commit job {}", jobId);
final long startTimeMillis = System.currentTimeMillis();
- PipelineJobManager jobManager = new
PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class,
getType()));
+ PipelineJobOption jobOption = new MigrationJobOption();
+ PipelineJobManager jobManager = new PipelineJobManager(jobOption);
jobManager.stop(jobId);
dropCheckJobs(jobId);
- MigrationJobConfiguration jobConfig = new
PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobOption.class,
getType())).getJobConfiguration(jobId);
+ MigrationJobConfiguration jobConfig = new
PipelineJobConfigurationManager(jobOption).getJobConfiguration(jobId);
refreshTableMetadata(jobId, jobConfig.getTargetDatabaseName());
jobManager.drop(jobId);
log.info("Commit cost {} ms", System.currentTimeMillis() -
startTimeMillis);
@@ -70,7 +72,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
final long startTimeMillis = System.currentTimeMillis();
dropCheckJobs(jobId);
cleanTempTableOnRollback(jobId);
- new
PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class,
getType())).drop(jobId);
+ new
PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class,
getType()).getOption()).drop(jobId);
log.info("Rollback job {} cost {} ms", jobId,
System.currentTimeMillis() - startTimeMillis);
}
@@ -81,7 +83,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
}
for (String each : checkJobIds) {
try {
- new
PipelineJobManager(TypedSPILoader.getService(PipelineJobOption.class,
getType())).drop(each);
+ new
PipelineJobManager(TypedSPILoader.getService(PipelineJobType.class,
getType()).getOption()).drop(each);
// CHECKSTYLE:OFF
} catch (final RuntimeException ex) {
// CHECKSTYLE:ON
@@ -91,7 +93,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
}
private void cleanTempTableOnRollback(final String jobId) throws
SQLException {
- MigrationJobConfiguration jobConfig = new
PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobOption.class,
getType())).getJobConfiguration(jobId);
+ MigrationJobConfiguration jobConfig = new
PipelineJobConfigurationManager(TypedSPILoader.getService(PipelineJobType.class,
getType()).getOption()).getJobConfiguration(jobId);
PipelineCommonSQLBuilder pipelineSQLBuilder = new
PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType());
TableAndSchemaNameMapper mapping = new
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
try (
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
index a15921a9adb..fd2c2dc48b6 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/metadata/processor/MigrationJobConfigurationChangedProcessor.java
@@ -18,7 +18,7 @@
package
org.apache.shardingsphere.data.pipeline.scenario.migration.metadata.processor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.config.processor.impl.AbstractJobConfigurationChangedProcessor;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
@@ -44,7 +44,7 @@ public final class MigrationJobConfigurationChangedProcessor
extends AbstractJob
}
@Override
- protected JobType getJobType() {
+ protected PipelineJobType getJobType() {
return new MigrationJobType();
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType
b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType
similarity index 100%
rename from
kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.JobType
rename to
kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption
b/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption
deleted file mode 100644
index c89d6bd11b8..00000000000
---
a/kernel/data-pipeline/scenario/migration/src/main/resources/META-INF/services/org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption
+++ /dev/null
@@ -1,18 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
index 1bf9b0046ed..2e6cf74c7c1 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/queryable/ShowMigrationRuleExecutor.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.proxy.backend.handler.distsql.ral.queryable;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
@@ -40,7 +40,7 @@ public final class ShowMigrationRuleExecutor implements
QueryableRALExecutor<Sho
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowMigrationRuleStatement sqlStatement) {
- PipelineProcessConfiguration processConfig = new
TransmissionJobManager((TransmissionJobOption)
TypedSPILoader.getService(PipelineJobOption.class, "MIGRATION"))
+ PipelineProcessConfiguration processConfig = new
TransmissionJobManager((TransmissionJobOption)
TypedSPILoader.getService(PipelineJobType.class, "MIGRATION").getOption())
.showProcessConfiguration(new
PipelineContextKey(InstanceType.PROXY));
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
result.add(new
LocalDataQueryResultRow(getString(processConfig.getRead()),
getString(processConfig.getWrite()),
getString(processConfig.getStreamChannel())));
diff --git
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java
index 210da1a21d6..97c9b0dd3a2 100644
---
a/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java
+++
b/proxy/backend/core/src/main/java/org/apache/shardingsphere/proxy/backend/handler/distsql/ral/updatable/AlterTransmissionRuleUpdater.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.proxy.backend.handler.distsql.ral.updatable;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
@@ -35,7 +35,7 @@ public final class AlterTransmissionRuleUpdater implements
RALUpdater<AlterTrans
@Override
public void executeUpdate(final String databaseName, final
AlterTransmissionRuleStatement sqlStatement) {
- TransmissionJobManager jobManager = new
TransmissionJobManager((TransmissionJobOption)
TypedSPILoader.getService(PipelineJobOption.class,
sqlStatement.getJobTypeName()));
+ TransmissionJobManager jobManager = new
TransmissionJobManager((TransmissionJobOption)
TypedSPILoader.getService(PipelineJobType.class,
sqlStatement.getJobTypeName()).getOption());
PipelineProcessConfiguration processConfig =
TransmissionProcessConfigurationSegmentConverter.convert(sqlStatement.getProcessConfigSegment());
jobManager.alterProcessConfiguration(new
PipelineContextKey(InstanceType.PROXY), processConfig);
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
index edd99c6e21f..9c56035dc9d 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/PipelineContainerComposer.java
@@ -26,7 +26,7 @@ import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDa
import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import
org.apache.shardingsphere.infra.database.core.connector.url.JdbcUrlAppender;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
@@ -118,7 +118,7 @@ public final class PipelineContainerComposer implements
AutoCloseable {
private Thread increaseTaskThread;
- public PipelineContainerComposer(final PipelineTestParameter testParam,
final JobType jobType) {
+ public PipelineContainerComposer(final PipelineTestParameter testParam,
final PipelineJobType jobType) {
databaseType = testParam.getDatabaseType();
containerComposer =
PipelineE2EEnvironment.getInstance().getItEnvType() ==
PipelineEnvTypeEnum.DOCKER
? new DockerContainerComposer(testParam.getDatabaseType(),
testParam.getStorageContainerImage(), testParam.getStorageContainerCount())
@@ -140,7 +140,7 @@ public final class PipelineContainerComposer implements
AutoCloseable {
}
@SneakyThrows(SQLException.class)
- private void init(final JobType jobType) {
+ private void init(final PipelineJobType jobType) {
String jdbcUrl = containerComposer.getProxyJdbcUrl(databaseType
instanceof PostgreSQLDatabaseType || databaseType instanceof
OpenGaussDatabaseType ? "postgres" : "");
try (Connection connection = DriverManager.getConnection(jdbcUrl,
ProxyContainerConstants.USERNAME, ProxyContainerConstants.PASSWORD)) {
cleanUpPipelineJobs(connection, jobType);
@@ -150,7 +150,7 @@ public final class PipelineContainerComposer implements
AutoCloseable {
cleanUpDataSource();
}
- private void cleanUpPipelineJobs(final Connection connection, final
JobType jobType) throws SQLException {
+ private void cleanUpPipelineJobs(final Connection connection, final
PipelineJobType jobType) throws SQLException {
if (PipelineEnvTypeEnum.NATIVE !=
PipelineE2EEnvironment.getInstance().getItEnvType()) {
return;
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
index eb19714abaf..84054e5e48e 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/JobConfigurationBuilder.java
@@ -27,7 +27,7 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceFactory;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
+import org.apache.shardingsphere.data.pipeline.common.job.type.PipelineJobType;
import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
@@ -96,7 +96,7 @@ public final class JobConfigurationBuilder {
result.setSources(sources);
result.setTarget(createYamlPipelineDataSourceConfiguration(new
ShardingSpherePipelineDataSourceConfiguration(
ConfigurationFileUtils.readFile("migration_sharding_sphere_jdbc_target.yaml").replace("${databaseNameSuffix}",
databaseNameSuffix))));
- ((TransmissionJobOption)
TypedSPILoader.getService(PipelineJobOption.class,
"MIGRATION")).extendYamlJobConfiguration(contextKey, result);
+ ((TransmissionJobOption)
TypedSPILoader.getService(PipelineJobType.class,
"MIGRATION").getOption()).extendYamlJobConfiguration(contextKey, result);
return result;
}