This is an automated email from the ASF dual-hosted git repository.

menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bb94ec3be6 Refactor pipeline consistency check range and position 
(#36606)
7bb94ec3be6 is described below

commit 7bb94ec3be6b84bd9157256f94e4b5f5add0e59d
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Sep 18 11:30:02 2025 +0800

    Refactor pipeline consistency check range and position (#36606)
    
    * Simplify MatchingTableInventoryChecker
    
    * Add TableCheckRangePosition
    
    * Fix CDCE2EIT missing TableCheckRangePosition
---
 .../ConsistencyCheckJobItemProgressContext.java    |  9 ++--
 .../position/TableCheckRangePosition.java}         | 35 +++++++------
 .../yaml/YamlTableCheckRangePosition.java}         | 31 ++++++------
 .../yaml/YamlTableCheckRangePositionSwapper.java   | 51 +++++++++++++++++++
 .../table/MatchingTableInventoryChecker.java       | 55 ++++++++------------
 .../table/TableInventoryCheckParameter.java        | 12 +++++
 .../RecordSingleTableInventoryCalculator.java      |  2 +-
 .../SingleTableInventoryCalculateParameter.java    |  2 +
 .../progress/ConsistencyCheckJobItemProgress.java  | 14 +++--
 .../YamlConsistencyCheckJobItemProgress.java       |  7 ++-
 ...YamlConsistencyCheckJobItemProgressSwapper.java | 15 +++---
 .../data/pipeline/core/task/PipelineTaskUtils.java | 28 ++++++++++
 .../CRC32SingleTableInventoryCalculatorTest.java   |  3 +-
 .../context/ConsistencyCheckJobItemContext.java    |  5 +-
 .../ConsistencyCheckJobItemContextTest.java        | 42 ++++++++++++---
 .../MigrationDataConsistencyChecker.java           | 59 ++++++++++++++--------
 .../e2e/operation/pipeline/cases/cdc/CDCE2EIT.java |  4 ++
 .../RecordSingleTableInventoryCalculatorTest.java  | 26 +++++-----
 .../ConsistencyCheckJobExecutorCallbackTest.java   | 36 ++++++++++---
 19 files changed, 291 insertions(+), 145 deletions(-)

diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
index 0a986c4768a..cfd2810435b 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
@@ -20,13 +20,14 @@ package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
 
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.List;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -54,9 +55,7 @@ public final class ConsistencyCheckJobItemProgressContext 
implements PipelineJob
     
     private volatile Long checkEndTimeMillis;
     
-    private final Map<String, Object> sourceTableCheckPositions = new 
ConcurrentHashMap<>();
-    
-    private final Map<String, Object> targetTableCheckPositions = new 
ConcurrentHashMap<>();
+    private final List<TableCheckRangePosition> tableCheckRangePositions = new 
ArrayList<>();
     
     private final String sourceDatabaseType;
     
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/TableCheckRangePosition.java
similarity index 51%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/TableCheckRangePosition.java
index 49f7580454f..430bc6f4e4e 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/TableCheckRangePosition.java
@@ -15,37 +15,42 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config;
+package org.apache.shardingsphere.data.pipeline.core.consistencycheck.position;
 
+import lombok.AllArgsConstructor;
 import lombok.Getter;
+import lombok.RequiredArgsConstructor;
 import lombok.Setter;
-
-import java.util.Map;
+import lombok.ToString;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
 
 /**
- * Yaml data consistency check job item progress.
+ * Table check range position.
  */
+@RequiredArgsConstructor
+@AllArgsConstructor
 @Getter
 @Setter
-public final class YamlConsistencyCheckJobItemProgress implements 
YamlPipelineJobItemProgressConfiguration {
+@ToString
+public final class TableCheckRangePosition {
     
-    private String status;
+    private final Integer splittingItem;
     
-    private String tableNames;
+    private final String sourceDataNode;
     
-    private String ignoredTableNames;
+    private final String logicTableName;
     
-    private Long checkedRecordsCount;
+    private final PrimaryKeyIngestPosition<?> sourceRange;
     
-    private Long recordsCount;
+    private final PrimaryKeyIngestPosition<?> targetRange;
     
-    private Long checkBeginTimeMillis;
+    private final String queryCondition;
     
-    private Long checkEndTimeMillis;
+    private volatile Object sourcePosition;
     
-    private Map<String, Object> sourceTableCheckPositions;
+    private volatile Object targetPosition;
     
-    private Map<String, Object> targetTableCheckPositions;
+    private volatile boolean finished;
     
-    private String sourceDatabaseType;
+    private volatile Boolean matched;
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePosition.java
similarity index 58%
copy from 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java
copy to 
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePosition.java
index 49f7580454f..7dcb5a7f007 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePosition.java
@@ -15,37 +15,38 @@
  * limitations under the License.
  */
 
-package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config;
+package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml;
 
+import lombok.EqualsAndHashCode;
 import lombok.Getter;
 import lombok.Setter;
-
-import java.util.Map;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
 
 /**
- * Yaml data consistency check job item progress.
+ * Yaml table check range position.
  */
 @Getter
 @Setter
-public final class YamlConsistencyCheckJobItemProgress implements 
YamlPipelineJobItemProgressConfiguration {
+@EqualsAndHashCode
+public final class YamlTableCheckRangePosition implements YamlConfiguration {
     
-    private String status;
+    private Integer splittingItem;
     
-    private String tableNames;
+    private String sourceDataNode;
     
-    private String ignoredTableNames;
+    private String logicTableName;
     
-    private Long checkedRecordsCount;
+    private String sourceRange;
     
-    private Long recordsCount;
+    private String targetRange;
     
-    private Long checkBeginTimeMillis;
+    private String queryCondition;
     
-    private Long checkEndTimeMillis;
+    private Object sourcePosition;
     
-    private Map<String, Object> sourceTableCheckPositions;
+    private Object targetPosition;
     
-    private Map<String, Object> targetTableCheckPositions;
+    private boolean finished;
     
-    private String sourceDatabaseType;
+    private Boolean matched;
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePositionSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePositionSwapper.java
new file mode 100644
index 00000000000..1236cd785fc
--- /dev/null
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePositionSwapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml;
+
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory;
+import 
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+
+/**
+ * Yaml table check range position swapper.
+ */
+public final class YamlTableCheckRangePositionSwapper implements 
YamlConfigurationSwapper<YamlTableCheckRangePosition, TableCheckRangePosition> {
+    
+    @Override
+    public YamlTableCheckRangePosition swapToYamlConfiguration(final 
TableCheckRangePosition data) {
+        YamlTableCheckRangePosition result = new YamlTableCheckRangePosition();
+        result.setSplittingItem(data.getSplittingItem());
+        result.setSourceDataNode(data.getSourceDataNode());
+        result.setLogicTableName(data.getLogicTableName());
+        result.setSourceRange(data.getSourceRange().toString());
+        result.setTargetRange(data.getTargetRange().toString());
+        result.setQueryCondition(data.getQueryCondition());
+        result.setSourcePosition(data.getSourcePosition());
+        result.setTargetPosition(data.getTargetPosition());
+        result.setFinished(data.isFinished());
+        result.setMatched(data.getMatched());
+        return result;
+    }
+    
+    @Override
+    public TableCheckRangePosition swapToObject(final 
YamlTableCheckRangePosition yamlConfig) {
+        return new TableCheckRangePosition(yamlConfig.getSplittingItem(), 
yamlConfig.getSourceDataNode(), yamlConfig.getLogicTableName(),
+                
PrimaryKeyIngestPositionFactory.newInstance(yamlConfig.getSourceRange()), 
PrimaryKeyIngestPositionFactory.newInstance(yamlConfig.getTargetRange()),
+                yamlConfig.getQueryCondition(), 
yamlConfig.getSourcePosition(), yamlConfig.getTargetPosition(), 
yamlConfig.isFinished(), yamlConfig.getMatched());
+    }
+}
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index 4198729ac46..bcd622ee34a 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -21,6 +21,7 @@ import lombok.AccessLevel;
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
@@ -31,18 +32,14 @@ import 
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperatio
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
-import 
org.apache.shardingsphere.infra.exception.external.sql.type.kernel.category.PipelineSQLException;
-import 
org.apache.shardingsphere.infra.exception.external.sql.type.wrapper.SQLWrapperException;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
 import 
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
 import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
 
-import java.sql.SQLException;
 import java.util.Iterator;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -78,18 +75,21 @@ public abstract class MatchingTableInventoryChecker 
implements TableInventoryChe
     
     private TableDataConsistencyCheckResult 
checkSingleTableInventoryData(final TableInventoryCheckParameter param, final 
ThreadPoolExecutor executor) {
         SingleTableInventoryCalculateParameter sourceParam = new 
SingleTableInventoryCalculateParameter(param.getSourceDataSource(), 
param.getSourceTable(),
-                param.getColumnNames(), param.getUniqueKeys(), 
QueryType.RANGE_QUERY);
-        sourceParam.setQueryRange(new 
QueryRange(param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName()),
 true, null));
+                param.getColumnNames(), param.getUniqueKeys(), 
QueryType.RANGE_QUERY, param.getQueryCondition());
+        TableCheckRangePosition checkRangePosition = 
param.getProgressContext().getTableCheckRangePositions().get(param.getSplittingItem());
+        sourceParam.setQueryRange(new QueryRange(null != 
checkRangePosition.getSourcePosition() ? checkRangePosition.getSourcePosition() 
: checkRangePosition.getSourceRange().getBeginValue(),
+                true, checkRangePosition.getSourceRange().getEndValue()));
         SingleTableInventoryCalculateParameter targetParam = new 
SingleTableInventoryCalculateParameter(param.getTargetDataSource(), 
param.getTargetTable(),
-                param.getColumnNames(), param.getUniqueKeys(), 
QueryType.RANGE_QUERY);
-        targetParam.setQueryRange(new 
QueryRange(param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName()),
 true, null));
+                param.getColumnNames(), param.getUniqueKeys(), 
QueryType.RANGE_QUERY, param.getQueryCondition());
+        targetParam.setQueryRange(new QueryRange(null != 
checkRangePosition.getTargetPosition() ? checkRangePosition.getTargetPosition() 
: checkRangePosition.getTargetRange().getBeginValue(),
+                true, checkRangePosition.getTargetRange().getEndValue()));
         SingleTableInventoryCalculator sourceCalculator = 
buildSingleTableInventoryCalculator();
         this.sourceCalculator = sourceCalculator;
         SingleTableInventoryCalculator targetCalculator = 
buildSingleTableInventoryCalculator();
         this.targetCalculator = targetCalculator;
         try {
-            Iterator<SingleTableInventoryCalculatedResult> 
sourceCalculatedResults = waitFuture(executor.submit(() -> 
sourceCalculator.calculate(sourceParam))).iterator();
-            Iterator<SingleTableInventoryCalculatedResult> 
targetCalculatedResults = waitFuture(executor.submit(() -> 
targetCalculator.calculate(targetParam))).iterator();
+            Iterator<SingleTableInventoryCalculatedResult> 
sourceCalculatedResults = PipelineTaskUtils.waitFuture(executor.submit(() -> 
sourceCalculator.calculate(sourceParam))).iterator();
+            Iterator<SingleTableInventoryCalculatedResult> 
targetCalculatedResults = PipelineTaskUtils.waitFuture(executor.submit(() -> 
targetCalculator.calculate(targetParam))).iterator();
             return checkSingleTableInventoryData(sourceCalculatedResults, 
targetCalculatedResults, param, executor);
         } finally {
             QuietlyCloser.close(sourceParam.getCalculationContext());
@@ -107,46 +107,31 @@ public abstract class MatchingTableInventoryChecker 
implements TableInventoryChe
             if (null != param.getReadRateLimitAlgorithm()) {
                 
param.getReadRateLimitAlgorithm().intercept(PipelineSQLOperationType.SELECT, 1);
             }
-            SingleTableInventoryCalculatedResult sourceCalculatedResult = 
waitFuture(executor.submit(sourceCalculatedResults::next));
-            SingleTableInventoryCalculatedResult targetCalculatedResult = 
waitFuture(executor.submit(targetCalculatedResults::next));
+            SingleTableInventoryCalculatedResult sourceCalculatedResult = 
PipelineTaskUtils.waitFuture(executor.submit(sourceCalculatedResults::next));
+            SingleTableInventoryCalculatedResult targetCalculatedResult = 
PipelineTaskUtils.waitFuture(executor.submit(targetCalculatedResults::next));
             if (!Objects.equals(sourceCalculatedResult, 
targetCalculatedResult)) {
                 checkResult.setMatched(false);
                 log.info("content matched false, jobId={}, sourceTable={}, 
targetTable={}, uniqueKeys={}", param.getJobId(), param.getSourceTable(), 
param.getTargetTable(), param.getUniqueKeys());
                 break;
             }
+            TableCheckRangePosition checkRangePosition = 
param.getProgressContext().getTableCheckRangePositions().get(param.getSplittingItem());
             if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
-                
param.getProgressContext().getSourceTableCheckPositions().put(param.getSourceTable().getTableName(),
 sourceCalculatedResult.getMaxUniqueKeyValue().get());
+                
checkRangePosition.setSourcePosition(sourceCalculatedResult.getMaxUniqueKeyValue().get());
             }
             if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
-                
param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName(),
 targetCalculatedResult.getMaxUniqueKeyValue().get());
+                
checkRangePosition.setTargetPosition(targetCalculatedResult.getMaxUniqueKeyValue().get());
             }
             param.getProgressContext().onProgressUpdated(new 
PipelineJobUpdateProgress(sourceCalculatedResult.getRecordsCount()));
         }
-        if (sourceCalculatedResults.hasNext()) {
+        TableCheckRangePosition checkRangePosition = 
param.getProgressContext().getTableCheckRangePositions().get(param.getSplittingItem());
+        checkRangePosition.setFinished(true);
+        if (sourceCalculatedResults.hasNext() || 
targetCalculatedResults.hasNext()) {
             checkResult.setMatched(false);
-            return new 
YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
-        }
-        if (targetCalculatedResults.hasNext()) {
-            checkResult.setMatched(false);
-            return new 
YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
         }
+        checkRangePosition.setMatched(checkResult.isMatched());
         return new 
YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
     }
     
-    private <T> T waitFuture(final Future<T> future) {
-        try {
-            return future.get();
-        } catch (final InterruptedException ex) {
-            Thread.currentThread().interrupt();
-            throw new SQLWrapperException(new SQLException(ex));
-        } catch (final ExecutionException ex) {
-            if (ex.getCause() instanceof PipelineSQLException) {
-                throw (PipelineSQLException) ex.getCause();
-            }
-            throw new SQLWrapperException(new SQLException(ex));
-        }
-    }
-    
     protected abstract SingleTableInventoryCalculator 
buildSingleTableInventoryCalculator();
     
     @Override
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
index 4419b643093..3cf1f8f37f2 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
@@ -36,6 +36,8 @@ public final class TableInventoryCheckParameter {
     
     private final String jobId;
     
+    private final int splittingItem;
+    
     private final PipelineDataSource sourceDataSource;
     
     private final PipelineDataSource targetDataSource;
@@ -51,4 +53,14 @@ public final class TableInventoryCheckParameter {
     private final JobRateLimitAlgorithm readRateLimitAlgorithm;
     
     private final ConsistencyCheckJobItemProgressContext progressContext;
+    
+    private final String queryCondition;
+    
+    public TableInventoryCheckParameter(final String jobId, final 
PipelineDataSource sourceDataSource, final PipelineDataSource targetDataSource,
+                                        final QualifiedTable sourceTable, 
final QualifiedTable targetTable,
+                                        final List<String> columnNames, final 
List<PipelineColumnMetaData> uniqueKeys,
+                                        final JobRateLimitAlgorithm 
readRateLimitAlgorithm, final ConsistencyCheckJobItemProgressContext 
progressContext) {
+        this(jobId, 0, sourceDataSource, targetDataSource, sourceTable, 
targetTable, columnNames, uniqueKeys, readRateLimitAlgorithm, progressContext,
+                null);
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
index 9c07cc5965d..d1a29f02502 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
@@ -311,7 +311,7 @@ public final class RecordSingleTableInventoryCalculator 
extends AbstractStreamin
     
     private SingleTableInventoryCalculateParameter 
buildPointRangeQueryCalculateParameter(final 
SingleTableInventoryCalculateParameter param, final Object uniqueKeyValue) {
         SingleTableInventoryCalculateParameter result = new 
SingleTableInventoryCalculateParameter(param.getDataSource(), param.getTable(), 
param.getColumnNames(),
-                Collections.singletonList(param.getFirstUniqueKey()), 
QueryType.POINT_QUERY);
+                Collections.singletonList(param.getFirstUniqueKey()), 
QueryType.POINT_QUERY, param.getQueryCondition());
         result.setUniqueKeysValues(Collections.singletonList(uniqueKeyValue));
         result.setShardingColumnsNames(param.getShardingColumnsNames());
         result.setShardingColumnsValues(param.getShardingColumnsValues());
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
index 103ebba3d97..8fe9e813bcc 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
@@ -68,6 +68,8 @@ public final class SingleTableInventoryCalculateParameter {
     
     private final QueryType queryType;
     
+    private final String queryCondition;
+    
     /**
      * Get database type.
      *
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/ConsistencyCheckJobItemProgress.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/ConsistencyCheckJobItemProgress.java
index 4607d4c8226..e21b7c3a711 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/ConsistencyCheckJobItemProgress.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/ConsistencyCheckJobItemProgress.java
@@ -21,11 +21,12 @@ import lombok.Getter;
 import lombok.RequiredArgsConstructor;
 import lombok.Setter;
 import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
+import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 
-import java.util.LinkedHashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * Data consistency check job item progress.
@@ -48,9 +49,7 @@ public final class ConsistencyCheckJobItemProgress implements 
PipelineJobItemPro
     
     private final Long checkEndTimeMillis;
     
-    private final Map<String, Object> sourceTableCheckPositions = new 
LinkedHashMap<>();
-    
-    private final Map<String, Object> targetTableCheckPositions = new 
LinkedHashMap<>();
+    private final List<TableCheckRangePosition> tableCheckRangePositions = new 
ArrayList<>();
     
     private final String sourceDatabaseType;
     
@@ -64,8 +63,7 @@ public final class ConsistencyCheckJobItemProgress implements 
PipelineJobItemPro
         recordsCount = context.getRecordsCount();
         checkBeginTimeMillis = context.getCheckBeginTimeMillis();
         checkEndTimeMillis = context.getCheckEndTimeMillis();
-        
sourceTableCheckPositions.putAll(context.getSourceTableCheckPositions());
-        
targetTableCheckPositions.putAll(context.getTargetTableCheckPositions());
+        tableCheckRangePositions.addAll(context.getTableCheckRangePositions());
         sourceDatabaseType = context.getSourceDatabaseType();
     }
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java
index 49f7580454f..a0df0d7ae13 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java
@@ -19,8 +19,9 @@ package 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config;
 
 import lombok.Getter;
 import lombok.Setter;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml.YamlTableCheckRangePosition;
 
-import java.util.Map;
+import java.util.List;
 
 /**
  * Yaml data consistency check job item progress.
@@ -43,9 +44,7 @@ public final class YamlConsistencyCheckJobItemProgress 
implements YamlPipelineJo
     
     private Long checkEndTimeMillis;
     
-    private Map<String, Object> sourceTableCheckPositions;
-    
-    private Map<String, Object> targetTableCheckPositions;
+    private List<YamlTableCheckRangePosition> tableCheckRangePositions;
     
     private String sourceDatabaseType;
 }
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlConsistencyCheckJobItemProgressSwapper.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlConsistencyCheckJobItemProgressSwapper.java
index 7f4e8d22156..2c8ce5fccfd 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlConsistencyCheckJobItemProgressSwapper.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlConsistencyCheckJobItemProgressSwapper.java
@@ -17,15 +17,20 @@
 
 package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper;
 
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml.YamlTableCheckRangePositionSwapper;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config.YamlConsistencyCheckJobItemProgress;
 
+import java.util.stream.Collectors;
+
 /**
  * YAML data check job item progress swapper.
  */
 public final class YamlConsistencyCheckJobItemProgressSwapper implements 
YamlPipelineJobItemProgressSwapper<YamlConsistencyCheckJobItemProgress, 
ConsistencyCheckJobItemProgress> {
     
+    private final YamlTableCheckRangePositionSwapper tableCheckPositionSwapper 
= new YamlTableCheckRangePositionSwapper();
+    
     @Override
     public YamlConsistencyCheckJobItemProgress swapToYamlConfiguration(final 
ConsistencyCheckJobItemProgress data) {
         YamlConsistencyCheckJobItemProgress result = new 
YamlConsistencyCheckJobItemProgress();
@@ -36,8 +41,7 @@ public final class YamlConsistencyCheckJobItemProgressSwapper 
implements YamlPip
         result.setRecordsCount(data.getRecordsCount());
         result.setCheckBeginTimeMillis(data.getCheckBeginTimeMillis());
         result.setCheckEndTimeMillis(data.getCheckEndTimeMillis());
-        
result.setSourceTableCheckPositions(data.getSourceTableCheckPositions());
-        
result.setTargetTableCheckPositions(data.getTargetTableCheckPositions());
+        
result.setTableCheckRangePositions(data.getTableCheckRangePositions().stream().map(tableCheckPositionSwapper::swapToYamlConfiguration).collect(Collectors.toList()));
         result.setSourceDatabaseType(data.getSourceDatabaseType());
         return result;
     }
@@ -46,11 +50,8 @@ public final class 
YamlConsistencyCheckJobItemProgressSwapper implements YamlPip
     public ConsistencyCheckJobItemProgress swapToObject(final 
YamlConsistencyCheckJobItemProgress yamlConfig) {
         ConsistencyCheckJobItemProgress result = new 
ConsistencyCheckJobItemProgress(yamlConfig.getTableNames(), 
yamlConfig.getIgnoredTableNames(), yamlConfig.getCheckedRecordsCount(),
                 yamlConfig.getRecordsCount(), 
yamlConfig.getCheckBeginTimeMillis(), yamlConfig.getCheckEndTimeMillis(), 
yamlConfig.getSourceDatabaseType());
-        if (null != yamlConfig.getSourceTableCheckPositions()) {
-            
result.getSourceTableCheckPositions().putAll(yamlConfig.getSourceTableCheckPositions());
-        }
-        if (null != yamlConfig.getTargetTableCheckPositions()) {
-            
result.getTargetTableCheckPositions().putAll(yamlConfig.getTargetTableCheckPositions());
+        if (null != yamlConfig.getTableCheckRangePositions()) {
+            
result.getTableCheckRangePositions().addAll(yamlConfig.getTableCheckRangePositions().stream().map(tableCheckPositionSwapper::swapToObject).collect(Collectors.toList()));
         }
         result.setStatus(JobStatus.valueOf(yamlConfig.getStatus()));
         return result;
diff --git 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
index a4ae090d786..acf9205e648 100644
--- 
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
+++ 
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
@@ -19,12 +19,18 @@ package org.apache.shardingsphere.data.pipeline.core.task;
 
 import lombok.AccessLevel;
 import lombok.NoArgsConstructor;
+import 
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
 import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
+import 
org.apache.shardingsphere.infra.exception.external.sql.type.kernel.category.PipelineSQLException;
+import 
org.apache.shardingsphere.infra.exception.external.sql.type.wrapper.SQLWrapperException;
 
+import java.sql.SQLException;
 import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 /**
  * Pipeline task utilities.
@@ -57,4 +63,26 @@ public final class PipelineTaskUtils {
         }
         return result;
     }
+    
+    /**
+     * Wait future.
+     *
+     * @param future future to wait
+     * @param <T> result type
+     * @return execution result
+     * @throws SQLWrapperException if the future execution fails
+     */
+    public static <T> T waitFuture(final Future<T> future) {
+        try {
+            return future.get();
+        } catch (final InterruptedException ex) {
+            Thread.currentThread().interrupt();
+            throw new SQLWrapperException(new SQLException(ex));
+        } catch (final ExecutionException ex) {
+            if (ex.getCause() instanceof PipelineSQLException || ex.getCause() 
instanceof PipelineJobCancelingException) {
+                throw (PipelineSQLException) ex.getCause();
+            }
+            throw new SQLWrapperException(new SQLException(ex));
+        }
+    }
 }
diff --git 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
index e2b78e3733b..597b1e215ad 100644
--- 
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
+++ 
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
@@ -66,7 +66,8 @@ class CRC32SingleTableInventoryCalculatorTest {
     void setUp() throws SQLException {
         DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, "FIXTURE");
         List<PipelineColumnMetaData> uniqueKeys = 
Collections.singletonList(new PipelineColumnMetaData(1, "id", Types.INTEGER, 
"integer", false, true, true));
-        parameter = new 
SingleTableInventoryCalculateParameter(pipelineDataSource, new 
QualifiedTable(null, "foo_tbl"), Arrays.asList("foo_col", "bar_col"), 
uniqueKeys, QueryType.RANGE_QUERY);
+        parameter = new 
SingleTableInventoryCalculateParameter(pipelineDataSource, new 
QualifiedTable(null, "foo_tbl"),
+                Arrays.asList("foo_col", "bar_col"), uniqueKeys, 
QueryType.RANGE_QUERY, null);
         when(pipelineDataSource.getDatabaseType()).thenReturn(databaseType);
         when(pipelineDataSource.getConnection()).thenReturn(connection);
     }
diff --git 
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
 
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
index 39a88d0bdc0..bb56b31df97 100644
--- 
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
+++ 
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
@@ -19,11 +19,11 @@ package 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.contex
 
 import lombok.Getter;
 import lombok.Setter;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
 import 
org.apache.shardingsphere.data.pipeline.core.context.PipelineProcessContext;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
-import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
 
 import java.util.Optional;
@@ -59,8 +59,7 @@ public final class ConsistencyCheckJobItemContext implements 
PipelineJobItemCont
         progressContext = new ConsistencyCheckJobItemProgressContext(jobId, 
shardingItem, jobConfig.getSourceDatabaseType().getType());
         if (null != jobItemProgress) {
             
progressContext.getCheckedRecordsCount().set(Optional.ofNullable(jobItemProgress.getCheckedRecordsCount()).orElse(0L));
-            
Optional.ofNullable(jobItemProgress.getSourceTableCheckPositions()).ifPresent(progressContext.getSourceTableCheckPositions()::putAll);
-            
Optional.ofNullable(jobItemProgress.getTargetTableCheckPositions()).ifPresent(progressContext.getTargetTableCheckPositions()::putAll);
+            
progressContext.getTableCheckRangePositions().addAll(jobItemProgress.getTableCheckRangePositions());
         }
         processContext = new ConsistencyCheckProcessContext(jobId);
     }
diff --git 
a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
 
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
index 1b95d6c53b1..aae4d9512b8 100644
--- 
a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
+++ 
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
@@ -17,6 +17,9 @@
 
 package 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context;
 
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -25,10 +28,13 @@ import 
org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
 import org.junit.jupiter.api.Test;
 
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 
 class ConsistencyCheckJobItemContextTest {
     
+    private static final String DATA_NODE = "ds_0.t_order";
+    
     private static final String TABLE = "t_order";
     
     private final DatabaseType databaseType = 
TypedSPILoader.getService(DatabaseType.class, "H2");
@@ -38,20 +44,40 @@ class ConsistencyCheckJobItemContextTest {
         ConsistencyCheckJobItemProgress jobItemProgress = new 
ConsistencyCheckJobItemProgress(TABLE, null, 0L, 10L, null, null, "H2");
         ConsistencyCheckJobItemContext actual = new 
ConsistencyCheckJobItemContext(new ConsistencyCheckJobConfiguration("", "", 
"DATA_MATCH", null, databaseType),
                 0, JobStatus.RUNNING, jobItemProgress);
-        
assertThat(actual.getProgressContext().getSourceTableCheckPositions().size(), 
is(0));
-        
assertThat(actual.getProgressContext().getTargetTableCheckPositions().size(), 
is(0));
+        
assertThat(actual.getProgressContext().getTableCheckRangePositions().size(), 
is(0));
     }
     
     @Test
     void assertConstructWithNonEmptyValues() {
         ConsistencyCheckJobItemProgress jobItemProgress = new 
ConsistencyCheckJobItemProgress(TABLE, null, 0L, 10L, null, null, "H2");
-        jobItemProgress.getSourceTableCheckPositions().put(TABLE, 6);
-        jobItemProgress.getTargetTableCheckPositions().put(TABLE, 5);
+        jobItemProgress.getTableCheckRangePositions().add(new 
TableCheckRangePosition(0, DATA_NODE, TABLE, new 
IntegerPrimaryKeyIngestPosition(1, 100),
+                new IntegerPrimaryKeyIngestPosition(1, 101), null, 11, 11, 
false, null));
+        jobItemProgress.getTableCheckRangePositions().add(new 
TableCheckRangePosition(1, DATA_NODE, TABLE, new 
IntegerPrimaryKeyIngestPosition(101, 200),
+                new IntegerPrimaryKeyIngestPosition(101, 203), null, 132, 132, 
false, null));
         ConsistencyCheckJobItemContext actual = new 
ConsistencyCheckJobItemContext(new ConsistencyCheckJobConfiguration("", "", 
"DATA_MATCH", null, databaseType),
                 0, JobStatus.RUNNING, jobItemProgress);
-        
assertThat(actual.getProgressContext().getSourceTableCheckPositions().size(), 
is(1));
-        
assertThat(actual.getProgressContext().getTargetTableCheckPositions().size(), 
is(1));
-        
assertThat(actual.getProgressContext().getSourceTableCheckPositions().get(TABLE),
 is(6));
-        
assertThat(actual.getProgressContext().getTargetTableCheckPositions().get(TABLE),
 is(5));
+        
assertThat(actual.getProgressContext().getTableCheckRangePositions().size(), 
is(2));
+        
assertTableCheckRangePosition(actual.getProgressContext().getTableCheckRangePositions().get(0),
+                new TableCheckRangePosition(0, DATA_NODE, TABLE, new 
IntegerPrimaryKeyIngestPosition(1, 100), new IntegerPrimaryKeyIngestPosition(1, 
101), null, 11, 11, false, null));
+        
assertTableCheckRangePosition(actual.getProgressContext().getTableCheckRangePositions().get(1),
+                new TableCheckRangePosition(1, DATA_NODE, TABLE, new 
IntegerPrimaryKeyIngestPosition(101, 200), new 
IntegerPrimaryKeyIngestPosition(101, 203), null, 132, 132, false, null));
+    }
+    
+    private void assertTableCheckRangePosition(final TableCheckRangePosition 
actual, final TableCheckRangePosition expected) {
+        assertRange(actual.getSourceRange(), expected.getSourceRange());
+        assertRange(actual.getTargetRange(), expected.getTargetRange());
+        assertThat(actual.getSplittingItem(), is(expected.getSplittingItem()));
+        assertThat(actual.getSourceDataNode(), 
is(expected.getSourceDataNode()));
+        assertThat(actual.getLogicTableName(), 
is(expected.getLogicTableName()));
+        assertThat(actual.getSourcePosition(), 
is(expected.getSourcePosition()));
+        assertThat(actual.getTargetPosition(), 
is(expected.getTargetPosition()));
+        assertThat(actual.isFinished(), is(expected.isFinished()));
+    }
+    
+    private void assertRange(final PrimaryKeyIngestPosition<?> actual, final 
PrimaryKeyIngestPosition<?> expected) {
+        assertThat(actual.getClass(), is(expected.getClass()));
+        assertThat(actual, instanceOf(IntegerPrimaryKeyIngestPosition.class));
+        assertThat(((IntegerPrimaryKeyIngestPosition) actual).getBeginValue(), 
is(((IntegerPrimaryKeyIngestPosition) expected).getBeginValue()));
+        assertThat(((IntegerPrimaryKeyIngestPosition) actual).getEndValue(), 
is(((IntegerPrimaryKeyIngestPosition) expected).getEndValue()));
     }
 }
diff --git 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index d97f6f6bf2b..bc9b35f1b90 100644
--- 
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++ 
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -21,17 +21,20 @@ import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
+import org.apache.shardingsphere.data.pipeline.core.datanode.DataNodeUtils;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
 import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
 import 
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
 import 
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
@@ -96,12 +99,28 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
                 PipelineDataSourceManager dataSourceManager = new 
PipelineDataSourceManager();
                 TableDataConsistencyChecker tableChecker = 
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps)) {
             
PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration(jobConfig.getJobId(),
 (ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget());
-            for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) {
-                if (checkTableInventoryDataUnmatchedAndBreak(each, 
tableChecker, checkResultMap, dataSourceManager)) {
-                    return 
checkResultMap.entrySet().stream().collect(Collectors.toMap(entry -> 
entry.getKey().format(), Entry::getValue));
+            if (progressContext.getTableCheckRangePositions().isEmpty()) {
+                
progressContext.getTableCheckRangePositions().addAll(splitCrossTables());
+            }
+            for (TableCheckRangePosition each : 
progressContext.getTableCheckRangePositions()) {
+                TableDataConsistencyCheckResult checkResult = 
checkSingleTableInventoryData(each, tableChecker, dataSourceManager);
+                log.info("checkResult: {}, table: {}, checkRangePosition: {}", 
checkResult, each.getSourceDataNode(), each);
+                DataNode dataNode = 
DataNodeUtils.parseWithSchema(each.getSourceDataNode());
+                QualifiedTable sourceTable = new 
QualifiedTable(dataNode.getSchemaName(), dataNode.getTableName());
+                checkResultMap.put(sourceTable, checkResult);
+                if (checkResult.isIgnored()) {
+                    
progressContext.getIgnoredTableNames().add(sourceTable.format());
+                    log.info("Table '{}' is ignored, ignore type: {}", 
each.getSourceDataNode(), checkResult.getIgnoredType());
+                    continue;
+                }
+                if (!checkResult.isMatched() && 
tableChecker.isBreakOnInventoryCheckNotMatched()) {
+                    log.info("Unmatched on table '{}', ignore left tables", 
each.getSourceDataNode());
+                    cancel();
+                    return 
checkResultMap.entrySet().stream().collect(Collectors.toMap(entry -> 
entry.getKey().toString(), Entry::getValue));
                 }
             }
         }
+        log.info("check done, jobId={}", jobConfig.getJobId());
         return 
checkResultMap.entrySet().stream().collect(Collectors.toMap(entry -> 
entry.getKey().format(), Entry::getValue));
     }
     
@@ -110,42 +129,38 @@ public final class MigrationDataConsistencyChecker 
implements PipelineDataConsis
         return 
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(TransmissionJobItemProgress::getInventoryRecordsCount).sum();
     }
     
-    private boolean checkTableInventoryDataUnmatchedAndBreak(final 
JobDataNodeLine jobDataNodeLine, final TableDataConsistencyChecker tableChecker,
-                                                             final 
Map<QualifiedTable, TableDataConsistencyCheckResult> checkResultMap,
-                                                             final 
PipelineDataSourceManager dataSourceManager) {
-        for (JobDataNodeEntry entry : jobDataNodeLine.getEntries()) {
-            for (DataNode each : entry.getDataNodes()) {
-                TableDataConsistencyCheckResult checkResult = 
checkSingleTableInventoryData(entry.getLogicTableName(), each, tableChecker, 
dataSourceManager);
-                QualifiedTable sourceTable = new 
QualifiedTable(each.getSchemaName(), each.getTableName());
-                checkResultMap.put(sourceTable, checkResult);
-                if (checkResult.isIgnored()) {
-                    
progressContext.getIgnoredTableNames().add(sourceTable.format());
-                    log.info("Table '{}' is ignored, ignore type: {}", 
each.format(), checkResult.getIgnoredType());
-                    continue;
-                }
-                if (!checkResult.isMatched() && 
tableChecker.isBreakOnInventoryCheckNotMatched()) {
-                    log.info("Unmatched on table '{}', ignore left tables", 
each.format());
-                    return true;
+    private List<TableCheckRangePosition> splitCrossTables() {
+        List<TableCheckRangePosition> result = new LinkedList<>();
+        int splittingItem = 0;
+        for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) {
+            for (JobDataNodeEntry entry : each.getEntries()) {
+                for (DataNode dataNode : entry.getDataNodes()) {
+                    result.add(new TableCheckRangePosition(splittingItem++, 
dataNode.format(), entry.getLogicTableName(),
+                            new UnsupportedKeyIngestPosition(), new 
UnsupportedKeyIngestPosition(), null));
                 }
             }
         }
-        return false;
+        return result;
     }
     
-    private TableDataConsistencyCheckResult 
checkSingleTableInventoryData(final String targetTableName, final DataNode 
dataNode,
+    private TableDataConsistencyCheckResult 
checkSingleTableInventoryData(final TableCheckRangePosition checkRangePosition,
                                                                           
final TableDataConsistencyChecker tableChecker, final PipelineDataSourceManager 
dataSourceManager) {
+        log.info("checkSingleTableInventoryData, jobId: {}, 
checkRangePosition: {}", jobConfig.getJobId(), checkRangePosition);
+        DataNode dataNode = 
DataNodeUtils.parseWithSchema(checkRangePosition.getSourceDataNode());
         QualifiedTable sourceTable = new 
QualifiedTable(dataNode.getSchemaName(), dataNode.getTableName());
         PipelineDataSource sourceDataSource = 
dataSourceManager.getDataSource(jobConfig.getSources().get(dataNode.getDataSourceName()));
         PipelineTableMetaDataLoader metaDataLoader = new 
StandardPipelineTableMetaDataLoader(sourceDataSource);
         PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(dataNode.getSchemaName(), 
dataNode.getTableName());
         ShardingSpherePreconditions.checkNotNull(tableMetaData,
                 () -> new 
PipelineTableDataConsistencyCheckLoadingFailedException(new 
QualifiedTable(dataNode.getSchemaName(), dataNode.getTableName())));
+        String targetTableName = checkRangePosition.getLogicTableName();
         List<String> columnNames = tableMetaData.getColumnNames();
         List<PipelineColumnMetaData> uniqueKeys = 
PipelineTableMetaDataUtils.getUniqueKeyColumns(sourceTable.getSchemaName(), 
sourceTable.getTableName(), metaDataLoader);
         QualifiedTable targetTable = new 
QualifiedTable(dataNode.getSchemaName(), targetTableName);
         PipelineDataSource targetDataSource = 
dataSourceManager.getDataSource(jobConfig.getTarget());
         TableInventoryCheckParameter param = new TableInventoryCheckParameter(
-                jobConfig.getJobId(), sourceDataSource, targetDataSource, 
sourceTable, targetTable, columnNames, uniqueKeys, readRateLimitAlgorithm, 
progressContext);
+                jobConfig.getJobId(), checkRangePosition.getSplittingItem(), 
sourceDataSource, targetDataSource, sourceTable, targetTable, columnNames, 
uniqueKeys,
+                readRateLimitAlgorithm, progressContext, 
checkRangePosition.getQueryCondition());
         TableInventoryChecker tableInventoryChecker = 
tableChecker.buildTableInventoryChecker(param);
         currentTableInventoryChecker.set(tableInventoryChecker);
         Optional<TableDataConsistencyCheckResult> preCheckResult = 
tableInventoryChecker.preCheck();
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
index ea5d170683d..72c02e9bf76 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
@@ -28,10 +28,12 @@ import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginPara
 import 
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter;
 import 
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
 import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
 import 
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
 import 
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
@@ -191,6 +193,8 @@ class CDCE2EIT {
         PipelineTableMetaData tableMetaData = 
metaDataLoader.getTableMetaData(qualifiedTable.getSchemaName(), 
qualifiedTable.getTableName());
         List<PipelineColumnMetaData> uniqueKeys = 
Collections.singletonList(tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0)));
         ConsistencyCheckJobItemProgressContext progressContext = new 
ConsistencyCheckJobItemProgressContext("", 0, 
sourceDataSource.getDatabaseType().getType());
+        progressContext.getTableCheckRangePositions().add(new 
TableCheckRangePosition(0, null, qualifiedTable.getTableName(),
+                new UnsupportedKeyIngestPosition(), new 
UnsupportedKeyIngestPosition(), null));
         TableInventoryCheckParameter param = new 
TableInventoryCheckParameter("", sourceDataSource, targetDataSource, 
qualifiedTable, qualifiedTable,
                 tableMetaData.getColumnNames(), uniqueKeys, null, 
progressContext);
         TableDataConsistencyChecker tableChecker = 
TypedSPILoader.getService(TableDataConsistencyChecker.class, "DATA_MATCH", new 
Properties());
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
index fb999a36a4c..2b397612e4d 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
@@ -107,7 +107,7 @@ class RecordSingleTableInventoryCalculatorTest {
     void assertCalculateOfRangeQueryFromBeginWithOrderIdUniqueKey(final String 
streamingRangeType) {
         RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(4, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"t_order"),
-                Collections.emptyList(), buildOrderIdUniqueKey(), 
QueryType.RANGE_QUERY);
+                Collections.emptyList(), buildOrderIdUniqueKey(), 
QueryType.RANGE_QUERY, null);
         assertQueryRangeCalculatedResult(calculator, param, new QueryRange(0, 
false, null), 4, 4);
     }
     
@@ -116,7 +116,7 @@ class RecordSingleTableInventoryCalculatorTest {
     void assertCalculateOfRangeQueryFromMiddleWithOrderIdUniqueKey(final 
String streamingRangeType) {
         RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(4, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"t_order"),
-                Collections.emptyList(), buildOrderIdUniqueKey(), 
QueryType.RANGE_QUERY);
+                Collections.emptyList(), buildOrderIdUniqueKey(), 
QueryType.RANGE_QUERY, null);
         assertQueryRangeCalculatedResult(calculator, param, new QueryRange(4, 
false, null), 4, 8);
     }
     
@@ -125,7 +125,7 @@ class RecordSingleTableInventoryCalculatorTest {
     void assertCalculateOfRangeQueryWithMultiColumnUniqueKeys(final String 
streamingRangeType) {
         RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(1000, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"t_order"),
-                Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.RANGE_QUERY);
+                Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.RANGE_QUERY, null);
         assertQueryRangeCalculatedResult(calculator, param, new QueryRange(3, 
true, 6), 8, 6);
         assertQueryRangeCalculatedResult(calculator, param, new QueryRange(3, 
false, 6), 3, 6);
     }
@@ -153,7 +153,7 @@ class RecordSingleTableInventoryCalculatorTest {
         }
         RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(1000, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"test3"),
-                Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.RANGE_QUERY);
+                Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.RANGE_QUERY, null);
         assertQueryRangeCalculatedResult(calculator, param, new QueryRange(3, 
true, 4), 4, 4);
         assertQueryRangeCalculatedResult(calculator, param, new QueryRange(5, 
true, 6), 4, 6);
         assertQueryRangeCalculatedResult(calculator, param, new QueryRange(5, 
true, 7), 5, 7);
@@ -164,7 +164,7 @@ class RecordSingleTableInventoryCalculatorTest {
     void assertCalculateOfReservedRangeQueryWithMultiColumnUniqueKeys(final 
String streamingRangeType) {
         RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(1000, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"t_order"),
-                Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.RANGE_QUERY);
+                Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.RANGE_QUERY, null);
         param.setQueryRange(new QueryRange(3, true, 2));
         Optional<SingleTableInventoryCalculatedResult> calculatedResult = 
calculator.calculateChunk(param);
         QuietlyCloser.close(param.getCalculationContext());
@@ -179,7 +179,7 @@ class RecordSingleTableInventoryCalculatorTest {
             connection.createStatement().execute("CREATE TABLE test1 (user_id 
INT NOT NULL, order_id INT, status VARCHAR(12), PRIMARY KEY (user_id))");
         }
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"test1"),
-                Collections.emptyList(), buildUserIdUniqueKey(), 
QueryType.RANGE_QUERY);
+                Collections.emptyList(), buildUserIdUniqueKey(), 
QueryType.RANGE_QUERY, null);
         param.setQueryRange(new QueryRange(0, false, null));
         RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(5, 
StreamingRangeType.valueOf(streamingRangeType));
         Optional<SingleTableInventoryCalculatedResult> calculateResult = 
calculator.calculateChunk(param);
@@ -195,7 +195,7 @@ class RecordSingleTableInventoryCalculatorTest {
             connection.createStatement().execute("CREATE TABLE test2 (user_id 
INT NOT NULL, order_id INT, status VARCHAR(12), PRIMARY KEY (user_id, 
order_id))");
         }
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"test2"),
-                Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.RANGE_QUERY);
+                Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.RANGE_QUERY, null);
         param.setQueryRange(new QueryRange(null, false, null));
         RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(5, 
StreamingRangeType.valueOf(streamingRangeType));
         Optional<SingleTableInventoryCalculatedResult> calculateResult = 
calculator.calculateChunk(param);
@@ -208,7 +208,7 @@ class RecordSingleTableInventoryCalculatorTest {
     void assertCalculateOfRangeQueryAllWithOrderIdUniqueKeyWith3x(final int 
streamingChunkCount, final String streamingRangeType) {
         RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(3, streamingChunkCount, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"t_order"),
-                Collections.emptyList(), buildOrderIdUniqueKey(), 
QueryType.RANGE_QUERY);
+                Collections.emptyList(), buildOrderIdUniqueKey(), 
QueryType.RANGE_QUERY, null);
         param.setQueryRange(new QueryRange(null, false, null));
         Iterator<SingleTableInventoryCalculatedResult> resultIterator = 
calculator.calculate(param).iterator();
         RecordSingleTableInventoryCalculatedResult actual = 
(RecordSingleTableInventoryCalculatedResult) resultIterator.next();
@@ -246,7 +246,7 @@ class RecordSingleTableInventoryCalculatorTest {
     void 
assertCalculateOfRangeQueryAllWithMultiColumnUniqueKeysWith50x100(final String 
streamingRangeType) {
         RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(50, 100, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"t_order"),
-                Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.RANGE_QUERY);
+                Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.RANGE_QUERY, null);
         param.setQueryRange(new QueryRange(null, false, null));
         Iterator<SingleTableInventoryCalculatedResult> resultIterator = 
calculator.calculate(param).iterator();
         RecordSingleTableInventoryCalculatedResult actual = 
(RecordSingleTableInventoryCalculatedResult) resultIterator.next();
@@ -272,7 +272,7 @@ class RecordSingleTableInventoryCalculatorTest {
     void assertCalculateOfRangeQueryAllWithMultiColumnUniqueKeysWith3x(final 
int streamingChunkCount, final String streamingRangeType) {
         RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(3, streamingChunkCount, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"t_order"),
-                Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.RANGE_QUERY);
+                Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.RANGE_QUERY, null);
         param.setQueryRange(new QueryRange(null, false, null));
         Iterator<SingleTableInventoryCalculatedResult> resultIterator = 
calculator.calculate(param).iterator();
         RecordSingleTableInventoryCalculatedResult actual = 
(RecordSingleTableInventoryCalculatedResult) resultIterator.next();
@@ -310,7 +310,7 @@ class RecordSingleTableInventoryCalculatorTest {
     void assertCalculateOfRangeQueryAllWithMultiColumnUniqueKeysWith2x(final 
int streamingChunkCount, final String streamingRangeType) {
         RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(2, streamingChunkCount, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"t_order"),
-                Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.RANGE_QUERY);
+                Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.RANGE_QUERY, null);
         param.setQueryRange(new QueryRange(null, false, null));
         Iterator<SingleTableInventoryCalculatedResult> resultIterator = 
calculator.calculate(param).iterator();
         RecordSingleTableInventoryCalculatedResult actual = 
(RecordSingleTableInventoryCalculatedResult) resultIterator.next();
@@ -357,7 +357,7 @@ class RecordSingleTableInventoryCalculatorTest {
     void assertCalculateOfPointQuery(final String streamingRangeType) {
         RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(3, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"t_order"),
-                Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.POINT_QUERY);
+                Collections.emptyList(), buildMultiColumnUniqueKeys(), 
QueryType.POINT_QUERY, null);
         param.setUniqueKeysValues(Arrays.asList(3, 3));
         Optional<SingleTableInventoryCalculatedResult> calculatedResult = 
calculator.calculateChunk(param);
         QuietlyCloser.close(param.getCalculationContext());
@@ -373,7 +373,7 @@ class RecordSingleTableInventoryCalculatorTest {
     void assertCalculateOfPointRangeQuery(final String streamingRangeType) {
         RecordSingleTableInventoryCalculator calculator = new 
RecordSingleTableInventoryCalculator(3, 
StreamingRangeType.valueOf(streamingRangeType));
         SingleTableInventoryCalculateParameter param = new 
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null, 
"t_order"),
-                Collections.emptyList(), buildUserIdUniqueKey(), 
QueryType.POINT_QUERY);
+                Collections.emptyList(), buildUserIdUniqueKey(), 
QueryType.POINT_QUERY, null);
         param.setUniqueKeysValues(Collections.singleton(3));
         Optional<SingleTableInventoryCalculatedResult> calculatedResult = 
calculator.calculateChunk(param);
         QuietlyCloser.close(param.getCalculationContext());
diff --git 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
index d34d41400ae..32c4db66fd4 100644
--- 
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
+++ 
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
@@ -17,7 +17,10 @@
 
 package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
 
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml.YamlTableCheckRangePosition;
+import 
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml.YamlTableCheckRangePositionSwapper;
 import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import 
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
 import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
@@ -36,8 +39,9 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 
 import java.util.Collections;
-import java.util.Map;
+import java.util.List;
 import java.util.Optional;
+import java.util.stream.Collectors;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
@@ -53,23 +57,39 @@ class ConsistencyCheckJobExecutorCallbackTest {
     void assertBuildPipelineJobItemContext() {
         ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(new 
PipelineContextKey(InstanceType.PROXY), 
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId());
         String checkJobId = PipelineJobIdUtils.marshal(pipelineJobId);
-        Map<String, Object> expectTableCheckPosition = 
Collections.singletonMap("t_order", 100);
+        List<YamlTableCheckRangePosition> expectedYamlTableCheckRangePositions 
= Collections.singletonList(createYamlTableCheckRangePosition());
         
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId,
 0,
-                
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
+                
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectedYamlTableCheckRangePositions)));
         ConsistencyCheckJobExecutorCallback callback = new 
ConsistencyCheckJobExecutorCallback();
         ConsistencyCheckJobConfiguration jobConfig = new 
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(createYamlConsistencyCheckJobConfiguration(checkJobId));
         PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager 
= new PipelineJobItemManager<>(new 
ConsistencyCheckJobType().getYamlJobItemProgressSwapper());
         Optional<ConsistencyCheckJobItemProgress> jobItemProgress = 
jobItemManager.getProgress(jobConfig.getJobId(), 0);
         ConsistencyCheckJobItemContext actual = 
callback.buildJobItemContext(jobConfig, 0, jobItemProgress.orElse(null), null, 
null);
-        assertThat(actual.getProgressContext().getSourceTableCheckPositions(), 
is(expectTableCheckPosition));
-        assertThat(actual.getProgressContext().getTargetTableCheckPositions(), 
is(expectTableCheckPosition));
+        YamlTableCheckRangePositionSwapper tableCheckPositionSwapper = new 
YamlTableCheckRangePositionSwapper();
+        List<YamlTableCheckRangePosition> actualYamlTableCheckPositions = 
actual.getProgressContext().getTableCheckRangePositions().stream()
+                
.map(tableCheckPositionSwapper::swapToYamlConfiguration).collect(Collectors.toList());
+        assertThat(actualYamlTableCheckPositions.size(), 
is(expectedYamlTableCheckRangePositions.size()));
+        for (int i = 0; i < actualYamlTableCheckPositions.size(); i++) {
+            assertThat(actualYamlTableCheckPositions.get(i), 
is(expectedYamlTableCheckRangePositions.get(i)));
+        }
     }
     
-    private YamlConsistencyCheckJobItemProgress 
createYamlConsistencyCheckJobItemProgress(final Map<String, Object> 
expectTableCheckPosition) {
+    private YamlTableCheckRangePosition createYamlTableCheckRangePosition() {
+        YamlTableCheckRangePosition result = new YamlTableCheckRangePosition();
+        result.setSplittingItem(0);
+        result.setSourceDataNode("ds_0.t_order");
+        result.setLogicTableName("t_order");
+        result.setSourceRange(new UnsupportedKeyIngestPosition().toString());
+        result.setTargetRange(new UnsupportedKeyIngestPosition().toString());
+        result.setSourcePosition(100);
+        result.setTargetPosition(100);
+        return result;
+    }
+    
+    private YamlConsistencyCheckJobItemProgress 
createYamlConsistencyCheckJobItemProgress(final 
List<YamlTableCheckRangePosition> yamlTableCheckRangePositions) {
         YamlConsistencyCheckJobItemProgress result = new 
YamlConsistencyCheckJobItemProgress();
         result.setStatus(JobStatus.RUNNING.name());
-        result.setSourceTableCheckPositions(expectTableCheckPosition);
-        result.setTargetTableCheckPositions(expectTableCheckPosition);
+        result.setTableCheckRangePositions(yamlTableCheckRangePositions);
         return result;
     }
     

Reply via email to