This is an automated email from the ASF dual-hosted git repository.
panjuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 41db48e6bb9 Refactor MigrationJobAPI (#29213)
41db48e6bb9 is described below
commit 41db48e6bb9323858a975a3003db3e3d7ef7e4f9
Author: Liang Zhang <[email protected]>
AuthorDate: Mon Nov 27 03:29:19 2023 +0800
Refactor MigrationJobAPI (#29213)
---
.../ShowMigrationSourceStorageUnitsExecutor.java | 5 +-
.../handler/update/MigrateTableUpdater.java | 5 +-
.../RegisterMigrationSourceStorageUnitUpdater.java | 5 +-
...nregisterMigrationSourceStorageUnitUpdater.java | 5 +-
.../api/impl/ConsistencyCheckJobAPI.java | 4 +-
...tionJobOption.java => MigrationJobManager.java} | 165 +++-------------
.../migration/api/impl/MigrationJobOption.java | 209 ---------------------
.../migration/api/impl/MigrationJobAPITest.java | 17 +-
8 files changed, 48 insertions(+), 367 deletions(-)
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
index 2b5a4b5db72..bab525e6822 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/query/ShowMigrationSourceStorageUnitsExecutor.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.migration.distsql.handler.query;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
import
org.apache.shardingsphere.distsql.handler.ral.query.QueryableRALExecutor;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
@@ -35,11 +36,11 @@ import java.util.List;
*/
public final class ShowMigrationSourceStorageUnitsExecutor implements
QueryableRALExecutor<ShowMigrationSourceStorageUnitsStatement> {
- private final MigrationJobOption jobOption = new MigrationJobOption();
+ private final MigrationJobManager jobManager = new MigrationJobManager(new
MigrationJobOption());
@Override
public Collection<LocalDataQueryResultRow> getRows(final
ShowMigrationSourceStorageUnitsStatement sqlStatement) {
- Iterator<Collection<Object>> data =
jobOption.listMigrationSourceResources(new
PipelineContextKey(InstanceType.PROXY)).iterator();
+ Iterator<Collection<Object>> data =
jobManager.listMigrationSourceResources(new
PipelineContextKey(InstanceType.PROXY)).iterator();
Collection<LocalDataQueryResultRow> result = new LinkedList<>();
while (data.hasNext()) {
result.add(new LocalDataQueryResultRow((List<Object>)
data.next()));
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
index aeb49d9005f..23f220d73fb 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/MigrateTableUpdater.java
@@ -20,6 +20,7 @@ package
org.apache.shardingsphere.migration.distsql.handler.update;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.MissingRequiredTargetDatabaseException;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
@@ -32,13 +33,13 @@ import
org.apache.shardingsphere.migration.distsql.statement.MigrateTableStateme
@Slf4j
public final class MigrateTableUpdater implements
RALUpdater<MigrateTableStatement> {
- private final MigrationJobOption jobOption = new MigrationJobOption();
+ private final MigrationJobManager jobManager = new MigrationJobManager(new
MigrationJobOption());
@Override
public void executeUpdate(final String databaseName, final
MigrateTableStatement sqlStatement) {
String targetDatabaseName = null ==
sqlStatement.getTargetDatabaseName() ? databaseName :
sqlStatement.getTargetDatabaseName();
ShardingSpherePreconditions.checkNotNull(targetDatabaseName,
MissingRequiredTargetDatabaseException::new);
- jobOption.createJobAndStart(new
PipelineContextKey(InstanceType.PROXY), new
MigrateTableStatement(sqlStatement.getSourceTargetEntries(),
targetDatabaseName));
+ jobManager.start(new PipelineContextKey(InstanceType.PROXY), new
MigrateTableStatement(sqlStatement.getSourceTargetEntries(),
targetDatabaseName));
}
@Override
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
index b89d63262ca..d26fee0d0a1 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/RegisterMigrationSourceStorageUnitUpdater.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.migration.distsql.handler.update;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import
org.apache.shardingsphere.distsql.handler.validate.DataSourcePoolPropertiesValidateHandler;
@@ -42,7 +43,7 @@ import java.util.Map;
*/
public final class RegisterMigrationSourceStorageUnitUpdater implements
RALUpdater<RegisterMigrationSourceStorageUnitStatement> {
- private final MigrationJobOption jobOption = new MigrationJobOption();
+ private final MigrationJobManager jobManager = new MigrationJobManager(new
MigrationJobOption());
private final DataSourcePoolPropertiesValidateHandler validateHandler =
new DataSourcePoolPropertiesValidateHandler();
@@ -55,7 +56,7 @@ public final class RegisterMigrationSourceStorageUnitUpdater
implements RALUpdat
DatabaseType databaseType =
DatabaseTypeFactory.get(urlBasedDataSourceSegment.getUrl());
Map<String, DataSourcePoolProperties> propsMap =
DataSourceSegmentsConverter.convert(databaseType, dataSources);
validateHandler.validate(propsMap);
- jobOption.addMigrationSourceResources(new
PipelineContextKey(InstanceType.PROXY), propsMap);
+ jobManager.addMigrationSourceResources(new
PipelineContextKey(InstanceType.PROXY), propsMap);
}
@Override
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
index 37d7e8cc00f..3f4cdfe20d2 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/UnregisterMigrationSourceStorageUnitUpdater.java
@@ -18,6 +18,7 @@
package org.apache.shardingsphere.migration.distsql.handler.update;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
@@ -28,11 +29,11 @@ import
org.apache.shardingsphere.migration.distsql.statement.UnregisterMigration
*/
public final class UnregisterMigrationSourceStorageUnitUpdater implements
RALUpdater<UnregisterMigrationSourceStorageUnitStatement> {
- private final MigrationJobOption jobOption = new MigrationJobOption();
+ private final MigrationJobManager jobManager = new MigrationJobManager(new
MigrationJobOption());
@Override
public void executeUpdate(final String databaseName, final
UnregisterMigrationSourceStorageUnitStatement sqlStatement) {
- jobOption.dropMigrationSourceResources(new
PipelineContextKey(InstanceType.PROXY), sqlStatement.getNames());
+ jobManager.dropMigrationSourceResources(new
PipelineContextKey(InstanceType.PROXY), sqlStatement.getNames());
}
@Override
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 472648bfdae..7d0c0135b74 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -147,11 +147,11 @@ public final class ConsistencyCheckJobAPI {
* @param parentJobId parent job id
*/
public void drop(final String parentJobId) {
- jobManager.stop(parentJobId);
+ String latestCheckJobId =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId)).getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
+ jobManager.stop(latestCheckJobId);
PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(parentJobId);
PipelineGovernanceFacade governanceFacade =
PipelineAPIFactory.getPipelineGovernanceFacade(contextKey);
Collection<String> checkJobIds =
governanceFacade.getJobFacade().getCheck().listCheckJobIds(parentJobId);
- String latestCheckJobId =
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineJobIdUtils.parseContextKey(parentJobId)).getJobFacade().getCheck().getLatestCheckJobId(parentJobId);
Optional<Integer> previousSequence =
ConsistencyCheckSequence.getPreviousSequence(
checkJobIds.stream().map(ConsistencyCheckJobId::parseSequence).collect(Collectors.toList()),
ConsistencyCheckJobId.parseSequence(latestCheckJobId));
if (previousSequence.isPresent()) {
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobManager.java
similarity index 61%
copy from
kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
copy to
kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobManager.java
index c3b8886940d..f356ed53733 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobManager.java
@@ -21,47 +21,21 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
-import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
import
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
-import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
-import
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.exception.connection.RegisterMigrationSourceStorageUnitException;
import
org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException;
import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
-import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
-import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
@@ -94,28 +68,36 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
import java.util.stream.Collectors;
/**
- * Migration job option.
+ * Migration job manager.
*/
@Slf4j
-public final class MigrationJobOption implements TransmissionJobOption {
+public final class MigrationJobManager {
- private final PipelineDataSourcePersistService dataSourcePersistService =
new PipelineDataSourcePersistService();
+ private final MigrationJobOption jobOption;
+
+ private final PipelineJobManager jobManager;
+
+ private final PipelineDataSourcePersistService dataSourcePersistService;
+
+ public MigrationJobManager(final MigrationJobOption jobOption) {
+ this.jobOption = jobOption;
+ jobManager = new PipelineJobManager(jobOption);
+ dataSourcePersistService = new PipelineDataSourcePersistService();
+ }
/**
- * Create job migration config and start.
+ * Start migration job.
*
* @param contextKey context key
* @param param create migration job parameter
* @return job id
*/
- public String createJobAndStart(final PipelineContextKey contextKey, final
MigrateTableStatement param) {
+ public String start(final PipelineContextKey contextKey, final
MigrateTableStatement param) {
MigrationJobConfiguration jobConfig = new
YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey,
param));
- new PipelineJobManager(this).start(jobConfig);
+ jobManager.start(jobConfig);
return jobConfig.getJobId();
}
@@ -163,7 +145,7 @@ public final class MigrationJobOption implements
TransmissionJobOption {
result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
result.setTablesFirstDataNodes(new
JobDataNodeLine(tablesFirstDataNodes).marshal());
result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
- extendYamlJobConfiguration(contextKey, result);
+ jobOption.extendYamlJobConfiguration(contextKey, result);
return result;
}
@@ -185,9 +167,7 @@ public final class MigrationJobOption implements
TransmissionJobOption {
}
private YamlRootConfiguration buildYamlRootConfiguration(final String
databaseName, final Map<String, Map<String, Object>> yamlDataSources, final
Collection<RuleConfiguration> rules) {
- if (rules.isEmpty()) {
- throw new NoAnyRuleExistsException(databaseName);
- }
+ ShardingSpherePreconditions.checkState(!rules.isEmpty(), () -> new
NoAnyRuleExistsException(databaseName));
YamlRootConfiguration result = new YamlRootConfiguration();
result.setDatabaseName(databaseName);
result.setDataSources(yamlDataSources);
@@ -201,95 +181,6 @@ public final class MigrationJobOption implements
TransmissionJobOption {
return result;
}
- @Override
- public PipelineJobInfo getJobInfo(final String jobId) {
- PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
- List<String> sourceTables = new LinkedList<>();
- new
PipelineJobConfigurationManager(this).<MigrationJobConfiguration>getJobConfiguration(jobId).getJobShardingDataNodes()
- .forEach(each -> each.getEntries().forEach(entry ->
entry.getDataNodes().forEach(dataNode ->
sourceTables.add(DataNodeUtils.formatWithSchema(dataNode)))));
- return new PipelineJobInfo(jobMetaData, null, String.join(",",
sourceTables));
- }
-
- @Override
- public void extendYamlJobConfiguration(final PipelineContextKey
contextKey, final YamlPipelineJobConfiguration yamlJobConfig) {
- YamlMigrationJobConfiguration config = (YamlMigrationJobConfiguration)
yamlJobConfig;
- if (null == yamlJobConfig.getJobId()) {
- config.setJobId(new MigrationJobId(contextKey,
config.getJobShardingDataNodes()).marshal());
- }
- }
-
- @SuppressWarnings("unchecked")
- @Override
- public YamlMigrationJobConfigurationSwapper
getYamlJobConfigurationSwapper() {
- return new YamlMigrationJobConfigurationSwapper();
- }
-
- @Override
- public MigrationTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
- MigrationJobConfiguration jobConfig = (MigrationJobConfiguration)
pipelineJobConfig;
- IncrementalDumperContext incrementalDumperContext = new
MigrationIncrementalDumperContextCreator(
-
jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
- Collection<CreateTableConfiguration> createTableConfigs =
buildCreateTableConfigurations(jobConfig,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
- Set<CaseInsensitiveIdentifier> targetTableNames =
jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet());
- Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor().getShardingColumnsMap(
- ((ShardingSpherePipelineDataSourceConfiguration)
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
- ImporterConfiguration importerConfig = buildImporterConfiguration(
- jobConfig, processConfig, shardingColumnsMap,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
- MigrationTaskConfiguration result = new MigrationTaskConfiguration(
-
incrementalDumperContext.getCommonContext().getDataSourceName(),
createTableConfigs, incrementalDumperContext, importerConfig);
- log.info("buildTaskConfiguration, result={}", result);
- return result;
- }
-
- private Collection<CreateTableConfiguration>
buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
- Collection<CreateTableConfiguration> result = new LinkedList<>();
- for (JobDataNodeEntry each :
jobConfig.getTablesFirstDataNodes().getEntries()) {
- String sourceSchemaName =
tableAndSchemaNameMapper.getSchemaName(each.getLogicTableName());
- DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData();
- String targetSchemaName =
dialectDatabaseMetaData.isSchemaAvailable() ? sourceSchemaName : null;
- DataNode dataNode = each.getDataNodes().get(0);
- PipelineDataSourceConfiguration sourceDataSourceConfig =
jobConfig.getSources().get(dataNode.getDataSourceName());
- CreateTableConfiguration createTableConfig = new
CreateTableConfiguration(
- sourceDataSourceConfig, new
CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()),
- jobConfig.getTarget(), new
CaseInsensitiveQualifiedTable(targetSchemaName, each.getLogicTableName()));
- result.add(createTableConfig);
- }
- log.info("buildCreateTableConfigurations, result={}", result);
- return result;
- }
-
- private ImporterConfiguration buildImporterConfiguration(final
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig,
- final
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
- MigrationProcessContext processContext = new
MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
- JobRateLimitAlgorithm writeRateLimitAlgorithm =
processContext.getWriteRateLimitAlgorithm();
- int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
- int retryTimes = jobConfig.getRetryTimes();
- int concurrency = jobConfig.getConcurrency();
- return new ImporterConfiguration(jobConfig.getTarget(),
shardingColumnsMap, tableAndSchemaNameMapper, batchSize,
writeRateLimitAlgorithm, retryTimes, concurrency);
- }
-
- @Override
- public MigrationProcessContext buildProcessContext(final
PipelineJobConfiguration jobConfig) {
- PipelineProcessConfiguration processConfig = new
TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
- return new MigrationProcessContext(jobConfig.getJobId(),
processConfig);
- }
-
- @Override
- public PipelineDataConsistencyChecker buildDataConsistencyChecker(final
PipelineJobConfiguration jobConfig, final TransmissionProcessContext
processContext,
- final
ConsistencyCheckJobItemProgressContext progressContext) {
- return new MigrationDataConsistencyChecker((MigrationJobConfiguration)
jobConfig, processContext, progressContext);
- }
-
- @Override
- public Optional<String> getToBeStartDisabledNextJobType() {
- return Optional.of("CONSISTENCY_CHECK");
- }
-
- @Override
- public Optional<String> getToBeStoppedPreviousJobType() {
- return Optional.of("CONSISTENCY_CHECK");
- }
-
/**
* Add migration source resources.
*
@@ -297,7 +188,7 @@ public final class MigrationJobOption implements
TransmissionJobOption {
* @param propsMap data source pool properties map
*/
public void addMigrationSourceResources(final PipelineContextKey
contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
- Map<String, DataSourcePoolProperties> existDataSources =
dataSourcePersistService.load(contextKey, getType());
+ Map<String, DataSourcePoolProperties> existDataSources =
dataSourcePersistService.load(contextKey, jobOption.getType());
Collection<String> duplicateDataSourceNames = new
HashSet<>(propsMap.size(), 1F);
for (Entry<String, DataSourcePoolProperties> entry :
propsMap.entrySet()) {
if (existDataSources.containsKey(entry.getKey())) {
@@ -307,7 +198,7 @@ public final class MigrationJobOption implements
TransmissionJobOption {
ShardingSpherePreconditions.checkState(duplicateDataSourceNames.isEmpty(), ()
-> new RegisterMigrationSourceStorageUnitException(duplicateDataSourceNames));
Map<String, DataSourcePoolProperties> result = new
LinkedHashMap<>(existDataSources);
result.putAll(propsMap);
- dataSourcePersistService.persist(contextKey, getType(), result);
+ dataSourcePersistService.persist(contextKey, jobOption.getType(),
result);
}
/**
@@ -317,13 +208,13 @@ public final class MigrationJobOption implements
TransmissionJobOption {
* @param resourceNames resource names
*/
public void dropMigrationSourceResources(final PipelineContextKey
contextKey, final Collection<String> resourceNames) {
- Map<String, DataSourcePoolProperties> metaDataDataSource =
dataSourcePersistService.load(contextKey, getType());
+ Map<String, DataSourcePoolProperties> metaDataDataSource =
dataSourcePersistService.load(contextKey, jobOption.getType());
List<String> noExistResources = resourceNames.stream().filter(each ->
!metaDataDataSource.containsKey(each)).collect(Collectors.toList());
ShardingSpherePreconditions.checkState(noExistResources.isEmpty(), ()
-> new UnregisterMigrationSourceStorageUnitException(noExistResources));
for (String each : resourceNames) {
metaDataDataSource.remove(each);
}
- dataSourcePersistService.persist(contextKey, getType(),
metaDataDataSource);
+ dataSourcePersistService.persist(contextKey, jobOption.getType(),
metaDataDataSource);
}
/**
@@ -333,7 +224,7 @@ public final class MigrationJobOption implements
TransmissionJobOption {
* @return migration source resources
*/
public Collection<Collection<Object>> listMigrationSourceResources(final
PipelineContextKey contextKey) {
- Map<String, DataSourcePoolProperties> propsMap =
dataSourcePersistService.load(contextKey, getType());
+ Map<String, DataSourcePoolProperties> propsMap =
dataSourcePersistService.load(contextKey, jobOption.getType());
Collection<Collection<Object>> result = new
ArrayList<>(propsMap.size());
for (Entry<String, DataSourcePoolProperties> entry :
propsMap.entrySet()) {
String dataSourceName = entry.getKey();
@@ -367,14 +258,4 @@ public final class MigrationJobOption implements
TransmissionJobOption {
}
return "";
}
-
- @Override
- public Class<MigrationJob> getJobClass() {
- return MigrationJob.class;
- }
-
- @Override
- public String getType() {
- return "MIGRATION";
- }
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
index c3b8886940d..c62eb313f46 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobOption.java
@@ -20,41 +20,29 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.job.yaml.YamlPipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
-import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
-import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
-import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
-import
org.apache.shardingsphere.data.pipeline.common.datasource.yaml.YamlPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.PipelineSchemaUtils;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
import
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
import
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import
org.apache.shardingsphere.data.pipeline.core.exception.connection.RegisterMigrationSourceStorageUnitException;
-import
org.apache.shardingsphere.data.pipeline.core.exception.connection.UnregisterMigrationSourceStorageUnitException;
-import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRuleExistsException;
-import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
-import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
-import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
import org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJob;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobId;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
@@ -64,36 +52,14 @@ import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
-import org.apache.shardingsphere.infra.config.rule.RuleConfiguration;
-import
org.apache.shardingsphere.infra.database.core.connector.ConnectionProperties;
-import
org.apache.shardingsphere.infra.database.core.connector.ConnectionPropertiesParser;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
-import
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeFactory;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
import org.apache.shardingsphere.infra.datanode.DataNode;
-import
org.apache.shardingsphere.infra.datasource.pool.props.domain.DataSourcePoolProperties;
-import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import
org.apache.shardingsphere.infra.metadata.database.ShardingSphereDatabase;
-import
org.apache.shardingsphere.infra.metadata.database.resource.unit.StorageUnit;
-import org.apache.shardingsphere.infra.util.json.JsonUtils;
-import org.apache.shardingsphere.infra.yaml.config.pojo.YamlRootConfiguration;
-import
org.apache.shardingsphere.infra.yaml.config.swapper.resource.YamlDataSourceConfigurationSwapper;
-import
org.apache.shardingsphere.infra.yaml.config.swapper.rule.YamlRuleConfigurationSwapperEngine;
-import
org.apache.shardingsphere.migration.distsql.statement.MigrateTableStatement;
-import
org.apache.shardingsphere.migration.distsql.statement.pojo.SourceTargetEntry;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -104,103 +70,6 @@ import java.util.stream.Collectors;
@Slf4j
public final class MigrationJobOption implements TransmissionJobOption {
- private final PipelineDataSourcePersistService dataSourcePersistService =
new PipelineDataSourcePersistService();
-
- /**
- * Create job migration config and start.
- *
- * @param contextKey context key
- * @param param create migration job parameter
- * @return job id
- */
- public String createJobAndStart(final PipelineContextKey contextKey, final
MigrateTableStatement param) {
- MigrationJobConfiguration jobConfig = new
YamlMigrationJobConfigurationSwapper().swapToObject(buildYamlJobConfiguration(contextKey,
param));
- new PipelineJobManager(this).start(jobConfig);
- return jobConfig.getJobId();
- }
-
- private YamlMigrationJobConfiguration buildYamlJobConfiguration(final
PipelineContextKey contextKey, final MigrateTableStatement param) {
- YamlMigrationJobConfiguration result = new
YamlMigrationJobConfiguration();
- result.setTargetDatabaseName(param.getTargetDatabaseName());
- Map<String, DataSourcePoolProperties> metaDataDataSource =
dataSourcePersistService.load(contextKey, "MIGRATION");
- Map<String, List<DataNode>> sourceDataNodes = new LinkedHashMap<>();
- Map<String, YamlPipelineDataSourceConfiguration> configSources = new
LinkedHashMap<>();
- List<SourceTargetEntry> sourceTargetEntries = new ArrayList<>(new
HashSet<>(param.getSourceTargetEntries())).stream().sorted(Comparator.comparing(SourceTargetEntry::getTargetTableName)
- .thenComparing(each ->
DataNodeUtils.formatWithSchema(each.getSource()))).collect(Collectors.toList());
- YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new
YamlDataSourceConfigurationSwapper();
- for (SourceTargetEntry each : sourceTargetEntries) {
- sourceDataNodes.computeIfAbsent(each.getTargetTableName(), key ->
new LinkedList<>()).add(each.getSource());
- ShardingSpherePreconditions.checkState(1 ==
sourceDataNodes.get(each.getTargetTableName()).size(),
- () -> new PipelineInvalidParameterException("more than one
source table for " + each.getTargetTableName()));
- String dataSourceName = each.getSource().getDataSourceName();
- if (configSources.containsKey(dataSourceName)) {
- continue;
- }
-
ShardingSpherePreconditions.checkState(metaDataDataSource.containsKey(dataSourceName),
- () -> new PipelineInvalidParameterException(dataSourceName
+ " doesn't exist. Run `SHOW MIGRATION SOURCE STORAGE UNITS;` to verify it."));
- Map<String, Object> sourceDataSourcePoolProps =
dataSourceConfigSwapper.swapToMap(metaDataDataSource.get(dataSourceName));
- StandardPipelineDataSourceConfiguration sourceDataSourceConfig =
new StandardPipelineDataSourceConfiguration(sourceDataSourcePoolProps);
- configSources.put(dataSourceName,
buildYamlPipelineDataSourceConfiguration(sourceDataSourceConfig.getType(),
sourceDataSourceConfig.getParameter()));
- DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(sourceDataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
- if (null == each.getSource().getSchemaName() &&
dialectDatabaseMetaData.isSchemaAvailable()) {
-
each.getSource().setSchemaName(PipelineSchemaUtils.getDefaultSchema(sourceDataSourceConfig));
- }
- DatabaseType sourceDatabaseType =
sourceDataSourceConfig.getDatabaseType();
- if (null == result.getSourceDatabaseType()) {
- result.setSourceDatabaseType(sourceDatabaseType.getType());
- } else if
(!result.getSourceDatabaseType().equals(sourceDatabaseType.getType())) {
- throw new PipelineInvalidParameterException("Source storage
units have different database types");
- }
- }
- result.setSources(configSources);
- ShardingSphereDatabase targetDatabase =
PipelineContextManager.getProxyContext().getContextManager().getMetaDataContexts().getMetaData().getDatabase(param.getTargetDatabaseName());
- PipelineDataSourceConfiguration targetPipelineDataSourceConfig =
buildTargetPipelineDataSourceConfiguration(targetDatabase);
-
result.setTarget(buildYamlPipelineDataSourceConfiguration(targetPipelineDataSourceConfig.getType(),
targetPipelineDataSourceConfig.getParameter()));
-
result.setTargetDatabaseType(targetPipelineDataSourceConfig.getDatabaseType().getType());
- List<JobDataNodeEntry> tablesFirstDataNodes =
sourceDataNodes.entrySet().stream()
- .map(entry -> new JobDataNodeEntry(entry.getKey(),
entry.getValue().subList(0, 1))).collect(Collectors.toList());
- result.setTargetTableNames(new
ArrayList<>(sourceDataNodes.keySet()).stream().sorted().collect(Collectors.toList()));
-
result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
- result.setTablesFirstDataNodes(new
JobDataNodeLine(tablesFirstDataNodes).marshal());
-
result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
- extendYamlJobConfiguration(contextKey, result);
- return result;
- }
-
- private YamlPipelineDataSourceConfiguration
buildYamlPipelineDataSourceConfiguration(final String type, final String param)
{
- YamlPipelineDataSourceConfiguration result = new
YamlPipelineDataSourceConfiguration();
- result.setType(type);
- result.setParameter(param);
- return result;
- }
-
- private PipelineDataSourceConfiguration
buildTargetPipelineDataSourceConfiguration(final ShardingSphereDatabase
targetDatabase) {
- Map<String, Map<String, Object>> targetPoolProps = new HashMap<>();
- YamlDataSourceConfigurationSwapper dataSourceConfigSwapper = new
YamlDataSourceConfigurationSwapper();
- for (Entry<String, StorageUnit> entry :
targetDatabase.getResourceMetaData().getStorageUnits().entrySet()) {
- targetPoolProps.put(entry.getKey(),
dataSourceConfigSwapper.swapToMap(entry.getValue().getDataSourcePoolProperties()));
- }
- YamlRootConfiguration targetRootConfig =
buildYamlRootConfiguration(targetDatabase.getName(), targetPoolProps,
targetDatabase.getRuleMetaData().getConfigurations());
- return new
ShardingSpherePipelineDataSourceConfiguration(targetRootConfig);
- }
-
- private YamlRootConfiguration buildYamlRootConfiguration(final String
databaseName, final Map<String, Map<String, Object>> yamlDataSources, final
Collection<RuleConfiguration> rules) {
- if (rules.isEmpty()) {
- throw new NoAnyRuleExistsException(databaseName);
- }
- YamlRootConfiguration result = new YamlRootConfiguration();
- result.setDatabaseName(databaseName);
- result.setDataSources(yamlDataSources);
- result.setRules(new
YamlRuleConfigurationSwapperEngine().swapToYamlRuleConfigurations(rules));
- return result;
- }
-
- private Map<String, String> buildTargetTableSchemaMap(final Map<String,
List<DataNode>> sourceDataNodes) {
- Map<String, String> result = new LinkedHashMap<>();
- sourceDataNodes.forEach((tableName, dataNodes) ->
result.put(tableName, dataNodes.get(0).getSchemaName()));
- return result;
- }
-
@Override
public PipelineJobInfo getJobInfo(final String jobId) {
PipelineJobMetaData jobMetaData = new
PipelineJobMetaData(PipelineJobIdUtils.getElasticJobConfigurationPOJO(jobId));
@@ -290,84 +159,6 @@ public final class MigrationJobOption implements
TransmissionJobOption {
return Optional.of("CONSISTENCY_CHECK");
}
- /**
- * Add migration source resources.
- *
- * @param contextKey context key
- * @param propsMap data source pool properties map
- */
- public void addMigrationSourceResources(final PipelineContextKey
contextKey, final Map<String, DataSourcePoolProperties> propsMap) {
- Map<String, DataSourcePoolProperties> existDataSources =
dataSourcePersistService.load(contextKey, getType());
- Collection<String> duplicateDataSourceNames = new
HashSet<>(propsMap.size(), 1F);
- for (Entry<String, DataSourcePoolProperties> entry :
propsMap.entrySet()) {
- if (existDataSources.containsKey(entry.getKey())) {
- duplicateDataSourceNames.add(entry.getKey());
- }
- }
-
ShardingSpherePreconditions.checkState(duplicateDataSourceNames.isEmpty(), ()
-> new RegisterMigrationSourceStorageUnitException(duplicateDataSourceNames));
- Map<String, DataSourcePoolProperties> result = new
LinkedHashMap<>(existDataSources);
- result.putAll(propsMap);
- dataSourcePersistService.persist(contextKey, getType(), result);
- }
-
- /**
- * Drop migration source resources.
- *
- * @param contextKey context key
- * @param resourceNames resource names
- */
- public void dropMigrationSourceResources(final PipelineContextKey
contextKey, final Collection<String> resourceNames) {
- Map<String, DataSourcePoolProperties> metaDataDataSource =
dataSourcePersistService.load(contextKey, getType());
- List<String> noExistResources = resourceNames.stream().filter(each ->
!metaDataDataSource.containsKey(each)).collect(Collectors.toList());
- ShardingSpherePreconditions.checkState(noExistResources.isEmpty(), ()
-> new UnregisterMigrationSourceStorageUnitException(noExistResources));
- for (String each : resourceNames) {
- metaDataDataSource.remove(each);
- }
- dataSourcePersistService.persist(contextKey, getType(),
metaDataDataSource);
- }
-
- /**
- * Query migration source resources list.
- *
- * @param contextKey context key
- * @return migration source resources
- */
- public Collection<Collection<Object>> listMigrationSourceResources(final
PipelineContextKey contextKey) {
- Map<String, DataSourcePoolProperties> propsMap =
dataSourcePersistService.load(contextKey, getType());
- Collection<Collection<Object>> result = new
ArrayList<>(propsMap.size());
- for (Entry<String, DataSourcePoolProperties> entry :
propsMap.entrySet()) {
- String dataSourceName = entry.getKey();
- DataSourcePoolProperties value = entry.getValue();
- Collection<Object> props = new LinkedList<>();
- props.add(dataSourceName);
- String url =
String.valueOf(value.getConnectionPropertySynonyms().getStandardProperties().get("url"));
- DatabaseType databaseType = DatabaseTypeFactory.get(url);
- props.add(databaseType.getType());
- ConnectionProperties connectionProps =
DatabaseTypedSPILoader.getService(ConnectionPropertiesParser.class,
databaseType).parse(url, "", null);
- props.add(connectionProps.getHostname());
- props.add(connectionProps.getPort());
- props.add(connectionProps.getCatalog());
- Map<String, Object> standardProps =
value.getPoolPropertySynonyms().getStandardProperties();
- props.add(getStandardProperty(standardProps,
"connectionTimeoutMilliseconds"));
- props.add(getStandardProperty(standardProps,
"idleTimeoutMilliseconds"));
- props.add(getStandardProperty(standardProps,
"maxLifetimeMilliseconds"));
- props.add(getStandardProperty(standardProps, "maxPoolSize"));
- props.add(getStandardProperty(standardProps, "minPoolSize"));
- props.add(getStandardProperty(standardProps, "readOnly"));
- Map<String, Object> otherProps =
value.getCustomProperties().getProperties();
- props.add(otherProps.isEmpty() ? "" :
JsonUtils.toJsonString(otherProps));
- result.add(props);
- }
- return result;
- }
-
- private String getStandardProperty(final Map<String, Object>
standardProps, final String key) {
- if (standardProps.containsKey(key) && null != standardProps.get(key)) {
- return standardProps.get(key).toString();
- }
- return "";
- }
-
@Override
public Class<MigrationJob> getJobClass() {
return MigrationJob.class;
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 4cb3930a110..32e144ab645 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -40,6 +40,7 @@ import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManag
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobOption;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
@@ -95,6 +96,8 @@ class MigrationJobAPITest {
private static MigrationJobAPI jobAPI;
+ private static MigrationJobManager migrationJobManager;
+
private static PipelineJobConfigurationManager jobConfigManager;
private static PipelineJobManager jobManager;
@@ -110,6 +113,7 @@ class MigrationJobAPITest {
PipelineContextUtils.mockModeConfigAndContextManager();
jobOption = new MigrationJobOption();
jobAPI = new MigrationJobAPI();
+ migrationJobManager = new MigrationJobManager(jobOption);
jobConfigManager = new PipelineJobConfigurationManager(jobOption);
jobManager = new PipelineJobManager(jobOption);
transmissionJobManager = new TransmissionJobManager(jobOption);
@@ -120,12 +124,13 @@ class MigrationJobAPITest {
props.put("jdbcUrl", jdbcUrl);
props.put("username", "root");
props.put("password", "root");
-
jobOption.addMigrationSourceResources(PipelineContextUtils.getContextKey(),
Collections.singletonMap("ds_0", new
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props)));
+ migrationJobManager.addMigrationSourceResources(
+ PipelineContextUtils.getContextKey(),
Collections.singletonMap("ds_0", new
DataSourcePoolProperties("com.zaxxer.hikari.HikariDataSource", props)));
}
@AfterAll
static void afterClass() {
-
jobOption.dropMigrationSourceResources(PipelineContextUtils.getContextKey(),
Collections.singletonList("ds_0"));
+
migrationJobManager.dropMigrationSourceResources(PipelineContextUtils.getContextKey(),
Collections.singletonList("ds_0"));
}
@Test
@@ -253,20 +258,20 @@ class MigrationJobAPITest {
void assertCreateJobConfigFailedOnMoreThanOneSourceTable() {
List<SourceTargetEntry> sourceTargetEntries = Stream.of("t_order_0",
"t_order_1")
.map(each -> new SourceTargetEntry("logic_db", new
DataNode("ds_0", each), "t_order")).collect(Collectors.toList());
- assertThrows(PipelineInvalidParameterException.class, () ->
jobOption.createJobAndStart(PipelineContextUtils.getContextKey(), new
MigrateTableStatement(sourceTargetEntries, "logic_db")));
+ assertThrows(PipelineInvalidParameterException.class, () ->
migrationJobManager.start(PipelineContextUtils.getContextKey(), new
MigrateTableStatement(sourceTargetEntries, "logic_db")));
}
@Test
void assertCreateJobConfigFailedOnDataSourceNotExist() {
List<SourceTargetEntry> sourceTargetEntries =
Collections.singletonList(new SourceTargetEntry("logic_db", new
DataNode("ds_not_exists", "t_order"), "t_order"));
- assertThrows(PipelineInvalidParameterException.class, () ->
jobOption.createJobAndStart(PipelineContextUtils.getContextKey(), new
MigrateTableStatement(sourceTargetEntries, "logic_db")));
+ assertThrows(PipelineInvalidParameterException.class, () ->
migrationJobManager.start(PipelineContextUtils.getContextKey(), new
MigrateTableStatement(sourceTargetEntries, "logic_db")));
}
@Test
void assertCreateJobConfig() throws SQLException {
initIntPrimaryEnvironment();
SourceTargetEntry sourceTargetEntry = new
SourceTargetEntry("logic_db", new DataNode("ds_0", "t_order"), "t_order");
- String jobId =
jobOption.createJobAndStart(PipelineContextUtils.getContextKey(), new
MigrateTableStatement(Collections.singletonList(sourceTargetEntry),
"logic_db"));
+ String jobId =
migrationJobManager.start(PipelineContextUtils.getContextKey(), new
MigrateTableStatement(Collections.singletonList(sourceTargetEntry),
"logic_db"));
MigrationJobConfiguration actual =
jobConfigManager.getJobConfiguration(jobId);
assertThat(actual.getTargetDatabaseName(), is("logic_db"));
List<JobDataNodeLine> dataNodeLines = actual.getJobShardingDataNodes();
@@ -294,7 +299,7 @@ class MigrationJobAPITest {
@Test
void assertShowMigrationSourceResources() {
- Collection<Collection<Object>> actual =
jobOption.listMigrationSourceResources(PipelineContextUtils.getContextKey());
+ Collection<Collection<Object>> actual =
migrationJobManager.listMigrationSourceResources(PipelineContextUtils.getContextKey());
assertThat(actual.size(), is(1));
Collection<Object> objects = actual.iterator().next();
assertThat(objects.toArray()[0], is("ds_0"));