This is an automated email from the ASF dual-hosted git repository.

zhaojinchao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 50b94625c18 Add ActualAndLogicTableNameMapper (#28941)
50b94625c18 is described below

commit 50b94625c18d3cc254b3262907bb37b6ebf14843
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Nov 4 22:32:45 2023 +0800

    Add ActualAndLogicTableNameMapper (#28941)
    
    * Rename TableAndSchemaNameMapperTest
    
    * Move TableAndSchemaNameMapper
    
    * Add ActualAndLogicTableNameMapper
    
    * Add ActualAndLogicTableNameMapper
    
    * Add ActualAndLogicTableNameMapper
    
    * Add ActualAndLogicTableNameMapper
    
    * Refactor DumperCommonContext
---
 .../ingest/dumper/context/DumperCommonContext.java | 55 ++--------------------
 .../dumper/context/InventoryDumperContext.java     |  2 +-
 .../ActualAndLogicTableNameMapper.java}            | 49 +++----------------
 .../context/mapper}/TableAndSchemaNameMapper.java  |  2 +-
 .../mapper/TableAndSchemaNameMapperTest.java       | 52 ++++++++++++++++++++
 .../common/config/ImporterConfiguration.java       |  2 +-
 .../data/pipeline/core/dumper/InventoryDumper.java | 11 ++---
 .../preparer/InventoryRecordsCountCalculator.java  |  3 +-
 .../core/preparer/InventoryTaskSplitter.java       | 11 ++---
 .../preparer/datasource/DataSourceCheckEngine.java |  2 +-
 .../datasource/DataSourceCheckEngineTest.java      |  2 +-
 .../mysql/ingest/MySQLIncrementalDumper.java       | 11 +++--
 .../mysql/ingest/MySQLIncrementalDumperTest.java   |  7 +--
 .../postgresql/ingest/wal/WALEventConverter.java   |  9 ++--
 .../postgresql/ingest/PostgreSQLWALDumperTest.java |  7 +--
 .../ingest/wal/WALEventConverterTest.java          |  7 +--
 .../data/pipeline/cdc/api/impl/CDCJobAPI.java      |  7 +--
 .../migration/api/impl/MigrationJobAPI.java        |  2 +-
 .../MigrationIncrementalDumperContextCreator.java  |  5 +-
 .../core/importer/PipelineDataSourceSinkTest.java  |  2 +-
 20 files changed, 111 insertions(+), 137 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
index 4a640fdb7bf..9166f609df9 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
@@ -20,16 +20,13 @@ package 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context;
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
-
-import java.util.Map;
 
 /**
- * Base dumper context.
+ * Dumper common context.
  */
 @Getter
 @Setter
@@ -40,53 +37,9 @@ public abstract class DumperCommonContext {
     
     private PipelineDataSourceConfiguration dataSourceConfig;
     
-    private Map<ActualTableName, LogicTableName> tableNameMap;
+    private ActualAndLogicTableNameMapper tableNameMapper;
     
     private TableAndSchemaNameMapper tableAndSchemaNameMapper;
     
     private IngestPosition position;
-    
-    /**
-     * Get logic table name.
-     *
-     * @param actualTableName actual table name
-     * @return logic table name
-     */
-    public LogicTableName getLogicTableName(final String actualTableName) {
-        return tableNameMap.get(new ActualTableName(actualTableName));
-    }
-    
-    private LogicTableName getLogicTableName(final ActualTableName 
actualTableName) {
-        return tableNameMap.get(actualTableName);
-    }
-    
-    /**
-     * Whether contains table.
-     *
-     * @param actualTableName actual table name
-     * @return contains or not
-     */
-    public boolean containsTable(final String actualTableName) {
-        return tableNameMap.containsKey(new ActualTableName(actualTableName));
-    }
-    
-    /**
-     * Get schema name.
-     *
-     * @param logicTableName logic table name
-     * @return schema name. nullable
-     */
-    public String getSchemaName(final LogicTableName logicTableName) {
-        return tableAndSchemaNameMapper.getSchemaName(logicTableName);
-    }
-    
-    /**
-     * Get schema name.
-     *
-     * @param actualTableName actual table name
-     * @return schema name, can be nullable 
-     */
-    public String getSchemaName(final ActualTableName actualTableName) {
-        return 
tableAndSchemaNameMapper.getSchemaName(getLogicTableName(actualTableName));
-    }
 }
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
index d72f07c7eaf..e96375d8c6e 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/InventoryDumperContext.java
@@ -54,7 +54,7 @@ public final class InventoryDumperContext extends 
DumperCommonContext {
     public InventoryDumperContext(final DumperCommonContext dumperContext) {
         setDataSourceName(dumperContext.getDataSourceName());
         setDataSourceConfig(dumperContext.getDataSourceConfig());
-        setTableNameMap(dumperContext.getTableNameMap());
+        setTableNameMapper(dumperContext.getTableNameMapper());
         
setTableAndSchemaNameMapper(dumperContext.getTableAndSchemaNameMapper());
     }
     
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/ActualAndLogicTableNameMapper.java
similarity index 53%
copy from 
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
copy to 
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/ActualAndLogicTableNameMapper.java
index 4a640fdb7bf..475c1742d5f 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/DumperCommonContext.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/ActualAndLogicTableNameMapper.java
@@ -15,36 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context;
+package 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper;
 
 import lombok.Getter;
-import lombok.Setter;
+import lombok.RequiredArgsConstructor;
 import lombok.ToString;
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 
 import java.util.Map;
 
 /**
- * Base dumper context.
+ * Actual table name and logic table name mapper.
  */
+@RequiredArgsConstructor
 @Getter
-@Setter
-@ToString(exclude = {"dataSourceConfig", "tableAndSchemaNameMapper"})
-public abstract class DumperCommonContext {
+@ToString
+public final class ActualAndLogicTableNameMapper {
     
-    private String dataSourceName;
-    
-    private PipelineDataSourceConfiguration dataSourceConfig;
-    
-    private Map<ActualTableName, LogicTableName> tableNameMap;
-    
-    private TableAndSchemaNameMapper tableAndSchemaNameMapper;
-    
-    private IngestPosition position;
+    private final Map<ActualTableName, LogicTableName> tableNameMap;
     
     /**
      * Get logic table name.
@@ -56,10 +45,6 @@ public abstract class DumperCommonContext {
         return tableNameMap.get(new ActualTableName(actualTableName));
     }
     
-    private LogicTableName getLogicTableName(final ActualTableName 
actualTableName) {
-        return tableNameMap.get(actualTableName);
-    }
-    
     /**
      * Whether contains table.
      *
@@ -69,24 +54,4 @@ public abstract class DumperCommonContext {
     public boolean containsTable(final String actualTableName) {
         return tableNameMap.containsKey(new ActualTableName(actualTableName));
     }
-    
-    /**
-     * Get schema name.
-     *
-     * @param logicTableName logic table name
-     * @return schema name. nullable
-     */
-    public String getSchemaName(final LogicTableName logicTableName) {
-        return tableAndSchemaNameMapper.getSchemaName(logicTableName);
-    }
-    
-    /**
-     * Get schema name.
-     *
-     * @param actualTableName actual table name
-     * @return schema name, can be nullable 
-     */
-    public String getSchemaName(final ActualTableName actualTableName) {
-        return 
tableAndSchemaNameMapper.getSchemaName(getLogicTableName(actualTableName));
-    }
 }
diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapper.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java
similarity index 97%
rename from 
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapper.java
rename to 
kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java
index 3999329ae63..329a24d3a1e 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/context/TableAndSchemaNameMapper.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapper.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.api.context;
+package 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper;
 
 import lombok.ToString;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
diff --git 
a/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapperTest.java
 
b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapperTest.java
new file mode 100644
index 00000000000..465ee71c7ea
--- /dev/null
+++ 
b/kernel/data-pipeline/api/src/test/java/org/apache/shardingsphere/data/pipeline/api/ingest/dumper/context/mapper/TableAndSchemaNameMapperTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+class TableAndSchemaNameMapperTest {
+    
+    @Test
+    void assertConstructFromNull() {
+        assertDoesNotThrow(() -> new TableAndSchemaNameMapper((Map<String, 
String>) null));
+    }
+    
+    @Test
+    void assertConstructFromValueNullMap() {
+        assertNull(new 
TableAndSchemaNameMapper(Collections.singletonMap("t_order", 
null)).getSchemaName("t_order"));
+    }
+    
+    @Test
+    void assertConstructFromMap() {
+        assertThat(new 
TableAndSchemaNameMapper(Collections.singletonMap("t_order", 
"public")).getSchemaName("t_order"), is("public"));
+    }
+    
+    @Test
+    void assertConstructFromCollection() {
+        assertThat(new 
TableAndSchemaNameMapper(Arrays.asList("public.t_order", 
"t_order_item")).getSchemaName("t_order"), is("public"));
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java
index 0116904291a..1438d942098 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/config/ImporterConfiguration.java
@@ -20,7 +20,7 @@ package org.apache.shardingsphere.data.pipeline.common.config;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.ToString;
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
index a8fb1156a7c..b022f32de72 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/dumper/InventoryDumper.java
@@ -21,17 +21,16 @@ import com.google.common.base.Strings;
 import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.FinishedRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.job.JobOperationType;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
@@ -46,8 +45,8 @@ import 
org.apache.shardingsphere.data.pipeline.common.util.PipelineJdbcUtils;
 import org.apache.shardingsphere.data.pipeline.core.exception.IngestException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 
 import javax.sql.DataSource;
@@ -102,7 +101,8 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
             log.info("Ignored because of already finished.");
             return;
         }
-        PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new 
LogicTableName(dumperContext.getLogicTableName())), 
dumperContext.getActualTableName());
+        PipelineTableMetaData tableMetaData = metaDataLoader.getTableMetaData(
+                
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()),
 dumperContext.getActualTableName());
         try (Connection connection = dataSource.getConnection()) {
             dump(tableMetaData, connection);
         } catch (final SQLException ex) {
@@ -156,8 +156,7 @@ public final class InventoryDumper extends 
AbstractLifecycleExecutor implements
         if (!Strings.isNullOrEmpty(dumperContext.getQuerySQL())) {
             return dumperContext.getQuerySQL();
         }
-        LogicTableName logicTableName = new 
LogicTableName(dumperContext.getLogicTableName());
-        String schemaName = dumperContext.getSchemaName(logicTableName);
+        String schemaName = 
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
         if (!dumperContext.hasUniqueKey()) {
             return inventoryDumpSQLBuilder.buildFetchAllSQL(schemaName, 
dumperContext.getActualTableName());
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
index 6656cdcf9a2..f1e46de7bee 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryRecordsCountCalculator.java
@@ -21,7 +21,6 @@ import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
 import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.SplitPipelineJobByUniqueKeyException;
@@ -52,7 +51,7 @@ public final class InventoryRecordsCountCalculator {
      * @throws SplitPipelineJobByUniqueKeyException if there's exception from 
database
      */
     public static long getTableRecordsCount(final InventoryDumperContext 
dumperContext, final PipelineDataSourceWrapper dataSource) {
-        String schemaName = dumperContext.getSchemaName(new 
LogicTableName(dumperContext.getLogicTableName()));
+        String schemaName = 
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
         String actualTableName = dumperContext.getActualTableName();
         PipelineCommonSQLBuilder pipelineSQLBuilder = new 
PipelineCommonSQLBuilder(dataSource.getDatabaseType());
         Optional<String> sql = 
pipelineSQLBuilder.buildEstimatedCountSQL(schemaName, actualTableName);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
index 2f1c6954860..97159392918 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/InventoryTaskSplitter.java
@@ -20,19 +20,18 @@ package 
org.apache.shardingsphere.data.pipeline.core.preparer;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.Range;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import org.apache.shardingsphere.data.pipeline.api.ingest.dumper.Dumper;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.InventoryDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
-import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.common.config.ImporterConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.config.process.PipelineReadConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.common.context.InventoryIncrementalProcessContext;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSourceWrapper;
-import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.PlaceholderPosition;
+import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.IntegerPrimaryKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.StringPrimaryKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.common.ingest.position.pk.type.UnsupportedKeyPosition;
 import 
org.apache.shardingsphere.data.pipeline.common.job.progress.InventoryIncrementalJobItemProgress;
@@ -111,7 +110,7 @@ public final class InventoryTaskSplitter {
     
     private Collection<InventoryDumperContext> splitByTable(final 
InventoryDumperContext dumperContext) {
         Collection<InventoryDumperContext> result = new LinkedList<>();
-        dumperContext.getTableNameMap().forEach((key, value) -> {
+        dumperContext.getTableNameMapper().getTableNameMap().forEach((key, 
value) -> {
             InventoryDumperContext inventoryDumperContext = new 
InventoryDumperContext(dumperContext);
             // use original table name, for metadata loader, since some 
database table name case-sensitive
             inventoryDumperContext.setActualTableName(key.getOriginal());
@@ -127,7 +126,7 @@ public final class InventoryTaskSplitter {
     private Collection<InventoryDumperContext> splitByPrimaryKey(final 
InventoryDumperContext dumperContext, final InventoryIncrementalJobItemContext 
jobItemContext,
                                                                  final 
PipelineDataSourceWrapper dataSource) {
         if (null == dumperContext.getUniqueKeyColumns()) {
-            String schemaName = dumperContext.getSchemaName(new 
LogicTableName(dumperContext.getLogicTableName()));
+            String schemaName = 
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName());
             String actualTableName = dumperContext.getActualTableName();
             List<PipelineColumnMetaData> uniqueKeyColumns = 
PipelineTableMetaDataUtils.getUniqueKeyColumns(schemaName, actualTableName, 
jobItemContext.getSourceMetaDataLoader());
             dumperContext.setUniqueKeyColumns(uniqueKeyColumns);
@@ -205,7 +204,7 @@ public final class InventoryTaskSplitter {
         String uniqueKey = 
dumperContext.getUniqueKeyColumns().get(0).getName();
         PipelineCommonSQLBuilder pipelineSQLBuilder = new 
PipelineCommonSQLBuilder(jobItemContext.getJobConfig().getSourceDatabaseType());
         String sql = pipelineSQLBuilder.buildUniqueKeyMinMaxValuesSQL(
-                dumperContext.getSchemaName(new 
LogicTableName(dumperContext.getLogicTableName())), 
dumperContext.getActualTableName(), uniqueKey);
+                
dumperContext.getTableAndSchemaNameMapper().getSchemaName(dumperContext.getLogicTableName()),
 dumperContext.getActualTableName(), uniqueKey);
         try (
                 Connection connection = dataSource.getConnection();
                 Statement statement = connection.createStatement();
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
index 7b76c512c85..d801148321c 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngine.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
 
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.common.sqlbuilder.PipelineCommonSQLBuilder;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
index d74115d295d..0518f428115 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/preparer/datasource/DataSourceCheckEngineTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.data.pipeline.core.preparer.datasource;
 
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithInvalidConnectionException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWithTargetTableNotEmptyException;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 84cf56db027..6d50d4950a0 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -19,18 +19,18 @@ package 
org.apache.shardingsphere.data.pipeline.mysql.ingest;
 
 import com.google.common.base.Preconditions;
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlJdbcConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.executor.AbstractLifecycleExecutor;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.IncrementalDumper;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.position.IngestPosition;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
@@ -132,7 +132,7 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
             return Collections.singletonList(createPlaceholderRecord(event));
         }
         AbstractRowsEvent rowsEvent = (AbstractRowsEvent) event;
-        if (!rowsEvent.getDatabaseName().equals(catalog) || 
!dumperContext.containsTable(rowsEvent.getTableName())) {
+        if (!rowsEvent.getDatabaseName().equals(catalog) || 
!dumperContext.getTableNameMapper().containsTable(rowsEvent.getTableName())) {
             return Collections.singletonList(createPlaceholderRecord(event));
         }
         PipelineTableMetaData tableMetaData = 
getPipelineTableMetaData(rowsEvent.getTableName());
@@ -155,7 +155,8 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
     }
     
     private PipelineTableMetaData getPipelineTableMetaData(final String 
actualTableName) {
-        return metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new 
ActualTableName(actualTableName)), actualTableName);
+        LogicTableName logicTableName = 
dumperContext.getTableNameMapper().getLogicTableName(actualTableName);
+        return 
metaDataLoader.getTableMetaData(dumperContext.getTableAndSchemaNameMapper().getSchemaName(logicTableName),
 actualTableName);
     }
     
     private List<DataRecord> handleWriteRowsEvent(final WriteRowsEvent event, 
final PipelineTableMetaData tableMetaData) {
@@ -216,7 +217,7 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
     }
     
     private DataRecord createDataRecord(final String type, final 
AbstractRowsEvent rowsEvent, final int columnCount) {
-        String tableName = 
dumperContext.getLogicTableName(rowsEvent.getTableName()).getOriginal();
+        String tableName = 
dumperContext.getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal();
         IngestPosition position = new BinlogPosition(rowsEvent.getFileName(), 
rowsEvent.getPosition(), rowsEvent.getServerId());
         DataRecord result = new DataRecord(type, tableName, position, 
columnCount);
         result.setActualTableName(rowsEvent.getTableName());
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
index 25bbaa33ff1..7d6ee11ad2f 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumperTest.java
@@ -17,9 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.mysql.ingest;
 
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
@@ -91,7 +92,7 @@ class MySQLIncrementalDumperTest {
     private IncrementalDumperContext createDumperContext() {
         IncrementalDumperContext result = new IncrementalDumperContext();
         result.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL",
 "root", "root"));
-        result.setTableNameMap(Collections.singletonMap(new 
ActualTableName("t_order"), new LogicTableName("t_order")));
+        result.setTableNameMapper(new 
ActualAndLogicTableNameMapper(Collections.singletonMap(new 
ActualTableName("t_order"), new LogicTableName("t_order"))));
         result.setTableAndSchemaNameMapper(new 
TableAndSchemaNameMapper(Collections.emptyMap()));
         return result;
     }
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index adfdf059666..cc2dca00d86 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -22,7 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.Column;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
-import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineTableMetaData;
@@ -80,7 +80,7 @@ public final class WALEventConverter {
     private boolean filter(final AbstractWALEvent event) {
         if (event instanceof AbstractRowEvent) {
             AbstractRowEvent rowEvent = (AbstractRowEvent) event;
-            return !dumperContext.containsTable(rowEvent.getTableName());
+            return 
!dumperContext.getTableNameMapper().containsTable(rowEvent.getTableName());
         }
         return false;
     }
@@ -90,7 +90,8 @@ public final class WALEventConverter {
     }
     
     private PipelineTableMetaData getPipelineTableMetaData(final String 
actualTableName) {
-        return metaDataLoader.getTableMetaData(dumperContext.getSchemaName(new 
ActualTableName(actualTableName)), actualTableName);
+        LogicTableName logicTableName = 
dumperContext.getTableNameMapper().getLogicTableName(actualTableName);
+        return 
metaDataLoader.getTableMetaData(dumperContext.getTableAndSchemaNameMapper().getSchemaName(logicTableName),
 actualTableName);
     }
     
     private DataRecord handleWriteRowEvent(final WriteRowEvent writeRowEvent, 
final PipelineTableMetaData tableMetaData) {
@@ -117,7 +118,7 @@ public final class WALEventConverter {
     }
     
     private DataRecord createDataRecord(final String type, final 
AbstractRowEvent rowsEvent, final int columnCount) {
-        String tableName = 
dumperContext.getLogicTableName(rowsEvent.getTableName()).getOriginal();
+        String tableName = 
dumperContext.getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).getOriginal();
         DataRecord result = new DataRecord(type, rowsEvent.getSchemaName(), 
tableName, new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount);
         result.setActualTableName(rowsEvent.getTableName());
         result.setCsn(rowsEvent.getCsn());
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
index e59c6a0f353..6cbb3233bac 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/PostgreSQLWALDumperTest.java
@@ -17,9 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.postgresql.ingest;
 
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import 
org.apache.shardingsphere.data.pipeline.common.datasource.DefaultPipelineDataSourceManager;
@@ -107,7 +108,7 @@ class PostgreSQLWALDumperTest {
         IncrementalDumperContext result = new IncrementalDumperContext();
         result.setJobId("0101123456");
         result.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration(jdbcUrl, username, password));
-        result.setTableNameMap(Collections.singletonMap(new 
ActualTableName("t_order_0"), new LogicTableName("t_order")));
+        result.setTableNameMapper(new 
ActualAndLogicTableNameMapper(Collections.singletonMap(new 
ActualTableName("t_order_0"), new LogicTableName("t_order"))));
         result.setTableAndSchemaNameMapper(new 
TableAndSchemaNameMapper(Collections.emptyMap()));
         return result;
     }
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 93a4e721074..15895cd370b 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -17,9 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;
 
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.record.PlaceholderRecord;
 import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
@@ -86,7 +87,7 @@ class WALEventConverterTest {
     private IncrementalDumperContext mockDumperContext() {
         IncrementalDumperContext result = new IncrementalDumperContext();
         result.setDataSourceConfig(new 
StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=PostgreSQL",
 "root", "root"));
-        result.setTableNameMap(Collections.singletonMap(new 
ActualTableName("t_order"), new LogicTableName("t_order")));
+        result.setTableNameMapper(new 
ActualAndLogicTableNameMapper(Collections.singletonMap(new 
ActualTableName("t_order"), new LogicTableName("t_order"))));
         result.setTableAndSchemaNameMapper(new 
TableAndSchemaNameMapper(Collections.emptyMap()));
         return result;
     }
diff --git 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
index d6015da0a5e..85f2aea4206 100644
--- 
a/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/cdc/core/src/main/java/org/apache/shardingsphere/data/pipeline/cdc/api/impl/CDCJobAPI.java
@@ -21,13 +21,14 @@ import com.google.common.base.Joiner;
 import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.digest.DigestUtils;
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
-import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfigurationFactory;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.yaml.YamlPipelineDataSourceConfigurationSwapper;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
 import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.data.pipeline.cdc.api.job.type.CDCJobType;
@@ -285,7 +286,7 @@ public final class CDCJobAPI extends 
AbstractInventoryIncrementalJobAPIImpl {
         result.setJobId(jobConfig.getJobId());
         result.setDataSourceName(dataSourceName);
         result.setDataSourceConfig(actualDataSourceConfig);
-        result.setTableNameMap(tableNameMap);
+        result.setTableNameMapper(new 
ActualAndLogicTableNameMapper(tableNameMap));
         result.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
         result.setDecodeWithTX(jobConfig.isDecodeWithTX());
         return result;
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index 218d70427d9..0be975696ed 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -19,7 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.codec.digest.DigestUtils;
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.ShardingSpherePipelineDataSourceConfiguration;
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
index 06ee39a408b..659b15995eb 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/config/ingest/MigrationIncrementalDumperContextCreator.java
@@ -18,7 +18,8 @@
 package 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.ingest;
 
 import lombok.RequiredArgsConstructor;
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.ActualAndLogicTableNameMapper;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.IncrementalDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
@@ -52,7 +53,7 @@ public final class MigrationIncrementalDumperContextCreator 
implements Increment
         result.setJobId(jobId);
         result.setDataSourceName(dataSourceName);
         result.setDataSourceConfig(sourceDataSource);
-        result.setTableNameMap(tableNameMap);
+        result.setTableNameMapper(new 
ActualAndLogicTableNameMapper(tableNameMap));
         result.setTableAndSchemaNameMapper(tableAndSchemaNameMapper);
         return result;
     }
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
index 487df1e7d48..3fc0ace7fe6 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/importer/PipelineDataSourceSinkTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.shardingsphere.test.it.data.pipeline.core.importer;
 
-import 
org.apache.shardingsphere.data.pipeline.api.context.TableAndSchemaNameMapper;
+import 
org.apache.shardingsphere.data.pipeline.api.ingest.dumper.context.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.api.ingest.channel.PipelineChannel;

Reply via email to