This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 213166a31ec Refactor usage of PipelineJobAPI.getType() (#29006)
213166a31ec is described below
commit 213166a31ec7cb6947bfca25e542f09e59b5e407
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 11 08:48:04 2023 +0800
Refactor usage of PipelineJobAPI.getType() (#29006)
---
.../pipeline/common/metadata/node/PipelineMetaDataNode.java | 9 ++++-----
.../registrycenter/repository/GovernanceRepositoryAPI.java | 9 ++++-----
.../repository/GovernanceRepositoryAPIImpl.java | 9 ++++-----
.../service/impl/AbstractInventoryIncrementalJobAPIImpl.java | 4 ++--
.../core/job/service/impl/AbstractPipelineJobAPIImpl.java | 2 +-
.../core/metadata/PipelineDataSourcePersistService.java | 5 ++---
.../core/metadata/PipelineMetaDataPersistService.java | 5 ++---
.../metadata/PipelineProcessConfigurationPersistService.java | 5 ++---
.../common/metadata/node/PipelineMetaDataNodeTest.java | 5 ++---
.../scenario/migration/api/impl/MigrationJobAPI.java | 12 ++++++------
.../PipelineProcessConfigurationPersistServiceTest.java | 7 ++-----
.../scenario/migration/api/impl/MigrationJobAPITest.java | 7 +++----
12 files changed, 34 insertions(+), 45 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java
index ab2b84017dd..19d71a8d791 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNode.java
@@ -19,7 +19,6 @@ package
org.apache.shardingsphere.data.pipeline.common.metadata.node;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import java.util.regex.Pattern;
@@ -41,14 +40,14 @@ public final class PipelineMetaDataNode {
* @param jobType job type
* @return data sources path
*/
- public static String getMetaDataDataSourcesPath(final JobType jobType) {
+ public static String getMetaDataDataSourcesPath(final String jobType) {
return String.join("/", getMetaDataRootPath(jobType), "data_sources");
}
- private static String getMetaDataRootPath(final JobType jobType) {
+ private static String getMetaDataRootPath(final String jobType) {
return null == jobType
? String.join("/", PipelineNodePath.DATA_PIPELINE_ROOT,
"metadata")
- : String.join("/", PipelineNodePath.DATA_PIPELINE_ROOT,
jobType.getType().toLowerCase(), "metadata");
+ : String.join("/", PipelineNodePath.DATA_PIPELINE_ROOT,
jobType.toLowerCase(), "metadata");
}
/**
@@ -57,7 +56,7 @@ public final class PipelineMetaDataNode {
* @param jobType job type
* @return data sources path
*/
- public static String getMetaDataProcessConfigPath(final JobType jobType) {
+ public static String getMetaDataProcessConfigPath(final String jobType) {
return String.join("/", getMetaDataRootPath(jobType),
"process_config");
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
index ccc68d2a091..61502c65757 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPI.java
@@ -17,7 +17,6 @@
package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.mode.repository.cluster.listener.DataChangedEventListener;
@@ -192,7 +191,7 @@ public interface GovernanceRepositoryAPI {
* @param jobType job type
* @return data source properties
*/
- String getMetaDataDataSources(JobType jobType);
+ String getMetaDataDataSources(String jobType);
/**
* Persist meta data data sources.
@@ -200,7 +199,7 @@ public interface GovernanceRepositoryAPI {
* @param jobType job type
* @param metaDataDataSources data source properties
*/
- void persistMetaDataDataSources(JobType jobType, String
metaDataDataSources);
+ void persistMetaDataDataSources(String jobType, String
metaDataDataSources);
/**
* Get meta data process configuration.
@@ -208,7 +207,7 @@ public interface GovernanceRepositoryAPI {
* @param jobType job type, nullable
* @return process configuration YAML text
*/
- String getMetaDataProcessConfiguration(JobType jobType);
+ String getMetaDataProcessConfiguration(String jobType);
/**
* Persist meta data process configuration.
@@ -216,7 +215,7 @@ public interface GovernanceRepositoryAPI {
* @param jobType job type, nullable
* @param processConfigYamlText process configuration YAML text
*/
- void persistMetaDataProcessConfiguration(JobType jobType, String
processConfigYamlText);
+ void persistMetaDataProcessConfiguration(String jobType, String
processConfigYamlText);
/**
* Get job item error msg.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
index b2bd85fa747..d1da0da09ac 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/registrycenter/repository/GovernanceRepositoryAPIImpl.java
@@ -20,7 +20,6 @@ package
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository
import com.google.common.base.Strings;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import
org.apache.shardingsphere.data.pipeline.common.metadata.node.PipelineMetaDataNode;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
@@ -167,22 +166,22 @@ public final class GovernanceRepositoryAPIImpl implements
GovernanceRepositoryAP
}
@Override
- public String getMetaDataDataSources(final JobType jobType) {
+ public String getMetaDataDataSources(final String jobType) {
return
repository.getDirectly(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType));
}
@Override
- public void persistMetaDataDataSources(final JobType jobType, final String
metaDataDataSources) {
+ public void persistMetaDataDataSources(final String jobType, final String
metaDataDataSources) {
repository.persist(PipelineMetaDataNode.getMetaDataDataSourcesPath(jobType),
metaDataDataSources);
}
@Override
- public String getMetaDataProcessConfiguration(final JobType jobType) {
+ public String getMetaDataProcessConfiguration(final String jobType) {
return
repository.getDirectly(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType));
}
@Override
- public void persistMetaDataProcessConfiguration(final JobType jobType,
final String processConfigYamlText) {
+ public void persistMetaDataProcessConfiguration(final String jobType,
final String processConfigYamlText) {
repository.persist(PipelineMetaDataNode.getMetaDataProcessConfigPath(jobType),
processConfigYamlText);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
index ecf481e03cb..8d79fbf093f 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractInventoryIncrementalJobAPIImpl.java
@@ -80,12 +80,12 @@ public abstract class
AbstractInventoryIncrementalJobAPIImpl extends AbstractPip
@Override
public void alterProcessConfiguration(final PipelineContextKey contextKey,
final PipelineProcessConfiguration processConfig) {
// TODO check rateLimiter type match or not
- processConfigPersistService.persist(contextKey, getJobType(),
processConfig);
+ processConfigPersistService.persist(contextKey, getType(),
processConfig);
}
@Override
public PipelineProcessConfiguration showProcessConfiguration(final
PipelineContextKey contextKey) {
- return
PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey,
getJobType()));
+ return
PipelineProcessConfigurationUtils.convertWithDefaultValue(processConfigPersistService.load(contextKey,
getType()));
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
index 4978dbd6192..6c353f52694 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/impl/AbstractPipelineJobAPIImpl.java
@@ -71,7 +71,7 @@ public abstract class AbstractPipelineJobAPIImpl implements
PipelineJobAPI {
private Stream<JobBriefInfo> getJobBriefInfos(final PipelineContextKey
contextKey) {
return
PipelineAPIFactory.getJobStatisticsAPI(contextKey).getAllJobsBriefInfo().stream().filter(each
-> !each.getJobName().startsWith("_"))
- .filter(each ->
PipelineJobIdUtils.parseJobType(each.getJobName()).getCode().equals(getJobType().getCode()));
+ .filter(each ->
PipelineJobIdUtils.parseJobType(each.getJobName()).getType().equals(getType()));
}
// TODO Add getJobInfo
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
index ad7ed42c007..00119f7d408 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineDataSourcePersistService.java
@@ -19,7 +19,6 @@ package org.apache.shardingsphere.data.pipeline.core.metadata;
import com.google.common.base.Strings;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -39,7 +38,7 @@ public final class PipelineDataSourcePersistService
implements PipelineMetaDataP
@Override
@SuppressWarnings("unchecked")
- public Map<String, DataSourcePoolProperties> load(final PipelineContextKey
contextKey, final JobType jobType) {
+ public Map<String, DataSourcePoolProperties> load(final PipelineContextKey
contextKey, final String jobType) {
String dataSourcesProps =
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataDataSources(jobType);
if (Strings.isNullOrEmpty(dataSourcesProps)) {
return Collections.emptyMap();
@@ -51,7 +50,7 @@ public final class PipelineDataSourcePersistService
implements PipelineMetaDataP
}
@Override
- public void persist(final PipelineContextKey contextKey, final JobType
jobType, final Map<String, DataSourcePoolProperties> propsMap) {
+ public void persist(final PipelineContextKey contextKey, final String
jobType, final Map<String, DataSourcePoolProperties> propsMap) {
Map<String, Map<String, Object>> dataSourceMap = new
LinkedHashMap<>(propsMap.size(), 1F);
for (Entry<String, DataSourcePoolProperties> entry :
propsMap.entrySet()) {
dataSourceMap.put(entry.getKey(),
swapper.swapToMap(entry.getValue()));
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineMetaDataPersistService.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineMetaDataPersistService.java
index 30ef52253d0..2b615e17712 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineMetaDataPersistService.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineMetaDataPersistService.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.data.pipeline.core.metadata;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
/**
* Pipeline meta data persist service.
@@ -34,7 +33,7 @@ public interface PipelineMetaDataPersistService<T> {
* @param jobType job type, nullable
* @return configurations
*/
- T load(PipelineContextKey contextKey, JobType jobType);
+ T load(PipelineContextKey contextKey, String jobType);
/**
* Persist meta data.
@@ -43,5 +42,5 @@ public interface PipelineMetaDataPersistService<T> {
* @param jobType job type, nullable
* @param configs configurations
*/
- void persist(PipelineContextKey contextKey, JobType jobType, T configs);
+ void persist(PipelineContextKey contextKey, String jobType, T configs);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
index e498002d9c4..4df71589bf2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/metadata/PipelineProcessConfigurationPersistService.java
@@ -22,7 +22,6 @@ import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelinePro
import
org.apache.shardingsphere.data.pipeline.common.config.process.yaml.YamlPipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.yaml.swapper.YamlPipelineProcessConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -34,7 +33,7 @@ public final class PipelineProcessConfigurationPersistService
implements Pipelin
private final YamlPipelineProcessConfigurationSwapper swapper = new
YamlPipelineProcessConfigurationSwapper();
@Override
- public PipelineProcessConfiguration load(final PipelineContextKey
contextKey, final JobType jobType) {
+ public PipelineProcessConfiguration load(final PipelineContextKey
contextKey, final String jobType) {
String yamlText =
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).getMetaDataProcessConfiguration(jobType);
if (Strings.isNullOrEmpty(yamlText)) {
return null;
@@ -44,7 +43,7 @@ public final class PipelineProcessConfigurationPersistService
implements Pipelin
}
@Override
- public void persist(final PipelineContextKey contextKey, final JobType
jobType, final PipelineProcessConfiguration processConfig) {
+ public void persist(final PipelineContextKey contextKey, final String
jobType, final PipelineProcessConfiguration processConfig) {
String yamlText =
YamlEngine.marshal(swapper.swapToYamlConfiguration(processConfig));
PipelineAPIFactory.getGovernanceRepositoryAPI(contextKey).persistMetaDataProcessConfiguration(jobType,
yamlText);
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeTest.java
index a4fe7687c5a..d33cf56282c 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/common/metadata/node/PipelineMetaDataNodeTest.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.data.pipeline.common.metadata.node;
-import org.apache.shardingsphere.data.pipeline.common.job.type.FixtureJobType;
import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.Test;
@@ -38,12 +37,12 @@ class PipelineMetaDataNodeTest {
@Test
void assertGetMetaDataDataSourcesPath() {
-
MatcherAssert.assertThat(PipelineMetaDataNode.getMetaDataDataSourcesPath(new
FixtureJobType()), is(migrationMetaDataRootPath + "/data_sources"));
+
MatcherAssert.assertThat(PipelineMetaDataNode.getMetaDataDataSourcesPath("FIXTURE"),
is(migrationMetaDataRootPath + "/data_sources"));
}
@Test
void assertGetMetaDataProcessConfigPath() {
- assertThat(PipelineMetaDataNode.getMetaDataProcessConfigPath(new
FixtureJobType()), is(migrationMetaDataRootPath + "/process_config"));
+
assertThat(PipelineMetaDataNode.getMetaDataProcessConfigPath("FIXTURE"),
is(migrationMetaDataRootPath + "/process_config"));
}
@Test
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 3eb8f36e54c..1c310065bc5 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -135,7 +135,7 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
private YamlMigrationJobConfiguration buildYamlJobConfiguration(final
PipelineContextKey contextKey, final MigrateTableStatement param) {
YamlMigrationJobConfiguration result = new
YamlMigrationJobConfiguration();
result.setTargetDatabaseName(param.getTargetDatabaseName());
- Map<String, DataSourcePoolProperties> metaDataDataSource =
dataSourcePersistService.load(contextKey, new MigrationJobType());
+ Map<String, DataSourcePoolProperties> metaDataDataSource =
dataSourcePersistService.load(contextKey, "MIGRATION");
Map<String, List<DataNode>> sourceDataNodes = new LinkedHashMap<>();
Map<String, YamlPipelineDataSourceConfiguration> configSources = new
LinkedHashMap<>();
List<SourceTargetEntry> sourceTargetEntries = new ArrayList<>(new
HashSet<>(param.getSourceTargetEntries())).stream().sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName)
@@ -405,7 +405,7 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
* @param propsMap data source pool properties map
*/
public void addMigrationSourceResources(final PipelineContextKey
contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
- Map<String, DataSourcePoolProperties> existDataSources =
dataSourcePersistService.load(contextKey, getJobType());
+ Map<String, DataSourcePoolProperties> existDataSources =
dataSourcePersistService.load(contextKey, getType());
Collection<String> duplicateDataSourceNames = new
HashSet<>(propsMap.size(), 1F);
for (Entry<String, DataSourcePoolProperties> entry :
propsMap.entrySet()) {
if (existDataSources.containsKey(entry.getKey())) {
@@ -415,7 +415,7 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
ShardingSpherePreconditions.checkState(duplicateDataSourceNames.isEmpty(), ()
-> new RegisterMigrationSourceStorageUnitException(duplicateDataSourceNames));
Map<String, DataSourcePoolProperties> result = new
LinkedHashMap<>(existDataSources);
result.putAll(propsMap);
- dataSourcePersistService.persist(contextKey, getJobType(), result);
+ dataSourcePersistService.persist(contextKey, getType(), result);
}
/**
@@ -425,13 +425,13 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
* @param resourceNames resource names
*/
public void dropMigrationSourceResources(final PipelineContextKey
contextKey, final Collection<String> resourceNames) {
- Map<String, DataSourcePoolProperties> metaDataDataSource =
dataSourcePersistService.load(contextKey, getJobType());
+ Map<String, DataSourcePoolProperties> metaDataDataSource =
dataSourcePersistService.load(contextKey, getType());
List<String> noExistResources = resourceNames.stream().filter(each ->
!metaDataDataSource.containsKey(each)).collect(Collectors.toList());
ShardingSpherePreconditions.checkState(noExistResources.isEmpty(), ()
-> new UnregisterMigrationSourceStorageUnitException(noExistResources));
for (String each : resourceNames) {
metaDataDataSource.remove(each);
}
- dataSourcePersistService.persist(contextKey, getJobType(),
metaDataDataSource);
+ dataSourcePersistService.persist(contextKey, getType(),
metaDataDataSource);
}
/**
@@ -441,7 +441,7 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
* @return migration source resources
*/
public Collection<Collection<Object>> listMigrationSourceResources(final
PipelineContextKey contextKey) {
- Map<String, DataSourcePoolProperties> propsMap =
dataSourcePersistService.load(contextKey, getJobType());
+ Map<String, DataSourcePoolProperties> propsMap =
dataSourcePersistService.load(contextKey, getType());
Collection<Collection<Object>> result = new
ArrayList<>(propsMap.size());
for (Entry<String, DataSourcePoolProperties> entry :
propsMap.entrySet()) {
String dataSourceName = entry.getKey();
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
index 9f0720c57a4..0c9142a61d0 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/metadata/PipelineProcessConfigurationPersistServiceTest.java
@@ -22,9 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.common.config.process.yaml.YamlPi
import
org.apache.shardingsphere.data.pipeline.common.config.process.yaml.YamlPipelineReadConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.yaml.YamlPipelineWriteConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.yaml.swapper.YamlPipelineProcessConfigurationSwapper;
-import org.apache.shardingsphere.data.pipeline.common.job.type.JobType;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineProcessConfigurationPersistService;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
import
org.apache.shardingsphere.infra.yaml.config.pojo.algorithm.YamlAlgorithmConfiguration;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
@@ -56,9 +54,8 @@ class PipelineProcessConfigurationPersistServiceTest {
String expectedYamlText = YamlEngine.marshal(yamlProcessConfig);
PipelineProcessConfiguration processConfig = new
YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig);
PipelineProcessConfigurationPersistService persistService = new
PipelineProcessConfigurationPersistService();
- JobType jobType = new MigrationJobType();
- persistService.persist(PipelineContextUtils.getContextKey(), jobType,
processConfig);
- String actualYamlText = YamlEngine.marshal(new
YamlPipelineProcessConfigurationSwapper().swapToYamlConfiguration(persistService.load(PipelineContextUtils.getContextKey(),
jobType)));
+ persistService.persist(PipelineContextUtils.getContextKey(),
"MIGRATION", processConfig);
+ String actualYamlText = YamlEngine.marshal(new
YamlPipelineProcessConfigurationSwapper().swapToYamlConfiguration(persistService.load(PipelineContextUtils.getContextKey(),
"MIGRATION")));
assertThat(actualYamlText, is(expectedYamlText));
}
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 4c4c5570879..c51bddd7599 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -19,9 +19,9 @@ package
org.apache.shardingsphere.test.it.data.pipeline.scenario.migration.api.i
import lombok.SneakyThrows;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
+import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
@@ -34,7 +34,6 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInva
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineAPIFactory;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
@@ -252,7 +251,7 @@ class MigrationJobAPITest {
@Test
void assertAddMigrationSourceResources() {
PipelineDataSourcePersistService persistService = new
PipelineDataSourcePersistService();
- Map<String, DataSourcePoolProperties> actual =
persistService.load(PipelineContextUtils.getContextKey(), new
MigrationJobType());
+ Map<String, DataSourcePoolProperties> actual =
persistService.load(PipelineContextUtils.getContextKey(), "MIGRATION");
assertTrue(actual.containsKey("ds_0"));
}
@@ -288,7 +287,7 @@ class MigrationJobAPITest {
}
private void initIntPrimaryEnvironment() throws SQLException {
- Map<String, DataSourcePoolProperties> metaDataDataSource = new
PipelineDataSourcePersistService().load(PipelineContextUtils.getContextKey(),
new MigrationJobType());
+ Map<String, DataSourcePoolProperties> metaDataDataSource = new
PipelineDataSourcePersistService().load(PipelineContextUtils.getContextKey(),
"MIGRATION");
DataSourcePoolProperties props = metaDataDataSource.get("ds_0");
try (
PipelineDataSourceWrapper dataSource = new
PipelineDataSourceWrapper(DataSourcePoolCreator.create(props), databaseType);