This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new dbf81f2fd73 Rename TableNameSchemaNameMapping to 
TableAndSchemaNameMapper (#28939)
dbf81f2fd73 is described below

commit dbf81f2fd738060246e75036ed524b6ac172112e
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 4 19:00:57 2023 +0800

    Rename TableNameSchemaNameMapping to TableAndSchemaNameMapper (#28939)
    
    * Refactor ImporterConfiguration.findSchemaName()
    
    * Add more constructor on TableNameSchemaNameMapping
    
    * Rename TableNameSchemaNameMapping to TableAndSchemaNameMapper
---
 ...eMapping.java => TableAndSchemaNameMapper.java} | 13 +++++++---
 .../ingest/dumper/context/DumperCommonContext.java | 10 ++++----
 .../dumper/context/InventoryDumperContext.java     |  2 +-
 .../context/TableNameSchemaNameMappingTest.java    | 13 +++++++---
 .../common/config/ImporterConfiguration.java       | 15 +++++------
 .../core/importer/sink/PipelineDataSourceSink.java | 11 +++-----
 .../core/preparer/PipelineJobPreparerUtils.java    |  2 +-
 .../preparer/datasource/DataSourceCheckEngine.java |  8 +++---
 .../datasource/DataSourceCheckEngineTest.java      |  6 ++---
 .../mysql/ingest/MySQLIncrementalDumperTest.java   |  4 +--
 .../postgresql/ingest/PostgreSQLWALDumperTest.java |  4 +--
 .../ingest/wal/WALEventConverterTest.java          |  4 +--
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      | 29 +++++++---------------
 .../migration/api/impl/MigrationJobAPI.java        | 16 ++++++------
 .../MigrationIncrementalDumperContextCreator.java  | 10 ++++----
 .../core/importer/PipelineDataSourceSinkTest.java  |  4 +--
 16 files changed, 75 insertions(+), 76 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMapping.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapper.java
similarity index 80%
rename from 
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMapping.java
rename to 
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapper.java
index ea56984a4b2..3999329ae63 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMapping.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapper.java
@@ -20,23 +20,30 @@ package org.apache.shardingsphere.data.pipeline.api.context;
 import lombok.ToString;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.stream.Collectors;
 
 /**
- * Table name and schema name mapping.
+ * Table and schema name mapper.
  */
 @ToString
-public final class TableNameSchemaNameMapping {
+public final class TableAndSchemaNameMapper {
     
     private final Map<LogicTableName, String> mapping;
     
-    public TableNameSchemaNameMapping(final Map<String, String> 
tableSchemaMap) {
+    public TableAndSchemaNameMapper(final Map<String, String> tableSchemaMap) {
         mapping = null == tableSchemaMap ? Collections.emptyMap() : 
getLogicTableNameMap(tableSchemaMap);
     }
     
+    public TableAndSchemaNameMapper(final Collection<String> tableNames) {
+        Map<String, String> tableNameSchemaMap = tableNames.stream().map(each 
-> each.split("\\.")).filter(split -> split.length > 
1).collect(Collectors.toMap(split -> split[1], split -> split[0]));
+        mapping = getLogicTableNameMap(tableNameSchemaMap);
+    }
+    
     private Map<LogicTableName, String> getLogicTableNameMap(final Map<String, 
String> tableSchemaMap) {
         Map<LogicTableName, String> result = new 
HashMap<>(tableSchemaMap.size(), 1F);
         for (Entry<String, String> entry : tableSchemaMap.entrySet()) {
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
index 69491ddcc9a..4a640fdb7bf 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
@@ -20,7 +20,7 @@ package 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
@@ -33,7 +33,7 @@ import java.util.Map;
  */
 @Getter
 @Setter
-@ToString(exclude = {"dataSourceConfig", "tableNameSchemaNameMapping"})
+@ToString(exclude = {"dataSourceConfig", "tableAndSchemaNameMapper"})
 public abstract class DumperCommonContext {
     
     private String dataSourceName;
@@ -42,7 +42,7 @@ public abstract class DumperCommonContext {
     
     private Map<ActualTableName, LogicTableName> tableNameMap;
     
-    private TableNameSchemaNameMapping tableNameSchemaNameMapping;
+    private TableAndSchemaNameMapper tableAndSchemaNameMapper;
     
     private IngestPosition position;
     
@@ -77,7 +77,7 @@ public abstract class DumperCommonContext {
      * @return schema name. nullable
      */
     public String getSchemaName(final LogicTableName logicTableName) {
-        return tableNameSchemaNameMapping.getSchemaName(logicTableName);
+        return tableAndSchemaNameMapper.getSchemaName(logicTableName);
     }
     
     /**
@@ -87,6 +87,6 @@ public abstract class DumperCommonContext {
      * @return schema name, can be nullable 
      */
     public String getSchemaName(final ActualTableName actualTableName) {
-        return 
tableNameSchemaNameMapping.getSchemaName(getLogicTableName(actualTableName));
+        return 
tableAndSchemaNameMapper.getSchemaName(getLogicTableName(actualTableName));
     }
 }
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
index 45eaf8e1ebe..d72f07c7eaf 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
@@ -55,7 +55,7 @@ public final class InventoryDumperContext extends 
DumperCommonContext {
         setDataSourceName(dumperContext.getDataSourceName());
         setDataSourceConfig(dumperContext.getDataSourceConfig());
         setTableNameMap(dumperContext.getTableNameMap());
-        
setTableNameSchemaNameMapping(dumperContext.getTableNameSchemaNameMapping());
+        
setTableAndSchemaNameMapper(dumperContext.getTableAndSchemaNameMapper());
     }
     
     /**
diff --git 
a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java
 
b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java
index a640ee35df0..9eb21b6148c 100644
--- 
a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java
+++ 
b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/context/TableNameSchemaNameMappingTest.java
@@ -19,7 +19,9 @@ package org.apache.shardingsphere.data.pipeline.api.context;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.Map;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -30,16 +32,21 @@ class TableNameSchemaNameMappingTest {
     
     @Test
     void assertConstructFromNull() {
-        assertDoesNotThrow(() -> new TableNameSchemaNameMapping(null));
+        assertDoesNotThrow(() -> new TableAndSchemaNameMapper((Map<String, 
String>) null));
     }
     
     @Test
     void assertConstructFromValueNullMap() {
-        assertNull(new 
TableNameSchemaNameMapping(Collections.singletonMap("t_order", 
null)).getSchemaName("t_order"));
+        assertNull(new 
TableAndSchemaNameMapper(Collections.singletonMap("t_order", 
null)).getSchemaName("t_order"));
     }
     
     @Test
     void assertConstructFromMap() {
-        assertThat(new 
TableNameSchemaNameMapping(Collections.singletonMap("t_order", 
"public")).getSchemaName("t_order"), is("public"));
+        assertThat(new 
TableAndSchemaNameMapper(Collections.singletonMap("t_order", 
"public")).getSchemaName("t_order"), is("public"));
+    }
+    
+    @Test
+    void assertConstructFromCollection() {
+        assertThat(new 
TableAndSchemaNameMapper(Arrays.asList("public.t_order", 
"t_order_item")).getSchemaName("t_order"), is("public"));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java
index a92bce38304..0116904291a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.common.config;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
@@ -30,6 +30,7 @@ import 
org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -38,7 +39,7 @@ import java.util.stream.Collectors;
  */
 @RequiredArgsConstructor
 @Getter
-@ToString(exclude = {"dataSourceConfig", "tableNameSchemaNameMapping"})
+@ToString(exclude = {"dataSourceConfig", "tableAndSchemaNameMapper"})
 public final class ImporterConfiguration {
     
     private final PipelineDataSourceConfiguration dataSourceConfig;
@@ -46,7 +47,7 @@ public final class ImporterConfiguration {
     // TODO columnName case-insensitive?
     private final Map<LogicTableName, Set<String>> shardingColumnsMap;
     
-    private final TableNameSchemaNameMapping tableNameSchemaNameMapping;
+    private final TableAndSchemaNameMapper tableAndSchemaNameMapper;
     
     private final int batchSize;
     
@@ -76,13 +77,13 @@ public final class ImporterConfiguration {
     }
     
     /**
-     * Get schema name.
+     * Find schema name.
      *
      * @param logicTableName logic table name
-     * @return schema name. nullable
+     * @return schema name
      */
-    public String getSchemaName(final LogicTableName logicTableName) {
+    public Optional<String> findSchemaName(final String logicTableName) {
         DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(dataSourceConfig.getDatabaseType()).getDialectDatabaseMetaData();
-        return dialectDatabaseMetaData.isSchemaAvailable() ? 
tableNameSchemaNameMapping.getSchemaName(logicTableName) : null;
+        return dialectDatabaseMetaData.isSchemaAvailable() ? 
Optional.of(tableAndSchemaNameMapper.getSchemaName(logicTableName)) : 
Optional.empty();
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
index 0995292e0db..fe3134354bd 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/sink/PipelineDataSourceSink.java
@@ -26,7 +26,6 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.GroupedDataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.IngestDataChangeType;
@@ -169,7 +168,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
     
     private void executeBatchInsert(final Connection connection, final 
List<DataRecord> dataRecords) throws SQLException {
         DataRecord dataRecord = dataRecords.get(0);
-        String insertSql = 
importSQLBuilder.buildInsertSQL(getSchemaName(dataRecord.getTableName()), 
dataRecord);
+        String insertSql = 
importSQLBuilder.buildInsertSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null),
 dataRecord);
         try (PreparedStatement preparedStatement = 
connection.prepareStatement(insertSql)) {
             batchInsertStatement.set(preparedStatement);
             preparedStatement.setQueryTimeout(30);
@@ -185,10 +184,6 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
         }
     }
     
-    private String getSchemaName(final String logicTableName) {
-        return getImporterConfig().getSchemaName(new 
LogicTableName(logicTableName));
-    }
-    
     private void executeUpdate(final Connection connection, final 
List<DataRecord> dataRecords) throws SQLException {
         for (DataRecord each : dataRecords) {
             executeUpdate(connection, each);
@@ -199,7 +194,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
         Set<String> shardingColumns = 
importerConfig.getShardingColumns(dataRecord.getTableName());
         List<Column> conditionColumns = 
RecordUtils.extractConditionColumns(dataRecord, shardingColumns);
         List<Column> setColumns = 
dataRecord.getColumns().stream().filter(Column::isUpdated).collect(Collectors.toList());
-        String updateSql = 
importSQLBuilder.buildUpdateSQL(getSchemaName(dataRecord.getTableName()), 
dataRecord, conditionColumns);
+        String updateSql = 
importSQLBuilder.buildUpdateSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null),
 dataRecord, conditionColumns);
         try (PreparedStatement preparedStatement = 
connection.prepareStatement(updateSql)) {
             updateStatement.set(preparedStatement);
             for (int i = 0; i < setColumns.size(); i++) {
@@ -226,7 +221,7 @@ public final class PipelineDataSourceSink implements 
PipelineSink {
     
     private void executeBatchDelete(final Connection connection, final 
List<DataRecord> dataRecords) throws SQLException {
         DataRecord dataRecord = dataRecords.get(0);
-        String deleteSQL = 
importSQLBuilder.buildDeleteSQL(getSchemaName(dataRecord.getTableName()), 
dataRecord,
+        String deleteSQL = 
importSQLBuilder.buildDeleteSQL(getImporterConfig().findSchemaName(dataRecord.getTableName()).orElse(null),
 dataRecord,
                 RecordUtils.extractConditionColumns(dataRecord, 
importerConfig.getShardingColumns(dataRecord.getTableName())));
         try (PreparedStatement preparedStatement = 
connection.prepareStatement(deleteSQL)) {
             batchDeleteStatement.set(preparedStatement);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
index e649878a0c7..2f38ea4c72c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/PipelineJobPreparerUtils.java
@@ -166,7 +166,7 @@ public final class PipelineJobPreparerUtils {
         }
         DataSourceCheckEngine dataSourceCheckEngine = new 
DataSourceCheckEngine(databaseType);
         dataSourceCheckEngine.checkConnection(targetDataSources);
-        dataSourceCheckEngine.checkTargetTable(targetDataSources, 
importerConfig.getTableNameSchemaNameMapping(), 
importerConfig.getLogicTableNames());
+        dataSourceCheckEngine.checkTargetTable(targetDataSources, 
importerConfig.getTableAndSchemaNameMapper(), 
importerConfig.getLogicTableNames());
     }
     
     /**
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
index f58f89bb8f4..7b76c512c85 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
 
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
@@ -66,17 +66,17 @@ public final class DataSourceCheckEngine {
      * Check table is empty.
      *
      * @param dataSources data sources
-     * @param tableNameSchemaNameMapping mapping
+     * @param tableAndSchemaNameMapper mapping
      * @param logicTableNames logic table names
      * @throws PrepareJobWithInvalidConnectionException prepare job with 
invalid connection exception
      */
     // TODO rename to common usage name
     // TODO Merge schemaName and tableNames
-    public void checkTargetTable(final Collection<? extends DataSource> 
dataSources, final TableNameSchemaNameMapping tableNameSchemaNameMapping, final 
Collection<String> logicTableNames) {
+    public void checkTargetTable(final Collection<? extends DataSource> 
dataSources, final TableAndSchemaNameMapper tableAndSchemaNameMapper, final 
Collection<String> logicTableNames) {
         try {
             for (DataSource each : dataSources) {
                 for (String tableName : logicTableNames) {
-                    if (!checkEmpty(each, 
tableNameSchemaNameMapping.getSchemaName(tableName), tableName)) {
+                    if (!checkEmpty(each, 
tableAndSchemaNameMapper.getSchemaName(tableName), tableName)) {
                         throw new 
PrepareJobWithTargetTableNotEmptyException(tableName);
                     }
                 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
index 6922b6642ae..d74115d295d 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
 
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
@@ -85,7 +85,7 @@ class DataSourceCheckEngineTest {
         when(dataSource.getConnection()).thenReturn(connection);
         when(connection.prepareStatement("SELECT * FROM t_order LIMIT 
1")).thenReturn(preparedStatement);
         when(preparedStatement.executeQuery()).thenReturn(resultSet);
-        dataSourceCheckEngine.checkTargetTable(dataSources, new 
TableNameSchemaNameMapping(Collections.emptyMap()), 
Collections.singletonList("t_order"));
+        dataSourceCheckEngine.checkTargetTable(dataSources, new 
TableAndSchemaNameMapper(Collections.emptyMap()), 
Collections.singletonList("t_order"));
     }
     
     @Test
@@ -95,6 +95,6 @@ class DataSourceCheckEngineTest {
         when(preparedStatement.executeQuery()).thenReturn(resultSet);
         when(resultSet.next()).thenReturn(true);
         assertThrows(PrepareJobWithTargetTableNotEmptyException.class,
-                () -> dataSourceCheckEngine.checkTargetTable(dataSources, new 
TableNameSchemaNameMapping(Collections.emptyMap()), 
Collections.singletonList("t_order")));
+                () -> dataSourceCheckEngine.checkTargetTable(dataSources, new 
TableAndSchemaNameMapper(Collections.emptyMap()), 
Collections.singletonList("t_order")));
     }
 }
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 876dd1beacf..25bbaa33ff1 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.mysql.ingest;
 
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
@@ -92,7 +92,7 @@ class MySQLIncrementalDumperTest {
         IncrementalDumperContext result = new IncrementalDumperContext();
         result.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL",
 "root", "root"));
         result.setTableNameMap(Collections.singletonMap(new 
ActualTableName("t_order"), new LogicTableName("t_order")));
-        result.setTableNameSchemaNameMapping(new 
TableNameSchemaNameMapping(Collections.emptyMap()));
+        result.setTableAndSchemaNameMapper(new 
TableAndSchemaNameMapper(Collections.emptyMap()));
         return result;
     }
     
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index c177dbb5963..e59c6a0f353 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
@@ -108,7 +108,7 @@ class PostgreSQLWALDumperTest {
         result.setJobId("0101123456");
         result.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration(jdbcUrl, username, password));
         result.setTableNameMap(Collections.singletonMap(new 
ActualTableName("t_order_0"), new LogicTableName("t_order")));
-        result.setTableNameSchemaNameMapping(new 
TableNameSchemaNameMapping(Collections.emptyMap()));
+        result.setTableAndSchemaNameMapper(new 
TableAndSchemaNameMapper(Collections.emptyMap()));
         return result;
     }
     
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 4d5f9d35ffd..93a4e721074 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
 
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
@@ -87,7 +87,7 @@ class WALEventConverterTest {
         IncrementalDumperContext result = new IncrementalDumperContext();
         result.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL",
 "root", "root"));
         result.setTableNameMap(Collections.singletonMap(new 
ActualTableName("t_order"), new LogicTableName("t_order")));
-        result.setTableNameSchemaNameMapping(new 
TableNameSchemaNameMapping(Collections.emptyMap()));
+        result.setTableAndSchemaNameMapper(new 
TableAndSchemaNameMapper(Collections.emptyMap()));
         return result;
     }
     
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index 23035491e14..d6015da0a5e 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -21,7 +21,7 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.digest.DigestUtils;
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
@@ -177,7 +177,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
                 if (getJobItemProgress(jobId, i).isPresent()) {
                     continue;
                 }
-                IncrementalDumperContext dumperContext = 
buildDumperContext(jobConfig, i, 
getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames()));
+                IncrementalDumperContext dumperContext = 
buildDumperContext(jobConfig, i, new 
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames()));
                 InventoryIncrementalJobItemProgress jobItemProgress = 
getInventoryIncrementalJobItemProgress(jobConfig, pipelineDataSourceManager, 
dumperContext);
                 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineJobIdUtils.parseContextKey(jobId)).persistJobItemProgress(
                         jobId, i, 
YamlEngine.marshal(getJobItemProgressSwapper().swapToYamlConfiguration(jobItemProgress)));
@@ -267,26 +267,15 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
     @Override
     public CDCTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
         CDCJobConfiguration jobConfig = (CDCJobConfiguration) 
pipelineJobConfig;
-        TableNameSchemaNameMapping tableNameSchemaNameMapping = 
getTableNameSchemaNameMapping(jobConfig.getSchemaTableNames());
-        IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, 
jobShardingItem, tableNameSchemaNameMapping);
-        ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, pipelineProcessConfig, 
jobConfig.getSchemaTableNames(), tableNameSchemaNameMapping);
+        TableAndSchemaNameMapper tableAndSchemaNameMapper = new 
TableAndSchemaNameMapper(jobConfig.getSchemaTableNames());
+        IncrementalDumperContext dumperContext = buildDumperContext(jobConfig, 
jobShardingItem, tableAndSchemaNameMapper);
+        ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, pipelineProcessConfig, 
jobConfig.getSchemaTableNames(), tableAndSchemaNameMapper);
         CDCTaskConfiguration result = new CDCTaskConfiguration(dumperContext, 
importerConfig);
         log.debug("buildTaskConfiguration, result={}", result);
         return result;
     }
     
-    private TableNameSchemaNameMapping getTableNameSchemaNameMapping(final 
Collection<String> tableNames) {
-        Map<String, String> tableNameSchemaMap = new LinkedHashMap<>();
-        for (String each : tableNames) {
-            String[] split = each.split("\\.");
-            if (split.length > 1) {
-                tableNameSchemaMap.put(split[1], split[0]);
-            }
-        }
-        return new TableNameSchemaNameMapping(tableNameSchemaMap);
-    }
-    
-    private IncrementalDumperContext buildDumperContext(final 
CDCJobConfiguration jobConfig, final int jobShardingItem, final 
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+    private IncrementalDumperContext buildDumperContext(final 
CDCJobConfiguration jobConfig, final int jobShardingItem, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
         JobDataNodeLine dataNodeLine = 
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
         Map<ActualTableName, LogicTableName> tableNameMap = new 
LinkedHashMap<>();
         dataNodeLine.getEntries().forEach(each -> 
each.getDataNodes().forEach(node -> tableNameMap.put(new 
ActualTableName(node.getTableName()), new 
LogicTableName(each.getLogicTableName()))));
@@ -297,13 +286,13 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         result.setDataSourceName(dataSourceName);
         result.setDataSourceConfig(actualDataSourceConfig);
         result.setTableNameMap(tableNameMap);
-        result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping);
+        result.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
         result.setDecodeWithTX(jobConfig.isDecodeWithTX());
         return result;
     }
     
     private ImporterConfiguration buildImporterConfiguration(final 
CDCJobConfiguration jobConfig, final PipelineProcessConfiguration 
pipelineProcessConfig, final Collection<String> schemaTableNames,
-                                                             final 
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+                                                             final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
         PipelineDataSourceConfiguration dataSourceConfig = 
PipelineDataSourceConfigurationFactory.newInstance(jobConfig.getDataSourceConfig().getType(),
                 jobConfig.getDataSourceConfig().getParameter());
         CDCProcessContext processContext = new 
CDCProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
@@ -311,7 +300,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
         Map<LogicTableName, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor()
                 
.getShardingColumnsMap(jobConfig.getDataSourceConfig().getRootConfig().getRules(),
 
schemaTableNames.stream().map(LogicTableName::new).collect(Collectors.toSet()));
-        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
tableNameSchemaNameMapping, batchSize, writeRateLimitAlgorithm, 0, 1);
+        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
tableAndSchemaNameMapper, batchSize, writeRateLimitAlgorithm, 0, 1);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 2c74e4a0d1a..218d70427d9 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.digest.DigestUtils;
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
@@ -264,21 +264,21 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
         MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) 
pipelineJobConfig;
         IncrementalDumperContext incrementalDumperContext = new 
MigrationIncrementalDumperContextCreator(
                 
jobConfig).createDumperContext(jobConfig.getJobShardingDataNodes().get(jobShardingItem));
-        CreateTableConfiguration createTableConfig = 
buildCreateTableConfiguration(jobConfig, 
incrementalDumperContext.getTableNameSchemaNameMapping());
+        CreateTableConfiguration createTableConfig = 
buildCreateTableConfiguration(jobConfig, 
incrementalDumperContext.getTableAndSchemaNameMapper());
         Set<LogicTableName> targetTableNames = 
jobConfig.getTargetTableNames().stream().map(LogicTableName::new).collect(Collectors.toSet());
         Map<LogicTableName, Set<String>> shardingColumnsMap = new 
ShardingColumnsExtractor().getShardingColumnsMap(
                 ((ShardingSpherePipelineDataSourceConfiguration) 
jobConfig.getTarget()).getRootConfig().getRules(), targetTableNames);
-        ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, pipelineProcessConfig, 
shardingColumnsMap, incrementalDumperContext.getTableNameSchemaNameMapping());
+        ImporterConfiguration importerConfig = 
buildImporterConfiguration(jobConfig, pipelineProcessConfig, 
shardingColumnsMap, incrementalDumperContext.getTableAndSchemaNameMapper());
         MigrationTaskConfiguration result = new 
MigrationTaskConfiguration(incrementalDumperContext.getDataSourceName(), 
createTableConfig, incrementalDumperContext, importerConfig);
         log.info("buildTaskConfiguration, result={}", result);
         return result;
     }
     
     private CreateTableConfiguration buildCreateTableConfiguration(final 
MigrationJobConfiguration jobConfig,
-                                                                   final 
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+                                                                   final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
         Collection<CreateTableEntry> createTableEntries = new LinkedList<>();
         for (JobDataNodeEntry each : 
jobConfig.getTablesFirstDataNodes().getEntries()) {
-            String sourceSchemaName = 
tableNameSchemaNameMapping.getSchemaName(each.getLogicTableName());
+            String sourceSchemaName = 
tableAndSchemaNameMapper.getSchemaName(each.getLogicTableName());
             DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(jobConfig.getTargetDatabaseType()).getDialectDatabaseMetaData();
             String targetSchemaName = 
dialectDatabaseMetaData.isSchemaAvailable() ? sourceSchemaName : null;
             DataNode dataNode = each.getDataNodes().get(0);
@@ -294,13 +294,13 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
     }
     
     private ImporterConfiguration buildImporterConfiguration(final 
MigrationJobConfiguration jobConfig, final PipelineProcessConfiguration 
pipelineProcessConfig,
-                                                             final 
Map<LogicTableName, Set<String>> shardingColumnsMap, final 
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+                                                             final 
Map<LogicTableName, Set<String>> shardingColumnsMap, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
         MigrationProcessContext processContext = new 
MigrationProcessContext(jobConfig.getJobId(), pipelineProcessConfig);
         JobRateLimitAlgorithm writeRateLimitAlgorithm = 
processContext.getWriteRateLimitAlgorithm();
         int batchSize = pipelineProcessConfig.getWrite().getBatchSize();
         int retryTimes = jobConfig.getRetryTimes();
         int concurrency = jobConfig.getConcurrency();
-        return new ImporterConfiguration(jobConfig.getTarget(), 
shardingColumnsMap, tableNameSchemaNameMapping, batchSize, 
writeRateLimitAlgorithm, retryTimes, concurrency);
+        return new ImporterConfiguration(jobConfig.getTarget(), 
shardingColumnsMap, tableAndSchemaNameMapper, batchSize, 
writeRateLimitAlgorithm, retryTimes, concurrency);
     }
     
     @Override
@@ -372,7 +372,7 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
     private void cleanTempTableOnRollback(final String jobId) throws 
SQLException {
         MigrationJobConfiguration jobConfig = getJobConfiguration(jobId);
         PipelineCommonSQLBuilder pipelineSQLBuilder = new 
PipelineCommonSQLBuilder(jobConfig.getTargetDatabaseType());
-        TableNameSchemaNameMapping mapping = new 
TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap());
+        TableAndSchemaNameMapper mapping = new 
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
         try (
                 PipelineDataSourceWrapper dataSource = 
PipelineDataSourceFactory.newInstance(jobConfig.getTarget());
                 Connection connection = dataSource.getConnection()) {
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
index c63c5243c52..06ee39a408b 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
@@ -18,7 +18,7 @@
 package 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest;
 
 import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
@@ -41,19 +41,19 @@ public final class MigrationIncrementalDumperContextCreator 
implements Increment
     @Override
     public IncrementalDumperContext createDumperContext(final JobDataNodeLine 
jobDataNodeLine) {
         Map<ActualTableName, LogicTableName> tableNameMap = 
JobDataNodeLineConvertUtils.buildTableNameMap(jobDataNodeLine);
-        TableNameSchemaNameMapping tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap());
+        TableAndSchemaNameMapper tableAndSchemaNameMapper = new 
TableAndSchemaNameMapper(jobConfig.getTargetTableSchemaMap());
         String dataSourceName = 
jobDataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName();
-        return buildDumperContext(jobConfig.getJobId(), dataSourceName, 
jobConfig.getSources().get(dataSourceName), tableNameMap, 
tableNameSchemaNameMapping);
+        return buildDumperContext(jobConfig.getJobId(), dataSourceName, 
jobConfig.getSources().get(dataSourceName), tableNameMap, 
tableAndSchemaNameMapper);
     }
     
     private IncrementalDumperContext buildDumperContext(final String jobId, 
final String dataSourceName, final PipelineDataSourceConfiguration 
sourceDataSource,
-                                                        final 
Map<ActualTableName, LogicTableName> tableNameMap, final 
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
+                                                        final 
Map<ActualTableName, LogicTableName> tableNameMap, final 
TableAndSchemaNameMapper tableAndSchemaNameMapper) {
         IncrementalDumperContext result = new IncrementalDumperContext();
         result.setJobId(jobId);
         result.setDataSourceName(dataSourceName);
         result.setDataSourceConfig(sourceDataSource);
         result.setTableNameMap(tableNameMap);
-        result.setTableNameSchemaNameMapping(tableNameSchemaNameMapping);
+        result.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
         return result;
     }
 }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
index cbeec358b0c..487df1e7d48 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.test.it.data.pipeline.core.importer;
 
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
+import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
@@ -193,6 +193,6 @@ class PipelineDataSourceSinkTest {
     
     private ImporterConfiguration mockImporterConfiguration() {
         Map<LogicTableName, Set<String>> shardingColumnsMap = 
Collections.singletonMap(new LogicTableName("test_table"), 
Collections.singleton("user"));
-        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
new TableNameSchemaNameMapping(Collections.emptyMap()), 1000, null, 3, 3);
+        return new ImporterConfiguration(dataSourceConfig, shardingColumnsMap, 
new TableAndSchemaNameMapper(Collections.emptyMap()), 1000, null, 3, 3);
     }
 }


Reply via email to