This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 2576aaa2a96 Refactor TransmissionJobAPI (#29197)
2576aaa2a96 is described below
commit 2576aaa2a967b7628efb68d276079f1d26839b2b
Author: Liang Zhang <[email protected]>
AuthorDate: Fri Nov 24 15:58:18 2023 +0800
Refactor TransmissionJobAPI (#29197)
---
.../pipeline/core/job/service/TransmissionJobAPI.java | 19 +++++++++----------
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 12 ++++++------
.../data/pipeline/cdc/core/job/CDCJob.java | 2 +-
.../task/ConsistencyCheckTasksRunner.java | 4 ++--
.../pipeline/scenario/migration/MigrationJob.java | 2 +-
.../scenario/migration/api/impl/MigrationJobAPI.java | 16 ++++++++--------
.../migration/api/impl/MigrationJobAPITest.java | 4 ++--
7 files changed, 29 insertions(+), 30 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobAPI.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobAPI.java
index 7a9431b794e..8a1def38509 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobAPI.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/service/TransmissionJobAPI.java
@@ -52,20 +52,20 @@ public interface TransmissionJobAPI extends PipelineJobAPI {
/**
* Build task configuration.
*
- * @param pipelineJobConfig pipeline job configuration
+ * @param jobConfig pipeline job configuration
* @param jobShardingItem job sharding item
- * @param pipelineProcessConfig pipeline process configuration
+ * @param processConfig pipeline process configuration
* @return task configuration
*/
- PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration
pipelineJobConfig, int jobShardingItem, PipelineProcessConfiguration
pipelineProcessConfig);
+ PipelineTaskConfiguration buildTaskConfiguration(PipelineJobConfiguration
jobConfig, int jobShardingItem, PipelineProcessConfiguration processConfig);
/**
- * Build pipeline process context.
+ * Build transmission process context.
*
- * @param pipelineJobConfig pipeline job configuration
- * @return pipeline process context
+ * @param jobConfig pipeline job configuration
+ * @return transmission process context
*/
- TransmissionProcessContext
buildPipelineProcessContext(PipelineJobConfiguration pipelineJobConfig);
+ TransmissionProcessContext buildProcessContext(PipelineJobConfiguration
jobConfig);
/**
* Extend YAML job configuration.
@@ -78,13 +78,12 @@ public interface TransmissionJobAPI extends PipelineJobAPI {
/**
* Build pipeline data consistency checker.
*
- * @param pipelineJobConfig job configuration
+ * @param jobConfig job configuration
* @param processContext process context
* @param progressContext consistency check job item progress context
* @return all logic tables check result
*/
- PipelineDataConsistencyChecker
buildPipelineDataConsistencyChecker(PipelineJobConfiguration pipelineJobConfig,
TransmissionProcessContext processContext,
-
ConsistencyCheckJobItemProgressContext progressContext);
+ PipelineDataConsistencyChecker
buildDataConsistencyChecker(PipelineJobConfiguration jobConfig,
TransmissionProcessContext processContext,
ConsistencyCheckJobItemProgressContext progressContext);
/**
* Commit pipeline job.
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 72d7179bf33..839f61bf204 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -245,11 +245,11 @@ public final class CDCJobAPI implements
TransmissionJobAPI {
}
@Override
- public CDCTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
+ public CDCTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
CDCJobConfiguration jobConfig = (CDCJobConfiguration)
pipelineJobConfig;
TableAndSchemaNameMapper tableAndSchemaNameMapper = new
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
IncrementalDumperContext dumperContext = buildDumperContext(jobConfig,
jobShardingItem, tableAndSchemaNameMapper);
- ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, pipelineProcessConfig,
jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper);
+ ImporterConfiguration importerConfig =
buildImporterConfiguration(jobConfig, processConfig,
jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper);
CDCTaskConfiguration result = new CDCTaskConfiguration(dumperContext,
importerConfig);
log.debug("buildTaskConfiguration, result={}", result);
return result;
@@ -277,9 +277,9 @@ public final class CDCJobAPI implements TransmissionJobAPI {
}
@Override
- public CDCProcessContext buildPipelineProcessContext(final
PipelineJobConfiguration pipelineJobConfig) {
+ public CDCProcessContext buildProcessContext(final
PipelineJobConfiguration jobConfig) {
TransmissionJobManager jobManager = new TransmissionJobManager(this);
- return new CDCProcessContext(pipelineJobConfig.getJobId(),
jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId())));
+ return new CDCProcessContext(jobConfig.getJobId(),
jobManager.showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId())));
}
@Override
@@ -325,8 +325,8 @@ public final class CDCJobAPI implements TransmissionJobAPI {
}
@Override
- public PipelineDataConsistencyChecker
buildPipelineDataConsistencyChecker(final PipelineJobConfiguration
pipelineJobConfig, final TransmissionProcessContext processContext,
-
final ConsistencyCheckJobItemProgressContext progressContext) {
+ public PipelineDataConsistencyChecker buildDataConsistencyChecker(final
PipelineJobConfiguration jobConfig, final TransmissionProcessContext
processContext,
+ final
ConsistencyCheckJobItemProgressContext progressContext) {
throw new UnsupportedOperationException();
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
index 31cde63332d..78d0cd9e4b2 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/core/job/CDCJob.java
@@ -109,7 +109,7 @@ public final class CDCJob extends AbstractPipelineJob
implements SimpleJob {
private CDCJobItemContext buildPipelineJobItemContext(final
CDCJobConfiguration jobConfig, final int shardingItem) {
Optional<TransmissionJobItemProgress> initProgress =
jobItemManager.getProgress(jobConfig.getJobId(), shardingItem);
- CDCProcessContext jobProcessContext =
jobAPI.buildPipelineProcessContext(jobConfig);
+ CDCProcessContext jobProcessContext =
jobAPI.buildProcessContext(jobConfig);
CDCTaskConfiguration taskConfig =
jobAPI.buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
return new CDCJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager,
sink);
}
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
index 58db6fc27c9..291d724dfdb 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/task/ConsistencyCheckTasksRunner.java
@@ -105,8 +105,8 @@ public final class ConsistencyCheckTasksRunner implements
PipelineTasksRunner {
TransmissionJobAPI jobAPI = (TransmissionJobAPI)
TypedSPILoader.getService(PipelineJobAPI.class, jobType.getType());
PipelineJobConfiguration parentJobConfig = new
PipelineJobManager(jobAPI).getJobConfiguration(parentJobId);
try {
- PipelineDataConsistencyChecker checker =
jobAPI.buildPipelineDataConsistencyChecker(
- parentJobConfig,
jobAPI.buildPipelineProcessContext(parentJobConfig),
jobItemContext.getProgressContext());
+ PipelineDataConsistencyChecker checker =
jobAPI.buildDataConsistencyChecker(
+ parentJobConfig,
jobAPI.buildProcessContext(parentJobConfig),
jobItemContext.getProgressContext());
consistencyChecker.set(checker);
Map<String, TableDataConsistencyCheckResult> checkResultMap =
checker.check(checkJobConfig.getAlgorithmTypeName(),
checkJobConfig.getAlgorithmProps());
log.info("job {} with check algorithm '{}' data consistency
checker result: {}, stopping: {}",
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
index fd958c35895..3e46a817d49 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/MigrationJob.java
@@ -63,7 +63,7 @@ public final class MigrationJob extends
AbstractSimplePipelineJob {
int shardingItem = shardingContext.getShardingItem();
MigrationJobConfiguration jobConfig = new
YamlMigrationJobConfigurationSwapper().swapToObject(shardingContext.getJobParameter());
Optional<TransmissionJobItemProgress> initProgress =
jobItemManager.getProgress(shardingContext.getJobName(), shardingItem);
- MigrationProcessContext jobProcessContext =
jobAPI.buildPipelineProcessContext(jobConfig);
+ MigrationProcessContext jobProcessContext =
jobAPI.buildProcessContext(jobConfig);
MigrationTaskConfiguration taskConfig =
jobAPI.buildTaskConfiguration(jobConfig, shardingItem,
jobProcessContext.getPipelineProcessConfig());
return new MigrationJobItemContext(jobConfig, shardingItem,
initProgress.orElse(null), jobProcessContext, taskConfig, dataSourceManager);
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 6d9eb905423..331211fdf2d 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -231,7 +231,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
}
@Override
- public MigrationTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
+ public MigrationTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration processConfig) {
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration)
pipelineJobConfig;
IncrementalDumperContext incrementalDumperContext = new
MigrationIncrementalDumperContextCreator(
jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
@@ -240,7 +240,7 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
Map<CaseInsensitiveIdentifier, Set<String>> shardingColumnsMap = new
ShardingColumnsExtractor().getShardingColumnsMap(
((ShardingSpherePipelineDataSourceConfiguration)
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
ImporterConfiguration importerConfig = buildImporterConfiguration(
- jobConfig, pipelineProcessConfig, shardingColumnsMap,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
+ jobConfig, processConfig, shardingColumnsMap,
incrementalDumperContext.getCommonContext().getTableAndSchemaNameMapper());
MigrationTaskConfiguration result = new MigrationTaskConfiguration(
incrementalDumperContext.getCommonContext().getDataSourceName(),
createTableConfigs, incrementalDumperContext, importerConfig);
log.info("buildTaskConfiguration, result={}", result);
@@ -275,15 +275,15 @@ public final class MigrationJobAPI implements
TransmissionJobAPI {
}
@Override
- public MigrationProcessContext buildPipelineProcessContext(final
PipelineJobConfiguration pipelineJobConfig) {
- PipelineProcessConfiguration processConfig = new
TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(pipelineJobConfig.getJobId()));
- return new MigrationProcessContext(pipelineJobConfig.getJobId(),
processConfig);
+ public MigrationProcessContext buildProcessContext(final
PipelineJobConfiguration jobConfig) {
+ PipelineProcessConfiguration processConfig = new
TransmissionJobManager(this).showProcessConfiguration(PipelineJobIdUtils.parseContextKey(jobConfig.getJobId()));
+ return new MigrationProcessContext(jobConfig.getJobId(),
processConfig);
}
@Override
- public PipelineDataConsistencyChecker
buildPipelineDataConsistencyChecker(final PipelineJobConfiguration
pipelineJobConfig, final TransmissionProcessContext processContext,
-
final ConsistencyCheckJobItemProgressContext progressContext) {
- return new MigrationDataConsistencyChecker((MigrationJobConfiguration)
pipelineJobConfig, processContext, progressContext);
+ public PipelineDataConsistencyChecker buildDataConsistencyChecker(final
PipelineJobConfiguration jobConfig, final TransmissionProcessContext
processContext,
+ final
ConsistencyCheckJobItemProgressContext progressContext) {
+ return new MigrationDataConsistencyChecker((MigrationJobConfiguration)
jobConfig, processContext, progressContext);
}
@Override
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
index 7017e4e41e3..13c3f46d689 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/migration/api/impl/MigrationJobAPITest.java
@@ -184,8 +184,8 @@ class MigrationJobAPITest {
initTableData(jobConfig);
Optional<String> jobId = jobManager.start(jobConfig);
assertTrue(jobId.isPresent());
- Map<String, TableDataConsistencyCheckResult> checkResultMap =
jobAPI.buildPipelineDataConsistencyChecker(
- jobConfig, jobAPI.buildPipelineProcessContext(jobConfig), new
ConsistencyCheckJobItemProgressContext(jobId.get(), 0, "H2")).check("FIXTURE",
null);
+ Map<String, TableDataConsistencyCheckResult> checkResultMap =
jobAPI.buildDataConsistencyChecker(
+ jobConfig, jobAPI.buildProcessContext(jobConfig), new
ConsistencyCheckJobItemProgressContext(jobId.get(), 0, "H2")).check("FIXTURE",
null);
assertThat(checkResultMap.size(), is(1));
String checkKey = "t_order";
assertTrue(checkResultMap.get(checkKey).isMatched());