This is an automated email from the ASF dual-hosted git repository.

duanzhengqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new bc2c1e66e05 Refactor SingleTableInventoryCalculateParameter (#33955)
bc2c1e66e05 is described below

commit bc2c1e66e05dcaa9f38180499b427ca726b117b2
Author: Liang Zhang <[email protected]>
AuthorDate: Sat Dec 7 19:56:31 2024 +0800

    Refactor SingleTableInventoryCalculateParameter (#33955)
    
    * Refactor SingleTableInventoryCalculateParameter
    
    * Refactor SingleTableInventoryCalculateParameter
    
    * Refactor SingleTableInventoryCalculateParameter
    
    * Refactor SingleTableInventoryCalculateParameter
    
    * Refactor SingleTableInventoryCalculateParameter
    
    * Refactor SingleTableInventoryCalculateParameter
    
    * Refactor SingleTableInventoryCalculateParameter
    
    * Refactor SingleTableInventoryCalculateParameter
    
    * Refactor SingleTableInventoryCalculateParameter
---
 .../metadata/database/schema/QualifiedTable.java   |  7 ++++++
 .../checker/PipelineDataSourceCheckEngine.java     | 10 ++++----
 .../table/MatchingTableInventoryChecker.java       |  8 +++----
 .../table/TableInventoryCheckParameter.java        |  6 ++---
 .../CRC32SingleTableInventoryCalculator.java       |  4 ++--
 .../RecordSingleTableInventoryCalculator.java      | 21 ++++++++--------
 .../SingleTableInventoryCalculateParameter.java    | 28 ++++------------------
 ...DataConsistencyCheckLoadingFailedException.java | 11 ++++-----
 .../core/importer/ImporterConfiguration.java       |  6 ++---
 ...PipelineDataConsistencyCalculateSQLBuilder.java | 20 ++++++++--------
 .../checker/PipelineDataSourceCheckEngineTest.java |  6 ++---
 .../CRC32SingleTableInventoryCalculatorTest.java   |  9 ++++---
 .../core/importer/ImporterConfigurationTest.java   |  9 ++++---
 ...lineDataConsistencyCalculateSQLBuilderTest.java | 13 +++++-----
 .../MigrationDataConsistencyChecker.java           | 10 ++++----
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 22 ++++++++---------
 .../RecordSingleTableInventoryCalculatorTest.java  | 18 +++++++-------
 17 files changed, 98 insertions(+), 110 deletions(-)

diff --git 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/QualifiedTable.java
 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/QualifiedTable.java
index 3276b5e3c48..6ea51c77116 100644
--- 
a/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/QualifiedTable.java
+++ 
b/infra/common/src/main/java/org/apache/shardingsphere/infra/metadata/database/schema/QualifiedTable.java
@@ -17,6 +17,7 @@
 
 package org.apache.shardingsphere.infra.metadata.database.schema;
 
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 
@@ -25,9 +26,15 @@ import lombok.RequiredArgsConstructor;
  */
 @RequiredArgsConstructor
 @Getter
+@EqualsAndHashCode
 public final class QualifiedTable {
     
     private final String schemaName;
     
     private final String tableName;
+    
+    @Override
+    public String toString() {
+        return null == schemaName ? tableName : String.join(".", schemaName, 
tableName);
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
index 03bfdfafe9c..7440198ea08 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngine.java
@@ -26,7 +26,7 @@ import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 
 import javax.sql.DataSource;
 import java.sql.Connection;
@@ -90,8 +90,8 @@ public final class PipelineDataSourceCheckEngine {
     private void checkEmptyTable(final Collection<DataSource> dataSources, 
final ImporterConfiguration importerConfig) {
         try {
             for (DataSource each : dataSources) {
-                for (CaseInsensitiveQualifiedTable qualifiedTable : 
importerConfig.getQualifiedTables()) {
-                    
ShardingSpherePreconditions.checkState(checkEmptyTable(each, qualifiedTable), 
() -> new 
PrepareJobWithTargetTableNotEmptyException(qualifiedTable.getTableName().getValue()));
+                for (QualifiedTable qualifiedTable : 
importerConfig.getQualifiedTables()) {
+                    
ShardingSpherePreconditions.checkState(checkEmptyTable(each, qualifiedTable), 
() -> new 
PrepareJobWithTargetTableNotEmptyException(qualifiedTable.getTableName()));
                 }
             }
         } catch (final SQLException ex) {
@@ -107,8 +107,8 @@ public final class PipelineDataSourceCheckEngine {
      * @return empty or not
      * @throws SQLException if there's database operation failure
      */
-    public boolean checkEmptyTable(final DataSource dataSource, final 
CaseInsensitiveQualifiedTable qualifiedTable) throws SQLException {
-        String sql = 
sqlBuilder.buildCheckEmptyTableSQL(qualifiedTable.getSchemaName().getValue(), 
qualifiedTable.getTableName().getValue());
+    public boolean checkEmptyTable(final DataSource dataSource, final 
QualifiedTable qualifiedTable) throws SQLException {
+        String sql = 
sqlBuilder.buildCheckEmptyTableSQL(qualifiedTable.getSchemaName(), 
qualifiedTable.getTableName());
         try (
                 Connection connection = dataSource.getConnection();
                 PreparedStatement preparedStatement = 
connection.prepareStatement(sql);
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index b26358e8b50..690ce1e89b0 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -73,9 +73,9 @@ public abstract class MatchingTableInventoryChecker 
implements TableInventoryChe
     
     private TableDataConsistencyCheckResult 
checkSingleTableInventoryData(final TableInventoryCheckParameter param, final 
ThreadPoolExecutor executor) {
         SingleTableInventoryCalculateParameter sourceParam = new 
SingleTableInventoryCalculateParameter(param.getSourceDataSource(), 
param.getSourceTable(),
-                param.getColumnNames(), param.getUniqueKeys(), 
param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName().getValue()));
+                param.getColumnNames(), param.getUniqueKeys(), 
param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName()));
         SingleTableInventoryCalculateParameter targetParam = new 
SingleTableInventoryCalculateParameter(param.getTargetDataSource(), 
param.getTargetTable(),
-                param.getColumnNames(), param.getUniqueKeys(), 
param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName().getValue()));
+                param.getColumnNames(), param.getUniqueKeys(), 
param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName()));
         SingleTableInventoryCalculator sourceCalculator = 
buildSingleTableInventoryCalculator();
         this.sourceCalculator = sourceCalculator;
         SingleTableInventoryCalculator targetCalculator = 
buildSingleTableInventoryCalculator();
@@ -108,10 +108,10 @@ public abstract class MatchingTableInventoryChecker 
implements TableInventoryChe
                 break;
             }
             if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
-                
param.getProgressContext().getSourceTableCheckPositions().put(param.getSourceTable().getTableName().getValue(),
 sourceCalculatedResult.getMaxUniqueKeyValue().get());
+                
param.getProgressContext().getSourceTableCheckPositions().put(param.getSourceTable().getTableName(),
 sourceCalculatedResult.getMaxUniqueKeyValue().get());
             }
             if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
-                
param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName().getValue(),
 targetCalculatedResult.getMaxUniqueKeyValue().get());
+                
param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName(),
 targetCalculatedResult.getMaxUniqueKeyValue().get());
             }
             param.getProgressContext().onProgressUpdated(new 
PipelineJobUpdateProgress(sourceCalculatedResult.getRecordsCount()));
         }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
index 8c5d674eeb2..4419b643093 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
@@ -23,7 +23,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.Consistency
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 
 import java.util.List;
 
@@ -40,9 +40,9 @@ public final class TableInventoryCheckParameter {
     
     private final PipelineDataSource targetDataSource;
     
-    private final CaseInsensitiveQualifiedTable sourceTable;
+    private final QualifiedTable sourceTable;
     
-    private final CaseInsensitiveQualifiedTable targetTable;
+    private final QualifiedTable targetTable;
     
     private final List<String> columnNames;
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
index e967f236345..dc198b0c04d 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculator.java
@@ -49,7 +49,7 @@ public final class CRC32SingleTableInventoryCalculator 
extends AbstractSingleTab
     }
     
     private CalculatedItem calculateCRC32(final 
PipelineDataConsistencyCalculateSQLBuilder pipelineSQLBuilder, final 
SingleTableInventoryCalculateParameter param, final String columnName) {
-        String sql = pipelineSQLBuilder.buildCRC32SQL(param.getSchemaName(), 
param.getLogicTableName(), columnName)
+        String sql = pipelineSQLBuilder.buildCRC32SQL(param.getTable(), 
columnName)
                 .orElseThrow(() -> new 
UnsupportedAlgorithmOnDatabaseTypeException("DataConsistencyCalculate", 
"CRC32", param.getDatabaseType()));
         try (
                 Connection connection = param.getDataSource().getConnection();
@@ -61,7 +61,7 @@ public final class CRC32SingleTableInventoryCalculator 
extends AbstractSingleTab
             int recordsCount = resultSet.getInt(2);
             return new CalculatedItem(crc32, recordsCount);
         } catch (final SQLException ex) {
-            throw new 
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(), 
param.getLogicTableName(), ex);
+            throw new 
PipelineTableDataConsistencyCheckLoadingFailedException(param.getTable(), ex);
         }
     }
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
index 0ad0862d142..f38a17ba1bf 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
@@ -92,8 +92,7 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
             ResultSet resultSet = calculationContext.getResultSet();
             ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
             while (resultSet.next()) {
-                ShardingSpherePreconditions.checkState(!isCanceling(), () -> 
new PipelineJobCancelingException(
-                        "Calculate chunk canceled, schema name: %s, table 
name: %s", param.getSchemaName(), param.getLogicTableName()));
+                ShardingSpherePreconditions.checkState(!isCanceling(), () -> 
new PipelineJobCancelingException("Calculate chunk canceled, qualified table: 
%s", param.getTable()));
                 Map<String, Object> columnRecord = new LinkedHashMap<>();
                 for (int columnIndex = 1, columnCount = 
resultSetMetaData.getColumnCount(); columnIndex <= columnCount; columnIndex++) {
                     
columnRecord.put(resultSetMetaData.getColumnLabel(columnIndex), 
columnValueReaderEngine.read(resultSet, resultSetMetaData, columnIndex));
@@ -109,7 +108,7 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
             // CHECKSTYLE:OFF
         } catch (final SQLException | RuntimeException ex) {
             // CHECKSTYLE:ON
-            throw new 
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(), 
param.getLogicTableName(), ex);
+            throw new 
PipelineTableDataConsistencyCheckLoadingFailedException(param.getTable(), ex);
         }
     }
     
@@ -125,7 +124,7 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
         } catch (final SQLException | RuntimeException ex) {
             // CHECKSTYLE:ON
             QuietlyCloser.close(result);
-            throw new 
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(), 
param.getLogicTableName(), ex);
+            throw new 
PipelineTableDataConsistencyCheckLoadingFailedException(param.getTable(), ex);
         }
         return result;
     }
@@ -155,10 +154,10 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
         Collection<String> columnNames = param.getColumnNames().isEmpty() ? 
Collections.singleton("*") : param.getColumnNames();
         switch (param.getQueryType()) {
             case RANGE_QUERY:
-                return 
pipelineSQLBuilder.buildQueryRangeOrderingSQL(param.getSchemaName(), 
param.getLogicTableName(),
-                        columnNames, param.getUniqueKeysNames(), 
param.getQueryRange(), param.getShardingColumnsNames());
+                return 
pipelineSQLBuilder.buildQueryRangeOrderingSQL(param.getTable(), columnNames, 
param.getUniqueKeysNames(), param.getQueryRange(), 
param.getShardingColumnsNames());
             case POINT_QUERY:
-                return 
pipelineSQLBuilder.buildPointQuerySQL(param.getSchemaName(), 
param.getLogicTableName(), columnNames, param.getUniqueKeysNames(), 
param.getShardingColumnsNames());
+                return pipelineSQLBuilder.buildPointQuerySQL(
+                        param.getTable().getSchemaName(), 
param.getTable().getTableName(), columnNames, param.getUniqueKeysNames(), 
param.getShardingColumnsNames());
             default:
                 throw new UnsupportedOperationException("Query type: " + 
param.getQueryType());
         }
@@ -169,7 +168,7 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
         if (queryType == QueryType.RANGE_QUERY) {
             QueryRange queryRange = param.getQueryRange();
             ShardingSpherePreconditions.checkNotNull(queryRange,
-                    () -> new 
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(), 
param.getLogicTableName(), new RuntimeException("Unique keys values range is 
null.")));
+                    () -> new 
PipelineTableDataConsistencyCheckLoadingFailedException(param.getTable(), new 
RuntimeException("Unique keys values range is null.")));
             int parameterIndex = 1;
             if (null != queryRange.getLower()) {
                 preparedStatement.setObject(parameterIndex++, 
queryRange.getLower());
@@ -181,15 +180,15 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
         } else if (queryType == QueryType.POINT_QUERY) {
             Collection<Object> uniqueKeysValues = param.getUniqueKeysValues();
             ShardingSpherePreconditions.checkNotNull(uniqueKeysValues,
-                    () -> new 
PipelineTableDataConsistencyCheckLoadingFailedException(param.getSchemaName(), 
param.getLogicTableName(), new RuntimeException("Unique keys values is 
null.")));
+                    () -> new 
PipelineTableDataConsistencyCheckLoadingFailedException(param.getTable(), new 
RuntimeException("Unique keys values is null.")));
             int parameterIndex = 1;
             for (Object each : uniqueKeysValues) {
                 preparedStatement.setObject(parameterIndex++, each);
             }
             if (null != param.getShardingColumnsNames() && 
!param.getShardingColumnsNames().isEmpty()) {
                 List<Object> shardingColumnsValues = 
param.getShardingColumnsValues();
-                
ShardingSpherePreconditions.checkNotNull(shardingColumnsValues, () -> new 
PipelineTableDataConsistencyCheckLoadingFailedException(
-                        param.getSchemaName(), param.getLogicTableName(), new 
RuntimeException("Sharding columns values is null when names not empty.")));
+                ShardingSpherePreconditions.checkNotNull(shardingColumnsValues,
+                        () -> new 
PipelineTableDataConsistencyCheckLoadingFailedException(param.getTable(), new 
RuntimeException("Sharding columns values is null when names not empty.")));
                 for (Object each : shardingColumnsValues) {
                     preparedStatement.setObject(parameterIndex++, each);
                 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
index 62d34286768..8b75761e472 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
@@ -20,11 +20,11 @@ package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calc
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 
 import javax.annotation.Nullable;
 import java.util.Collection;
@@ -45,7 +45,7 @@ public final class SingleTableInventoryCalculateParameter {
      */
     private final PipelineDataSource dataSource;
     
-    private final CaseInsensitiveQualifiedTable table;
+    private final QualifiedTable table;
     
     private final List<String> columnNames;
     
@@ -67,8 +67,8 @@ public final class SingleTableInventoryCalculateParameter {
     
     private final QueryType queryType;
     
-    public SingleTableInventoryCalculateParameter(final PipelineDataSource 
dataSource, final CaseInsensitiveQualifiedTable table, final List<String> 
columnNames,
-                                                  final 
List<PipelineColumnMetaData> uniqueKeys, final Object tableCheckPosition) {
+    public SingleTableInventoryCalculateParameter(final PipelineDataSource 
dataSource,
+                                                  final QualifiedTable table, 
final List<String> columnNames, final List<PipelineColumnMetaData> uniqueKeys, 
final Object tableCheckPosition) {
         this.dataSource = dataSource;
         this.table = table;
         this.columnNames = columnNames;
@@ -86,24 +86,6 @@ public final class SingleTableInventoryCalculateParameter {
         return dataSource.getDatabaseType();
     }
     
-    /**
-     * Get schema name.
-     *
-     * @return schema name
-     */
-    public String getSchemaName() {
-        return table.getSchemaName().getValue();
-    }
-    
-    /**
-     * Get logic table name.
-     *
-     * @return logic table name
-     */
-    public String getLogicTableName() {
-        return table.getTableName().getValue();
-    }
-    
     /**
      * Get first unique key.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineTableDataConsistencyCheckLoadingFailedException.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineTableDataConsistencyCheckLoadingFailedException.java
index 6641476adf8..6dee838a265 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineTableDataConsistencyCheckLoadingFailedException.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/exception/data/PipelineTableDataConsistencyCheckLoadingFailedException.java
@@ -18,8 +18,7 @@
 package org.apache.shardingsphere.data.pipeline.core.exception.data;
 
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.sqlstate.XOpenSQLState;
-
-import javax.annotation.Nullable;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 
 /**
  * Pipeline table data consistency check loading failed exception.
@@ -28,11 +27,11 @@ public final class 
PipelineTableDataConsistencyCheckLoadingFailedException exten
     
     private static final long serialVersionUID = 8965231249677009738L;
     
-    public PipelineTableDataConsistencyCheckLoadingFailedException(@Nullable 
final String schemaName, final String tableName) {
-        this(schemaName, tableName, null);
+    public PipelineTableDataConsistencyCheckLoadingFailedException(final 
QualifiedTable qualifiedTable) {
+        this(qualifiedTable, null);
     }
     
-    public PipelineTableDataConsistencyCheckLoadingFailedException(@Nullable 
final String schemaName, final String tableName, final Exception cause) {
-        super(XOpenSQLState.CONNECTION_EXCEPTION, 1, String.format("Data check 
table '%s' failed.", null != schemaName ? schemaName + "." + tableName : 
tableName), cause);
+    public PipelineTableDataConsistencyCheckLoadingFailedException(final 
QualifiedTable qualifiedTable, final Exception cause) {
+        super(XOpenSQLState.CONNECTION_EXCEPTION, 1, String.format("Data check 
table '%s' failed.", qualifiedTable), cause);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
index bb0ebca58cd..00f8efaa8ec 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfiguration.java
@@ -26,7 +26,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAn
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 
 import java.util.Collection;
@@ -85,8 +85,8 @@ public final class ImporterConfiguration {
      *
      * @return qualified tables
      */
-    public Collection<CaseInsensitiveQualifiedTable> getQualifiedTables() {
+    public Collection<QualifiedTable> getQualifiedTables() {
         return shardingColumnsMap.keySet().stream()
-                .map(ShardingSphereIdentifier::getValue).map(each -> new 
CaseInsensitiveQualifiedTable(tableAndSchemaNameMapper.getSchemaName(each), 
each)).collect(Collectors.toList());
+                .map(ShardingSphereIdentifier::getValue).map(each -> new 
QualifiedTable(tableAndSchemaNameMapper.getSchemaName(each), 
each)).collect(Collectors.toList());
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
index c5a51e3f570..c3edc165fe1 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilder.java
@@ -22,6 +22,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.dialect.DialectPi
 import 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.segment.PipelineSQLSegmentBuilder;
 import 
org.apache.shardingsphere.infra.database.core.spi.DatabaseTypedSPILoader;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 
 import javax.annotation.Nullable;
 import java.util.ArrayList;
@@ -47,22 +48,21 @@ public final class 
PipelineDataConsistencyCalculateSQLBuilder {
     /**
      * Build query range ordering SQL.
      *
-     * @param schemaName schema name
-     * @param tableName table name
+     * @param qualifiedTable qualified table
      * @param columnNames column names
      * @param uniqueKeys unique keys, it may be primary key, not null
      * @param queryRange query range
      * @param shardingColumnsNames sharding columns names
      * @return built SQL
      */
-    public String buildQueryRangeOrderingSQL(final String schemaName, final 
String tableName, final Collection<String> columnNames, final List<String> 
uniqueKeys, final QueryRange queryRange,
+    public String buildQueryRangeOrderingSQL(final QualifiedTable 
qualifiedTable, final Collection<String> columnNames, final List<String> 
uniqueKeys, final QueryRange queryRange,
                                              @Nullable final List<String> 
shardingColumnsNames) {
-        return 
dialectSQLBuilder.wrapWithPageQuery(buildQueryRangeOrderingSQL0(schemaName, 
tableName, columnNames, uniqueKeys, queryRange, shardingColumnsNames));
+        return 
dialectSQLBuilder.wrapWithPageQuery(buildQueryRangeOrderingSQL0(qualifiedTable, 
columnNames, uniqueKeys, queryRange, shardingColumnsNames));
     }
     
-    private String buildQueryRangeOrderingSQL0(final String schemaName, final 
String tableName, final Collection<String> columnNames, final List<String> 
uniqueKeys, final QueryRange queryRange,
+    private String buildQueryRangeOrderingSQL0(final QualifiedTable 
qualifiedTable, final Collection<String> columnNames, final List<String> 
uniqueKeys, final QueryRange queryRange,
                                                @Nullable final List<String> 
shardingColumnsNames) {
-        String qualifiedTableName = 
sqlSegmentBuilder.getQualifiedTableName(schemaName, tableName);
+        String qualifiedTableName = 
sqlSegmentBuilder.getQualifiedTableName(qualifiedTable.getSchemaName(), 
qualifiedTable.getTableName());
         String queryColumns = 
columnNames.stream().map(sqlSegmentBuilder::getEscapedIdentifier).collect(Collectors.joining(","));
         String firstUniqueKey = uniqueKeys.get(0);
         String orderByColumns = joinColumns(uniqueKeys, 
shardingColumnsNames).stream().map(each -> 
sqlSegmentBuilder.getEscapedIdentifier(each) + " 
ASC").collect(Collectors.joining(", "));
@@ -121,12 +121,12 @@ public final class 
PipelineDataConsistencyCalculateSQLBuilder {
     /**
      * Build CRC32 SQL.
      *
-     * @param schemaName schema name
-     * @param tableName table name
+     * @param qualifiedTable qualified table
      * @param columnName column name
      * @return built SQL
      */
-    public Optional<String> buildCRC32SQL(final String schemaName, final 
String tableName, final String columnName) {
-        return 
dialectSQLBuilder.buildCRC32SQL(sqlSegmentBuilder.getQualifiedTableName(schemaName,
 tableName), sqlSegmentBuilder.getEscapedIdentifier(columnName));
+    public Optional<String> buildCRC32SQL(final QualifiedTable qualifiedTable, 
final String columnName) {
+        return dialectSQLBuilder.buildCRC32SQL(
+                
sqlSegmentBuilder.getQualifiedTableName(qualifiedTable.getSchemaName(), 
qualifiedTable.getTableName()), 
sqlSegmentBuilder.getEscapedIdentifier(columnName));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngineTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngineTest.java
index 1c513940779..7637ad634da 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngineTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/checker/PipelineDataSourceCheckEngineTest.java
@@ -21,7 +21,7 @@ import 
org.apache.shardingsphere.data.pipeline.core.exception.job.PrepareJobWith
 import 
org.apache.shardingsphere.data.pipeline.core.importer.ImporterConfiguration;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -96,7 +96,7 @@ class PipelineDataSourceCheckEngineTest {
         when(connection.prepareStatement("SELECT * FROM t_order LIMIT 
1")).thenReturn(preparedStatement);
         when(preparedStatement.executeQuery()).thenReturn(resultSet);
         ImporterConfiguration importerConfig = 
mock(ImporterConfiguration.class);
-        
when(importerConfig.getQualifiedTables()).thenReturn(Collections.singleton(new 
CaseInsensitiveQualifiedTable(null, "t_order")));
+        
when(importerConfig.getQualifiedTables()).thenReturn(Collections.singleton(new 
QualifiedTable(null, "t_order")));
         assertDoesNotThrow(() -> 
pipelineDataSourceCheckEngine.checkTargetDataSources(dataSources, 
importerConfig));
     }
     
@@ -107,7 +107,7 @@ class PipelineDataSourceCheckEngineTest {
         when(preparedStatement.executeQuery()).thenReturn(resultSet);
         when(resultSet.next()).thenReturn(true);
         ImporterConfiguration importerConfig = 
mock(ImporterConfiguration.class);
-        
when(importerConfig.getQualifiedTables()).thenReturn(Collections.singleton(new 
CaseInsensitiveQualifiedTable(null, "t_order")));
+        
when(importerConfig.getQualifiedTables()).thenReturn(Collections.singleton(new 
QualifiedTable(null, "t_order")));
         assertThrows(PrepareJobWithTargetTableNotEmptyException.class, () -> 
pipelineDataSourceCheckEngine.checkTargetDataSources(dataSources, 
importerConfig));
     }
     
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
index 61f6c800bd0..c23537e161d 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
@@ -17,12 +17,12 @@
 
 package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
 
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
-import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
-import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
+import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
+import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -65,8 +65,7 @@ class CRC32SingleTableInventoryCalculatorTest {
     void setUp() throws SQLException {
         DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, "FIXTURE");
         List<PipelineColumnMetaData> uniqueKeys = 
Collections.singletonList(new PipelineColumnMetaData(1, "id", Types.INTEGER, 
"integer", false, true, true));
-        parameter = new 
SingleTableInventoryCalculateParameter(pipelineDataSource,
-                new CaseInsensitiveQualifiedTable(null, "foo_tbl"), 
Arrays.asList("foo_col", "bar_col"), uniqueKeys, Collections.emptyMap());
+        parameter = new 
SingleTableInventoryCalculateParameter(pipelineDataSource, new 
QualifiedTable(null, "foo_tbl"), Arrays.asList("foo_col", "bar_col"), 
uniqueKeys, Collections.emptyMap());
         when(pipelineDataSource.getDatabaseType()).thenReturn(databaseType);
         when(pipelineDataSource.getConnection()).thenReturn(connection);
     }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfigurationTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfigurationTest.java
index d0bbb7024aa..321884ab03d 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfigurationTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/importer/ImporterConfigurationTest.java
@@ -21,7 +21,7 @@ import 
org.apache.shardingsphere.data.pipeline.api.PipelineDataSourceConfigurati
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.mapper.TableAndSchemaNameMapper;
 import 
org.apache.shardingsphere.data.pipeline.core.ratelimit.JobRateLimitAlgorithm;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 import 
org.apache.shardingsphere.infra.metadata.identifier.ShardingSphereIdentifier;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.junit.jupiter.api.Test;
@@ -56,9 +56,8 @@ class ImporterConfigurationTest {
     void assertGetQualifiedTables() {
         TableAndSchemaNameMapper tableAndSchemaNameMapper = 
mock(TableAndSchemaNameMapper.class);
         
when(tableAndSchemaNameMapper.getSchemaName("foo_tbl")).thenReturn("foo_schema");
-        ImporterConfiguration importerConfig = new ImporterConfiguration(
-                mock(PipelineDataSourceConfiguration.class), 
Collections.singletonMap(new ShardingSphereIdentifier("foo_tbl"), 
Collections.singleton("foo_col")),
-                tableAndSchemaNameMapper, 1, 
mock(JobRateLimitAlgorithm.class), 1, 1);
-        assertThat(importerConfig.getQualifiedTables(), 
is(Collections.singletonList(new CaseInsensitiveQualifiedTable("foo_schema", 
"foo_tbl"))));
+        ImporterConfiguration importerConfig = new 
ImporterConfiguration(mock(PipelineDataSourceConfiguration.class),
+                Collections.singletonMap(new 
ShardingSphereIdentifier("foo_tbl"), Collections.singleton("foo_col")), 
tableAndSchemaNameMapper, 1, mock(JobRateLimitAlgorithm.class), 1, 1);
+        assertThat(importerConfig.getQualifiedTables(), 
is(Collections.singletonList(new QualifiedTable("foo_schema", "foo_tbl"))));
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilderTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilderTest.java
index 352b1f1cd96..06ab86fae9f 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilderTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/sqlbuilder/sql/PipelineDataConsistencyCalculateSQLBuilderTest.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.core.sqlbuilder.sql;
 
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.junit.jupiter.api.Test;
 
@@ -43,19 +44,19 @@ class PipelineDataConsistencyCalculateSQLBuilderTest {
     
     @Test
     void assertBuildQueryRangeOrderingSQLWithoutQueryCondition() {
-        String actual = sqlBuilder.buildQueryRangeOrderingSQL(null, "t_order", 
COLUMN_NAMES, UNIQUE_KEYS,
+        String actual = sqlBuilder.buildQueryRangeOrderingSQL(new 
QualifiedTable(null, "t_order"), COLUMN_NAMES, UNIQUE_KEYS,
                 new QueryRange(1, true, 5), SHARDING_COLUMNS_NAMES);
         assertThat(actual, is("SELECT order_id,user_id,status FROM t_order 
WHERE order_id>=? AND order_id<=? ORDER BY order_id ASC, status ASC, user_id 
ASC"));
-        actual = sqlBuilder.buildQueryRangeOrderingSQL(null, "t_order", 
COLUMN_NAMES, UNIQUE_KEYS,
+        actual = sqlBuilder.buildQueryRangeOrderingSQL(new 
QualifiedTable(null, "t_order"), COLUMN_NAMES, UNIQUE_KEYS,
                 new QueryRange(1, false, 5), SHARDING_COLUMNS_NAMES);
         assertThat(actual, is("SELECT order_id,user_id,status FROM t_order 
WHERE order_id>? AND order_id<=? ORDER BY order_id ASC, status ASC, user_id 
ASC"));
-        actual = sqlBuilder.buildQueryRangeOrderingSQL(null, "t_order", 
COLUMN_NAMES, UNIQUE_KEYS,
+        actual = sqlBuilder.buildQueryRangeOrderingSQL(new 
QualifiedTable(null, "t_order"), COLUMN_NAMES, UNIQUE_KEYS,
                 new QueryRange(1, false, null), SHARDING_COLUMNS_NAMES);
         assertThat(actual, is("SELECT order_id,user_id,status FROM t_order 
WHERE order_id>? ORDER BY order_id ASC, status ASC, user_id ASC"));
-        actual = sqlBuilder.buildQueryRangeOrderingSQL(null, "t_order", 
COLUMN_NAMES, UNIQUE_KEYS,
+        actual = sqlBuilder.buildQueryRangeOrderingSQL(new 
QualifiedTable(null, "t_order"), COLUMN_NAMES, UNIQUE_KEYS,
                 new QueryRange(null, false, 5), SHARDING_COLUMNS_NAMES);
         assertThat(actual, is("SELECT order_id,user_id,status FROM t_order 
WHERE order_id<=? ORDER BY order_id ASC, status ASC, user_id ASC"));
-        actual = sqlBuilder.buildQueryRangeOrderingSQL(null, "t_order", 
COLUMN_NAMES, UNIQUE_KEYS,
+        actual = sqlBuilder.buildQueryRangeOrderingSQL(new 
QualifiedTable(null, "t_order"), COLUMN_NAMES, UNIQUE_KEYS,
                 new QueryRange(null, false, null), SHARDING_COLUMNS_NAMES);
         assertThat(actual, is("SELECT order_id,user_id,status FROM t_order 
ORDER BY order_id ASC, status ASC, user_id ASC"));
     }
@@ -72,7 +73,7 @@ class PipelineDataConsistencyCalculateSQLBuilderTest {
     
     @Test
     void assertBuildCRC32SQL() {
-        Optional<String> actual = sqlBuilder.buildCRC32SQL("foo_schema", 
"foo_tbl", "foo_col");
+        Optional<String> actual = sqlBuilder.buildCRC32SQL(new 
QualifiedTable("foo_schema", "foo_tbl"), "foo_col");
         assertThat(actual, is(Optional.of("SELECT CRC32(foo_col) FROM 
foo_tbl")));
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index b8722ff9ca3..dc1ab7df67e 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -45,6 +45,7 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.Migrati
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
@@ -124,15 +125,16 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
     
     private TableDataConsistencyCheckResult 
checkSingleTableInventoryData(final String targetTableName, final DataNode 
dataNode,
                                                                           
final TableDataConsistencyChecker tableChecker, final PipelineDataSourceManager 
dataSourceManager) {
-        CaseInsensitiveQualifiedTable sourceTable = new 
CaseInsensitiveQualifiedTable(dataNode.getSchemaName(), 
dataNode.getTableName());
-        CaseInsensitiveQualifiedTable targetTable = new 
CaseInsensitiveQualifiedTable(dataNode.getSchemaName(), targetTableName);
+        QualifiedTable sourceTable = new 
QualifiedTable(dataNode.getSchemaName(), dataNode.getTableName());
+        QualifiedTable targetTable = new 
QualifiedTable(dataNode.getSchemaName(), targetTableName);
         PipelineDataSource sourceDataSource = 
dataSourceManager.getDataSource(jobConfig.getSources().get(dataNode.getDataSourceName()));
         PipelineDataSource targetDataSource = 
dataSourceManager.getDataSource(jobConfig.getTarget());
         PipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(sourceDataSource);
         PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(dataNode.getSchemaName(), 
dataNode.getTableName());
-        ShardingSpherePreconditions.checkNotNull(tableMetaData, () -> new 
PipelineTableDataConsistencyCheckLoadingFailedException(dataNode.getSchemaName(),
 dataNode.getTableName()));
+        ShardingSpherePreconditions.checkNotNull(tableMetaData,
+                () -> new 
PipelineTableDataConsistencyCheckLoadingFailedException(new 
QualifiedTable(dataNode.getSchemaName(), dataNode.getTableName())));
         List<String> columnNames = tableMetaData.getColumnNames();
-        List<PipelineColumnMetaData> uniqueKeys = 
PipelineTableMetaDataUtils.getUniqueKeyColumns(sourceTable.getSchemaName().getValue(),
 sourceTable.getTableName().getValue(), metaDataLoader);
+        List<PipelineColumnMetaData> uniqueKeys = 
PipelineTableMetaDataUtils.getUniqueKeyColumns(sourceTable.getSchemaName(), 
sourceTable.getTableName(), metaDataLoader);
         TableInventoryCheckParameter param = new TableInventoryCheckParameter(
                 jobConfig.getJobId(), sourceDataSource, targetDataSource, 
sourceTable, targetTable, columnNames, uniqueKeys, readRateLimitAlgorithm, 
progressContext);
         TableInventoryChecker tableInventoryChecker = 
tableChecker.buildTableInventoryChecker(param);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index 0cd02fe9260..8297becec5e 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -32,15 +32,15 @@ import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.Tabl
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
+import 
org.apache.shardingsphere.infra.algorithm.keygen.snowflake.SnowflakeKeyGenerateAlgorithm;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import 
org.apache.shardingsphere.infra.algorithm.keygen.snowflake.SnowflakeKeyGenerateAlgorithm;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.task.E2EIncrementalTask;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.helper.PipelineCaseHelper;
@@ -133,12 +133,12 @@ class CDCE2EIT {
                 containerComposer.proxyExecuteWithLog(String.format("INSERT 
INTO %s (order_id, user_id, status) VALUES (%d, %d, 'OK')", tableName, orderId, 
i), 0);
                 containerComposer.assertOrderRecordExist(targetDataSource, 
tableName, orderId);
             }
-            CaseInsensitiveQualifiedTable orderSchemaTableName = 
dialectDatabaseMetaData.isSchemaAvailable()
-                    ? new 
CaseInsensitiveQualifiedTable(PipelineContainerComposer.SCHEMA_NAME, 
SOURCE_TABLE_NAME)
-                    : new CaseInsensitiveQualifiedTable(null, 
SOURCE_TABLE_NAME);
-            assertDataMatched(sourceDataSource, targetDataSource, 
orderSchemaTableName);
-            assertDataMatched(sourceDataSource, targetDataSource, new 
CaseInsensitiveQualifiedTable(null, "t_address"));
-            assertDataMatched(sourceDataSource, targetDataSource, new 
CaseInsensitiveQualifiedTable(null, "t_single"));
+            QualifiedTable orderQualifiedTable = 
dialectDatabaseMetaData.isSchemaAvailable()
+                    ? new 
QualifiedTable(PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME)
+                    : new QualifiedTable(null, SOURCE_TABLE_NAME);
+            assertDataMatched(sourceDataSource, targetDataSource, 
orderQualifiedTable);
+            assertDataMatched(sourceDataSource, targetDataSource, new 
QualifiedTable(null, "t_address"));
+            assertDataMatched(sourceDataSource, targetDataSource, new 
QualifiedTable(null, "t_single"));
             cdcClient.close();
             Awaitility.await().atMost(10L, 
TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS).until(() -> 
containerComposer.queryForListWithLog("SHOW STREAMING LIST")
                     .stream().noneMatch(each -> 
Boolean.parseBoolean(each.get("active").toString())));
@@ -186,12 +186,12 @@ class CDCE2EIT {
         return result;
     }
     
-    private void assertDataMatched(final PipelineDataSource sourceDataSource, 
final PipelineDataSource targetDataSource, final CaseInsensitiveQualifiedTable 
schemaTableName) {
+    private void assertDataMatched(final PipelineDataSource sourceDataSource, 
final PipelineDataSource targetDataSource, final QualifiedTable qualifiedTable) 
{
         StandardPipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(targetDataSource);
-        PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(schemaTableName.getSchemaName().getValue(), 
schemaTableName.getTableName().getValue());
+        PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(qualifiedTable.getSchemaName(), 
qualifiedTable.getTableName());
         List<PipelineColumnMetaData> uniqueKeys = 
Collections.singletonList(tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0)));
         ConsistencyCheckJobItemProgressContext progressContext = new 
ConsistencyCheckJobItemProgressContext("", 0, 
sourceDataSource.getDatabaseType().getType());
-        TableInventoryCheckParameter param = new 
TableInventoryCheckParameter("", sourceDataSource, targetDataSource, 
schemaTableName, schemaTableName,
+        TableInventoryCheckParameter param = new 
TableInventoryCheckParameter("", sourceDataSource, targetDataSource, 
qualifiedTable, qualifiedTable,
                 tableMetaData.getColumnNames(), uniqueKeys, null, 
progressContext);
         TableDataConsistencyChecker tableChecker = 
TypedSPILoader.getService(TableDataConsistencyChecker.class, "DATA_MATCH", new 
Properties());
         TableDataConsistencyCheckResult checkResult = 
tableChecker.buildTableInventoryChecker(param).checkSingleTableInventoryData();
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
index ac74e3ac676..f0956e547a2 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
@@ -22,11 +22,11 @@ import org.apache.commons.lang3.RandomStringUtils;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.RecordSingleTableInventoryCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
-import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
-import 
org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveQualifiedTable;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -123,14 +123,14 @@ class RecordSingleTableInventoryCalculatorTest {
     
     private SingleTableInventoryCalculateParameter generateParameter(final 
PipelineDataSource dataSource, final Object dataCheckPosition) {
         List<PipelineColumnMetaData> uniqueKeys = 
Collections.singletonList(new PipelineColumnMetaData(1, "order_id", 
Types.INTEGER, "integer", false, true, true));
-        return new SingleTableInventoryCalculateParameter(dataSource, new 
CaseInsensitiveQualifiedTable(null, "t_order"), Collections.emptyList(), 
uniqueKeys, dataCheckPosition);
+        return new SingleTableInventoryCalculateParameter(dataSource, new 
QualifiedTable(null, "t_order"), Collections.emptyList(), uniqueKeys, 
dataCheckPosition);
     }
     
     @Test
     void assertCalculateOfRangeQuery() {
         RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(1000);
-        SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new 
CaseInsensitiveQualifiedTable(null, "t_order"),
-                Collections.emptyList(), buildUniqueKeys(), 
QueryType.RANGE_QUERY);
+        SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(
+                dataSource, new QualifiedTable(null, "t_order"), 
Collections.emptyList(), buildUniqueKeys(), QueryType.RANGE_QUERY);
         param.setQueryRange(new QueryRange(3, true, 7));
         Optional<SingleTableInventoryCalculatedResult> calculatedResult = 
calculator.calculateChunk(param);
         assertTrue(calculatedResult.isPresent());
@@ -143,8 +143,8 @@ class RecordSingleTableInventoryCalculatorTest {
     @Test
     void assertCalculateOfRangeQueryAll() {
         RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(3);
-        SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new 
CaseInsensitiveQualifiedTable(null, "t_order"),
-                Collections.emptyList(), buildUniqueKeys(), 
QueryType.RANGE_QUERY);
+        SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource,
+                new QualifiedTable(null, "t_order"), Collections.emptyList(), 
buildUniqueKeys(), QueryType.RANGE_QUERY);
         param.setQueryRange(new QueryRange(null, false, null));
         Iterator<SingleTableInventoryCalculatedResult> resultIterator = 
calculator.calculate(param).iterator();
         RecordSingleTableInventoryCalculatedResult actual = 
(RecordSingleTableInventoryCalculatedResult) resultIterator.next();
@@ -183,8 +183,8 @@ class RecordSingleTableInventoryCalculatorTest {
     @Test
     void assertCalculateOfPointQuery() {
         RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(3);
-        SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new 
CaseInsensitiveQualifiedTable(null, "t_order"),
-                Collections.emptyList(), buildUniqueKeys(), 
QueryType.POINT_QUERY);
+        SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource,
+                new QualifiedTable(null, "t_order"), Collections.emptyList(), 
buildUniqueKeys(), QueryType.POINT_QUERY);
         param.setUniqueKeysValues(Arrays.asList(3, 3));
         Optional<SingleTableInventoryCalculatedResult> calculatedResult = 
calculator.calculateChunk(param);
         assertTrue(calculatedResult.isPresent());

Reply via email to