This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 2576aaa2a96 Refactor TransmissionJobAPI (#29197)
2576aaa2a96 is described below

commit 2576aaa2a967b7628efb68d276079f1d26839b2b
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Nov 24 15:58:18 2023 +0800

    Refactor TransmissionJobAPI (#29197)
---
 .../pipeline/core/job/service/TransmissionJobAPI.java | 19 +++++++++----------
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java         | 12 ++++++------
 .../data/pipeline/cdc/core/job/CDCJob.java            |  2 +-
 .../task/ConsistencyCheckTasksRunner.java             |  4 ++--
 .../pipeline/scenario/migration/MigrationJob.java     |  2 +-
 .../scenario/migration/api/impl/MigrationJobAPI.java  | 16 ++++++++--------
 .../migration/api/impl/MigrationJobAPITest.java       |  4 ++--
 7 files changed, 29 insertions(+), 30 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobAPI.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobAPI.java
index 7a9431b794e..8a1def38509 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobAPI.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobAPI.java
@@ -52,20 +52,20 @@ public interface TransmissionJobAPI extends PipelineJobAPI {
     /**
      * Build task configuration.
      *
-     * @param pipelineJobConfig pipeline job configuration
+     * @param jobConfig pipeline job configuration
      * @param jobShardingItem job sharding item
-     * @param pipelineProcessConfig pipeline process configuration
+     * @param processConfig pipeline process configuration
      * @return task configuration
      */
-    PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration 
pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration 
pipelineProcessConfig);
+    PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration 
jobConfig, int jobShardingItem, PipelineProcessConfiguration processConfig);
     
     /**
-     * Build pipeline process context.
+     * Build transmission process context.
      *
-     * @param pipelineJobConfig pipeline job configuration
-     * @return pipeline process context
+     * @param jobConfig pipeline job configuration
+     * @return transmission process context
      */
-    TransmissionProcessContext 
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
+    TransmissionProcessContext buildProcessContext(PipelineJobConfiguration 
jobConfig);
     
     /**
      * Extend YAML job configuration.
@@ -78,13 +78,12 @@ public interface TransmissionJobAPI extends PipelineJobAPI {
     /**
      * Build pipeline data consistency checker.
      *
-     * @param pipelineJobConfig job configuration
+     * @param jobConfig job configuration
      * @param processContext process context
      * @param progressContext consistency check job item progress context
      * @return all logic tables check result
      */
-    PipelineDataConsistencyChecker 
buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig, 
TransmissionProcessContext processContext,
-                                                                       
ConsistencyCheckJobItemProgressContext progressContext);
+    PipelineDataConsistencyChecker 
buildDataConsistencyChecker(PipelineJobConfiguration jobConfig, 
TransmissionProcessContext processContext, 
ConsistencyCheckJobItemProgressContext progressContext);
     
     /**
      * Commit pipeline job.
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 72d7179bf33..839f61bf204 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -245,11 +245,11 @@ public final class CDCJobAPI implements 
TransmissionJobAPI {
     }
     
     @Override
-    public CDCTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
+    public CDCTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
         CDCJobConfiguration jobConfig = (CDCJobConfiguration) 
pipelineJobConfig;
         TableAndSchemaNameMapper tableAndSchemaNameMapper = new 
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
         IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, 
jobShardingItem, tableAndSchemaNameMapper);
-        ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, pipelineProcessConfig, 
jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper);
+        ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, processConfig, 
jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper);
         CDCTaskConfiguration result = new CDCTaskConfiguration(dumperContext, 
importerConfig);
         log.debug("buildTaskConfiguration, result={}", result);
         return result;
@@ -277,9 +277,9 @@ public final class CDCJobAPI implements TransmissionJobAPI {
     }
     
     @Override
-    public CDCProcessContext buildPipelineProcessContext(final 
PipelineJobConfiguration pipelineJobConfig) {
+    public CDCProcessContext buildProcessContext(final 
PipelineJobConfiguration jobConfig) {
         TransmissionJobManager jobManager = new TransmissionJobManager(this);
-        return new CDCProcessContext(pipelineJobConfig.getJobId(), 
jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId())));
+        return new CDCProcessContext(jobConfig.getJobId(), 
jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())));
     }
     
     @Override
@@ -325,8 +325,8 @@ public final class CDCJobAPI implements TransmissionJobAPI {
     }
     
     @Override
-    public PipelineDataConsistencyChecker 
buildPipelineDataConsistencyChecker(final PipelineJobConfiguration 
pipelineJobConfig, final TransmissionProcessContext processContext,
-                                                                              
final ConsistencyCheckJobItemProgressContext progressContext) {
+    public PipelineDataConsistencyChecker buildDataConsistencyChecker(final 
PipelineJobConfiguration jobConfig, final TransmissionProcessContext 
processContext,
+                                                                      final 
ConsistencyCheckJobItemProgressContext progressContext) {
         throw new UnsupportedOperationException();
     }
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 31cde63332d..78d0cd9e4b2 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -109,7 +109,7 @@ public final class CDCJob extends AbstractPipelineJob 
implements SimpleJob {
     
     private CDCJobItemContext buildPipelineJobItemContext(final 
CDCJobConfiguration jobConfig, final int shardingItem) {
         Optional<TransmissionJobItemProgress> initProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
-        CDCProcessContext jobProcessContext = 
jobAPI.buildPipelineProcessContext(jobConfig);
+        CDCProcessContext jobProcessContext = 
jobAPI.buildProcessContext(jobConfig);
         CDCTaskConfiguration taskConfig = 
jobAPI.buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
         return new CDCJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager, 
sink);
     }
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 58db6fc27c9..291d724dfdb 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -105,8 +105,8 @@ public final class ConsistencyCheckTasksRunner implements 
PipelineTasksRunner {
             TransmissionJobAPI jobAPI = (TransmissionJobAPI) 
TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
             PipelineJobConfiguration parentJobConfig = new 
PipelineJobManager(jobAPI).getJobConfiguration(parentJobId);
             try {
-                PipelineDataConsistencyChecker checker = 
jobAPI.buildPipelineDataConsistencyChecker(
-                        parentJobConfig, 
jobAPI.buildPipelineProcessContext(parentJobConfig), 
jobItemContext.getProgressContext());
+                PipelineDataConsistencyChecker checker = 
jobAPI.buildDataConsistencyChecker(
+                        parentJobConfig, 
jobAPI.buildProcessContext(parentJobConfig), 
jobItemContext.getProgressContext());
                 consistencyChecker.set(checker);
                 Map<String, TableDataConsistencyCheckResult> checkResultMap = 
checker.check(checkJobConfig.getAlgorithmTypeName(), 
checkJobConfig.getAlgorithmProps());
                 log.info("job {} with check algorithm '{}' data consistency 
checker result: {}, stopping: {}",
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index fd958c35895..3e46a817d49 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -63,7 +63,7 @@ public final class MigrationJob extends 
AbstractSimplePipelineJob {
         int shardingItem = shardingContext.getShardingItem();
         MigrationJobConfiguration jobConfig = new 
YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
         Optional<TransmissionJobItemProgress> initProgress = 
jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
-        MigrationProcessContext jobProcessContext = 
jobAPI.buildPipelineProcessContext(jobConfig);
+        MigrationProcessContext jobProcessContext = 
jobAPI.buildProcessContext(jobConfig);
         MigrationTaskConfiguration taskConfig = 
jobAPI.buildTaskConfiguration(jobConfig, shardingItem, 
jobProcessContext.getPipelineProcessConfig());
         return new MigrationJobItemContext(jobConfig, shardingItem, 
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
     }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 6d9eb905423..331211fdf2d 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -231,7 +231,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
     }
     
     @Override
-    public MigrationTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
+    public MigrationTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration processConfig) {
         MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) 
pipelineJobConfig;
         IncrementalDumperContext incrementalDumperContext = new 
MigrationIncrementalDumperContextCreator(
                 
jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
@@ -240,7 +240,7 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
         Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor().getShardingColumnsMap(
                 ((ShardingSpherePipelineDataSourceConfiguration) 
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
         ImporterConfiguration importerConfig = buildImporterConfiguration(
-                jobConfig, pipelineProcessConfig, shardingColumnsMap, 
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
+                jobConfig, processConfig, shardingColumnsMap, 
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
         MigrationTaskConfiguration result = new MigrationTaskConfiguration(
                 
incrementalDumperContext.getCommonContext().getDataSourceName(), 
createTableConfigs, incrementalDumperContext, importerConfig);
         log.info("buildTaskConfiguration, result={}", result);
@@ -275,15 +275,15 @@ public final class MigrationJobAPI implements 
TransmissionJobAPI {
     }
     
     @Override
-    public MigrationProcessContext buildPipelineProcessContext(final 
PipelineJobConfiguration pipelineJobConfig) {
-        PipelineProcessConfiguration processConfig = new 
TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId()));
-        return new MigrationProcessContext(pipelineJobConfig.getJobId(), 
processConfig);
+    public MigrationProcessContext buildProcessContext(final 
PipelineJobConfiguration jobConfig) {
+        PipelineProcessConfiguration processConfig = new 
TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
+        return new MigrationProcessContext(jobConfig.getJobId(), 
processConfig);
     }
     
     @Override
-    public PipelineDataConsistencyChecker 
buildPipelineDataConsistencyChecker(final PipelineJobConfiguration 
pipelineJobConfig, final TransmissionProcessContext processContext,
-                                                                              
final ConsistencyCheckJobItemProgressContext progressContext) {
-        return new MigrationDataConsistencyChecker((MigrationJobConfiguration) 
pipelineJobConfig, processContext, progressContext);
+    public PipelineDataConsistencyChecker buildDataConsistencyChecker(final 
PipelineJobConfiguration jobConfig, final TransmissionProcessContext 
processContext,
+                                                                      final 
ConsistencyCheckJobItemProgressContext progressContext) {
+        return new MigrationDataConsistencyChecker((MigrationJobConfiguration) 
jobConfig, processContext, progressContext);
     }
     
     @Override
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 7017e4e41e3..13c3f46d689 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -184,8 +184,8 @@ class MigrationJobAPITest {
         initTableData(jobConfig);
         Optional<String> jobId = jobManager.start(jobConfig);
         assertTrue(jobId.isPresent());
-        Map<String, TableDataConsistencyCheckResult> checkResultMap = 
jobAPI.buildPipelineDataConsistencyChecker(
-                jobConfig, jobAPI.buildPipelineProcessContext(jobConfig), new 
ConsistencyCheckJobItemProgressContext(jobId.get(), 0, "H2")).check("FIXTURE", 
null);
+        Map<String, TableDataConsistencyCheckResult> checkResultMap = 
jobAPI.buildDataConsistencyChecker(
+                jobConfig, jobAPI.buildProcessContext(jobConfig), new 
ConsistencyCheckJobItemProgressContext(jobId.get(), 0, "H2")).check("FIXTURE", 
null);
         assertThat(checkResultMap.size(), is(1));
         String checkKey = "t_order";
         assertTrue(checkResultMap.get(checkKey).isMatched());

Reply via email to