This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new d09b94c1347 Refactor MigrationJobAPI (#29241)
d09b94c1347 is described below
commit d09b94c1347a2d3794ec971dd47518db7789342e
Author: Liang Zhang <[email protected]>
AuthorDate: Thu Nov 30 23:14:35 2023 +0800
Refactor MigrationJobAPI (#29241)
---
.../data/pipeline/scenario/migration/api/MigrationJobAPI.java | 11 ++++++-----
1 file changed, 6 insertions(+), 5 deletions(-)
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
index 4412612d67f..2b0beae44af 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/MigrationJobAPI.java
@@ -39,12 +39,11 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.metadata.NoAnyRule
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.PipelineJobOption;
-import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
+import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
+import
org.apache.shardingsphere.data.pipeline.core.job.option.TransmissionJobOption;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobConfigurationManager;
import
org.apache.shardingsphere.data.pipeline.core.job.service.PipelineJobManager;
-import org.apache.shardingsphere.data.pipeline.core.job.api.TransmissionJobAPI;
import
org.apache.shardingsphere.data.pipeline.core.metadata.PipelineDataSourcePersistService;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobOption;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
@@ -93,6 +92,8 @@ import java.util.stream.Collectors;
@Slf4j
public final class MigrationJobAPI implements TransmissionJobAPI {
+ private final TransmissionJobOption jobOption;
+
private final PipelineJobManager jobManager;
private final PipelineJobConfigurationManager jobConfigManager;
@@ -100,7 +101,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
private final PipelineDataSourcePersistService dataSourcePersistService;
public MigrationJobAPI() {
- PipelineJobOption jobOption = new MigrationJobOption();
+ jobOption = new MigrationJobOption();
jobManager = new PipelineJobManager(jobOption);
jobConfigManager = new PipelineJobConfigurationManager(jobOption);
dataSourcePersistService = new PipelineDataSourcePersistService();
@@ -163,7 +164,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
result.setTargetTableSchemaMap(buildTargetTableSchemaMap(sourceDataNodes));
result.setTablesFirstDataNodes(new
JobDataNodeLine(tablesFirstDataNodes).marshal());
result.setJobShardingDataNodes(JobDataNodeLineConvertUtils.convertDataNodesToLines(sourceDataNodes).stream().map(JobDataNodeLine::marshal).collect(Collectors.toList()));
- ((TransmissionJobOption)
TypedSPILoader.getService(PipelineJobType.class,
getType()).getOption()).extendYamlJobConfiguration(contextKey, result);
+ jobOption.extendYamlJobConfiguration(contextKey, result);
return result;
}