This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 3a04b41fc7a Remove TransmissionJobOption.buildTaskConfiguration()
(#29247)
3a04b41fc7a is described below
commit 3a04b41fc7a16af1129ca59ff05b7ca93fdc1c25
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Dec 1 15:40:01 2023 +0800
Remove TransmissionJobOption.buildTaskConfiguration() (#29247)
---
.../core/job/option/TransmissionJobOption.java | 12 -----
.../shardingsphere/data/pipeline/cdc/CDCJob.java | 54 +++++++++++++++++--
.../data/pipeline/cdc/CDCJobOption.java | 51 ------------------
.../pipeline/scenario/migration/MigrationJob.java | 63 +++++++++++++++++++++-
.../scenario/migration/MigrationJobOption.java | 63 ----------------------
.../pipeline/core/util/PipelineContextUtils.java | 55 ++++++++++++++++++-
6 files changed, 164 insertions(+), 134 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
index 2564468586f..3582b71ebe6 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
@@ -18,13 +18,11 @@
package org.apache.shardingsphere.data.pipeline.core.job.option;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
/**
* Transmission job option.
@@ -45,16 +43,6 @@ public interface TransmissionJobOption extends
PipelineJobOption {
*/
PipelineJobInfo getJobInfo(String jobId);
- /**
- * Build task configuration.
- *
- * @param jobConfig pipeline job configuration
- * @param jobShardingItem job sharding item
- * @param processConfig pipeline process configuration
- * @return task configuration
- */
- PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration
jobConfig, int jobShardingItem, PipelineProcessConfiguration processConfig);
-
/**
* Build transmission process context.
*
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 84cdcc78ed9..c70c81d0122 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -20,25 +20,38 @@ package org.apache.shardingsphere.data.pipeline.cdc;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
+import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
import
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink;
import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
-import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
+import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
+import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
+import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback;
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
+import
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
+import
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
@@ -55,8 +68,11 @@ import
org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
/**
* CDC job.
@@ -93,7 +109,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
log.info("stopping true, ignore");
return;
}
- CDCJobItemContext jobItemContext =
buildPipelineJobItemContext(jobConfig, shardingItem);
+ CDCJobItemContext jobItemContext =
buildCDCJobItemContext(jobConfig, shardingItem);
PipelineTasksRunner tasksRunner = new
CDCTasksRunner(jobItemContext);
if (!addTasksRunner(shardingItem, tasksRunner)) {
continue;
@@ -111,13 +127,43 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
executeIncrementalTasks(jobItemContexts);
}
- private CDCJobItemContext buildPipelineJobItemContext(final
CDCJobConfiguration jobConfig, final int shardingItem) {
+ private CDCJobItemContext buildCDCJobItemContext(final CDCJobConfiguration
jobConfig, final int shardingItem) {
Optional<TransmissionJobItemProgress> initProgress =
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
CDCProcessContext jobProcessContext =
jobOption.buildProcessContext(jobConfig);
- CDCTaskConfiguration taskConfig =
jobOption.buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
+ CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig,
shardingItem, jobProcessContext.getPipelineProcessConfig());
return new CDCJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager,
sink);
}
+ private CDCTaskConfiguration buildTaskConfiguration(final
CDCJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
+ TableAndSchemaNameMapper tableAndSchemaNameMapper = new
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
+ IncrementalDumperContext dumperContext = buildDumperContext(jobConfig,
jobShardingItem, tableAndSchemaNameMapper);
+ ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, processConfig,
jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper);
+ CDCTaskConfiguration result = new CDCTaskConfiguration(dumperContext,
importerConfig);
+ log.debug("buildTaskConfiguration, result={}", result);
+ return result;
+ }
+
+ private IncrementalDumperContext buildDumperContext(final
CDCJobConfiguration jobConfig, final int jobShardingItem, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+ JobDataNodeLine dataNodeLine =
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
+ String dataSourceName =
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
+ StandardPipelineDataSourceConfiguration actualDataSourceConfig =
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
+ return new IncrementalDumperContext(
+ new DumperCommonContext(dataSourceName,
actualDataSourceConfig,
JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine),
tableAndSchemaNameMapper),
+ jobConfig.getJobId(), jobConfig.isDecodeWithTX());
+ }
+
+ private ImporterConfiguration buildImporterConfiguration(final
CDCJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig, final Collection<String> schemaTableNames,
+ final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+ PipelineDataSourceConfiguration dataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getDataSourceConfig().getType(),
+ jobConfig.getDataSourceConfig().getParameter());
+ CDCProcessContext processContext = new
CDCProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
+ JobRateLimitAlgorithm writeRateLimitAlgorithm =
processContext.getWriteRateLimitAlgorithm();
+ int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
+ Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor()
+
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()));
+ return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, 0, 1);
+ }
+
private void prepare(final Collection<CDCJobItemContext> jobItemContexts) {
try {
jobPreparer.initTasks(jobItemContexts);
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
index 081e86e8247..331017ac3f3 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
@@ -18,39 +18,20 @@
package org.apache.shardingsphere.data.pipeline.cdc;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
import
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
-import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
-import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
-import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
-import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
-import
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
/**
* CDC job option.
*/
@@ -80,38 +61,6 @@ public final class CDCJobOption implements
TransmissionJobOption {
return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(),
String.join(", ", jobConfig.getSchemaTableNames()));
}
- @Override
- public CDCTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
- CDCJobConfiguration jobConfig = (CDCJobConfiguration)
pipelineJobConfig;
- TableAndSchemaNameMapper tableAndSchemaNameMapper = new
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
- IncrementalDumperContext dumperContext = buildDumperContext(jobConfig,
jobShardingItem, tableAndSchemaNameMapper);
- ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, processConfig,
jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper);
- CDCTaskConfiguration result = new CDCTaskConfiguration(dumperContext,
importerConfig);
- log.debug("buildTaskConfiguration, result={}", result);
- return result;
- }
-
- private IncrementalDumperContext buildDumperContext(final
CDCJobConfiguration jobConfig, final int jobShardingItem, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
- JobDataNodeLine dataNodeLine =
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
- String dataSourceName =
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
- StandardPipelineDataSourceConfiguration actualDataSourceConfig =
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
- return new IncrementalDumperContext(
- new DumperCommonContext(dataSourceName,
actualDataSourceConfig,
JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine),
tableAndSchemaNameMapper),
- jobConfig.getJobId(), jobConfig.isDecodeWithTX());
- }
-
- private ImporterConfiguration buildImporterConfiguration(final
CDCJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig, final Collection<String> schemaTableNames,
- final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
- PipelineDataSourceConfiguration dataSourceConfig =
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getDataSourceConfig().getType(),
- jobConfig.getDataSourceConfig().getParameter());
- CDCProcessContext processContext = new
CDCProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
- JobRateLimitAlgorithm writeRateLimitAlgorithm =
processContext.getWriteRateLimitAlgorithm();
- int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
- Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor()
-
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()));
- return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap,
tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, 0, 1);
- }
-
@Override
public CDCProcessContext buildProcessContext(final
PipelineJobConfiguration jobConfig) {
TransmissionJobManager jobManager = new TransmissionJobManager(this);
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index ccdadabf7f9..63fdd02f23d 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -18,25 +18,46 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext;
+import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
+import
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
+import
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
import
org.apache.shardingsphere.data.pipeline.core.task.runner.TransmissionTasksRunner;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.prepare.MigrationJobPreparer;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.datanode.DataNode;
import java.sql.SQLException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/**
* Migration job.
@@ -63,10 +84,48 @@ public final class MigrationJob extends
AbstractSimplePipelineJob {
MigrationJobConfiguration jobConfig = new
YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
Optional<TransmissionJobItemProgress> initProgress =
jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
MigrationProcessContext jobProcessContext =
jobOption.buildProcessContext(jobConfig);
- MigrationTaskConfiguration taskConfig =
jobOption.buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
+ MigrationTaskConfiguration taskConfig =
buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
return new MigrationJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
}
+ private MigrationTaskConfiguration buildTaskConfiguration(final
MigrationJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
+ IncrementalDumperContext incrementalDumperContext = new
MigrationIncrementalDumperContextCreator(jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
+ Collection<CreateTableConfiguration> createTableConfigs =
buildCreateTableConfigurations(jobConfig,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
+ Set<CaseInsensitiveIdentifier> targetTableNames =
jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet());
+ Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor().getShardingColumnsMap(
+ ((ShardingSpherePipelineDataSourceConfiguration)
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
+ ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, processConfig, shardingColumnsMap,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
+ MigrationTaskConfiguration result = new MigrationTaskConfiguration(
+
incrementalDumperContext.getCommonContext().getDataSourceName(),
createTableConfigs, incrementalDumperContext, importerConfig);
+ log.info("buildTaskConfiguration, result={}", result);
+ return result;
+ }
+
+ private Collection<CreateTableConfiguration>
buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+ Collection<CreateTableConfiguration> result = new LinkedList<>();
+ for (JobDataNodeEntry each :
jobConfig.getTablesFirstDataNodes().getEntries()) {
+ String sourceSchemaName =
tableAndSchemaNameMapper.getSchemaName(each.getLogicTableName());
+ DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData();
+ String targetSchemaName =
dialectDatabaseMetaData.isSchemaAvailable() ? sourceSchemaName : null;
+ DataNode dataNode = each.getDataNodes().get(0);
+ PipelineDataSourceConfiguration sourceDataSourceConfig =
jobConfig.getSources().get(dataNode.getDataSourceName());
+ CreateTableConfiguration createTableConfig = new
CreateTableConfiguration(sourceDataSourceConfig, new
CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()),
+ jobConfig.getTarget(), new
CaseInsensitiveQualifiedTable(targetSchemaName, each.getLogicTableName()));
+ result.add(createTableConfig);
+ }
+ log.info("buildCreateTableConfigurations, result={}", result);
+ return result;
+ }
+
+ private ImporterConfiguration buildImporterConfiguration(final
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig,
+ final
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+ int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
+ JobRateLimitAlgorithm writeRateLimitAlgorithm = new
MigrationProcessContext(jobConfig.getJobId(),
pipelineProcessConfig).getWriteRateLimitAlgorithm();
+ int retryTimes = jobConfig.getRetryTimes();
+ int concurrency = jobConfig.getConcurrency();
+ return new ImporterConfiguration(jobConfig.getTarget(),
shardingColumnsMap, tableAndSchemaNameMapper, batchSize,
writeRateLimitAlgorithm, retryTimes, concurrency);
+ }
+
@Override
protected PipelineTasksRunner buildPipelineTasksRunner(final
PipelineJobItemContext pipelineJobItemContext) {
return new TransmissionTasksRunner((TransmissionJobItemContext)
pipelineJobItemContext);
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
index d2554aed14b..03b8f537bf4 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
@@ -18,45 +18,26 @@
package org.apache.shardingsphere.data.pipeline.scenario.migration;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration;
-import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
-import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
-import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
-import
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
-import
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
-import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
-import org.apache.shardingsphere.infra.datanode.DataNode;
import java.util.Collection;
import java.util.LinkedList;
-import java.util.Map;
import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
/**
* Migration job option.
@@ -94,50 +75,6 @@ public final class MigrationJobOption implements
TransmissionJobOption {
return new PipelineJobInfo(jobMetaData, null, String.join(",",
sourceTables));
}
- @Override
- public MigrationTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
- MigrationJobConfiguration jobConfig = (MigrationJobConfiguration)
pipelineJobConfig;
- IncrementalDumperContext incrementalDumperContext = new
MigrationIncrementalDumperContextCreator(
-
jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
- Collection<CreateTableConfiguration> createTableConfigs =
buildCreateTableConfigurations(jobConfig,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
- Set<CaseInsensitiveIdentifier> targetTableNames =
jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet());
- Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor().getShardingColumnsMap(
- ((ShardingSpherePipelineDataSourceConfiguration)
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
- ImporterConfiguration importerConfig = buildImporterConfiguration(
- jobConfig, processConfig, shardingColumnsMap,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
- MigrationTaskConfiguration result = new MigrationTaskConfiguration(
-
incrementalDumperContext.getCommonContext().getDataSourceName(),
createTableConfigs, incrementalDumperContext, importerConfig);
- log.info("buildTaskConfiguration, result={}", result);
- return result;
- }
-
- private Collection<CreateTableConfiguration>
buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
- Collection<CreateTableConfiguration> result = new LinkedList<>();
- for (JobDataNodeEntry each :
jobConfig.getTablesFirstDataNodes().getEntries()) {
- String sourceSchemaName =
tableAndSchemaNameMapper.getSchemaName(each.getLogicTableName());
- DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData();
- String targetSchemaName =
dialectDatabaseMetaData.isSchemaAvailable() ? sourceSchemaName : null;
- DataNode dataNode = each.getDataNodes().get(0);
- PipelineDataSourceConfiguration sourceDataSourceConfig =
jobConfig.getSources().get(dataNode.getDataSourceName());
- CreateTableConfiguration createTableConfig = new
CreateTableConfiguration(
- sourceDataSourceConfig, new
CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()),
- jobConfig.getTarget(), new
CaseInsensitiveQualifiedTable(targetSchemaName, each.getLogicTableName()));
- result.add(createTableConfig);
- }
- log.info("buildCreateTableConfigurations, result={}", result);
- return result;
- }
-
- private ImporterConfiguration buildImporterConfiguration(final
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig,
- final
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
- MigrationProcessContext processContext = new
MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
- JobRateLimitAlgorithm writeRateLimitAlgorithm =
processContext.getWriteRateLimitAlgorithm();
- int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
- int retryTimes = jobConfig.getRetryTimes();
- int concurrency = jobConfig.getConcurrency();
- return new ImporterConfiguration(jobConfig.getTarget(),
shardingColumnsMap, tableAndSchemaNameMapper, batchSize,
writeRateLimitAlgorithm, retryTimes, concurrency);
- }
-
@Override
public MigrationProcessContext buildProcessContext(final
PipelineJobConfiguration jobConfig) {
PipelineProcessConfiguration processConfig = new
TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index 3faeb76ed4a..efa91b9cae2 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -18,7 +18,10 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.util;
import lombok.SneakyThrows;
+import
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration;
+import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
import
org.apache.shardingsphere.data.pipeline.common.config.process.yaml.YamlPipelineProcessConfiguration;
@@ -27,17 +30,27 @@ import
org.apache.shardingsphere.data.pipeline.common.config.process.yaml.swappe
import org.apache.shardingsphere.data.pipeline.common.context.PipelineContext;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
import
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
+import
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
+import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
import
org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
-import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
+import
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
+import
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
import
org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory;
import
org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.datanode.DataNode;
import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn;
import
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
@@ -57,9 +70,13 @@ import org.mockito.internal.configuration.plugins.Plugins;
import java.sql.Types;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
/**
* Pipeline context utility class.
@@ -166,7 +183,7 @@ public final class PipelineContextUtils {
PipelineProcessConfiguration processConfig =
mockPipelineProcessConfiguration();
MigrationProcessContext processContext = new
MigrationProcessContext(jobConfig.getJobId(), processConfig);
int jobShardingItem = 0;
- MigrationTaskConfiguration taskConfig = new
MigrationJobOption().buildTaskConfiguration(jobConfig, jobShardingItem,
processConfig);
+ MigrationTaskConfiguration taskConfig =
buildTaskConfiguration(jobConfig, jobShardingItem, processConfig);
return new MigrationJobItemContext(jobConfig, jobShardingItem, null,
processContext, taskConfig, new DefaultPipelineDataSourceManager());
}
@@ -178,4 +195,38 @@ public final class PipelineContextUtils {
PipelineProcessConfigurationUtils.fillInDefaultValue(yamlProcessConfig);
return new
YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig);
}
+
+ private static MigrationTaskConfiguration buildTaskConfiguration(final
MigrationJobConfiguration jobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
+ IncrementalDumperContext incrementalDumperContext = new
MigrationIncrementalDumperContextCreator(jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
+ Collection<CreateTableConfiguration> createTableConfigs =
buildCreateTableConfigurations(jobConfig,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
+ Set<CaseInsensitiveIdentifier> targetTableNames =
jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet());
+ Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor().getShardingColumnsMap(
+ ((ShardingSpherePipelineDataSourceConfiguration)
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
+ ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, processConfig, shardingColumnsMap,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
+ return new
MigrationTaskConfiguration(incrementalDumperContext.getCommonContext().getDataSourceName(),
createTableConfigs, incrementalDumperContext, importerConfig);
+ }
+
+ private static Collection<CreateTableConfiguration>
buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+ Collection<CreateTableConfiguration> result = new LinkedList<>();
+ for (JobDataNodeEntry each :
jobConfig.getTablesFirstDataNodes().getEntries()) {
+ String sourceSchemaName =
tableAndSchemaNameMapper.getSchemaName(each.getLogicTableName());
+ DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData();
+ String targetSchemaName =
dialectDatabaseMetaData.isSchemaAvailable() ? sourceSchemaName : null;
+ DataNode dataNode = each.getDataNodes().get(0);
+ PipelineDataSourceConfiguration sourceDataSourceConfig =
jobConfig.getSources().get(dataNode.getDataSourceName());
+ CreateTableConfiguration createTableConfig = new
CreateTableConfiguration(sourceDataSourceConfig, new
CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()),
+ jobConfig.getTarget(), new
CaseInsensitiveQualifiedTable(targetSchemaName, each.getLogicTableName()));
+ result.add(createTableConfig);
+ }
+ return result;
+ }
+
+ private static ImporterConfiguration buildImporterConfiguration(final
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration
pipelineProcessConfig,
+ final
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+ int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
+ JobRateLimitAlgorithm writeRateLimitAlgorithm = new
MigrationProcessContext(jobConfig.getJobId(),
pipelineProcessConfig).getWriteRateLimitAlgorithm();
+ int retryTimes = jobConfig.getRetryTimes();
+ int concurrency = jobConfig.getConcurrency();
+ return new ImporterConfiguration(jobConfig.getTarget(),
shardingColumnsMap, tableAndSchemaNameMapper, batchSize,
writeRateLimitAlgorithm, retryTimes, concurrency);
+ }
}