This is an automated email from the ASF dual-hosted git repository.

panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 213166a31ec Refactor usage of PipelineJobAPI.getType() (#29006)
213166a31ec is described below

commit 213166a31ec7cb6947bfca25e542f09e59b5e407
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 11 08:48:04 2023 +0800

    Refactor usage of PipelineJobAPI.getType() (#29006)
---
 .../pipeline/common/metadata/node/PipelineMetaDataNode.java  |  9 ++++-----
 .../registrycenter/repository/GovernanceRepositoryAPI.java   |  9 ++++-----
 .../repository/GovernanceRepositoryAPIImpl.java              |  9 ++++-----
 .../service/impl/AbstractInventoryIncrementalJobAPIImpl.java |  4 ++--
 .../core/job/service/impl/AbstractPipelineJobAPIImpl.java    |  2 +-
 .../core/metadata/PipelineDataSourcePersistService.java      |  5 ++---
 .../core/metadata/PipelineMetaDataPersistService.java        |  5 ++---
 .../metadata/PipelineProcessConfigurationPersistService.java |  5 ++---
 .../common/metadata/node/PipelineMetaDataNodeTest.java       |  5 ++---
 .../scenario/migration/api/impl/MigrationJobAPI.java         | 12 ++++++------
 .../PipelineProcessConfigurationPersistServiceTest.java      |  7 ++-----
 .../scenario/migration/api/impl/MigrationJobAPITest.java     |  7 +++----
 12 files changed, 34 insertions(+), 45 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java
index ab2b84017dd..19d71a8d791 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java
@@ -19,7 +19,6 @@ package 
org.apache.shardingsphere.data.pipeline.common.metadata.node;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
 
 import java.util.regex.Pattern;
 
@@ -41,14 +40,14 @@ public final class PipelineMetaDataNode {
      * @param jobType job type
      * @return data sources path
      */
-    public static String getMetaDataDataSourcesPath(final JobType jobType) {
+    public static String getMetaDataDataSourcesPath(final String jobType) {
         return String.join("/", getMetaDataRootPath(jobType), "data_sources");
     }
     
-    private static String getMetaDataRootPath(final JobType jobType) {
+    private static String getMetaDataRootPath(final String jobType) {
         return null == jobType
                 ? String.join("/", PipelineNodePath.DATA_PIPELINE_ROOT, 
"metadata")
-                : String.join("/", PipelineNodePath.DATA_PIPELINE_ROOT, 
jobType.getType().toLowerCase(), "metadata");
+                : String.join("/", PipelineNodePath.DATA_PIPELINE_ROOT, 
jobType.toLowerCase(), "metadata");
     }
     
     /**
@@ -57,7 +56,7 @@ public final class PipelineMetaDataNode {
      * @param jobType job type
      * @return data sources path
      */
-    public static String getMetaDataProcessConfigPath(final JobType jobType) {
+    public static String getMetaDataProcessConfigPath(final String jobType) {
         return String.join("/", getMetaDataRootPath(jobType), 
"process_config");
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
index ccc68d2a091..61502c65757 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
@@ -17,7 +17,6 @@
 
 package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
 
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
 
@@ -192,7 +191,7 @@ public interface GovernanceRepositoryAPI {
      * @param jobType job type
      * @return data source properties
      */
-    String getMetaDataDataSources(JobType jobType);
+    String getMetaDataDataSources(String jobType);
     
     /**
      * Persist meta data data sources.
@@ -200,7 +199,7 @@ public interface GovernanceRepositoryAPI {
      * @param jobType job type
      * @param metaDataDataSources data source properties
      */
-    void persistMetaDataDataSources(JobType jobType, String 
metaDataDataSources);
+    void persistMetaDataDataSources(String jobType, String 
metaDataDataSources);
     
     /**
      * Get meta data process configuration.
@@ -208,7 +207,7 @@ public interface GovernanceRepositoryAPI {
      * @param jobType job type, nullable
      * @return process configuration YAML text
      */
-    String getMetaDataProcessConfiguration(JobType jobType);
+    String getMetaDataProcessConfiguration(String jobType);
     
     /**
      * Persist meta data process configuration.
@@ -216,7 +215,7 @@ public interface GovernanceRepositoryAPI {
      * @param jobType job type, nullable
      * @param processConfigYamlText process configuration YAML text
      */
-    void persistMetaDataProcessConfiguration(JobType jobType, String 
processConfigYamlText);
+    void persistMetaDataProcessConfiguration(String jobType, String 
processConfigYamlText);
     
     /**
      * Get job item error msg.
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
index b2bd85fa747..d1da0da09ac 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
@@ -20,7 +20,6 @@ package 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository
 import com.google.common.base.Strings;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
@@ -167,22 +166,22 @@ public final class GovernanceRepositoryAPIImpl implements 
GovernanceRepositoryAP
     }
     
     @Override
-    public String getMetaDataDataSources(final JobType jobType) {
+    public String getMetaDataDataSources(final String jobType) {
         return 
repository.getDirectly(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType));
     }
     
     @Override
-    public void persistMetaDataDataSources(final JobType jobType, final String 
metaDataDataSources) {
+    public void persistMetaDataDataSources(final String jobType, final String 
metaDataDataSources) {
         
repository.persist(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType), 
metaDataDataSources);
     }
     
     @Override
-    public String getMetaDataProcessConfiguration(final JobType jobType) {
+    public String getMetaDataProcessConfiguration(final String jobType) {
         return 
repository.getDirectly(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType));
     }
     
     @Override
-    public void persistMetaDataProcessConfiguration(final JobType jobType, 
final String processConfigYamlText) {
+    public void persistMetaDataProcessConfiguration(final String jobType, 
final String processConfigYamlText) {
         
repository.persist(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType), 
processConfigYamlText);
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index ecf481e03cb..8d79fbf093f 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -80,12 +80,12 @@ public abstract class 
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
     @Override
     public void alterProcessConfiguration(final PipelineContextKey contextKey, 
final PipelineProcessConfiguration processConfig) {
         // TODO check rateLimiter type match or not
-        processConfigPersistService.persist(contextKey, getJobType(), 
processConfig);
+        processConfigPersistService.persist(contextKey, getType(), 
processConfig);
     }
     
     @Override
     public PipelineProcessConfiguration showProcessConfiguration(final 
PipelineContextKey contextKey) {
-        return 
PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey,
 getJobType()));
+        return 
PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey,
 getType()));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
index 4978dbd6192..6c353f52694 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
@@ -71,7 +71,7 @@ public abstract class AbstractPipelineJobAPIImpl implements 
PipelineJobAPI {
     
     private Stream<JobBriefInfo> getJobBriefInfos(final PipelineContextKey 
contextKey) {
         return 
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each
 -> !each.getJobName().startsWith("_"))
-                .filter(each -> 
PipelineJobIdUtils.parseJobType(each.getJobName()).getCode().equals(getJobType().getCode()));
+                .filter(each -> 
PipelineJobIdUtils.parseJobType(each.getJobName()).getType().equals(getType()));
     }
     
     // TODO Add getJobInfo
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
index ad7ed42c007..00119f7d408 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.data.pipeline.core.metadata;
 
 import com.google.common.base.Strings;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import 
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -39,7 +38,7 @@ public final class PipelineDataSourcePersistService 
implements PipelineMetaDataP
     
     @Override
     @SuppressWarnings("unchecked")
-    public Map<String, DataSourcePoolProperties> load(final PipelineContextKey 
contextKey, final JobType jobType) {
+    public Map<String, DataSourcePoolProperties> load(final PipelineContextKey 
contextKey, final String jobType) {
         String dataSourcesProps = 
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataDataSources(jobType);
         if (Strings.isNullOrEmpty(dataSourcesProps)) {
             return Collections.emptyMap();
@@ -51,7 +50,7 @@ public final class PipelineDataSourcePersistService 
implements PipelineMetaDataP
     }
     
     @Override
-    public void persist(final PipelineContextKey contextKey, final JobType 
jobType, final Map<String, DataSourcePoolProperties> propsMap) {
+    public void persist(final PipelineContextKey contextKey, final String 
jobType, final Map<String, DataSourcePoolProperties> propsMap) {
         Map<String, Map<String, Object>> dataSourceMap = new 
LinkedHashMap<>(propsMap.size(), 1F);
         for (Entry<String, DataSourcePoolProperties> entry : 
propsMap.entrySet()) {
             dataSourceMap.put(entry.getKey(), 
swapper.swapToMap(entry.getValue()));
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineMetaDataPersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineMetaDataPersistService.java
index 30ef52253d0..2b615e17712 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineMetaDataPersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineMetaDataPersistService.java
@@ -18,7 +18,6 @@
 package org.apache.shardingsphere.data.pipeline.core.metadata;
 
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
 
 /**
  * Pipeline meta data persist service.
@@ -34,7 +33,7 @@ public interface PipelineMetaDataPersistService<T> {
      * @param jobType job type, nullable
      * @return configurations
      */
-    T load(PipelineContextKey contextKey, JobType jobType);
+    T load(PipelineContextKey contextKey, String jobType);
     
     /**
      * Persist meta data.
@@ -43,5 +42,5 @@ public interface PipelineMetaDataPersistService<T> {
      * @param jobType job type, nullable
      * @param configs configurations
      */
-    void persist(PipelineContextKey contextKey, JobType jobType, T configs);
+    void persist(PipelineContextKey contextKey, String jobType, T configs);
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
index e498002d9c4..4df71589bf2 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
@@ -22,7 +22,6 @@ import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelinePro
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.yaml.YamlPipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.yaml.swapper.YamlPipelineProcessConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 
@@ -34,7 +33,7 @@ public final class PipelineProcessConfigurationPersistService 
implements Pipelin
     private final YamlPipelineProcessConfigurationSwapper swapper = new 
YamlPipelineProcessConfigurationSwapper();
     
     @Override
-    public PipelineProcessConfiguration load(final PipelineContextKey 
contextKey, final JobType jobType) {
+    public PipelineProcessConfiguration load(final PipelineContextKey 
contextKey, final String jobType) {
         String yamlText = 
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataProcessConfiguration(jobType);
         if (Strings.isNullOrEmpty(yamlText)) {
             return null;
@@ -44,7 +43,7 @@ public final class PipelineProcessConfigurationPersistService 
implements Pipelin
     }
     
     @Override
-    public void persist(final PipelineContextKey contextKey, final JobType 
jobType, final PipelineProcessConfiguration processConfig) {
+    public void persist(final PipelineContextKey contextKey, final String 
jobType, final PipelineProcessConfiguration processConfig) {
         String yamlText = 
YamlEngine.marshal(swapper.swapToYamlConfiguration(processConfig));
         
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).persistMetaDataProcessConfiguration(jobType,
 yamlText);
     }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeTest.java
index a4fe7687c5a..d33cf56282c 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeTest.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.data.pipeline.common.metadata.node;
 
-import org.apache.shardingsphere.data.pipeline.common.job.type.FixtureJobType;
 import org.hamcrest.MatcherAssert;
 import org.junit.jupiter.api.Test;
 
@@ -38,12 +37,12 @@ class PipelineMetaDataNodeTest {
     
     @Test
     void assertGetMetaDataDataSourcesPath() {
-        
MatcherAssert.assertThat(PipelineMetaDataNode.getMetaDataDataSourcesPath(new 
FixtureJobType()), is(migrationMetaDataRootPath + "/data_sources"));
+        
MatcherAssert.assertThat(PipelineMetaDataNode.getMetaDataDataSourcesPath("FIXTURE"),
 is(migrationMetaDataRootPath + "/data_sources"));
     }
     
     @Test
     void assertGetMetaDataProcessConfigPath() {
-        assertThat(PipelineMetaDataNode.getMetaDataProcessConfigPath(new 
FixtureJobType()), is(migrationMetaDataRootPath + "/process_config"));
+        
assertThat(PipelineMetaDataNode.getMetaDataProcessConfigPath("FIXTURE"), 
is(migrationMetaDataRootPath + "/process_config"));
     }
     
     @Test
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 3eb8f36e54c..1c310065bc5 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -135,7 +135,7 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
     private YamlMigrationJobConfiguration buildYamlJobConfiguration(final 
PipelineContextKey contextKey, final MigrateTableStatement param) {
         YamlMigrationJobConfiguration result = new 
YamlMigrationJobConfiguration();
         result.setTargetDatabaseName(param.getTargetDatabaseName());
-        Map<String, DataSourcePoolProperties> metaDataDataSource = 
dataSourcePersistService.load(contextKey, new MigrationJobType());
+        Map<String, DataSourcePoolProperties> metaDataDataSource = 
dataSourcePersistService.load(contextKey, "MIGRATION");
         Map<String, List<DataNode>> sourceDataNodes = new LinkedHashMap<>();
         Map<String, YamlPipelineDataSourceConfiguration> configSources = new 
LinkedHashMap<>();
         List<SourceTargetEntry> sourceTargetEntries = new ArrayList<>(new 
HashSet<>(param.getSourceTargetEntries())).stream().sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName)
@@ -405,7 +405,7 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
      * @param propsMap data source pool properties map
      */
     public void addMigrationSourceResources(final PipelineContextKey 
contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
-        Map<String, DataSourcePoolProperties> existDataSources = 
dataSourcePersistService.load(contextKey, getJobType());
+        Map<String, DataSourcePoolProperties> existDataSources = 
dataSourcePersistService.load(contextKey, getType());
         Collection<String> duplicateDataSourceNames = new 
HashSet<>(propsMap.size(), 1F);
         for (Entry<String, DataSourcePoolProperties> entry : 
propsMap.entrySet()) {
             if (existDataSources.containsKey(entry.getKey())) {
@@ -415,7 +415,7 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
         
ShardingSpherePreconditions.checkState(duplicateDataSourceNames.isEmpty(), () 
-> new RegisterMigrationSourceStorageUnitException(duplicateDataSourceNames));
         Map<String, DataSourcePoolProperties> result = new 
LinkedHashMap<>(existDataSources);
         result.putAll(propsMap);
-        dataSourcePersistService.persist(contextKey, getJobType(), result);
+        dataSourcePersistService.persist(contextKey, getType(), result);
     }
     
     /**
@@ -425,13 +425,13 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
      * @param resourceNames resource names
      */
     public void dropMigrationSourceResources(final PipelineContextKey 
contextKey, final Collection<String> resourceNames) {
-        Map<String, DataSourcePoolProperties> metaDataDataSource = 
dataSourcePersistService.load(contextKey, getJobType());
+        Map<String, DataSourcePoolProperties> metaDataDataSource = 
dataSourcePersistService.load(contextKey, getType());
         List<String> noExistResources = resourceNames.stream().filter(each -> 
!metaDataDataSource.containsKey(each)).collect(Collectors.toList());
         ShardingSpherePreconditions.checkState(noExistResources.isEmpty(), () 
-> new UnregisterMigrationSourceStorageUnitException(noExistResources));
         for (String each : resourceNames) {
             metaDataDataSource.remove(each);
         }
-        dataSourcePersistService.persist(contextKey, getJobType(), 
metaDataDataSource);
+        dataSourcePersistService.persist(contextKey, getType(), 
metaDataDataSource);
     }
     
     /**
@@ -441,7 +441,7 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
      * @return migration source resources
      */
     public Collection<Collection<Object>> listMigrationSourceResources(final 
PipelineContextKey contextKey) {
-        Map<String, DataSourcePoolProperties> propsMap = 
dataSourcePersistService.load(contextKey, getJobType());
+        Map<String, DataSourcePoolProperties> propsMap = 
dataSourcePersistService.load(contextKey, getType());
         Collection<Collection<Object>> result = new 
ArrayList<>(propsMap.size());
         for (Entry<String, DataSourcePoolProperties> entry : 
propsMap.entrySet()) {
             String dataSourceName = entry.getKey();
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
index 9f0720c57a4..0c9142a61d0 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
@@ -22,9 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.common.config.process.yaml.YamlPi
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.yaml.YamlPipelineReadConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.yaml.YamlPipelineWriteConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.yaml.swapper.YamlPipelineProcessConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
 import 
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
@@ -56,9 +54,8 @@ class PipelineProcessConfigurationPersistServiceTest {
         String expectedYamlText = YamlEngine.marshal(yamlProcessConfig);
         PipelineProcessConfiguration processConfig = new 
YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig);
         PipelineProcessConfigurationPersistService persistService = new 
PipelineProcessConfigurationPersistService();
-        JobType jobType = new MigrationJobType();
-        persistService.persist(PipelineContextUtils.getContextKey(), jobType, 
processConfig);
-        String actualYamlText = YamlEngine.marshal(new 
YamlPipelineProcessConfigurationSwapper().swapToYamlConfiguration(persistService.load(PipelineContextUtils.getContextKey(),
 jobType)));
+        persistService.persist(PipelineContextUtils.getContextKey(), 
"MIGRATION", processConfig);
+        String actualYamlText = YamlEngine.marshal(new 
YamlPipelineProcessConfigurationSwapper().swapToYamlConfiguration(persistService.load(PipelineContextUtils.getContextKey(),
 "MIGRATION")));
         assertThat(actualYamlText, is(expectedYamlText));
     }
 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 4c4c5570879..c51bddd7599 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -19,9 +19,9 @@ package 
org.apache.shardingsphere.test.it.data.pipeline.scenario.migration.api.i
 
 import lombok.SneakyThrows;
 import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
 import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
+import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
@@ -34,7 +34,6 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInva
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
@@ -252,7 +251,7 @@ class MigrationJobAPITest {
     @Test
     void assertAddMigrationSourceResources() {
         PipelineDataSourcePersistService persistService = new 
PipelineDataSourcePersistService();
-        Map<String, DataSourcePoolProperties> actual = 
persistService.load(PipelineContextUtils.getContextKey(), new 
MigrationJobType());
+        Map<String, DataSourcePoolProperties> actual = 
persistService.load(PipelineContextUtils.getContextKey(), "MIGRATION");
         assertTrue(actual.containsKey("ds_0"));
     }
     
@@ -288,7 +287,7 @@ class MigrationJobAPITest {
     }
     
     private void initIntPrimaryEnvironment() throws SQLException {
-        Map<String, DataSourcePoolProperties> metaDataDataSource = new 
PipelineDataSourcePersistService().load(PipelineContextUtils.getContextKey(), 
new MigrationJobType());
+        Map<String, DataSourcePoolProperties> metaDataDataSource = new 
PipelineDataSourcePersistService().load(PipelineContextUtils.getContextKey(), 
"MIGRATION");
         DataSourcePoolProperties props = metaDataDataSource.get("ds_0");
         try (
                 PipelineDataSourceWrapper dataSource = new 
PipelineDataSourceWrapper(DataSourcePoolCreator.create(props), databaseType);

Reply via email to