This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a04b41fc7a Remove TransmissionJobOption.buildTaskConfiguration() 
(#29247)
3a04b41fc7a is described below

commit 3a04b41fc7a16af1129ca59ff05b7ca93fdc1c25
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Dec 1 15:40:01 2023 +0800

    Remove TransmissionJobOption.buildTaskConfiguration() (#29247)
---
 .../core/job/option/TransmissionJobOption.java     | 12 -----
 .../shardingsphere/data/pipeline/cdc/CDCJob.java   | 54 +++++++++++++++++--
 .../data/pipeline/cdc/CDCJobOption.java            | 51 ------------------
 .../pipeline/scenario/migration/MigrationJob.java  | 63 +++++++++++++++++++++-
 .../scenario/migration/MigrationJobOption.java     | 63 ----------------------
 .../pipeline/core/util/PipelineContextUtils.java   | 55 ++++++++++++++++++-
 6 files changed, 164 insertions(+), 134 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
index 2564468586f..3582b71ebe6 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/option/TransmissionJobOption.java
@@ -18,13 +18,11 @@
 package org.apache.shardingsphere.data.pipeline.core.job.option;
 
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.yaml.YamlTransmissionJobItemProgressSwapper;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import 
org.apache.shardingsphere.data.pipeline.core.task.config.PipelineTaskConfiguration;
 
 /**
  * Transmission job option.
@@ -45,16 +43,6 @@ public interface TransmissionJobOption extends 
PipelineJobOption {
      */
     PipelineJobInfo getJobInfo(String jobId);
     
-    /**
-     * Build task configuration.
-     *
-     * @param jobConfig pipeline job configuration
-     * @param jobShardingItem job sharding item
-     * @param processConfig pipeline process configuration
-     * @return task configuration
-     */
-    PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration 
jobConfig, int jobShardingItem, PipelineProcessConfiguration processConfig);
-    
     /**
      * Build transmission process context.
      *
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
index 84cdcc78ed9..c70c81d0122 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJob.java
@@ -20,25 +20,38 @@ package org.apache.shardingsphere.data.pipeline.cdc;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.cdc.api.CDCJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCJobItemContext;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.cdc.core.importer.sink.CDCSocketSink;
 import org.apache.shardingsphere.data.pipeline.cdc.core.prepare.CDCJobPreparer;
 import org.apache.shardingsphere.data.pipeline.cdc.core.task.CDCTasksRunner;
 import org.apache.shardingsphere.data.pipeline.cdc.generator.CDCResponseUtils;
-import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
+import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
+import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
+import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteCallback;
 import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.FinishedPosition;
 import org.apache.shardingsphere.data.pipeline.common.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
+import 
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
 import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.core.job.AbstractPipelineJob;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
@@ -55,8 +68,11 @@ import 
org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 import java.util.Collection;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 /**
  * CDC job.
@@ -93,7 +109,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
                 log.info("stopping true, ignore");
                 return;
             }
-            CDCJobItemContext jobItemContext = 
buildPipelineJobItemContext(jobConfig, shardingItem);
+            CDCJobItemContext jobItemContext = 
buildCDCJobItemContext(jobConfig, shardingItem);
             PipelineTasksRunner tasksRunner = new 
CDCTasksRunner(jobItemContext);
             if (!addTasksRunner(shardingItem, tasksRunner)) {
                 continue;
@@ -111,13 +127,43 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
         executeIncrementalTasks(jobItemContexts);
     }
     
-    private CDCJobItemContext buildPipelineJobItemContext(final 
CDCJobConfiguration jobConfig, final int shardingItem) {
+    private CDCJobItemContext buildCDCJobItemContext(final CDCJobConfiguration 
jobConfig, final int shardingItem) {
         Optional<TransmissionJobItemProgress> initProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
         CDCProcessContext jobProcessContext = 
jobOption.buildProcessContext(jobConfig);
-        CDCTaskConfiguration taskConfig = 
jobOption.buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
+        CDCTaskConfiguration taskConfig = buildTaskConfiguration(jobConfig, 
shardingItem, jobProcessContext.getPipelineProcessConfig());
         return new CDCJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, 
sink);
     }
     
+    private CDCTaskConfiguration buildTaskConfiguration(final 
CDCJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
+        TableAndSchemaNameMapper tableAndSchemaNameMapper = new 
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
+        IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, 
jobShardingItem, tableAndSchemaNameMapper);
+        ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, processConfig, 
jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper);
+        CDCTaskConfiguration result = new CDCTaskConfiguration(dumperContext, 
importerConfig);
+        log.debug("buildTaskConfiguration, result={}", result);
+        return result;
+    }
+    
+    private IncrementalDumperContext buildDumperContext(final 
CDCJobConfiguration jobConfig, final int jobShardingItem, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+        JobDataNodeLine dataNodeLine = 
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
+        String dataSourceName = 
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
+        StandardPipelineDataSourceConfiguration actualDataSourceConfig = 
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
+        return new IncrementalDumperContext(
+                new DumperCommonContext(dataSourceName, 
actualDataSourceConfig, 
JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine), 
tableAndSchemaNameMapper),
+                jobConfig.getJobId(), jobConfig.isDecodeWithTX());
+    }
+    
+    private ImporterConfiguration buildImporterConfiguration(final 
CDCJobConfiguration jobConfig, final PipelineProcessConfiguration 
pipelineProcessConfig, final Collection<String> schemaTableNames,
+                                                             final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+        PipelineDataSourceConfiguration dataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getDataSourceConfig().getType(),
+                jobConfig.getDataSourceConfig().getParameter());
+        CDCProcessContext processContext = new 
CDCProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
+        JobRateLimitAlgorithm writeRateLimitAlgorithm = 
processContext.getWriteRateLimitAlgorithm();
+        int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
+        Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor()
+                
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
 
schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()));
+        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, 0, 1);
+    }
+    
     private void prepare(final Collection<CDCJobItemContext> jobItemContexts) {
         try {
             jobPreparer.initTasks(jobItemContexts);
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
index 081e86e8247..331017ac3f3 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/CDCJobOption.java
@@ -18,39 +18,20 @@
 package org.apache.shardingsphere.data.pipeline.cdc;
 
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.job.CDCJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.cdc.config.task.CDCTaskConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.cdc.config.yaml.YamlCDCJobConfigurationSwapper;
 import org.apache.shardingsphere.data.pipeline.cdc.context.CDCProcessContext;
-import 
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
-import org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLine;
-import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeLineConvertUtils;
-import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceConfigurationFactory;
-import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
-import 
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 
-import java.util.Collection;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
 /**
  * CDC job option.
  */
@@ -80,38 +61,6 @@ public final class CDCJobOption implements 
TransmissionJobOption {
         return new PipelineJobInfo(jobMetaData, jobConfig.getDatabaseName(), 
String.join(", ", jobConfig.getSchemaTableNames()));
     }
     
-    @Override
-    public CDCTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
-        CDCJobConfiguration jobConfig = (CDCJobConfiguration) 
pipelineJobConfig;
-        TableAndSchemaNameMapper tableAndSchemaNameMapper = new 
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
-        IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, 
jobShardingItem, tableAndSchemaNameMapper);
-        ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, processConfig, 
jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper);
-        CDCTaskConfiguration result = new CDCTaskConfiguration(dumperContext, 
importerConfig);
-        log.debug("buildTaskConfiguration, result={}", result);
-        return result;
-    }
-    
-    private IncrementalDumperContext buildDumperContext(final 
CDCJobConfiguration jobConfig, final int jobShardingItem, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
-        JobDataNodeLine dataNodeLine = 
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
-        String dataSourceName = 
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
-        StandardPipelineDataSourceConfiguration actualDataSourceConfig = 
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
-        return new IncrementalDumperContext(
-                new DumperCommonContext(dataSourceName, 
actualDataSourceConfig, 
JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine), 
tableAndSchemaNameMapper),
-                jobConfig.getJobId(), jobConfig.isDecodeWithTX());
-    }
-    
-    private ImporterConfiguration buildImporterConfiguration(final 
CDCJobConfiguration jobConfig, final PipelineProcessConfiguration 
pipelineProcessConfig, final Collection<String> schemaTableNames,
-                                                             final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
-        PipelineDataSourceConfiguration dataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getDataSourceConfig().getType(),
-                jobConfig.getDataSourceConfig().getParameter());
-        CDCProcessContext processContext = new 
CDCProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
-        JobRateLimitAlgorithm writeRateLimitAlgorithm = 
processContext.getWriteRateLimitAlgorithm();
-        int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
-        Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor()
-                
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
 
schemaTableNames.stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet()));
-        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, 0, 1);
-    }
-    
     @Override
     public CDCProcessContext buildProcessContext(final 
PipelineJobConfiguration jobConfig) {
         TransmissionJobManager jobManager = new TransmissionJobManager(this);
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index ccdadabf7f9..63fdd02f23d 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -18,25 +18,46 @@
 package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionJobItemContext;
+import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.TransmissionJobItemProgress;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
+import 
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
+import 
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.job.AbstractSimplePipelineJob;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobItemManager;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.PipelineTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.core.task.runner.TransmissionTasksRunner;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.prepare.MigrationJobPreparer;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
 import org.apache.shardingsphere.elasticjob.api.ShardingContext;
+import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.datanode.DataNode;
 
 import java.sql.SQLException;
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Migration job.
@@ -63,10 +84,48 @@ public final class MigrationJob extends 
AbstractSimplePipelineJob {
         MigrationJobConfiguration jobConfig = new 
YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
         Optional<TransmissionJobItemProgress> initProgress = 
jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
         MigrationProcessContext jobProcessContext = 
jobOption.buildProcessContext(jobConfig);
-        MigrationTaskConfiguration taskConfig = 
jobOption.buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
+        MigrationTaskConfiguration taskConfig = 
buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
         return new MigrationJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
     }
     
+    private MigrationTaskConfiguration buildTaskConfiguration(final 
MigrationJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
+        IncrementalDumperContext incrementalDumperContext = new 
MigrationIncrementalDumperContextCreator(jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
+        Collection<CreateTableConfiguration> createTableConfigs = 
buildCreateTableConfigurations(jobConfig, 
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
+        Set<CaseInsensitiveIdentifier> targetTableNames = 
jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet());
+        Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor().getShardingColumnsMap(
+                ((ShardingSpherePipelineDataSourceConfiguration) 
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
+        ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, processConfig, shardingColumnsMap, 
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
+        MigrationTaskConfiguration result = new MigrationTaskConfiguration(
+                
incrementalDumperContext.getCommonContext().getDataSourceName(), 
createTableConfigs, incrementalDumperContext, importerConfig);
+        log.info("buildTaskConfiguration, result={}", result);
+        return result;
+    }
+    
+    private Collection<CreateTableConfiguration> 
buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+        Collection<CreateTableConfiguration> result = new LinkedList<>();
+        for (JobDataNodeEntry each : 
jobConfig.getTablesFirstDataNodes().getEntries()) {
+            String sourceSchemaName = 
tableAndSchemaNameMapper.getSchemaName(each.getLogicTableName());
+            DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData();
+            String targetSchemaName = 
dialectDatabaseMetaData.isSchemaAvailable() ? sourceSchemaName : null;
+            DataNode dataNode = each.getDataNodes().get(0);
+            PipelineDataSourceConfiguration sourceDataSourceConfig = 
jobConfig.getSources().get(dataNode.getDataSourceName());
+            CreateTableConfiguration createTableConfig = new 
CreateTableConfiguration(sourceDataSourceConfig, new 
CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()),
+                    jobConfig.getTarget(), new 
CaseInsensitiveQualifiedTable(targetSchemaName, each.getLogicTableName()));
+            result.add(createTableConfig);
+        }
+        log.info("buildCreateTableConfigurations, result={}", result);
+        return result;
+    }
+    
+    private ImporterConfiguration buildImporterConfiguration(final 
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration 
pipelineProcessConfig,
+                                                             final 
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+        int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
+        JobRateLimitAlgorithm writeRateLimitAlgorithm = new 
MigrationProcessContext(jobConfig.getJobId(), 
pipelineProcessConfig).getWriteRateLimitAlgorithm();
+        int retryTimes = jobConfig.getRetryTimes();
+        int concurrency = jobConfig.getConcurrency();
+        return new ImporterConfiguration(jobConfig.getTarget(), 
shardingColumnsMap, tableAndSchemaNameMapper, batchSize, 
writeRateLimitAlgorithm, retryTimes, concurrency);
+    }
+    
     @Override
     protected PipelineTasksRunner buildPipelineTasksRunner(final 
PipelineJobItemContext pipelineJobItemContext) {
         return new TransmissionTasksRunner((TransmissionJobItemContext) 
pipelineJobItemContext);
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
index d2554aed14b..03b8f537bf4 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJobOption.java
@@ -18,45 +18,26 @@
 package org.apache.shardingsphere.data.pipeline.scenario.migration;
 
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.job.PipelineJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.TransmissionProcessContext;
 import org.apache.shardingsphere.data.pipeline.common.datanode.DataNodeUtils;
-import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
-import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
-import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobMetaData;
-import 
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency.MigrationDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.yaml.YamlMigrationJobConfigurationSwapper;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
-import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
-import org.apache.shardingsphere.infra.datanode.DataNode;
 
 import java.util.Collection;
 import java.util.LinkedList;
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * Migration job option.
@@ -94,50 +75,6 @@ public final class MigrationJobOption implements 
TransmissionJobOption {
         return new PipelineJobInfo(jobMetaData, null, String.join(",", 
sourceTables));
     }
     
-    @Override
-    public MigrationTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
-        MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) 
pipelineJobConfig;
-        IncrementalDumperContext incrementalDumperContext = new 
MigrationIncrementalDumperContextCreator(
-                
jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
-        Collection<CreateTableConfiguration> createTableConfigs = 
buildCreateTableConfigurations(jobConfig, 
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
-        Set<CaseInsensitiveIdentifier> targetTableNames = 
jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet());
-        Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor().getShardingColumnsMap(
-                ((ShardingSpherePipelineDataSourceConfiguration) 
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
-        ImporterConfiguration importerConfig = buildImporterConfiguration(
-                jobConfig, processConfig, shardingColumnsMap, 
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
-        MigrationTaskConfiguration result = new MigrationTaskConfiguration(
-                
incrementalDumperContext.getCommonContext().getDataSourceName(), 
createTableConfigs, incrementalDumperContext, importerConfig);
-        log.info("buildTaskConfiguration, result={}", result);
-        return result;
-    }
-    
-    private Collection<CreateTableConfiguration> 
buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
-        Collection<CreateTableConfiguration> result = new LinkedList<>();
-        for (JobDataNodeEntry each : 
jobConfig.getTablesFirstDataNodes().getEntries()) {
-            String sourceSchemaName = 
tableAndSchemaNameMapper.getSchemaName(each.getLogicTableName());
-            DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData();
-            String targetSchemaName = 
dialectDatabaseMetaData.isSchemaAvailable() ? sourceSchemaName : null;
-            DataNode dataNode = each.getDataNodes().get(0);
-            PipelineDataSourceConfiguration sourceDataSourceConfig = 
jobConfig.getSources().get(dataNode.getDataSourceName());
-            CreateTableConfiguration createTableConfig = new 
CreateTableConfiguration(
-                    sourceDataSourceConfig, new 
CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()),
-                    jobConfig.getTarget(), new 
CaseInsensitiveQualifiedTable(targetSchemaName, each.getLogicTableName()));
-            result.add(createTableConfig);
-        }
-        log.info("buildCreateTableConfigurations, result={}", result);
-        return result;
-    }
-    
-    private ImporterConfiguration buildImporterConfiguration(final 
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration 
pipelineProcessConfig,
-                                                             final 
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
-        MigrationProcessContext processContext = new 
MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
-        JobRateLimitAlgorithm writeRateLimitAlgorithm = 
processContext.getWriteRateLimitAlgorithm();
-        int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
-        int retryTimes = jobConfig.getRetryTimes();
-        int concurrency = jobConfig.getConcurrency();
-        return new ImporterConfiguration(jobConfig.getTarget(), 
shardingColumnsMap, tableAndSchemaNameMapper, batchSize, 
writeRateLimitAlgorithm, retryTimes, concurrency);
-    }
-    
     @Override
     public MigrationProcessContext buildProcessContext(final 
PipelineJobConfiguration jobConfig) {
         PipelineProcessConfiguration processConfig = new 
TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
index 3faeb76ed4a..efa91b9cae2 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/util/PipelineContextUtils.java
@@ -18,7 +18,10 @@
 package org.apache.shardingsphere.test.it.data.pipeline.core.util;
 
 import lombok.SneakyThrows;
+import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.config.CreateTableConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineProcessConfigurationUtils;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.yaml.YamlPipelineProcessConfiguration;
@@ -27,17 +30,27 @@ import 
org.apache.shardingsphere.data.pipeline.common.config.process.yaml.swappe
 import org.apache.shardingsphere.data.pipeline.common.context.PipelineContext;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextKey;
 import 
org.apache.shardingsphere.data.pipeline.common.context.PipelineContextManager;
+import 
org.apache.shardingsphere.data.pipeline.common.datanode.JobDataNodeEntry;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
 import org.apache.shardingsphere.data.pipeline.common.execute.ExecuteEngine;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
+import 
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveQualifiedTable;
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.model.PipelineColumnMetaData;
-import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
+import 
org.apache.shardingsphere.data.pipeline.common.spi.algorithm.JobRateLimitAlgorithm;
+import 
org.apache.shardingsphere.data.pipeline.common.util.ShardingColumnsExtractor;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationTaskConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest.MigrationIncrementalDumperContextCreator;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.context.MigrationProcessContext;
 import 
org.apache.shardingsphere.driver.api.yaml.YamlShardingSphereDataSourceFactory;
 import 
org.apache.shardingsphere.driver.jdbc.core.datasource.ShardingSphereDataSource;
 import org.apache.shardingsphere.infra.config.mode.ModeConfiguration;
+import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.datanode.DataNode;
 import org.apache.shardingsphere.infra.instance.metadata.InstanceType;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereColumn;
 import 
org.apache.shardingsphere.infra.metadata.database.schema.model.ShardingSphereTable;
@@ -57,9 +70,13 @@ import org.mockito.internal.configuration.plugins.Plugins;
 
 import java.sql.Types;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Pipeline context utility class.
@@ -166,7 +183,7 @@ public final class PipelineContextUtils {
         PipelineProcessConfiguration processConfig = 
mockPipelineProcessConfiguration();
         MigrationProcessContext processContext = new 
MigrationProcessContext(jobConfig.getJobId(), processConfig);
         int jobShardingItem = 0;
-        MigrationTaskConfiguration taskConfig = new 
MigrationJobOption().buildTaskConfiguration(jobConfig, jobShardingItem, 
processConfig);
+        MigrationTaskConfiguration taskConfig = 
buildTaskConfiguration(jobConfig, jobShardingItem, processConfig);
         return new MigrationJobItemContext(jobConfig, jobShardingItem, null, 
processContext, taskConfig, new DefaultPipelineDataSourceManager());
     }
     
@@ -178,4 +195,38 @@ public final class PipelineContextUtils {
         
PipelineProcessConfigurationUtils.fillInDefaultValue(yamlProcessConfig);
         return new 
YamlPipelineProcessConfigurationSwapper().swapToObject(yamlProcessConfig);
     }
+    
+    private static MigrationTaskConfiguration buildTaskConfiguration(final 
MigrationJobConfiguration jobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
+        IncrementalDumperContext incrementalDumperContext = new 
MigrationIncrementalDumperContextCreator(jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
+        Collection<CreateTableConfiguration> createTableConfigs = 
buildCreateTableConfigurations(jobConfig, 
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
+        Set<CaseInsensitiveIdentifier> targetTableNames = 
jobConfig.getTargetTableNames().stream().map(CaseInsensitiveIdentifier::new).collect(Collectors.toSet());
+        Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor().getShardingColumnsMap(
+                ((ShardingSpherePipelineDataSourceConfiguration) 
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
+        ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, processConfig, shardingColumnsMap, 
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
+        return new 
MigrationTaskConfiguration(incrementalDumperContext.getCommonContext().getDataSourceName(),
 createTableConfigs, incrementalDumperContext, importerConfig);
+    }
+    
+    private static Collection<CreateTableConfiguration> 
buildCreateTableConfigurations(final MigrationJobConfiguration jobConfig, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+        Collection<CreateTableConfiguration> result = new LinkedList<>();
+        for (JobDataNodeEntry each : 
jobConfig.getTablesFirstDataNodes().getEntries()) {
+            String sourceSchemaName = 
tableAndSchemaNameMapper.getSchemaName(each.getLogicTableName());
+            DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData();
+            String targetSchemaName = 
dialectDatabaseMetaData.isSchemaAvailable() ? sourceSchemaName : null;
+            DataNode dataNode = each.getDataNodes().get(0);
+            PipelineDataSourceConfiguration sourceDataSourceConfig = 
jobConfig.getSources().get(dataNode.getDataSourceName());
+            CreateTableConfiguration createTableConfig = new 
CreateTableConfiguration(sourceDataSourceConfig, new 
CaseInsensitiveQualifiedTable(sourceSchemaName, dataNode.getTableName()),
+                    jobConfig.getTarget(), new 
CaseInsensitiveQualifiedTable(targetSchemaName, each.getLogicTableName()));
+            result.add(createTableConfig);
+        }
+        return result;
+    }
+    
+    private static ImporterConfiguration buildImporterConfiguration(final 
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration 
pipelineProcessConfig,
+                                                                    final 
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
+        int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
+        JobRateLimitAlgorithm writeRateLimitAlgorithm = new 
MigrationProcessContext(jobConfig.getJobId(), 
pipelineProcessConfig).getWriteRateLimitAlgorithm();
+        int retryTimes = jobConfig.getRetryTimes();
+        int concurrency = jobConfig.getConcurrency();
+        return new ImporterConfiguration(jobConfig.getTarget(), 
shardingColumnsMap, tableAndSchemaNameMapper, batchSize, 
writeRateLimitAlgorithm, retryTimes, concurrency);
+    }
 }


Reply via email to