This is an automated email from the ASF dual-hosted git repository.
menghaoran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 7bb94ec3be6 Refactor pipeline consistency check range and position
(#36606)
7bb94ec3be6 is described below
commit 7bb94ec3be6b84bd9157256f94e4b5f5add0e59d
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Thu Sep 18 11:30:02 2025 +0800
Refactor pipeline consistency check range and position (#36606)
* Simplify MatchingTableInventoryChecker
* Add TableCheckRangePosition
* Fix CDCE2EIT missing TableCheckRangePosition
---
.../ConsistencyCheckJobItemProgressContext.java | 9 ++--
.../position/TableCheckRangePosition.java} | 35 +++++++------
.../yaml/YamlTableCheckRangePosition.java} | 31 ++++++------
.../yaml/YamlTableCheckRangePositionSwapper.java | 51 +++++++++++++++++++
.../table/MatchingTableInventoryChecker.java | 55 ++++++++------------
.../table/TableInventoryCheckParameter.java | 12 +++++
.../RecordSingleTableInventoryCalculator.java | 2 +-
.../SingleTableInventoryCalculateParameter.java | 2 +
.../progress/ConsistencyCheckJobItemProgress.java | 14 +++--
.../YamlConsistencyCheckJobItemProgress.java | 7 ++-
...YamlConsistencyCheckJobItemProgressSwapper.java | 15 +++---
.../data/pipeline/core/task/PipelineTaskUtils.java | 28 ++++++++++
.../CRC32SingleTableInventoryCalculatorTest.java | 3 +-
.../context/ConsistencyCheckJobItemContext.java | 5 +-
.../ConsistencyCheckJobItemContextTest.java | 42 ++++++++++++---
.../MigrationDataConsistencyChecker.java | 59 ++++++++++++++--------
.../e2e/operation/pipeline/cases/cdc/CDCE2EIT.java | 4 ++
.../RecordSingleTableInventoryCalculatorTest.java | 26 +++++-----
.../ConsistencyCheckJobExecutorCallbackTest.java | 36 ++++++++++---
19 files changed, 291 insertions(+), 145 deletions(-)
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
index 0a986c4768a..cfd2810435b 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/ConsistencyCheckJobItemProgressContext.java
@@ -20,13 +20,14 @@ package
org.apache.shardingsphere.data.pipeline.core.consistencycheck;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobProgressListener;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.persist.PipelineJobProgressPersistService;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.List;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicLong;
@@ -54,9 +55,7 @@ public final class ConsistencyCheckJobItemProgressContext
implements PipelineJob
private volatile Long checkEndTimeMillis;
- private final Map<String, Object> sourceTableCheckPositions = new
ConcurrentHashMap<>();
-
- private final Map<String, Object> targetTableCheckPositions = new
ConcurrentHashMap<>();
+ private final List<TableCheckRangePosition> tableCheckRangePositions = new
ArrayList<>();
private final String sourceDatabaseType;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/TableCheckRangePosition.java
similarity index 51%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/TableCheckRangePosition.java
index 49f7580454f..430bc6f4e4e 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/TableCheckRangePosition.java
@@ -15,37 +15,42 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config;
+package org.apache.shardingsphere.data.pipeline.core.consistencycheck.position;
+import lombok.AllArgsConstructor;
import lombok.Getter;
+import lombok.RequiredArgsConstructor;
import lombok.Setter;
-
-import java.util.Map;
+import lombok.ToString;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
/**
- * Yaml data consistency check job item progress.
+ * Table check range position.
*/
+@RequiredArgsConstructor
+@AllArgsConstructor
@Getter
@Setter
-public final class YamlConsistencyCheckJobItemProgress implements
YamlPipelineJobItemProgressConfiguration {
+@ToString
+public final class TableCheckRangePosition {
- private String status;
+ private final Integer splittingItem;
- private String tableNames;
+ private final String sourceDataNode;
- private String ignoredTableNames;
+ private final String logicTableName;
- private Long checkedRecordsCount;
+ private final PrimaryKeyIngestPosition<?> sourceRange;
- private Long recordsCount;
+ private final PrimaryKeyIngestPosition<?> targetRange;
- private Long checkBeginTimeMillis;
+ private final String queryCondition;
- private Long checkEndTimeMillis;
+ private volatile Object sourcePosition;
- private Map<String, Object> sourceTableCheckPositions;
+ private volatile Object targetPosition;
- private Map<String, Object> targetTableCheckPositions;
+ private volatile boolean finished;
- private String sourceDatabaseType;
+ private volatile Boolean matched;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePosition.java
similarity index 58%
copy from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java
copy to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePosition.java
index 49f7580454f..7dcb5a7f007 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePosition.java
@@ -15,37 +15,38 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config;
+package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml;
+import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
-
-import java.util.Map;
+import org.apache.shardingsphere.infra.util.yaml.YamlConfiguration;
/**
- * Yaml data consistency check job item progress.
+ * Yaml table check range position.
*/
@Getter
@Setter
-public final class YamlConsistencyCheckJobItemProgress implements
YamlPipelineJobItemProgressConfiguration {
+@EqualsAndHashCode
+public final class YamlTableCheckRangePosition implements YamlConfiguration {
- private String status;
+ private Integer splittingItem;
- private String tableNames;
+ private String sourceDataNode;
- private String ignoredTableNames;
+ private String logicTableName;
- private Long checkedRecordsCount;
+ private String sourceRange;
- private Long recordsCount;
+ private String targetRange;
- private Long checkBeginTimeMillis;
+ private String queryCondition;
- private Long checkEndTimeMillis;
+ private Object sourcePosition;
- private Map<String, Object> sourceTableCheckPositions;
+ private Object targetPosition;
- private Map<String, Object> targetTableCheckPositions;
+ private boolean finished;
- private String sourceDatabaseType;
+ private Boolean matched;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePositionSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePositionSwapper.java
new file mode 100644
index 00000000000..1236cd785fc
--- /dev/null
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/position/yaml/YamlTableCheckRangePositionSwapper.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml;
+
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPositionFactory;
+import
org.apache.shardingsphere.infra.util.yaml.swapper.YamlConfigurationSwapper;
+
+/**
+ * Yaml table check range position swapper.
+ */
+public final class YamlTableCheckRangePositionSwapper implements
YamlConfigurationSwapper<YamlTableCheckRangePosition, TableCheckRangePosition> {
+
+ @Override
+ public YamlTableCheckRangePosition swapToYamlConfiguration(final
TableCheckRangePosition data) {
+ YamlTableCheckRangePosition result = new YamlTableCheckRangePosition();
+ result.setSplittingItem(data.getSplittingItem());
+ result.setSourceDataNode(data.getSourceDataNode());
+ result.setLogicTableName(data.getLogicTableName());
+ result.setSourceRange(data.getSourceRange().toString());
+ result.setTargetRange(data.getTargetRange().toString());
+ result.setQueryCondition(data.getQueryCondition());
+ result.setSourcePosition(data.getSourcePosition());
+ result.setTargetPosition(data.getTargetPosition());
+ result.setFinished(data.isFinished());
+ result.setMatched(data.getMatched());
+ return result;
+ }
+
+ @Override
+ public TableCheckRangePosition swapToObject(final
YamlTableCheckRangePosition yamlConfig) {
+ return new TableCheckRangePosition(yamlConfig.getSplittingItem(),
yamlConfig.getSourceDataNode(), yamlConfig.getLogicTableName(),
+
PrimaryKeyIngestPositionFactory.newInstance(yamlConfig.getSourceRange()),
PrimaryKeyIngestPositionFactory.newInstance(yamlConfig.getTargetRange()),
+ yamlConfig.getQueryCondition(),
yamlConfig.getSourcePosition(), yamlConfig.getTargetPosition(),
yamlConfig.isFinished(), yamlConfig.getMatched());
+ }
+}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
index 4198729ac46..bcd622ee34a 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableInventoryChecker.java
@@ -21,6 +21,7 @@ import lombok.AccessLevel;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.SingleTableInventoryCalculatedResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.yaml.YamlTableDataConsistencyCheckResult;
@@ -31,18 +32,14 @@ import
org.apache.shardingsphere.data.pipeline.core.constant.PipelineSQLOperatio
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.QueryType;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.query.range.QueryRange;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
-import
org.apache.shardingsphere.infra.exception.external.sql.type.kernel.category.PipelineSQLException;
-import
org.apache.shardingsphere.infra.exception.external.sql.type.wrapper.SQLWrapperException;
+import org.apache.shardingsphere.data.pipeline.core.task.PipelineTaskUtils;
import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFactoryBuilder;
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
-import java.sql.SQLException;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -78,18 +75,21 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
private TableDataConsistencyCheckResult
checkSingleTableInventoryData(final TableInventoryCheckParameter param, final
ThreadPoolExecutor executor) {
SingleTableInventoryCalculateParameter sourceParam = new
SingleTableInventoryCalculateParameter(param.getSourceDataSource(),
param.getSourceTable(),
- param.getColumnNames(), param.getUniqueKeys(),
QueryType.RANGE_QUERY);
- sourceParam.setQueryRange(new
QueryRange(param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName()),
true, null));
+ param.getColumnNames(), param.getUniqueKeys(),
QueryType.RANGE_QUERY, param.getQueryCondition());
+ TableCheckRangePosition checkRangePosition =
param.getProgressContext().getTableCheckRangePositions().get(param.getSplittingItem());
+ sourceParam.setQueryRange(new QueryRange(null !=
checkRangePosition.getSourcePosition() ? checkRangePosition.getSourcePosition()
: checkRangePosition.getSourceRange().getBeginValue(),
+ true, checkRangePosition.getSourceRange().getEndValue()));
SingleTableInventoryCalculateParameter targetParam = new
SingleTableInventoryCalculateParameter(param.getTargetDataSource(),
param.getTargetTable(),
- param.getColumnNames(), param.getUniqueKeys(),
QueryType.RANGE_QUERY);
- targetParam.setQueryRange(new
QueryRange(param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName()),
true, null));
+ param.getColumnNames(), param.getUniqueKeys(),
QueryType.RANGE_QUERY, param.getQueryCondition());
+ targetParam.setQueryRange(new QueryRange(null !=
checkRangePosition.getTargetPosition() ? checkRangePosition.getTargetPosition()
: checkRangePosition.getTargetRange().getBeginValue(),
+ true, checkRangePosition.getTargetRange().getEndValue()));
SingleTableInventoryCalculator sourceCalculator =
buildSingleTableInventoryCalculator();
this.sourceCalculator = sourceCalculator;
SingleTableInventoryCalculator targetCalculator =
buildSingleTableInventoryCalculator();
this.targetCalculator = targetCalculator;
try {
- Iterator<SingleTableInventoryCalculatedResult>
sourceCalculatedResults = waitFuture(executor.submit(() ->
sourceCalculator.calculate(sourceParam))).iterator();
- Iterator<SingleTableInventoryCalculatedResult>
targetCalculatedResults = waitFuture(executor.submit(() ->
targetCalculator.calculate(targetParam))).iterator();
+ Iterator<SingleTableInventoryCalculatedResult>
sourceCalculatedResults = PipelineTaskUtils.waitFuture(executor.submit(() ->
sourceCalculator.calculate(sourceParam))).iterator();
+ Iterator<SingleTableInventoryCalculatedResult>
targetCalculatedResults = PipelineTaskUtils.waitFuture(executor.submit(() ->
targetCalculator.calculate(targetParam))).iterator();
return checkSingleTableInventoryData(sourceCalculatedResults,
targetCalculatedResults, param, executor);
} finally {
QuietlyCloser.close(sourceParam.getCalculationContext());
@@ -107,46 +107,31 @@ public abstract class MatchingTableInventoryChecker
implements TableInventoryChe
if (null != param.getReadRateLimitAlgorithm()) {
param.getReadRateLimitAlgorithm().intercept(PipelineSQLOperationType.SELECT, 1);
}
- SingleTableInventoryCalculatedResult sourceCalculatedResult =
waitFuture(executor.submit(sourceCalculatedResults::next));
- SingleTableInventoryCalculatedResult targetCalculatedResult =
waitFuture(executor.submit(targetCalculatedResults::next));
+ SingleTableInventoryCalculatedResult sourceCalculatedResult =
PipelineTaskUtils.waitFuture(executor.submit(sourceCalculatedResults::next));
+ SingleTableInventoryCalculatedResult targetCalculatedResult =
PipelineTaskUtils.waitFuture(executor.submit(targetCalculatedResults::next));
if (!Objects.equals(sourceCalculatedResult,
targetCalculatedResult)) {
checkResult.setMatched(false);
log.info("content matched false, jobId={}, sourceTable={},
targetTable={}, uniqueKeys={}", param.getJobId(), param.getSourceTable(),
param.getTargetTable(), param.getUniqueKeys());
break;
}
+ TableCheckRangePosition checkRangePosition =
param.getProgressContext().getTableCheckRangePositions().get(param.getSplittingItem());
if (sourceCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
-
param.getProgressContext().getSourceTableCheckPositions().put(param.getSourceTable().getTableName(),
sourceCalculatedResult.getMaxUniqueKeyValue().get());
+
checkRangePosition.setSourcePosition(sourceCalculatedResult.getMaxUniqueKeyValue().get());
}
if (targetCalculatedResult.getMaxUniqueKeyValue().isPresent()) {
-
param.getProgressContext().getTargetTableCheckPositions().put(param.getTargetTable().getTableName(),
targetCalculatedResult.getMaxUniqueKeyValue().get());
+
checkRangePosition.setTargetPosition(targetCalculatedResult.getMaxUniqueKeyValue().get());
}
param.getProgressContext().onProgressUpdated(new
PipelineJobUpdateProgress(sourceCalculatedResult.getRecordsCount()));
}
- if (sourceCalculatedResults.hasNext()) {
+ TableCheckRangePosition checkRangePosition =
param.getProgressContext().getTableCheckRangePositions().get(param.getSplittingItem());
+ checkRangePosition.setFinished(true);
+ if (sourceCalculatedResults.hasNext() ||
targetCalculatedResults.hasNext()) {
checkResult.setMatched(false);
- return new
YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
- }
- if (targetCalculatedResults.hasNext()) {
- checkResult.setMatched(false);
- return new
YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
}
+ checkRangePosition.setMatched(checkResult.isMatched());
return new
YamlTableDataConsistencyCheckResultSwapper().swapToObject(checkResult);
}
- private <T> T waitFuture(final Future<T> future) {
- try {
- return future.get();
- } catch (final InterruptedException ex) {
- Thread.currentThread().interrupt();
- throw new SQLWrapperException(new SQLException(ex));
- } catch (final ExecutionException ex) {
- if (ex.getCause() instanceof PipelineSQLException) {
- throw (PipelineSQLException) ex.getCause();
- }
- throw new SQLWrapperException(new SQLException(ex));
- }
- }
-
protected abstract SingleTableInventoryCalculator
buildSingleTableInventoryCalculator();
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
index 4419b643093..3cf1f8f37f2 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
@@ -36,6 +36,8 @@ public final class TableInventoryCheckParameter {
private final String jobId;
+ private final int splittingItem;
+
private final PipelineDataSource sourceDataSource;
private final PipelineDataSource targetDataSource;
@@ -51,4 +53,14 @@ public final class TableInventoryCheckParameter {
private final JobRateLimitAlgorithm readRateLimitAlgorithm;
private final ConsistencyCheckJobItemProgressContext progressContext;
+
+ private final String queryCondition;
+
+ public TableInventoryCheckParameter(final String jobId, final
PipelineDataSource sourceDataSource, final PipelineDataSource targetDataSource,
+ final QualifiedTable sourceTable,
final QualifiedTable targetTable,
+ final List<String> columnNames, final
List<PipelineColumnMetaData> uniqueKeys,
+ final JobRateLimitAlgorithm
readRateLimitAlgorithm, final ConsistencyCheckJobItemProgressContext
progressContext) {
+ this(jobId, 0, sourceDataSource, targetDataSource, sourceTable,
targetTable, columnNames, uniqueKeys, readRateLimitAlgorithm, progressContext,
+ null);
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
index 9c07cc5965d..d1a29f02502 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculator.java
@@ -311,7 +311,7 @@ public final class RecordSingleTableInventoryCalculator
extends AbstractStreamin
private SingleTableInventoryCalculateParameter
buildPointRangeQueryCalculateParameter(final
SingleTableInventoryCalculateParameter param, final Object uniqueKeyValue) {
SingleTableInventoryCalculateParameter result = new
SingleTableInventoryCalculateParameter(param.getDataSource(), param.getTable(),
param.getColumnNames(),
- Collections.singletonList(param.getFirstUniqueKey()),
QueryType.POINT_QUERY);
+ Collections.singletonList(param.getFirstUniqueKey()),
QueryType.POINT_QUERY, param.getQueryCondition());
result.setUniqueKeysValues(Collections.singletonList(uniqueKeyValue));
result.setShardingColumnsNames(param.getShardingColumnsNames());
result.setShardingColumnsValues(param.getShardingColumnsValues());
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
index 103ebba3d97..8fe9e813bcc 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/SingleTableInventoryCalculateParameter.java
@@ -68,6 +68,8 @@ public final class SingleTableInventoryCalculateParameter {
private final QueryType queryType;
+ private final String queryCondition;
+
/**
* Get database type.
*
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/ConsistencyCheckJobItemProgress.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/ConsistencyCheckJobItemProgress.java
index 4607d4c8226..e21b7c3a711 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/ConsistencyCheckJobItemProgress.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/ConsistencyCheckJobItemProgress.java
@@ -21,11 +21,12 @@ import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import lombok.ToString;
-import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
+import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
-import java.util.LinkedHashMap;
-import java.util.Map;
+import java.util.ArrayList;
+import java.util.List;
/**
* Data consistency check job item progress.
@@ -48,9 +49,7 @@ public final class ConsistencyCheckJobItemProgress implements
PipelineJobItemPro
private final Long checkEndTimeMillis;
- private final Map<String, Object> sourceTableCheckPositions = new
LinkedHashMap<>();
-
- private final Map<String, Object> targetTableCheckPositions = new
LinkedHashMap<>();
+ private final List<TableCheckRangePosition> tableCheckRangePositions = new
ArrayList<>();
private final String sourceDatabaseType;
@@ -64,8 +63,7 @@ public final class ConsistencyCheckJobItemProgress implements
PipelineJobItemPro
recordsCount = context.getRecordsCount();
checkBeginTimeMillis = context.getCheckBeginTimeMillis();
checkEndTimeMillis = context.getCheckEndTimeMillis();
-
sourceTableCheckPositions.putAll(context.getSourceTableCheckPositions());
-
targetTableCheckPositions.putAll(context.getTargetTableCheckPositions());
+ tableCheckRangePositions.addAll(context.getTableCheckRangePositions());
sourceDatabaseType = context.getSourceDatabaseType();
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java
index 49f7580454f..a0df0d7ae13 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/config/YamlConsistencyCheckJobItemProgress.java
@@ -19,8 +19,9 @@ package
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config;
import lombok.Getter;
import lombok.Setter;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml.YamlTableCheckRangePosition;
-import java.util.Map;
+import java.util.List;
/**
* Yaml data consistency check job item progress.
@@ -43,9 +44,7 @@ public final class YamlConsistencyCheckJobItemProgress
implements YamlPipelineJo
private Long checkEndTimeMillis;
- private Map<String, Object> sourceTableCheckPositions;
-
- private Map<String, Object> targetTableCheckPositions;
+ private List<YamlTableCheckRangePosition> tableCheckRangePositions;
private String sourceDatabaseType;
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlConsistencyCheckJobItemProgressSwapper.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlConsistencyCheckJobItemProgressSwapper.java
index 7f4e8d22156..2c8ce5fccfd 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlConsistencyCheckJobItemProgressSwapper.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/job/progress/yaml/swapper/YamlConsistencyCheckJobItemProgressSwapper.java
@@ -17,15 +17,20 @@
package org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.swapper;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml.YamlTableCheckRangePositionSwapper;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.yaml.config.YamlConsistencyCheckJobItemProgress;
+import java.util.stream.Collectors;
+
/**
* YAML data check job item progress swapper.
*/
public final class YamlConsistencyCheckJobItemProgressSwapper implements
YamlPipelineJobItemProgressSwapper<YamlConsistencyCheckJobItemProgress,
ConsistencyCheckJobItemProgress> {
+ private final YamlTableCheckRangePositionSwapper tableCheckPositionSwapper
= new YamlTableCheckRangePositionSwapper();
+
@Override
public YamlConsistencyCheckJobItemProgress swapToYamlConfiguration(final
ConsistencyCheckJobItemProgress data) {
YamlConsistencyCheckJobItemProgress result = new
YamlConsistencyCheckJobItemProgress();
@@ -36,8 +41,7 @@ public final class YamlConsistencyCheckJobItemProgressSwapper
implements YamlPip
result.setRecordsCount(data.getRecordsCount());
result.setCheckBeginTimeMillis(data.getCheckBeginTimeMillis());
result.setCheckEndTimeMillis(data.getCheckEndTimeMillis());
-
result.setSourceTableCheckPositions(data.getSourceTableCheckPositions());
-
result.setTargetTableCheckPositions(data.getTargetTableCheckPositions());
+
result.setTableCheckRangePositions(data.getTableCheckRangePositions().stream().map(tableCheckPositionSwapper::swapToYamlConfiguration).collect(Collectors.toList()));
result.setSourceDatabaseType(data.getSourceDatabaseType());
return result;
}
@@ -46,11 +50,8 @@ public final class
YamlConsistencyCheckJobItemProgressSwapper implements YamlPip
public ConsistencyCheckJobItemProgress swapToObject(final
YamlConsistencyCheckJobItemProgress yamlConfig) {
ConsistencyCheckJobItemProgress result = new
ConsistencyCheckJobItemProgress(yamlConfig.getTableNames(),
yamlConfig.getIgnoredTableNames(), yamlConfig.getCheckedRecordsCount(),
yamlConfig.getRecordsCount(),
yamlConfig.getCheckBeginTimeMillis(), yamlConfig.getCheckEndTimeMillis(),
yamlConfig.getSourceDatabaseType());
- if (null != yamlConfig.getSourceTableCheckPositions()) {
-
result.getSourceTableCheckPositions().putAll(yamlConfig.getSourceTableCheckPositions());
- }
- if (null != yamlConfig.getTargetTableCheckPositions()) {
-
result.getTargetTableCheckPositions().putAll(yamlConfig.getTargetTableCheckPositions());
+ if (null != yamlConfig.getTableCheckRangePositions()) {
+
result.getTableCheckRangePositions().addAll(yamlConfig.getTableCheckRangePositions().stream().map(tableCheckPositionSwapper::swapToObject).collect(Collectors.toList()));
}
result.setStatus(JobStatus.valueOf(yamlConfig.getStatus()));
return result;
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
index a4ae090d786..acf9205e648 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/task/PipelineTaskUtils.java
@@ -19,12 +19,18 @@ package org.apache.shardingsphere.data.pipeline.core.task;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.core.exception.PipelineJobCancelingException;
import
org.apache.shardingsphere.data.pipeline.core.ingest.dumper.inventory.InventoryDumperContext;
import
org.apache.shardingsphere.data.pipeline.core.ingest.position.IngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.task.progress.IncrementalTaskProgress;
+import
org.apache.shardingsphere.infra.exception.external.sql.type.kernel.category.PipelineSQLException;
+import
org.apache.shardingsphere.infra.exception.external.sql.type.wrapper.SQLWrapperException;
+import java.sql.SQLException;
import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
/**
* Pipeline task utilities.
@@ -57,4 +63,26 @@ public final class PipelineTaskUtils {
}
return result;
}
+
+ /**
+ * Wait future.
+ *
+ * @param future future to wait
+ * @param <T> result type
+ * @return execution result
+ * @throws SQLWrapperException if the future execution fails
+ */
+ public static <T> T waitFuture(final Future<T> future) {
+ try {
+ return future.get();
+ } catch (final InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ throw new SQLWrapperException(new SQLException(ex));
+ } catch (final ExecutionException ex) {
+ if (ex.getCause() instanceof PipelineSQLException || ex.getCause()
instanceof PipelineJobCancelingException) {
+ throw (PipelineSQLException) ex.getCause();
+ }
+ throw new SQLWrapperException(new SQLException(ex));
+ }
+ }
}
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
index e2b78e3733b..597b1e215ad 100644
---
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/CRC32SingleTableInventoryCalculatorTest.java
@@ -66,7 +66,8 @@ class CRC32SingleTableInventoryCalculatorTest {
void setUp() throws SQLException {
DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, "FIXTURE");
List<PipelineColumnMetaData> uniqueKeys =
Collections.singletonList(new PipelineColumnMetaData(1, "id", Types.INTEGER,
"integer", false, true, true));
- parameter = new
SingleTableInventoryCalculateParameter(pipelineDataSource, new
QualifiedTable(null, "foo_tbl"), Arrays.asList("foo_col", "bar_col"),
uniqueKeys, QueryType.RANGE_QUERY);
+ parameter = new
SingleTableInventoryCalculateParameter(pipelineDataSource, new
QualifiedTable(null, "foo_tbl"),
+ Arrays.asList("foo_col", "bar_col"), uniqueKeys,
QueryType.RANGE_QUERY, null);
when(pipelineDataSource.getDatabaseType()).thenReturn(databaseType);
when(pipelineDataSource.getConnection()).thenReturn(connection);
}
diff --git
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
index 39a88d0bdc0..bb56b31df97 100644
---
a/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
+++
b/kernel/data-pipeline/scenario/consistency-check/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContext.java
@@ -19,11 +19,11 @@ package
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.contex
import lombok.Getter;
import lombok.Setter;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineJobItemContext;
import
org.apache.shardingsphere.data.pipeline.core.context.PipelineProcessContext;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
import java.util.Optional;
@@ -59,8 +59,7 @@ public final class ConsistencyCheckJobItemContext implements
PipelineJobItemCont
progressContext = new ConsistencyCheckJobItemProgressContext(jobId,
shardingItem, jobConfig.getSourceDatabaseType().getType());
if (null != jobItemProgress) {
progressContext.getCheckedRecordsCount().set(Optional.ofNullable(jobItemProgress.getCheckedRecordsCount()).orElse(0L));
-
Optional.ofNullable(jobItemProgress.getSourceTableCheckPositions()).ifPresent(progressContext.getSourceTableCheckPositions()::putAll);
-
Optional.ofNullable(jobItemProgress.getTargetTableCheckPositions()).ifPresent(progressContext.getTargetTableCheckPositions()::putAll);
+
progressContext.getTableCheckRangePositions().addAll(jobItemProgress.getTableCheckRangePositions());
}
processContext = new ConsistencyCheckProcessContext(jobId);
}
diff --git
a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
index 1b95d6c53b1..aae4d9512b8 100644
---
a/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
+++
b/kernel/data-pipeline/scenario/consistency-check/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/context/ConsistencyCheckJobItemContextTest.java
@@ -17,6 +17,9 @@
package
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.PrimaryKeyIngestPosition;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.IntegerPrimaryKeyIngestPosition;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.ConsistencyCheckJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
@@ -25,10 +28,13 @@ import
org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
class ConsistencyCheckJobItemContextTest {
+ private static final String DATA_NODE = "ds_0.t_order";
+
private static final String TABLE = "t_order";
private final DatabaseType databaseType =
TypedSPILoader.getService(DatabaseType.class, "H2");
@@ -38,20 +44,40 @@ class ConsistencyCheckJobItemContextTest {
ConsistencyCheckJobItemProgress jobItemProgress = new
ConsistencyCheckJobItemProgress(TABLE, null, 0L, 10L, null, null, "H2");
ConsistencyCheckJobItemContext actual = new
ConsistencyCheckJobItemContext(new ConsistencyCheckJobConfiguration("", "",
"DATA_MATCH", null, databaseType),
0, JobStatus.RUNNING, jobItemProgress);
-
assertThat(actual.getProgressContext().getSourceTableCheckPositions().size(),
is(0));
-
assertThat(actual.getProgressContext().getTargetTableCheckPositions().size(),
is(0));
+
assertThat(actual.getProgressContext().getTableCheckRangePositions().size(),
is(0));
}
@Test
void assertConstructWithNonEmptyValues() {
ConsistencyCheckJobItemProgress jobItemProgress = new
ConsistencyCheckJobItemProgress(TABLE, null, 0L, 10L, null, null, "H2");
- jobItemProgress.getSourceTableCheckPositions().put(TABLE, 6);
- jobItemProgress.getTargetTableCheckPositions().put(TABLE, 5);
+ jobItemProgress.getTableCheckRangePositions().add(new
TableCheckRangePosition(0, DATA_NODE, TABLE, new
IntegerPrimaryKeyIngestPosition(1, 100),
+ new IntegerPrimaryKeyIngestPosition(1, 101), null, 11, 11,
false, null));
+ jobItemProgress.getTableCheckRangePositions().add(new
TableCheckRangePosition(1, DATA_NODE, TABLE, new
IntegerPrimaryKeyIngestPosition(101, 200),
+ new IntegerPrimaryKeyIngestPosition(101, 203), null, 132, 132,
false, null));
ConsistencyCheckJobItemContext actual = new
ConsistencyCheckJobItemContext(new ConsistencyCheckJobConfiguration("", "",
"DATA_MATCH", null, databaseType),
0, JobStatus.RUNNING, jobItemProgress);
-
assertThat(actual.getProgressContext().getSourceTableCheckPositions().size(),
is(1));
-
assertThat(actual.getProgressContext().getTargetTableCheckPositions().size(),
is(1));
-
assertThat(actual.getProgressContext().getSourceTableCheckPositions().get(TABLE),
is(6));
-
assertThat(actual.getProgressContext().getTargetTableCheckPositions().get(TABLE),
is(5));
+
assertThat(actual.getProgressContext().getTableCheckRangePositions().size(),
is(2));
+
assertTableCheckRangePosition(actual.getProgressContext().getTableCheckRangePositions().get(0),
+ new TableCheckRangePosition(0, DATA_NODE, TABLE, new
IntegerPrimaryKeyIngestPosition(1, 100), new IntegerPrimaryKeyIngestPosition(1,
101), null, 11, 11, false, null));
+
assertTableCheckRangePosition(actual.getProgressContext().getTableCheckRangePositions().get(1),
+ new TableCheckRangePosition(1, DATA_NODE, TABLE, new
IntegerPrimaryKeyIngestPosition(101, 200), new
IntegerPrimaryKeyIngestPosition(101, 203), null, 132, 132, false, null));
+ }
+
+ private void assertTableCheckRangePosition(final TableCheckRangePosition
actual, final TableCheckRangePosition expected) {
+ assertRange(actual.getSourceRange(), expected.getSourceRange());
+ assertRange(actual.getTargetRange(), expected.getTargetRange());
+ assertThat(actual.getSplittingItem(), is(expected.getSplittingItem()));
+ assertThat(actual.getSourceDataNode(),
is(expected.getSourceDataNode()));
+ assertThat(actual.getLogicTableName(),
is(expected.getLogicTableName()));
+ assertThat(actual.getSourcePosition(),
is(expected.getSourcePosition()));
+ assertThat(actual.getTargetPosition(),
is(expected.getTargetPosition()));
+ assertThat(actual.isFinished(), is(expected.isFinished()));
+ }
+
+ private void assertRange(final PrimaryKeyIngestPosition<?> actual, final
PrimaryKeyIngestPosition<?> expected) {
+ assertThat(actual.getClass(), is(expected.getClass()));
+ assertThat(actual, instanceOf(IntegerPrimaryKeyIngestPosition.class));
+ assertThat(((IntegerPrimaryKeyIngestPosition) actual).getBeginValue(),
is(((IntegerPrimaryKeyIngestPosition) expected).getBeginValue()));
+ assertThat(((IntegerPrimaryKeyIngestPosition) actual).getEndValue(),
is(((IntegerPrimaryKeyIngestPosition) expected).getEndValue()));
}
}
diff --git
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index d97f6f6bf2b..bc9b35f1b90 100644
---
a/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++
b/kernel/data-pipeline/scenario/migration/core/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -21,17 +21,20 @@ import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.api.type.ShardingSpherePipelineDataSourceConfiguration;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryChecker;
import
org.apache.shardingsphere.data.pipeline.core.context.TransmissionProcessContext;
+import org.apache.shardingsphere.data.pipeline.core.datanode.DataNodeUtils;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeEntry;
import org.apache.shardingsphere.data.pipeline.core.datanode.JobDataNodeLine;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSourceManager;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.TransmissionJobItemProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.progress.listener.PipelineJobUpdateProgress;
import
org.apache.shardingsphere.data.pipeline.core.job.service.TransmissionJobManager;
@@ -96,12 +99,28 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
PipelineDataSourceManager dataSourceManager = new
PipelineDataSourceManager();
TableDataConsistencyChecker tableChecker =
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps)) {
PipelineDataSourceConfigurationUtils.transformPipelineDataSourceConfiguration(jobConfig.getJobId(),
(ShardingSpherePipelineDataSourceConfiguration) jobConfig.getTarget());
- for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) {
- if (checkTableInventoryDataUnmatchedAndBreak(each,
tableChecker, checkResultMap, dataSourceManager)) {
- return
checkResultMap.entrySet().stream().collect(Collectors.toMap(entry ->
entry.getKey().format(), Entry::getValue));
+ if (progressContext.getTableCheckRangePositions().isEmpty()) {
+
progressContext.getTableCheckRangePositions().addAll(splitCrossTables());
+ }
+ for (TableCheckRangePosition each :
progressContext.getTableCheckRangePositions()) {
+ TableDataConsistencyCheckResult checkResult =
checkSingleTableInventoryData(each, tableChecker, dataSourceManager);
+ log.info("checkResult: {}, table: {}, checkRangePosition: {}",
checkResult, each.getSourceDataNode(), each);
+ DataNode dataNode =
DataNodeUtils.parseWithSchema(each.getSourceDataNode());
+ QualifiedTable sourceTable = new
QualifiedTable(dataNode.getSchemaName(), dataNode.getTableName());
+ checkResultMap.put(sourceTable, checkResult);
+ if (checkResult.isIgnored()) {
+
progressContext.getIgnoredTableNames().add(sourceTable.format());
+ log.info("Table '{}' is ignored, ignore type: {}",
each.getSourceDataNode(), checkResult.getIgnoredType());
+ continue;
+ }
+ if (!checkResult.isMatched() &&
tableChecker.isBreakOnInventoryCheckNotMatched()) {
+ log.info("Unmatched on table '{}', ignore left tables",
each.getSourceDataNode());
+ cancel();
+ return
checkResultMap.entrySet().stream().collect(Collectors.toMap(entry ->
entry.getKey().toString(), Entry::getValue));
}
}
}
+ log.info("check done, jobId={}", jobConfig.getJobId());
return
checkResultMap.entrySet().stream().collect(Collectors.toMap(entry ->
entry.getKey().format(), Entry::getValue));
}
@@ -110,42 +129,38 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
return
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(TransmissionJobItemProgress::getInventoryRecordsCount).sum();
}
- private boolean checkTableInventoryDataUnmatchedAndBreak(final
JobDataNodeLine jobDataNodeLine, final TableDataConsistencyChecker tableChecker,
- final
Map<QualifiedTable, TableDataConsistencyCheckResult> checkResultMap,
- final
PipelineDataSourceManager dataSourceManager) {
- for (JobDataNodeEntry entry : jobDataNodeLine.getEntries()) {
- for (DataNode each : entry.getDataNodes()) {
- TableDataConsistencyCheckResult checkResult =
checkSingleTableInventoryData(entry.getLogicTableName(), each, tableChecker,
dataSourceManager);
- QualifiedTable sourceTable = new
QualifiedTable(each.getSchemaName(), each.getTableName());
- checkResultMap.put(sourceTable, checkResult);
- if (checkResult.isIgnored()) {
-
progressContext.getIgnoredTableNames().add(sourceTable.format());
- log.info("Table '{}' is ignored, ignore type: {}",
each.format(), checkResult.getIgnoredType());
- continue;
- }
- if (!checkResult.isMatched() &&
tableChecker.isBreakOnInventoryCheckNotMatched()) {
- log.info("Unmatched on table '{}', ignore left tables",
each.format());
- return true;
+ private List<TableCheckRangePosition> splitCrossTables() {
+ List<TableCheckRangePosition> result = new LinkedList<>();
+ int splittingItem = 0;
+ for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) {
+ for (JobDataNodeEntry entry : each.getEntries()) {
+ for (DataNode dataNode : entry.getDataNodes()) {
+ result.add(new TableCheckRangePosition(splittingItem++,
dataNode.format(), entry.getLogicTableName(),
+ new UnsupportedKeyIngestPosition(), new
UnsupportedKeyIngestPosition(), null));
}
}
}
- return false;
+ return result;
}
- private TableDataConsistencyCheckResult
checkSingleTableInventoryData(final String targetTableName, final DataNode
dataNode,
+ private TableDataConsistencyCheckResult
checkSingleTableInventoryData(final TableCheckRangePosition checkRangePosition,
final TableDataConsistencyChecker tableChecker, final PipelineDataSourceManager
dataSourceManager) {
+ log.info("checkSingleTableInventoryData, jobId: {},
checkRangePosition: {}", jobConfig.getJobId(), checkRangePosition);
+ DataNode dataNode =
DataNodeUtils.parseWithSchema(checkRangePosition.getSourceDataNode());
QualifiedTable sourceTable = new
QualifiedTable(dataNode.getSchemaName(), dataNode.getTableName());
PipelineDataSource sourceDataSource =
dataSourceManager.getDataSource(jobConfig.getSources().get(dataNode.getDataSourceName()));
PipelineTableMetaDataLoader metaDataLoader = new
StandardPipelineTableMetaDataLoader(sourceDataSource);
PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(dataNode.getSchemaName(),
dataNode.getTableName());
ShardingSpherePreconditions.checkNotNull(tableMetaData,
() -> new
PipelineTableDataConsistencyCheckLoadingFailedException(new
QualifiedTable(dataNode.getSchemaName(), dataNode.getTableName())));
+ String targetTableName = checkRangePosition.getLogicTableName();
List<String> columnNames = tableMetaData.getColumnNames();
List<PipelineColumnMetaData> uniqueKeys =
PipelineTableMetaDataUtils.getUniqueKeyColumns(sourceTable.getSchemaName(),
sourceTable.getTableName(), metaDataLoader);
QualifiedTable targetTable = new
QualifiedTable(dataNode.getSchemaName(), targetTableName);
PipelineDataSource targetDataSource =
dataSourceManager.getDataSource(jobConfig.getTarget());
TableInventoryCheckParameter param = new TableInventoryCheckParameter(
- jobConfig.getJobId(), sourceDataSource, targetDataSource,
sourceTable, targetTable, columnNames, uniqueKeys, readRateLimitAlgorithm,
progressContext);
+ jobConfig.getJobId(), checkRangePosition.getSplittingItem(),
sourceDataSource, targetDataSource, sourceTable, targetTable, columnNames,
uniqueKeys,
+ readRateLimitAlgorithm, progressContext,
checkRangePosition.getQueryCondition());
TableInventoryChecker tableInventoryChecker =
tableChecker.buildTableInventoryChecker(param);
currentTableInventoryChecker.set(tableInventoryChecker);
Optional<TableDataConsistencyCheckResult> preCheckResult =
tableInventoryChecker.preCheck();
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
index ea5d170683d..72c02e9bf76 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
@@ -28,10 +28,12 @@ import
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.CDCLoginPara
import
org.apache.shardingsphere.data.pipeline.cdc.client.parameter.StartStreamingParameter;
import
org.apache.shardingsphere.data.pipeline.cdc.protocol.request.StreamDataRequestBody.SchemaTable;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.TableCheckRangePosition;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
@@ -191,6 +193,8 @@ class CDCE2EIT {
PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(qualifiedTable.getSchemaName(),
qualifiedTable.getTableName());
List<PipelineColumnMetaData> uniqueKeys =
Collections.singletonList(tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0)));
ConsistencyCheckJobItemProgressContext progressContext = new
ConsistencyCheckJobItemProgressContext("", 0,
sourceDataSource.getDatabaseType().getType());
+ progressContext.getTableCheckRangePositions().add(new
TableCheckRangePosition(0, null, qualifiedTable.getTableName(),
+ new UnsupportedKeyIngestPosition(), new
UnsupportedKeyIngestPosition(), null));
TableInventoryCheckParameter param = new
TableInventoryCheckParameter("", sourceDataSource, targetDataSource,
qualifiedTable, qualifiedTable,
tableMetaData.getColumnNames(), uniqueKeys, null,
progressContext);
TableDataConsistencyChecker tableChecker =
TypedSPILoader.getService(TableDataConsistencyChecker.class, "DATA_MATCH", new
Properties());
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
index fb999a36a4c..2b397612e4d 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/RecordSingleTableInventoryCalculatorTest.java
@@ -107,7 +107,7 @@ class RecordSingleTableInventoryCalculatorTest {
void assertCalculateOfRangeQueryFromBeginWithOrderIdUniqueKey(final String
streamingRangeType) {
RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(4,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
- Collections.emptyList(), buildOrderIdUniqueKey(),
QueryType.RANGE_QUERY);
+ Collections.emptyList(), buildOrderIdUniqueKey(),
QueryType.RANGE_QUERY, null);
assertQueryRangeCalculatedResult(calculator, param, new QueryRange(0,
false, null), 4, 4);
}
@@ -116,7 +116,7 @@ class RecordSingleTableInventoryCalculatorTest {
void assertCalculateOfRangeQueryFromMiddleWithOrderIdUniqueKey(final
String streamingRangeType) {
RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(4,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
- Collections.emptyList(), buildOrderIdUniqueKey(),
QueryType.RANGE_QUERY);
+ Collections.emptyList(), buildOrderIdUniqueKey(),
QueryType.RANGE_QUERY, null);
assertQueryRangeCalculatedResult(calculator, param, new QueryRange(4,
false, null), 4, 8);
}
@@ -125,7 +125,7 @@ class RecordSingleTableInventoryCalculatorTest {
void assertCalculateOfRangeQueryWithMultiColumnUniqueKeys(final String
streamingRangeType) {
RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(1000,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
- Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY);
+ Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY, null);
assertQueryRangeCalculatedResult(calculator, param, new QueryRange(3,
true, 6), 8, 6);
assertQueryRangeCalculatedResult(calculator, param, new QueryRange(3,
false, 6), 3, 6);
}
@@ -153,7 +153,7 @@ class RecordSingleTableInventoryCalculatorTest {
}
RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(1000,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"test3"),
- Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY);
+ Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY, null);
assertQueryRangeCalculatedResult(calculator, param, new QueryRange(3,
true, 4), 4, 4);
assertQueryRangeCalculatedResult(calculator, param, new QueryRange(5,
true, 6), 4, 6);
assertQueryRangeCalculatedResult(calculator, param, new QueryRange(5,
true, 7), 5, 7);
@@ -164,7 +164,7 @@ class RecordSingleTableInventoryCalculatorTest {
void assertCalculateOfReservedRangeQueryWithMultiColumnUniqueKeys(final
String streamingRangeType) {
RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(1000,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
- Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY);
+ Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY, null);
param.setQueryRange(new QueryRange(3, true, 2));
Optional<SingleTableInventoryCalculatedResult> calculatedResult =
calculator.calculateChunk(param);
QuietlyCloser.close(param.getCalculationContext());
@@ -179,7 +179,7 @@ class RecordSingleTableInventoryCalculatorTest {
connection.createStatement().execute("CREATE TABLE test1 (user_id
INT NOT NULL, order_id INT, status VARCHAR(12), PRIMARY KEY (user_id))");
}
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"test1"),
- Collections.emptyList(), buildUserIdUniqueKey(),
QueryType.RANGE_QUERY);
+ Collections.emptyList(), buildUserIdUniqueKey(),
QueryType.RANGE_QUERY, null);
param.setQueryRange(new QueryRange(0, false, null));
RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(5,
StreamingRangeType.valueOf(streamingRangeType));
Optional<SingleTableInventoryCalculatedResult> calculateResult =
calculator.calculateChunk(param);
@@ -195,7 +195,7 @@ class RecordSingleTableInventoryCalculatorTest {
connection.createStatement().execute("CREATE TABLE test2 (user_id
INT NOT NULL, order_id INT, status VARCHAR(12), PRIMARY KEY (user_id,
order_id))");
}
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"test2"),
- Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY);
+ Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY, null);
param.setQueryRange(new QueryRange(null, false, null));
RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(5,
StreamingRangeType.valueOf(streamingRangeType));
Optional<SingleTableInventoryCalculatedResult> calculateResult =
calculator.calculateChunk(param);
@@ -208,7 +208,7 @@ class RecordSingleTableInventoryCalculatorTest {
void assertCalculateOfRangeQueryAllWithOrderIdUniqueKeyWith3x(final int
streamingChunkCount, final String streamingRangeType) {
RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(3, streamingChunkCount,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
- Collections.emptyList(), buildOrderIdUniqueKey(),
QueryType.RANGE_QUERY);
+ Collections.emptyList(), buildOrderIdUniqueKey(),
QueryType.RANGE_QUERY, null);
param.setQueryRange(new QueryRange(null, false, null));
Iterator<SingleTableInventoryCalculatedResult> resultIterator =
calculator.calculate(param).iterator();
RecordSingleTableInventoryCalculatedResult actual =
(RecordSingleTableInventoryCalculatedResult) resultIterator.next();
@@ -246,7 +246,7 @@ class RecordSingleTableInventoryCalculatorTest {
void
assertCalculateOfRangeQueryAllWithMultiColumnUniqueKeysWith50x100(final String
streamingRangeType) {
RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(50, 100,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
- Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY);
+ Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY, null);
param.setQueryRange(new QueryRange(null, false, null));
Iterator<SingleTableInventoryCalculatedResult> resultIterator =
calculator.calculate(param).iterator();
RecordSingleTableInventoryCalculatedResult actual =
(RecordSingleTableInventoryCalculatedResult) resultIterator.next();
@@ -272,7 +272,7 @@ class RecordSingleTableInventoryCalculatorTest {
void assertCalculateOfRangeQueryAllWithMultiColumnUniqueKeysWith3x(final
int streamingChunkCount, final String streamingRangeType) {
RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(3, streamingChunkCount,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
- Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY);
+ Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY, null);
param.setQueryRange(new QueryRange(null, false, null));
Iterator<SingleTableInventoryCalculatedResult> resultIterator =
calculator.calculate(param).iterator();
RecordSingleTableInventoryCalculatedResult actual =
(RecordSingleTableInventoryCalculatedResult) resultIterator.next();
@@ -310,7 +310,7 @@ class RecordSingleTableInventoryCalculatorTest {
void assertCalculateOfRangeQueryAllWithMultiColumnUniqueKeysWith2x(final
int streamingChunkCount, final String streamingRangeType) {
RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(2, streamingChunkCount,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
- Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY);
+ Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.RANGE_QUERY, null);
param.setQueryRange(new QueryRange(null, false, null));
Iterator<SingleTableInventoryCalculatedResult> resultIterator =
calculator.calculate(param).iterator();
RecordSingleTableInventoryCalculatedResult actual =
(RecordSingleTableInventoryCalculatedResult) resultIterator.next();
@@ -357,7 +357,7 @@ class RecordSingleTableInventoryCalculatorTest {
void assertCalculateOfPointQuery(final String streamingRangeType) {
RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(3,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
- Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.POINT_QUERY);
+ Collections.emptyList(), buildMultiColumnUniqueKeys(),
QueryType.POINT_QUERY, null);
param.setUniqueKeysValues(Arrays.asList(3, 3));
Optional<SingleTableInventoryCalculatedResult> calculatedResult =
calculator.calculateChunk(param);
QuietlyCloser.close(param.getCalculationContext());
@@ -373,7 +373,7 @@ class RecordSingleTableInventoryCalculatorTest {
void assertCalculateOfPointRangeQuery(final String streamingRangeType) {
RecordSingleTableInventoryCalculator calculator = new
RecordSingleTableInventoryCalculator(3,
StreamingRangeType.valueOf(streamingRangeType));
SingleTableInventoryCalculateParameter param = new
SingleTableInventoryCalculateParameter(dataSource, new QualifiedTable(null,
"t_order"),
- Collections.emptyList(), buildUserIdUniqueKey(),
QueryType.POINT_QUERY);
+ Collections.emptyList(), buildUserIdUniqueKey(),
QueryType.POINT_QUERY, null);
param.setUniqueKeysValues(Collections.singleton(3));
Optional<SingleTableInventoryCalculatedResult> calculatedResult =
calculator.calculateChunk(param);
QuietlyCloser.close(param.getCalculationContext());
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
index d34d41400ae..32c4db66fd4 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/ConsistencyCheckJobExecutorCallbackTest.java
@@ -17,7 +17,10 @@
package org.apache.shardingsphere.data.pipeline.scenario.consistencycheck;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml.YamlTableCheckRangePosition;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.position.yaml.YamlTableCheckRangePositionSwapper;
import org.apache.shardingsphere.data.pipeline.core.context.PipelineContextKey;
+import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.type.UnsupportedKeyIngestPosition;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.api.PipelineAPIFactory;
import org.apache.shardingsphere.data.pipeline.core.job.id.PipelineJobIdUtils;
@@ -36,8 +39,9 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.Collections;
-import java.util.Map;
+import java.util.List;
import java.util.Optional;
+import java.util.stream.Collectors;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -53,23 +57,39 @@ class ConsistencyCheckJobExecutorCallbackTest {
void assertBuildPipelineJobItemContext() {
ConsistencyCheckJobId pipelineJobId = new ConsistencyCheckJobId(new
PipelineContextKey(InstanceType.PROXY),
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId());
String checkJobId = PipelineJobIdUtils.marshal(pipelineJobId);
- Map<String, Object> expectTableCheckPosition =
Collections.singletonMap("t_order", 100);
+ List<YamlTableCheckRangePosition> expectedYamlTableCheckRangePositions
= Collections.singletonList(createYamlTableCheckRangePosition());
PipelineAPIFactory.getPipelineGovernanceFacade(PipelineContextUtils.getContextKey()).getJobItemFacade().getProcess().persist(checkJobId,
0,
-
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectTableCheckPosition)));
+
YamlEngine.marshal(createYamlConsistencyCheckJobItemProgress(expectedYamlTableCheckRangePositions)));
ConsistencyCheckJobExecutorCallback callback = new
ConsistencyCheckJobExecutorCallback();
ConsistencyCheckJobConfiguration jobConfig = new
YamlConsistencyCheckJobConfigurationSwapper().swapToObject(createYamlConsistencyCheckJobConfiguration(checkJobId));
PipelineJobItemManager<ConsistencyCheckJobItemProgress> jobItemManager
= new PipelineJobItemManager<>(new
ConsistencyCheckJobType().getYamlJobItemProgressSwapper());
Optional<ConsistencyCheckJobItemProgress> jobItemProgress =
jobItemManager.getProgress(jobConfig.getJobId(), 0);
ConsistencyCheckJobItemContext actual =
callback.buildJobItemContext(jobConfig, 0, jobItemProgress.orElse(null), null,
null);
- assertThat(actual.getProgressContext().getSourceTableCheckPositions(),
is(expectTableCheckPosition));
- assertThat(actual.getProgressContext().getTargetTableCheckPositions(),
is(expectTableCheckPosition));
+ YamlTableCheckRangePositionSwapper tableCheckPositionSwapper = new
YamlTableCheckRangePositionSwapper();
+ List<YamlTableCheckRangePosition> actualYamlTableCheckPositions =
actual.getProgressContext().getTableCheckRangePositions().stream()
+
.map(tableCheckPositionSwapper::swapToYamlConfiguration).collect(Collectors.toList());
+ assertThat(actualYamlTableCheckPositions.size(),
is(expectedYamlTableCheckRangePositions.size()));
+ for (int i = 0; i < actualYamlTableCheckPositions.size(); i++) {
+ assertThat(actualYamlTableCheckPositions.get(i),
is(expectedYamlTableCheckRangePositions.get(i)));
+ }
}
- private YamlConsistencyCheckJobItemProgress
createYamlConsistencyCheckJobItemProgress(final Map<String, Object>
expectTableCheckPosition) {
+ private YamlTableCheckRangePosition createYamlTableCheckRangePosition() {
+ YamlTableCheckRangePosition result = new YamlTableCheckRangePosition();
+ result.setSplittingItem(0);
+ result.setSourceDataNode("ds_0.t_order");
+ result.setLogicTableName("t_order");
+ result.setSourceRange(new UnsupportedKeyIngestPosition().toString());
+ result.setTargetRange(new UnsupportedKeyIngestPosition().toString());
+ result.setSourcePosition(100);
+ result.setTargetPosition(100);
+ return result;
+ }
+
+ private YamlConsistencyCheckJobItemProgress
createYamlConsistencyCheckJobItemProgress(final
List<YamlTableCheckRangePosition> yamlTableCheckRangePositions) {
YamlConsistencyCheckJobItemProgress result = new
YamlConsistencyCheckJobItemProgress();
result.setStatus(JobStatus.RUNNING.name());
- result.setSourceTableCheckPositions(expectTableCheckPosition);
- result.setTargetTableCheckPositions(expectTableCheckPosition);
+ result.setTableCheckRangePositions(yamlTableCheckRangePositions);
return result;
}