This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 720a2504a5d Refactor PipelineJobAPI (#29007)
720a2504a5d is described below
commit 720a2504a5dbd82ba73af2c25ce3ea6de16501df
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 11 13:09:41 2023 +0800
Refactor PipelineJobAPI (#29007)
---
.../data/pipeline/core/job/service/PipelineJobAPI.java | 13 ++++---------
.../core/job/service/impl/AbstractPipelineJobAPIImpl.java | 5 -----
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 6 ++----
.../scenario/consistencycheck/ConsistencyCheckJobType.java | 4 +---
.../consistencycheck/api/impl/ConsistencyCheckJobAPI.java | 9 +++------
.../data/pipeline/scenario/migration/MigrationJobType.java | 4 +---
.../scenario/migration/api/impl/MigrationJobAPI.java | 7 ++-----
7 files changed, 13 insertions(+), 35 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
index 94e5c5e34dc..5a2bfefcbc5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/PipelineJobAPI.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.core.job.service;
-import
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
@@ -27,8 +26,8 @@ import
org.apache.shardingsphere.data.pipeline.common.context.PipelineProcessCon
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.PipelineJobItemProgress;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
+import
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
import org.apache.shardingsphere.infra.spi.annotation.SingletonSPI;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPI;
@@ -41,13 +40,6 @@ import java.util.Optional;
@SingletonSPI
public interface PipelineJobAPI extends TypedSPI {
- /**
- * Get job type.
- *
- * @return job type
- */
- JobType getJobType();
-
/**
* Marshal pipeline job id.
*
@@ -177,4 +169,7 @@ public interface PipelineJobAPI extends TypedSPI {
* @param shardingItem sharding item
*/
void cleanJobItemErrorMessage(String jobId, int shardingItem);
+
+ @Override
+ String getType();
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
index 6c353f52694..9a0cf65b82c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
@@ -168,11 +168,6 @@ public abstract class AbstractPipelineJobAPIImpl
implements PipelineJobAPI {
return result;
}
- @Override
- public String getType() {
- return getJobType().getType();
- }
-
@Override
public String getJobItemErrorMessage(final String jobId, final int
shardingItem) {
return
Optional.ofNullable(PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).getJobItemErrorMessage(jobId,
shardingItem)).orElse("");
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 09ceb2debd5..e203e1a2533 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -24,7 +24,6 @@ import org.apache.commons.codec.digest.DigestUtils;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
import
org.apache.shardingsphere.data.pipeline.cdc.api.pojo.StreamDataParameter;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
@@ -52,7 +51,6 @@ import
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipeli
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.JobItemIncrementalTasksProgress;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
@@ -368,7 +366,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
}
@Override
- public JobType getJobType() {
- return new CDCJobType();
+ public String getType() {
+ return "STREAMING";
}
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
index ce87dac4dfa..87e71d934de 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobType.java
@@ -24,11 +24,9 @@ import
org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
*/
public final class ConsistencyCheckJobType implements JobType {
- public static final String TYPE_CODE = "02";
-
@Override
public String getCode() {
- return TYPE_CODE;
+ return "02";
}
@Override
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index b9d1a11426f..087ee683394 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -19,7 +19,6 @@ package
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.im
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
@@ -31,8 +30,6 @@ import
org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.ConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlConsistencyCheckJobItemProgressSwapper;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import
org.apache.shardingsphere.data.pipeline.common.pojo.ConsistencyCheckJobItemInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
@@ -47,9 +44,9 @@ import
org.apache.shardingsphere.data.pipeline.core.job.service.InventoryIncreme
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractPipelineJobAPIImpl;
+import
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJob;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobId;
-import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.ConsistencyCheckJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.yaml.YamlConsistencyCheckJobConfiguration;
@@ -405,7 +402,7 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
}
@Override
- public JobType getJobType() {
- return JobCodeRegistry.getJobType(ConsistencyCheckJobType.TYPE_CODE);
+ public String getType() {
+ return "CONSISTENCY_CHECK";
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
index 92d305bca0f..ab7c82ebd90 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobType.java
@@ -24,11 +24,9 @@ import
org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
*/
public final class MigrationJobType implements JobType {
- public static final String TYPE_CODE = "01";
-
@Override
public String getCode() {
- return TYPE_CODE;
+ return "01";
}
@Override
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 1c310065bc5..2e273e15c40 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -38,8 +38,6 @@ import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSou
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.common.job.PipelineJobId;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobCodeRegistry;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
@@ -63,7 +61,6 @@ import
org.apache.shardingsphere.data.pipeline.core.job.service.impl.AbstractInv
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
@@ -490,8 +487,8 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
}
@Override
- public JobType getJobType() {
- return JobCodeRegistry.getJobType(MigrationJobType.TYPE_CODE);
+ public String getType() {
+ return "MIGRATION";
}
@Override