This is an automated email from the ASF dual-hosted git repository.

sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 957e8e74a75 Review and refactor pipeline (#28266)
957e8e74a75 is described below

commit 957e8e74a75e935275c4e251cb401bca15c65469
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Aug 26 21:50:17 2023 +0800

    Review and refactor pipeline (#28266)
    
    * Verify consistency check algorithm and database type before starting 
consistency check
    
    * Isolate SingleTableInventoryCalculator for 
MatchingTableDataConsistencyChecker
    
    * Clean code
    
    * Improve DataMatchTableDataConsistencyChecker.getChunkSize
    
    * Refactor MigrationDataConsistencyChecker
    
    * Rename TableDataConsistencyCheckParameter to TableInventoryCheckParameter
    
    * Extract JobDataNodeLineConvertUtils.buildTableNameMap
    
    * Add TableDataConsistencyChecker.isBreakOnInventoryCheckNotMatched
    
    * Refactor MigrationDataConsistencyChecker
    
    * Add schemaName in DataRecord
    
    * Add actualTableName in DataRecord
---
 .../pipeline/api/ingest/record/DataRecord.java     | 11 ++-
 .../datanode/JobDataNodeLineConvertUtils.java      | 18 +++++
 .../CRC32MatchTableDataConsistencyChecker.java     |  6 +-
 .../DataMatchTableDataConsistencyChecker.java      | 22 +++---
 .../table/MatchingTableDataConsistencyChecker.java | 29 +++++---
 .../table/TableDataConsistencyChecker.java         | 11 ++-
 ...eter.java => TableInventoryCheckParameter.java} |  4 +-
 .../DataMatchTableDataConsistencyCheckerTest.java  | 51 ++++++++++++++
 .../mysql/ingest/MySQLIncrementalDumper.java       |  1 +
 .../ingest/wal/decode/MppdbDecodingPlugin.java     |  2 +-
 .../postgresql/ingest/wal/WALEventConverter.java   |  3 +-
 .../ingest/wal/decode/TestDecodingPlugin.java      |  2 +-
 .../ingest/wal/event/AbstractRowEvent.java         |  2 +-
 .../ingest/wal/WALEventConverterTest.java          | 12 ++--
 .../handler/update/CheckMigrationJobUpdater.java   | 10 ++-
 .../api/impl/ConsistencyCheckJobAPI.java           | 10 +++
 .../pojo/CreateConsistencyCheckJobParameter.java   |  5 ++
 .../migration/api/impl/MigrationJobAPI.java        | 12 +---
 .../MigrationDataConsistencyChecker.java           | 78 +++++++++-------------
 .../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java |  4 +-
 .../primarykey/IndexesMigrationE2EIT.java          |  6 +-
 .../TableDataConsistencyCheckerFixture.java        |  4 +-
 .../api/impl/ConsistencyCheckJobAPITest.java       | 28 +++++---
 23 files changed, 218 insertions(+), 113 deletions(-)

diff --git 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
index b0f0fa5622b..2072dd6d169 100644
--- 
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
+++ 
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
@@ -33,12 +33,14 @@ import java.util.List;
  */
 @Getter
 @Setter
-@EqualsAndHashCode(of = "tableName", callSuper = false)
+@EqualsAndHashCode(of = {"schemaName", "tableName"}, callSuper = false)
 @ToString
 public final class DataRecord extends Record {
     
     private final String type;
     
+    private final String schemaName;
+    
     private final String tableName;
     
     private final List<Column> columns;
@@ -47,11 +49,18 @@ public final class DataRecord extends Record {
     
     private final List<Object> oldUniqueKeyValues = new ArrayList<>();
     
+    private String actualTableName;
+    
     private Long csn;
     
     public DataRecord(final String type, final String tableName, final 
IngestPosition position, final int columnCount) {
+        this(type, null, tableName, position, columnCount);
+    }
+    
+    public DataRecord(final String type, final String schemaName, final String 
tableName, final IngestPosition position, final int columnCount) {
         super(position);
         this.type = type;
+        this.schemaName = schemaName;
         this.tableName = tableName;
         columns = new ArrayList<>(columnCount);
     }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java
index 6285d3a23b7..5cf249adc30 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java
@@ -20,6 +20,8 @@ package 
org.apache.shardingsphere.data.pipeline.common.datanode;
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 
 import java.util.LinkedHashMap;
@@ -65,4 +67,20 @@ public final class JobDataNodeLineConvertUtils {
         }
         return result;
     }
+    
+    /**
+     * Build table name map.
+     *
+     * @param dataNodeLine data node line
+     * @return actual table and logic table map
+     */
+    public static Map<ActualTableName, LogicTableName> buildTableNameMap(final 
JobDataNodeLine dataNodeLine) {
+        Map<ActualTableName, LogicTableName> result = new LinkedHashMap<>();
+        for (JobDataNodeEntry each : dataNodeLine.getEntries()) {
+            for (DataNode dataNode : each.getDataNodes()) {
+                result.put(new ActualTableName(dataNode.getTableName()), new 
LogicTableName(each.getLogicTableName()));
+            }
+        }
+        return result;
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
index eef9d20fb0a..11e2479a393 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
@@ -33,11 +33,9 @@ import java.util.LinkedList;
 @SPIDescription("Match CRC32 of records.")
 public final class CRC32MatchTableDataConsistencyChecker extends 
MatchingTableDataConsistencyChecker {
     
-    private final SingleTableInventoryCalculator calculator = new 
CRC32SingleTableInventoryCalculator();
-    
     @Override
-    protected SingleTableInventoryCalculator 
getSingleTableInventoryCalculator() {
-        return calculator;
+    protected SingleTableInventoryCalculator 
buildSingleTableInventoryCalculator() {
+        return new CRC32SingleTableInventoryCalculator();
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
index 88589b64bcd..f40ff4609a5 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
@@ -17,9 +17,11 @@
 
 package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
 
+import com.google.common.base.Strings;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.RecordSingleTableInventoryCalculator;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculator;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
@@ -38,31 +40,33 @@ public final class DataMatchTableDataConsistencyChecker 
extends MatchingTableDat
     
     private static final int DEFAULT_CHUNK_SIZE = 1000;
     
-    private SingleTableInventoryCalculator calculator;
+    private int chunkSize;
     
     @Override
     public void init(final Properties props) {
-        calculator = new 
RecordSingleTableInventoryCalculator(getChunkSize(props));
+        chunkSize = getChunkSize(props);
     }
     
     private int getChunkSize(final Properties props) {
+        String chunkSizeText = props.getProperty(CHUNK_SIZE_KEY);
+        if (Strings.isNullOrEmpty(chunkSizeText)) {
+            return DEFAULT_CHUNK_SIZE;
+        }
         int result;
         try {
-            result = Integer.parseInt(props.getProperty(CHUNK_SIZE_KEY, 
Integer.toString(DEFAULT_CHUNK_SIZE)));
+            result = Integer.parseInt(chunkSizeText);
         } catch (final NumberFormatException ignore) {
-            log.warn("'chunk-size' is not a valid number, use default value 
{}", DEFAULT_CHUNK_SIZE);
-            return DEFAULT_CHUNK_SIZE;
+            throw new PipelineInvalidParameterException("'chunk-size' is not a 
valid number: `" + chunkSizeText + "`");
         }
         if (result <= 0) {
-            log.warn("Invalid 'chunk-size': {}, use default value {}", result, 
DEFAULT_CHUNK_SIZE);
-            return DEFAULT_CHUNK_SIZE;
+            throw new PipelineInvalidParameterException("Invalid 'chunk-size': 
" + result);
         }
         return result;
     }
     
     @Override
-    protected SingleTableInventoryCalculator 
getSingleTableInventoryCalculator() {
-        return calculator;
+    protected SingleTableInventoryCalculator 
buildSingleTableInventoryCalculator() {
+        return new RecordSingleTableInventoryCalculator(chunkSize);
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
index 79518e3edb4..496e3a75174 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
@@ -35,8 +35,10 @@ import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFact
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
 import java.sql.SQLException;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -51,8 +53,10 @@ import java.util.concurrent.TimeUnit;
 @RequiredArgsConstructor
 public abstract class MatchingTableDataConsistencyChecker implements 
TableDataConsistencyChecker {
     
+    private final Set<SingleTableInventoryCalculator> calculators = new 
HashSet<>();
+    
     @Override
-    public TableDataConsistencyCheckResult checkSingleTableInventoryData(final 
TableDataConsistencyCheckParameter param) {
+    public TableDataConsistencyCheckResult checkSingleTableInventoryData(final 
TableInventoryCheckParameter param) {
         ThreadFactory threadFactory = 
ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(param.getJobId()) + 
"-check-%d");
         ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60, 
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
         try {
@@ -63,25 +67,30 @@ public abstract class MatchingTableDataConsistencyChecker 
implements TableDataCo
         }
     }
     
-    private TableDataConsistencyCheckResult 
checkSingleTableInventoryData(final TableDataConsistencyCheckParameter param, 
final ThreadPoolExecutor executor) {
+    private TableDataConsistencyCheckResult 
checkSingleTableInventoryData(final TableInventoryCheckParameter param, final 
ThreadPoolExecutor executor) {
         SingleTableInventoryCalculateParameter sourceParam = new 
SingleTableInventoryCalculateParameter(param.getSourceDataSource(), 
param.getSourceTable(),
                 param.getColumnNames(), param.getUniqueKeys(), 
param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName().getOriginal()));
         SingleTableInventoryCalculateParameter targetParam = new 
SingleTableInventoryCalculateParameter(param.getTargetDataSource(), 
param.getTargetTable(),
                 param.getColumnNames(), param.getUniqueKeys(), 
param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName().getOriginal()));
-        SingleTableInventoryCalculator calculator = 
getSingleTableInventoryCalculator();
-        Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults 
= waitFuture(executor.submit(() -> 
calculator.calculate(sourceParam))).iterator();
-        Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults 
= waitFuture(executor.submit(() -> 
calculator.calculate(targetParam))).iterator();
+        SingleTableInventoryCalculator sourceCalculator = 
buildSingleTableInventoryCalculator();
+        calculators.add(sourceCalculator);
+        SingleTableInventoryCalculator targetCalculator = 
buildSingleTableInventoryCalculator();
+        calculators.add(targetCalculator);
+        Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults 
= waitFuture(executor.submit(() -> 
sourceCalculator.calculate(sourceParam))).iterator();
+        Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults 
= waitFuture(executor.submit(() -> 
targetCalculator.calculate(targetParam))).iterator();
         try {
             return checkSingleTableInventoryData(sourceCalculatedResults, 
targetCalculatedResults, param, executor);
         } finally {
             QuietlyCloser.close(sourceParam.getCalculationContext());
             QuietlyCloser.close(targetParam.getCalculationContext());
+            calculators.remove(sourceCalculator);
+            calculators.remove(targetCalculator);
         }
     }
     
     private TableDataConsistencyCheckResult 
checkSingleTableInventoryData(final 
Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults,
                                                                           
final Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults,
-                                                                          
final TableDataConsistencyCheckParameter param, final ThreadPoolExecutor 
executor) {
+                                                                          
final TableInventoryCheckParameter param, final ThreadPoolExecutor executor) {
         YamlTableDataConsistencyCheckResult checkResult = new 
YamlTableDataConsistencyCheckResult(new 
YamlTableDataConsistencyCountCheckResult(), new 
YamlTableDataConsistencyContentCheckResult(true));
         while (sourceCalculatedResults.hasNext() && 
targetCalculatedResults.hasNext()) {
             if (null != param.getReadRateLimitAlgorithm()) {
@@ -137,15 +146,17 @@ public abstract class MatchingTableDataConsistencyChecker 
implements TableDataCo
         }
     }
     
-    protected abstract SingleTableInventoryCalculator 
getSingleTableInventoryCalculator();
+    protected abstract SingleTableInventoryCalculator 
buildSingleTableInventoryCalculator();
     
     @Override
     public void cancel() {
-        getSingleTableInventoryCalculator().cancel();
+        for (SingleTableInventoryCalculator each : calculators) {
+            each.cancel();
+        }
     }
     
     @Override
     public boolean isCanceling() {
-        return getSingleTableInventoryCalculator().isCanceling();
+        return 
calculators.stream().anyMatch(SingleTableInventoryCalculator::isCanceling);
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
index a46aff91391..2a126301f25 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
@@ -35,7 +35,16 @@ public interface TableDataConsistencyChecker extends 
ShardingSphereAlgorithm, Pi
      * @param param check parameter
      * @return check result
      */
-    TableDataConsistencyCheckResult 
checkSingleTableInventoryData(TableDataConsistencyCheckParameter param);
+    TableDataConsistencyCheckResult 
checkSingleTableInventoryData(TableInventoryCheckParameter param);
+    
+    /**
+     * Is break on inventory check not matched.
+     *
+     * @return break or not
+     */
+    default boolean isBreakOnInventoryCheckNotMatched() {
+        return true;
+    }
     
     /**
      * Get supported database types.
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyCheckParameter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
similarity index 95%
rename from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyCheckParameter.java
rename to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
index 54b58311571..c89fe51a300 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyCheckParameter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
@@ -28,11 +28,11 @@ import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorit
 import java.util.List;
 
 /**
- * Table data consistency check parameter.
+ * Table inventory check parameter.
  */
 @RequiredArgsConstructor
 @Getter
-public final class TableDataConsistencyCheckParameter {
+public final class TableInventoryCheckParameter {
     
     private final String jobId;
     
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/DataMatchTableDataConsistencyCheckerTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/DataMatchTableDataConsistencyCheckerTest.java
new file mode 100644
index 00000000000..01a3dddc40e
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/DataMatchTableDataConsistencyCheckerTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
+
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.DataMatchTableDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class DataMatchTableDataConsistencyCheckerTest {
+    
+    @Test
+    void assertInitSuccess() {
+        for (String each : Arrays.asList("1", "1000")) {
+            new 
DataMatchTableDataConsistencyChecker().init(buildAlgorithmProperties(each));
+        }
+    }
+    
+    @Test
+    void assertInitFailure() {
+        assertThrows(PipelineInvalidParameterException.class, () -> new 
DataMatchTableDataConsistencyChecker().init(buildAlgorithmProperties("xyz")));
+        for (String each : Arrays.asList("0", "-1")) {
+            assertThrows(PipelineInvalidParameterException.class, () -> new 
DataMatchTableDataConsistencyChecker().init(buildAlgorithmProperties(each)));
+        }
+    }
+    
+    private Properties buildAlgorithmProperties(final String chunkSize) {
+        Properties result = new Properties();
+        result.put("chunk-size", chunkSize);
+        return result;
+    }
+}
diff --git 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 0814054fa88..0a0eeb75081 100644
--- 
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++ 
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -237,6 +237,7 @@ public final class MySQLIncrementalDumper extends 
AbstractLifecycleExecutor impl
         String tableName = 
dumperConfig.getLogicTableName(rowsEvent.getTableName()).getOriginal();
         IngestPosition position = new BinlogPosition(rowsEvent.getFileName(), 
rowsEvent.getPosition(), rowsEvent.getServerId());
         DataRecord result = new DataRecord(type, tableName, position, 
columnCount);
+        result.setActualTableName(rowsEvent.getTableName());
         result.setCommitTime(rowsEvent.getTimestamp() * 1000);
         return result;
     }
diff --git 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
index 8203f1245c5..19fca8ed5a9 100644
--- 
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
+++ 
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
@@ -113,7 +113,7 @@ public final class MppdbDecodingPlugin implements 
DecodingPlugin {
                 throw new IngestException("Unknown rowEventType: " + 
rowEventType);
         }
         String[] tableMetaData = mppTableData.getTableName().split("\\.");
-        result.setDatabaseName(tableMetaData[0]);
+        result.setSchemaName(tableMetaData[0]);
         result.setTableName(tableMetaData[1]);
         return result;
     }
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index fc909f1eec0..bd261596024 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -121,7 +121,8 @@ public final class WALEventConverter {
     
     private DataRecord createDataRecord(final String type, final 
AbstractRowEvent rowsEvent, final int columnCount) {
         String tableName = 
dumperConfig.getLogicTableName(rowsEvent.getTableName()).getOriginal();
-        DataRecord result = new DataRecord(type, tableName, new 
WALPosition(rowsEvent.getLogSequenceNumber()), columnCount);
+        DataRecord result = new DataRecord(type, rowsEvent.getSchemaName(), 
tableName, new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount);
+        result.setActualTableName(rowsEvent.getTableName());
         result.setCsn(rowsEvent.getCsn());
         return result;
     }
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
index 07bf1e43384..a8c5499c658 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
@@ -81,7 +81,7 @@ public final class TestDecodingPlugin implements 
DecodingPlugin {
                 throw new IngestException("Unknown rowEventType: " + 
rowEventType);
         }
         String[] tableMetaData = tableName.split("\\.");
-        result.setDatabaseName(tableMetaData[0]);
+        result.setSchemaName(tableMetaData[0]);
         result.setTableName(tableMetaData[1].substring(0, 
tableMetaData[1].length() - 1));
         return result;
     }
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
index 6cba665d907..bc57fbf4c18 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
@@ -29,7 +29,7 @@ import lombok.ToString;
 @ToString(callSuper = true)
 public abstract class AbstractRowEvent extends AbstractWALEvent {
     
-    private String databaseName;
+    private String schemaName;
     
     private String tableName;
     
diff --git 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 9269cf5113e..1cd12fdb8b8 100644
--- 
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++ 
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -134,7 +134,7 @@ class WALEventConverterTest {
     private void assertWriteRowEvent0(final Map<LogicTableName, 
Set<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws 
ReflectiveOperationException {
         dumperConfig.setTargetTableColumnsMap(targetTableColumnsMap);
         WriteRowEvent rowsEvent = new WriteRowEvent();
-        rowsEvent.setDatabaseName("");
+        rowsEvent.setSchemaName("");
         rowsEvent.setTableName("t_order");
         rowsEvent.setAfterRow(Arrays.asList(101, 1, "OK"));
         Method method = 
WALEventConverter.class.getDeclaredMethod("handleWriteRowEvent", 
WriteRowEvent.class, PipelineTableMetaData.class);
@@ -201,14 +201,14 @@ class WALEventConverterTest {
     void assertConvertFailure() {
         AbstractRowEvent event = new AbstractRowEvent() {
         };
-        event.setDatabaseName("");
+        event.setSchemaName("");
         event.setTableName("t_order");
         assertThrows(UnsupportedSQLOperationException.class, () -> 
walEventConverter.convert(event));
     }
     
     private AbstractRowEvent mockWriteRowEvent() {
         WriteRowEvent result = new WriteRowEvent();
-        result.setDatabaseName("");
+        result.setSchemaName("");
         result.setTableName("t_order");
         result.setAfterRow(Arrays.asList("id", "user_id"));
         return result;
@@ -216,7 +216,7 @@ class WALEventConverterTest {
     
     private AbstractRowEvent mockUpdateRowEvent() {
         UpdateRowEvent result = new UpdateRowEvent();
-        result.setDatabaseName("");
+        result.setSchemaName("");
         result.setTableName("t_order");
         result.setAfterRow(Arrays.asList("id", "user_id"));
         return result;
@@ -224,7 +224,7 @@ class WALEventConverterTest {
     
     private AbstractRowEvent mockDeleteRowEvent() {
         DeleteRowEvent result = new DeleteRowEvent();
-        result.setDatabaseName("");
+        result.setSchemaName("");
         result.setTableName("t_order");
         result.setPrimaryKeys(Collections.singletonList("id"));
         return result;
@@ -232,7 +232,7 @@ class WALEventConverterTest {
     
     private AbstractRowEvent mockUnknownTableEvent() {
         WriteRowEvent result = new WriteRowEvent();
-        result.setDatabaseName("");
+        result.setSchemaName("");
         result.setTableName("t_other");
         return result;
     }
diff --git 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
index b18055ded38..35e684178e3 100644
--- 
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
+++ 
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
@@ -19,6 +19,8 @@ package 
org.apache.shardingsphere.migration.distsql.handler.update;
 
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
 import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
 import 
org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
@@ -31,14 +33,18 @@ import java.util.Properties;
  */
 public final class CheckMigrationJobUpdater implements 
RALUpdater<CheckMigrationStatement> {
     
-    private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI();
+    private final ConsistencyCheckJobAPI checkJobAPI = new 
ConsistencyCheckJobAPI();
+    
+    private final MigrationJobAPI migrationJobAPI = new MigrationJobAPI();
     
     @Override
     public void executeUpdate(final String databaseName, final 
CheckMigrationStatement sqlStatement) throws SQLException {
         AlgorithmSegment typeStrategy = sqlStatement.getTypeStrategy();
         String algorithmTypeName = null == typeStrategy ? null : 
typeStrategy.getName();
         Properties algorithmProps = null == typeStrategy ? null : 
typeStrategy.getProps();
-        jobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(sqlStatement.getJobId(), algorithmTypeName, 
algorithmProps));
+        String jobId = sqlStatement.getJobId();
+        MigrationJobConfiguration jobConfig = 
migrationJobAPI.getJobConfiguration(jobId);
+        checkJobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps, 
jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType()));
     }
     
     @Override
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 6473566fe4f..e5162fab517 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -38,6 +38,8 @@ import 
org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
 import 
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.ConsistencyCheckJobNotFoundException;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
 import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
@@ -55,6 +57,7 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;
 import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -108,6 +111,7 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
                 throw new 
UncompletedConsistencyCheckJobExistsException(latestCheckJobId.get());
             }
         }
+        verifyPipelineDatabaseType(param);
         PipelineContextKey contextKey = 
PipelineJobIdUtils.parseContextKey(parentJobId);
         String result = marshalJobId(latestCheckJobId.map(s -> new 
ConsistencyCheckJobId(contextKey, parentJobId, s)).orElseGet(() -> new 
ConsistencyCheckJobId(contextKey, parentJobId)));
         repositoryAPI.persistLatestCheckJobId(parentJobId, result);
@@ -122,6 +126,12 @@ public final class ConsistencyCheckJobAPI extends 
AbstractPipelineJobAPIImpl {
         return result;
     }
     
+    private void verifyPipelineDatabaseType(final 
CreateConsistencyCheckJobParameter param) {
+        Collection<DatabaseType> supportedDatabaseTypes = 
TableDataConsistencyCheckerFactory.newInstance(param.getAlgorithmTypeName(), 
param.getAlgorithmProps()).getSupportedDatabaseTypes();
+        
ShardingSpherePreconditions.checkState(supportedDatabaseTypes.contains(param.getSourceDatabaseType()),
 () -> new 
UnsupportedPipelineDatabaseTypeException(param.getSourceDatabaseType()));
+        
ShardingSpherePreconditions.checkState(supportedDatabaseTypes.contains(param.getTargetDatabaseType()),
 () -> new 
UnsupportedPipelineDatabaseTypeException(param.getTargetDatabaseType()));
+    }
+    
     /**
      * Get latest data consistency check result.
      *
diff --git 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/pojo/CreateConsistencyCheckJobParameter.java
 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/pojo/CreateConsistencyCheckJobParameter.java
index f48442b2ada..92413dad079 100644
--- 
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/pojo/CreateConsistencyCheckJobParameter.java
+++ 
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/pojo/CreateConsistencyCheckJobParameter.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.po
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 
 import java.util.Properties;
 
@@ -34,4 +35,8 @@ public final class CreateConsistencyCheckJobParameter {
     private final String algorithmTypeName;
     
     private final Properties algorithmProps;
+    
+    private final DatabaseType sourceDatabaseType;
+    
+    private final DatabaseType targetDatabaseType;
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index b91a8ea0af7..ef86896f22d 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -265,7 +265,7 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
     public MigrationTaskConfiguration buildTaskConfiguration(final 
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final 
PipelineProcessConfiguration pipelineProcessConfig) {
         MigrationJobConfiguration jobConfig = (MigrationJobConfiguration) 
pipelineJobConfig;
         JobDataNodeLine dataNodeLine = 
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
-        Map<ActualTableName, LogicTableName> tableNameMap = 
buildTableNameMap(dataNodeLine);
+        Map<ActualTableName, LogicTableName> tableNameMap = 
JobDataNodeLineConvertUtils.buildTableNameMap(dataNodeLine);
         TableNameSchemaNameMapping tableNameSchemaNameMapping = new 
TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap());
         CreateTableConfiguration createTableConfig = 
buildCreateTableConfiguration(jobConfig, tableNameSchemaNameMapping);
         String dataSourceName = 
dataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName();
@@ -279,16 +279,6 @@ public final class MigrationJobAPI extends 
AbstractInventoryIncrementalJobAPIImp
         return result;
     }
     
-    private Map<ActualTableName, LogicTableName> buildTableNameMap(final 
JobDataNodeLine dataNodeLine) {
-        Map<ActualTableName, LogicTableName> result = new LinkedHashMap<>();
-        for (JobDataNodeEntry each : dataNodeLine.getEntries()) {
-            for (DataNode dataNode : each.getDataNodes()) {
-                result.put(new ActualTableName(dataNode.getTableName()), new 
LogicTableName(each.getLogicTableName()));
-            }
-        }
-        return result;
-    }
-    
     private CreateTableConfiguration buildCreateTableConfiguration(final 
MigrationJobConfiguration jobConfig,
                                                                    final 
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
         Collection<CreateTableEntry> createTableEntries = new LinkedList<>();
diff --git 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index c8dbbc1f6eb..c01a6d404d7 100644
--- 
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -18,7 +18,6 @@
 package 
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency;
 
 import lombok.extern.slf4j.Slf4j;
-import 
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
 import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
@@ -37,27 +36,24 @@ import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPi
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
-import 
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.datanode.DataNode;
 import 
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
 
-import java.util.Collection;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.Set;
 
 /**
  * Data consistency checker for migration job.
@@ -71,7 +67,7 @@ public final class MigrationDataConsistencyChecker implements 
PipelineDataConsis
     
     private final ConsistencyCheckJobItemProgressContext progressContext;
     
-    private final AtomicReference<TableDataConsistencyChecker> 
currentTableChecker = new AtomicReference<>();
+    private final Set<TableDataConsistencyChecker> tableCheckers = new 
HashSet<>();
     
     public MigrationDataConsistencyChecker(final MigrationJobConfiguration 
jobConfig, final InventoryIncrementalProcessContext processContext,
                                            final 
ConsistencyCheckJobItemProgressContext progressContext) {
@@ -82,9 +78,6 @@ public final class MigrationDataConsistencyChecker implements 
PipelineDataConsis
     
     @Override
     public Map<String, TableDataConsistencyCheckResult> check(final String 
algorithmType, final Properties algorithmProps) {
-        Collection<DatabaseType> supportedDatabaseTypes = 
TableDataConsistencyCheckerFactory.newInstance(algorithmType, 
algorithmProps).getSupportedDatabaseTypes();
-        verifyPipelineDatabaseType(supportedDatabaseTypes, 
jobConfig.getSources().values().iterator().next());
-        verifyPipelineDatabaseType(supportedDatabaseTypes, 
jobConfig.getTarget());
         List<String> sourceTableNames = new LinkedList<>();
         jobConfig.getJobShardingDataNodes().forEach(each -> 
each.getEntries().forEach(entry -> entry.getDataNodes()
                 .forEach(dataNode -> 
sourceTableNames.add(DataNodeUtils.formatWithSchema(dataNode)))));
@@ -93,33 +86,37 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
         progressContext.onProgressUpdated(new 
PipelineJobProgressUpdatedParameter(0));
         Map<String, TableDataConsistencyCheckResult> result = new 
LinkedHashMap<>();
         try (PipelineDataSourceManager dataSourceManager = new 
DefaultPipelineDataSourceManager()) {
-            AtomicBoolean checkFailed = new AtomicBoolean(false);
             for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) {
-                each.getEntries().forEach(entry -> 
entry.getDataNodes().forEach(dataNode -> {
-                    TableDataConsistencyChecker tableChecker = 
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps);
-                    currentTableChecker.set(tableChecker);
-                    check(tableChecker, result, dataSourceManager, 
checkFailed, each, entry, dataNode);
-                }));
+                checkTableInventoryData(each, algorithmType, algorithmProps, 
result, dataSourceManager);
             }
         }
         return result;
     }
     
-    private void check(final TableDataConsistencyChecker tableChecker, final 
Map<String, TableDataConsistencyCheckResult> checkResults, final 
PipelineDataSourceManager dataSourceManager,
-                       final AtomicBoolean checkFailed, final JobDataNodeLine 
jobDataNodeLine, final JobDataNodeEntry entry, final DataNode dataNode) {
-        if (checkFailed.get()) {
-            return;
-        }
-        TableDataConsistencyCheckResult checkResult = 
checkSingleTable(entry.getLogicTableName(), dataNode, tableChecker, 
dataSourceManager);
-        checkResults.put(DataNodeUtils.formatWithSchema(dataNode), 
checkResult);
-        if (!checkResult.isMatched()) {
-            log.info("unmatched on table '{}', ignore left tables", 
jobDataNodeLine);
-            checkFailed.set(true);
+    private long getRecordsCount() {
+        Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = new 
MigrationJobAPI().getJobProgress(jobConfig);
+        return 
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
+    }
+    
+    private void checkTableInventoryData(final JobDataNodeLine 
jobDataNodeLine, final String algorithmType, final Properties algorithmProps,
+                                         final Map<String, 
TableDataConsistencyCheckResult> checkResults, final PipelineDataSourceManager 
dataSourceManager) {
+        for (JobDataNodeEntry entry : jobDataNodeLine.getEntries()) {
+            for (DataNode each : entry.getDataNodes()) {
+                TableDataConsistencyChecker tableChecker = 
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps);
+                tableCheckers.add(tableChecker);
+                TableDataConsistencyCheckResult checkResult = 
checkSingleTableInventoryData(entry.getLogicTableName(), each, tableChecker, 
dataSourceManager);
+                checkResults.put(DataNodeUtils.formatWithSchema(each), 
checkResult);
+                tableCheckers.remove(null);
+                if (!checkResult.isMatched() && 
tableChecker.isBreakOnInventoryCheckNotMatched()) {
+                    log.info("Unmatched on table '{}', ignore left tables", 
DataNodeUtils.formatWithSchema(each));
+                    return;
+                }
+            }
         }
     }
     
-    private TableDataConsistencyCheckResult checkSingleTable(final String 
targetTableName, final DataNode dataNode,
-                                                             final 
TableDataConsistencyChecker tableChecker, final PipelineDataSourceManager 
dataSourceManager) {
+    private TableDataConsistencyCheckResult 
checkSingleTableInventoryData(final String targetTableName, final DataNode 
dataNode,
+                                                                          
final TableDataConsistencyChecker tableChecker, final PipelineDataSourceManager 
dataSourceManager) {
         SchemaTableName sourceTable = new 
SchemaTableName(dataNode.getSchemaName(), dataNode.getTableName());
         SchemaTableName targetTable = new 
SchemaTableName(dataNode.getSchemaName(), targetTableName);
         PipelineDataSourceWrapper sourceDataSource = 
dataSourceManager.getDataSource(jobConfig.getSources().get(dataNode.getDataSourceName()));
@@ -130,35 +127,20 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
         List<String> columnNames = tableMetaData.getColumnNames();
         List<PipelineColumnMetaData> uniqueKeys = 
PipelineTableMetaDataUtils.getUniqueKeyColumns(
                 sourceTable.getSchemaName().getOriginal(), 
sourceTable.getTableName().getOriginal(), metaDataLoader);
-        TableDataConsistencyCheckParameter param = new 
TableDataConsistencyCheckParameter(
+        TableInventoryCheckParameter param = new TableInventoryCheckParameter(
                 jobConfig.getJobId(), sourceDataSource, targetDataSource, 
sourceTable, targetTable, columnNames, uniqueKeys, readRateLimitAlgorithm, 
progressContext);
         return tableChecker.checkSingleTableInventoryData(param);
     }
     
-    private void verifyPipelineDatabaseType(final Collection<DatabaseType> 
supportedDatabaseTypes, final PipelineDataSourceConfiguration dataSourceConfig) 
{
-        
ShardingSpherePreconditions.checkState(supportedDatabaseTypes.contains(dataSourceConfig.getDatabaseType()),
-                () -> new 
UnsupportedPipelineDatabaseTypeException(dataSourceConfig.getDatabaseType()));
-    }
-    
-    private long getRecordsCount() {
-        Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = new 
MigrationJobAPI().getJobProgress(jobConfig);
-        return 
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
-    }
-    
     @Override
     public void cancel() {
-        TableDataConsistencyChecker tableChecker = currentTableChecker.get();
-        if (null != tableChecker) {
-            tableChecker.cancel();
+        for (TableDataConsistencyChecker each : tableCheckers) {
+            each.cancel();
         }
     }
     
     @Override
     public boolean isCanceling() {
-        TableDataConsistencyChecker tableChecker = currentTableChecker.get();
-        if (null != tableChecker) {
-            return tableChecker.isCanceling();
-        }
-        return false;
+        return 
tableCheckers.stream().anyMatch(TableDataConsistencyChecker::isCanceling);
     }
 }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index d9824cfaf00..52aa0d515a6 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -35,8 +35,8 @@ import 
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSou
 import 
org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
 import 
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
@@ -202,7 +202,7 @@ class CDCE2EIT {
         PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(schemaTableName.getSchemaName().getOriginal(), 
schemaTableName.getTableName().getOriginal());
         List<PipelineColumnMetaData> uniqueKeys = 
Collections.singletonList(tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0)));
         ConsistencyCheckJobItemProgressContext progressContext = new 
ConsistencyCheckJobItemProgressContext("", 0);
-        TableDataConsistencyCheckParameter param = new 
TableDataConsistencyCheckParameter("", sourceDataSource, targetDataSource, 
schemaTableName, schemaTableName,
+        TableInventoryCheckParameter param = new 
TableInventoryCheckParameter("", sourceDataSource, targetDataSource, 
schemaTableName, schemaTableName,
                 tableMetaData.getColumnNames(), uniqueKeys, null, 
progressContext);
         TableDataConsistencyChecker tableChecker = 
TypedSPILoader.getService(TableDataConsistencyChecker.class, "DATA_MATCH", new 
Properties());
         TableDataConsistencyCheckResult checkResult = 
tableChecker.checkSingleTableInventoryData(param);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 1fadda3e073..d4b01eee705 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -20,12 +20,11 @@ package 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primary
 import lombok.SneakyThrows;
 import org.apache.commons.codec.binary.Hex;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
 import 
org.apache.shardingsphere.infra.database.postgresql.type.PostgreSQLDatabaseType;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import 
org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
 import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import 
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
 import 
org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
 import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
@@ -36,6 +35,7 @@ import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.Pipeline
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import 
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
+import 
org.apache.shardingsphere.test.e2e.data.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
 import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ArgumentsSource;
@@ -193,7 +193,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
             } else {
                 return;
             }
-            KeyGenerateAlgorithm keyGenerateAlgorithm = new 
SnowflakeKeyGenerateAlgorithm();
+            KeyGenerateAlgorithm keyGenerateAlgorithm = new 
AutoIncrementKeyGenerateAlgorithm();
             Object uniqueKey = keyGenerateAlgorithm.generateKey();
             assertMigrationSuccess(containerComposer, sql, "user_id", 
keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
                 insertOneOrder(containerComposer, uniqueKey);
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/TableDataConsistencyCheckerFixture.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/TableDataConsistencyCheckerFixture.java
index 022a9c5b6b2..b0270f10613 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/TableDataConsistencyCheckerFixture.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/TableDataConsistencyCheckerFixture.java
@@ -20,8 +20,8 @@ package 
org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyContentCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCountCheckResult;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
 import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
 import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
@@ -32,7 +32,7 @@ import java.util.Collection;
 public final class TableDataConsistencyCheckerFixture implements 
TableDataConsistencyChecker {
     
     @Override
-    public TableDataConsistencyCheckResult checkSingleTableInventoryData(final 
TableDataConsistencyCheckParameter param) {
+    public TableDataConsistencyCheckResult checkSingleTableInventoryData(final 
TableInventoryCheckParameter param) {
         return new TableDataConsistencyCheckResult(new 
TableDataConsistencyCountCheckResult(2, 2), new 
TableDataConsistencyContentCheckResult(true));
     }
     
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index a9d9f8c9d11..c6f78a04282 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -30,6 +30,8 @@ import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.poj
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;
+import 
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
+import 
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
 import 
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
 import org.junit.jupiter.api.BeforeAll;
@@ -49,6 +51,8 @@ class ConsistencyCheckJobAPITest {
     
     private final ConsistencyCheckJobAPI checkJobAPI = new 
ConsistencyCheckJobAPI();
     
+    private final YamlMigrationJobConfigurationSwapper jobConfigSwapper = new 
YamlMigrationJobConfigurationSwapper();
+    
     @BeforeAll
     public static void beforeClass() {
         PipelineContextUtils.mockModeConfigAndContextManager();
@@ -56,21 +60,25 @@ class ConsistencyCheckJobAPITest {
     
     @Test
     void assertCreateJobConfig() {
-        String parentJobId = 
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId();
-        String checkJobId = checkJobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(parentJobId, null, null));
-        ConsistencyCheckJobConfiguration jobConfig = 
checkJobAPI.getJobConfiguration(checkJobId);
+        MigrationJobConfiguration parentJobConfig = 
jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration());
+        String parentJobId = parentJobConfig.getJobId();
+        String checkJobId = checkJobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(parentJobId, null, null,
+                parentJobConfig.getSourceDatabaseType(), 
parentJobConfig.getTargetDatabaseType()));
+        ConsistencyCheckJobConfiguration checkJobConfig = 
checkJobAPI.getJobConfiguration(checkJobId);
         int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
         String expectCheckJobId = checkJobAPI.marshalJobId(new 
ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId), 
parentJobId, expectedSequence));
-        assertThat(jobConfig.getJobId(), is(expectCheckJobId));
-        assertNull(jobConfig.getAlgorithmTypeName());
+        assertThat(checkJobConfig.getJobId(), is(expectCheckJobId));
+        assertNull(checkJobConfig.getAlgorithmTypeName());
         int sequence = ConsistencyCheckJobId.parseSequence(expectCheckJobId);
         assertThat(sequence, is(expectedSequence));
     }
     
     @Test
     void assertGetLatestDataConsistencyCheckResult() {
-        String parentJobId = 
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId();
-        String checkJobId = checkJobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(parentJobId, null, null));
+        MigrationJobConfiguration parentJobConfig = 
jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration());
+        String parentJobId = parentJobConfig.getJobId();
+        String checkJobId = checkJobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(parentJobId, null, null,
+                parentJobConfig.getSourceDatabaseType(), 
parentJobConfig.getTargetDatabaseType()));
         GovernanceRepositoryAPI governanceRepositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
         governanceRepositoryAPI.persistLatestCheckJobId(parentJobId, 
checkJobId);
         Map<String, TableDataConsistencyCheckResult> expectedCheckResult = 
Collections.singletonMap("t_order", new TableDataConsistencyCheckResult(new 
TableDataConsistencyCountCheckResult(1, 1),
@@ -83,11 +91,13 @@ class ConsistencyCheckJobAPITest {
     
     @Test
     void assertDropByParentJobId() {
-        String parentJobId = 
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId();
+        MigrationJobConfiguration parentJobConfig = 
jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration());
+        String parentJobId = parentJobConfig.getJobId();
         GovernanceRepositoryAPI repositoryAPI = 
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
         int expectedSequence = 1;
         for (int i = 0; i < 3; i++) {
-            String checkJobId = checkJobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(parentJobId, null, null));
+            String checkJobId = checkJobAPI.createJobAndStart(new 
CreateConsistencyCheckJobParameter(parentJobId, null, null,
+                    parentJobConfig.getSourceDatabaseType(), 
parentJobConfig.getTargetDatabaseType()));
             ConsistencyCheckJobItemContext checkJobItemContext = new 
ConsistencyCheckJobItemContext(
                     new ConsistencyCheckJobConfiguration(checkJobId, 
parentJobId, null, null), 0, JobStatus.FINISHED, null);
             checkJobAPI.persistJobItemProgress(checkJobItemContext);

Reply via email to