This is an automated email from the ASF dual-hosted git repository.
duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 704277558e5 Refactor JobDataNodeLineConvertUtils (#28983)
704277558e5 is described below
commit 704277558e56e835268c5bbd0d87f31ad900bf19
Author: Liang Zhang <[email protected]>
AuthorDate: Wed Nov 8 10:48:11 2023 +0800
Refactor JobDataNodeLineConvertUtils (#28983)
* Refactor JobDataNodeLineConvertUtils
* Refactor JobDataNodeLineConvertUtils
---
.../common/datanode/JobDataNodeLineConvertUtils.java | 13 +++++++------
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 7 +------
.../ingest/MigrationIncrementalDumperContextCreator.java | 2 +-
3 files changed, 9 insertions(+), 13 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java
index 26e3ef3df33..40f077f5d13 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java
@@ -21,6 +21,7 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.tuple.Pair;
import
org.apache.shardingsphere.data.pipeline.common.metadata.CaseInsensitiveIdentifier;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
import org.apache.shardingsphere.infra.datanode.DataNode;
import java.util.LinkedHashMap;
@@ -68,18 +69,18 @@ public final class JobDataNodeLineConvertUtils {
}
/**
- * Build table name map.
+ * Build actual and logic table name mapper.
*
* @param dataNodeLine data node line
- * @return actual table and logic table map
+ * @return actual and logic table name mapper
*/
- public static Map<CaseInsensitiveIdentifier, CaseInsensitiveIdentifier>
buildTableNameMap(final JobDataNodeLine dataNodeLine) {
- Map<CaseInsensitiveIdentifier, CaseInsensitiveIdentifier> result = new
LinkedHashMap<>();
+ public static ActualAndLogicTableNameMapper buildTableNameMapper(final
JobDataNodeLine dataNodeLine) {
+ Map<CaseInsensitiveIdentifier, CaseInsensitiveIdentifier> map = new
LinkedHashMap<>();
for (JobDataNodeEntry each : dataNodeLine.getEntries()) {
for (DataNode dataNode : each.getDataNodes()) {
- result.put(new
CaseInsensitiveIdentifier(dataNode.getTableName()), new
CaseInsensitiveIdentifier(each.getLogicTableName()));
+ map.put(new
CaseInsensitiveIdentifier(dataNode.getTableName()), new
CaseInsensitiveIdentifier(each.getLogicTableName()));
}
}
- return result;
+ return new ActualAndLogicTableNameMapper(map);
}
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index c81675d26da..ec1bd493e28 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -68,7 +68,6 @@ import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWith
import org.apache.shardingsphere.data.pipeline.core.importer.sink.PipelineSink;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.DumperCommonContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.IncrementalDumperContext;
-import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobCenter;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
@@ -89,7 +88,6 @@ import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.util.Collection;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -280,11 +278,8 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
JobDataNodeLine dataNodeLine =
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
String dataSourceName =
dataNodeLine.getEntries().iterator().next().getDataNodes().iterator().next().getDataSourceName();
StandardPipelineDataSourceConfiguration actualDataSourceConfig =
jobConfig.getDataSourceConfig().getActualDataSourceConfiguration(dataSourceName);
- Map<CaseInsensitiveIdentifier, CaseInsensitiveIdentifier> tableNameMap
= new LinkedHashMap<>();
- dataNodeLine.getEntries()
- .forEach(each -> each.getDataNodes().forEach(node ->
tableNameMap.put(new CaseInsensitiveIdentifier(node.getTableName()), new
CaseInsensitiveIdentifier(each.getLogicTableName()))));
return new IncrementalDumperContext(
- new DumperCommonContext(dataSourceName,
actualDataSourceConfig, new ActualAndLogicTableNameMapper(tableNameMap),
tableAndSchemaNameMapper),
+ new DumperCommonContext(dataSourceName,
actualDataSourceConfig,
JobDataNodeLineConvertUtils.buildTableNameMapper(dataNodeLine),
tableAndSchemaNameMapper),
jobConfig.getJobId(), jobConfig.isDecodeWithTX());
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
index 1c4bc71d2e0..e24a578120a 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
@@ -38,7 +38,7 @@ public final class MigrationIncrementalDumperContextCreator
implements Increment
@Override
public IncrementalDumperContext createDumperContext(final JobDataNodeLine
jobDataNodeLine) {
String dataSourceName =
jobDataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName();
- ActualAndLogicTableNameMapper tableNameMapper = new
ActualAndLogicTableNameMapper(JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine));
+ ActualAndLogicTableNameMapper tableNameMapper =
JobDataNodeLineConvertUtils.buildTableNameMapper(jobDataNodeLine);
TableAndSchemaNameMapper tableAndSchemaNameMapper = new
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
return new IncrementalDumperContext(
new DumperCommonContext(dataSourceName,
jobConfig.getSources().get(dataSourceName), tableNameMapper,
tableAndSchemaNameMapper), jobConfig.getJobId(), false);