This is an automated email from the ASF dual-hosted git repository.
zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 50b94625c18 Add ActualAndLogicTableNameMapper (#28941)
50b94625c18 is described below
commit 50b94625c18d3cc254b3262907bb37b6ebf14843
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 4 22:32:45 2023 +0800
Add ActualAndLogicTableNameMapper (#28941)
* Rename TableAndSchemaNameMapperTest
* Move TableAndSchemaNameMapper
* Add ActualAndLogicTableNameMapper
* Add ActualAndLogicTableNameMapper
* Add ActualAndLogicTableNameMapper
* Add ActualAndLogicTableNameMapper
* Refactor DumperCommonContext
---
.../ingest/dumper/context/DumperCommonContext.java | 55 ++--------------------
.../dumper/context/InventoryDumperContext.java | 2 +-
.../ActualAndLogicTableNameMapper.java} | 49 +++----------------
.../context/mapper}/TableAndSchemaNameMapper.java | 2 +-
.../mapper/TableAndSchemaNameMapperTest.java | 52 ++++++++++++++++++++
.../common/config/ImporterConfiguration.java | 2 +-
.../data/pipeline/core/dumper/InventoryDumper.java | 11 ++---
.../preparer/InventoryRecordsCountCalculator.java | 3 +-
.../core/preparer/InventoryTaskSplitter.java | 11 ++---
.../preparer/datasource/DataSourceCheckEngine.java | 2 +-
.../datasource/DataSourceCheckEngineTest.java | 2 +-
.../mysql/ingest/MySQLIncrementalDumper.java | 11 +++--
.../mysql/ingest/MySQLIncrementalDumperTest.java | 7 +--
.../postgresql/ingest/wal/WALEventConverter.java | 9 ++--
.../postgresql/ingest/PostgreSQLWALDumperTest.java | 7 +--
.../ingest/wal/WALEventConverterTest.java | 7 +--
.../data/pipeline/cdc/api/impl/CDCJobAPI.java | 7 +--
.../migration/api/impl/MigrationJobAPI.java | 2 +-
.../MigrationIncrementalDumperContextCreator.java | 5 +-
.../core/importer/PipelineDataSourceSinkTest.java | 2 +-
20 files changed, 111 insertions(+), 137 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
index 4a640fdb7bf..9166f609df9 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
@@ -20,16 +20,13 @@ package
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-
-import java.util.Map;
/**
- * Base dumper context.
+ * Dumper common context.
*/
@Getter
@Setter
@@ -40,53 +37,9 @@ public abstract class DumperCommonContext {
private PipelineDataSourceConfiguration dataSourceConfig;
- private Map<ActualTableName, LogicTableName> tableNameMap;
+ private ActualAndLogicTableNameMapper tableNameMapper;
private TableAndSchemaNameMapper tableAndSchemaNameMapper;
private IngestPosition position;
-
- /**
- * Get logic table name.
- *
- * @param actualTableName actual table name
- * @return logic table name
- */
- public LogicTableName getLogicTableName(final String actualTableName) {
- return tableNameMap.get(new ActualTableName(actualTableName));
- }
-
- private LogicTableName getLogicTableName(final ActualTableName
actualTableName) {
- return tableNameMap.get(actualTableName);
- }
-
- /**
- * Whether contains table.
- *
- * @param actualTableName actual table name
- * @return contains or not
- */
- public boolean containsTable(final String actualTableName) {
- return tableNameMap.containsKey(new ActualTableName(actualTableName));
- }
-
- /**
- * Get schema name.
- *
- * @param logicTableName logic table name
- * @return schema name. nullable
- */
- public String getSchemaName(final LogicTableName logicTableName) {
- return tableAndSchemaNameMapper.getSchemaName(logicTableName);
- }
-
- /**
- * Get schema name.
- *
- * @param actualTableName actual table name
- * @return schema name, can be nullable
- */
- public String getSchemaName(final ActualTableName actualTableName) {
- return
tableAndSchemaNameMapper.getSchemaName(getLogicTableName(actualTableName));
- }
}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
index d72f07c7eaf..e96375d8c6e 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
@@ -54,7 +54,7 @@ public final class InventoryDumperContext extends
DumperCommonContext {
public InventoryDumperContext(final DumperCommonContext dumperContext) {
setDataSourceName(dumperContext.getDataSourceName());
setDataSourceConfig(dumperContext.getDataSourceConfig());
- setTableNameMap(dumperContext.getTableNameMap());
+ setTableNameMapper(dumperContext.getTableNameMapper());
setTableAndSchemaNameMapper(dumperContext.getTableAndSchemaNameMapper());
}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/ActualAndLogicTableNameMapper.java
similarity index 53%
copy from
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
copy to
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/ActualAndLogicTableNameMapper.java
index 4a640fdb7bf..475c1742d5f 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/ActualAndLogicTableNameMapper.java
@@ -15,36 +15,25 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context;
+package
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper;
import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
import lombok.ToString;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
-import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import java.util.Map;
/**
- * Base dumper context.
+ * Actual table name and logic table name mapper.
*/
+@RequiredArgsConstructor
@Getter
-@Setter
-@ToString(exclude = {"dataSourceConfig", "tableAndSchemaNameMapper"})
-public abstract class DumperCommonContext {
+@ToString
+public final class ActualAndLogicTableNameMapper {
- private String dataSourceName;
-
- private PipelineDataSourceConfiguration dataSourceConfig;
-
- private Map<ActualTableName, LogicTableName> tableNameMap;
-
- private TableAndSchemaNameMapper tableAndSchemaNameMapper;
-
- private IngestPosition position;
+ private final Map<ActualTableName, LogicTableName> tableNameMap;
/**
* Get logic table name.
@@ -56,10 +45,6 @@ public abstract class DumperCommonContext {
return tableNameMap.get(new ActualTableName(actualTableName));
}
- private LogicTableName getLogicTableName(final ActualTableName
actualTableName) {
- return tableNameMap.get(actualTableName);
- }
-
/**
* Whether contains table.
*
@@ -69,24 +54,4 @@ public abstract class DumperCommonContext {
public boolean containsTable(final String actualTableName) {
return tableNameMap.containsKey(new ActualTableName(actualTableName));
}
-
- /**
- * Get schema name.
- *
- * @param logicTableName logic table name
- * @return schema name. nullable
- */
- public String getSchemaName(final LogicTableName logicTableName) {
- return tableAndSchemaNameMapper.getSchemaName(logicTableName);
- }
-
- /**
- * Get schema name.
- *
- * @param actualTableName actual table name
- * @return schema name, can be nullable
- */
- public String getSchemaName(final ActualTableName actualTableName) {
- return
tableAndSchemaNameMapper.getSchemaName(getLogicTableName(actualTableName));
- }
}
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapper.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java
similarity index 97%
rename from
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapper.java
rename to
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java
index 3999329ae63..329a24d3a1e 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapper.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.api.context;
+package
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper;
import lombok.ToString;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
diff --git
a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapperTest.java
b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapperTest.java
new file mode 100644
index 00000000000..465ee71c7ea
--- /dev/null
+++
b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapperTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+class TableAndSchemaNameMapperTest {
+
+ @Test
+ void assertConstructFromNull() {
+ assertDoesNotThrow(() -> new TableAndSchemaNameMapper((Map<String,
String>) null));
+ }
+
+ @Test
+ void assertConstructFromValueNullMap() {
+ assertNull(new
TableAndSchemaNameMapper(Collections.singletonMap("t_order",
null)).getSchemaName("t_order"));
+ }
+
+ @Test
+ void assertConstructFromMap() {
+ assertThat(new
TableAndSchemaNameMapper(Collections.singletonMap("t_order",
"public")).getSchemaName("t_order"), is("public"));
+ }
+
+ @Test
+ void assertConstructFromCollection() {
+ assertThat(new
TableAndSchemaNameMapper(Arrays.asList("public.t_order",
"t_order_item")).getSchemaName("t_order"), is("public"));
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java
index 0116904291a..1438d942098 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.common.config;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
index a8fb1156a7c..b022f32de72 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
@@ -21,17 +21,16 @@ import com.google.common.base.Strings;
import lombok.AccessLevel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
@@ -46,8 +45,8 @@ import
org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import javax.sql.DataSource;
@@ -102,7 +101,8 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
log.info("Ignored because of already finished.");
return;
}
- PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new
LogicTableName(dumperContext.getLogicTableName())),
dumperContext.getActualTableName());
+ PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(
+
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()),
dumperContext.getActualTableName());
try (Connection connection = dataSource.getConnection()) {
dump(tableMetaData, connection);
} catch (final SQLException ex) {
@@ -156,8 +156,7 @@ public final class InventoryDumper extends
AbstractLifecycleExecutor implements
if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL())) {
return dumperContext.getQuerySQL();
}
- LogicTableName logicTableName = new
LogicTableName(dumperContext.getLogicTableName());
- String schemaName = dumperContext.getSchemaName(logicTableName);
+ String schemaName =
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
if (!dumperContext.hasUniqueKey()) {
return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName,
dumperContext.getActualTableName());
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
index 6656cdcf9a2..f1e46de7bee 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
@@ -21,7 +21,6 @@ import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
@@ -52,7 +51,7 @@ public final class InventoryRecordsCountCalculator {
* @throws SplitPipelineJobByUniqueKeyException if there's exception from
database
*/
public static long getTableRecordsCount(final InventoryDumperContext
dumperContext, final PipelineDataSourceWrapper dataSource) {
- String schemaName = dumperContext.getSchemaName(new
LogicTableName(dumperContext.getLogicTableName()));
+ String schemaName =
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
String actualTableName = dumperContext.getActualTableName();
PipelineCommonSQLBuilder pipelineSQLBuilder = new
PipelineCommonSQLBuilder(dataSource.getDatabaseType());
Optional<String> sql =
pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
index 2f1c6954860..97159392918 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
@@ -20,19 +20,18 @@ package
org.apache.shardingsphere.data.pipeline.core.preparer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.Range;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineReadConfiguration;
import
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext;
import
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext;
import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
-import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
+import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.StringPrimaryKeyPosition;
import
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.UnsupportedKeyPosition;
import
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
@@ -111,7 +110,7 @@ public final class InventoryTaskSplitter {
private Collection<InventoryDumperContext> splitByTable(final
InventoryDumperContext dumperContext) {
Collection<InventoryDumperContext> result = new LinkedList<>();
- dumperContext.getTableNameMap().forEach((key, value) -> {
+ dumperContext.getTableNameMapper().getTableNameMap().forEach((key,
value) -> {
InventoryDumperContext inventoryDumperContext = new
InventoryDumperContext(dumperContext);
// use original table name, for metadata loader, since some
database table name case-sensitive
inventoryDumperContext.setActualTableName(key.getOriginal());
@@ -127,7 +126,7 @@ public final class InventoryTaskSplitter {
private Collection<InventoryDumperContext> splitByPrimaryKey(final
InventoryDumperContext dumperContext, final InventoryIncrementalJobItemContext
jobItemContext,
final
PipelineDataSourceWrapper dataSource) {
if (null == dumperContext.getUniqueKeyColumns()) {
- String schemaName = dumperContext.getSchemaName(new
LogicTableName(dumperContext.getLogicTableName()));
+ String schemaName =
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
String actualTableName = dumperContext.getActualTableName();
List<PipelineColumnMetaData> uniqueKeyColumns =
PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, actualTableName,
jobItemContext.getSourceMetaDataLoader());
dumperContext.setUniqueKeyColumns(uniqueKeyColumns);
@@ -205,7 +204,7 @@ public final class InventoryTaskSplitter {
String uniqueKey =
dumperContext.getUniqueKeyColumns().get(0).getName();
PipelineCommonSQLBuilder pipelineSQLBuilder = new
PipelineCommonSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType());
String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(
- dumperContext.getSchemaName(new
LogicTableName(dumperContext.getLogicTableName())),
dumperContext.getActualTableName(), uniqueKey);
+
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()),
dumperContext.getActualTableName(), uniqueKey);
try (
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement();
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
index 7b76c512c85..d801148321c 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
index d74115d295d..0518f428115 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 84cf56db027..6d50d4950a0 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -19,18 +19,18 @@ package
org.apache.shardingsphere.data.pipeline.mysql.ingest;
import com.google.common.base.Preconditions;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
@@ -132,7 +132,7 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
return Collections.singletonList(createPlaceholderRecord(event));
}
AbstractRowsEvent rowsEvent = (AbstractRowsEvent) event;
- if (!rowsEvent.getDatabaseName().equals(catalog) ||
!dumperContext.containsTable(rowsEvent.getTableName())) {
+ if (!rowsEvent.getDatabaseName().equals(catalog) ||
!dumperContext.getTableNameMapper().containsTable(rowsEvent.getTableName())) {
return Collections.singletonList(createPlaceholderRecord(event));
}
PipelineTableMetaData tableMetaData =
getPipelineTableMetaData(rowsEvent.getTableName());
@@ -155,7 +155,8 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
}
private PipelineTableMetaData getPipelineTableMetaData(final String
actualTableName) {
- return metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new
ActualTableName(actualTableName)), actualTableName);
+ LogicTableName logicTableName =
dumperContext.getTableNameMapper().getLogicTableName(actualTableName);
+ return
metaDataLoader.getTableMetaData(dumperContext.getTableAndSchemaNameMapper().getSchemaName(logicTableName),
actualTableName);
}
private List<DataRecord> handleWriteRowsEvent(final WriteRowsEvent event,
final PipelineTableMetaData tableMetaData) {
@@ -216,7 +217,7 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
}
private DataRecord createDataRecord(final String type, final
AbstractRowsEvent rowsEvent, final int columnCount) {
- String tableName =
dumperContext.getLogicTableName(rowsEvent.getTableName()).getOriginal();
+ String tableName =
dumperContext.getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal();
IngestPosition position = new BinlogPosition(rowsEvent.getFileName(),
rowsEvent.getPosition(), rowsEvent.getServerId());
DataRecord result = new DataRecord(type, tableName, position,
columnCount);
result.setActualTableName(rowsEvent.getTableName());
diff --git
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 25bbaa33ff1..7d6ee11ad2f 100644
---
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -17,9 +17,10 @@
package org.apache.shardingsphere.data.pipeline.mysql.ingest;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
@@ -91,7 +92,7 @@ class MySQLIncrementalDumperTest {
private IncrementalDumperContext createDumperContext() {
IncrementalDumperContext result = new IncrementalDumperContext();
result.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL",
"root", "root"));
- result.setTableNameMap(Collections.singletonMap(new
ActualTableName("t_order"), new LogicTableName("t_order")));
+ result.setTableNameMapper(new
ActualAndLogicTableNameMapper(Collections.singletonMap(new
ActualTableName("t_order"), new LogicTableName("t_order"))));
result.setTableAndSchemaNameMapper(new
TableAndSchemaNameMapper(Collections.emptyMap()));
return result;
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index adfdf059666..cc2dca00d86 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -22,7 +22,7 @@ import
org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
@@ -80,7 +80,7 @@ public final class WALEventConverter {
private boolean filter(final AbstractWALEvent event) {
if (event instanceof AbstractRowEvent) {
AbstractRowEvent rowEvent = (AbstractRowEvent) event;
- return !dumperContext.containsTable(rowEvent.getTableName());
+ return
!dumperContext.getTableNameMapper().containsTable(rowEvent.getTableName());
}
return false;
}
@@ -90,7 +90,8 @@ public final class WALEventConverter {
}
private PipelineTableMetaData getPipelineTableMetaData(final String
actualTableName) {
- return metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new
ActualTableName(actualTableName)), actualTableName);
+ LogicTableName logicTableName =
dumperContext.getTableNameMapper().getLogicTableName(actualTableName);
+ return
metaDataLoader.getTableMetaData(dumperContext.getTableAndSchemaNameMapper().getSchemaName(logicTableName),
actualTableName);
}
private DataRecord handleWriteRowEvent(final WriteRowEvent writeRowEvent,
final PipelineTableMetaData tableMetaData) {
@@ -117,7 +118,7 @@ public final class WALEventConverter {
}
private DataRecord createDataRecord(final String type, final
AbstractRowEvent rowsEvent, final int columnCount) {
- String tableName =
dumperContext.getLogicTableName(rowsEvent.getTableName()).getOriginal();
+ String tableName =
dumperContext.getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal();
DataRecord result = new DataRecord(type, rowsEvent.getSchemaName(),
tableName, new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount);
result.setActualTableName(rowsEvent.getTableName());
result.setCsn(rowsEvent.getCsn());
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index e59c6a0f353..6cbb3233bac 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -17,9 +17,10 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
@@ -107,7 +108,7 @@ class PostgreSQLWALDumperTest {
IncrementalDumperContext result = new IncrementalDumperContext();
result.setJobId("0101123456");
result.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration(jdbcUrl, username, password));
- result.setTableNameMap(Collections.singletonMap(new
ActualTableName("t_order_0"), new LogicTableName("t_order")));
+ result.setTableNameMapper(new
ActualAndLogicTableNameMapper(Collections.singletonMap(new
ActualTableName("t_order_0"), new LogicTableName("t_order"))));
result.setTableAndSchemaNameMapper(new
TableAndSchemaNameMapper(Collections.emptyMap()));
return result;
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 93a4e721074..15895cd370b 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -17,9 +17,10 @@
package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
@@ -86,7 +87,7 @@ class WALEventConverterTest {
private IncrementalDumperContext mockDumperContext() {
IncrementalDumperContext result = new IncrementalDumperContext();
result.setDataSourceConfig(new
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL",
"root", "root"));
- result.setTableNameMap(Collections.singletonMap(new
ActualTableName("t_order"), new LogicTableName("t_order")));
+ result.setTableNameMapper(new
ActualAndLogicTableNameMapper(Collections.singletonMap(new
ActualTableName("t_order"), new LogicTableName("t_order"))));
result.setTableAndSchemaNameMapper(new
TableAndSchemaNameMapper(Collections.emptyMap()));
return result;
}
diff --git
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index d6015da0a5e..85f2aea4206 100644
---
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -21,13 +21,14 @@ import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
-import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
@@ -285,7 +286,7 @@ public final class CDCJobAPI extends
AbstractInventoryIncrementalJobAPIImpl {
result.setJobId(jobConfig.getJobId());
result.setDataSourceName(dataSourceName);
result.setDataSourceConfig(actualDataSourceConfig);
- result.setTableNameMap(tableNameMap);
+ result.setTableNameMapper(new
ActualAndLogicTableNameMapper(tableNameMap));
result.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
result.setDecodeWithTX(jobConfig.isDecodeWithTX());
return result;
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 218d70427d9..0be975696ed 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -19,7 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.codec.digest.DigestUtils;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
index 06ee39a408b..659b15995eb 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
@@ -18,7 +18,8 @@
package
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest;
import lombok.RequiredArgsConstructor;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
@@ -52,7 +53,7 @@ public final class MigrationIncrementalDumperContextCreator
implements Increment
result.setJobId(jobId);
result.setDataSourceName(dataSourceName);
result.setDataSourceConfig(sourceDataSource);
- result.setTableNameMap(tableNameMap);
+ result.setTableNameMapper(new
ActualAndLogicTableNameMapper(tableNameMap));
result.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
return result;
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
index 487df1e7d48..3fc0ace7fe6 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
@@ -17,7 +17,7 @@
package org.apache.shardingsphere.test.it.data.pipeline.core.importer;
-import
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
+import
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;