This is an automated email from the ASF dual-hosted git repository.
sunnianjun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 957e8e74a75 Review and refactor pipeline (#28266)
957e8e74a75 is described below
commit 957e8e74a75e935275c4e251cb401bca15c65469
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Sat Aug 26 21:50:17 2023 +0800
Review and refactor pipeline (#28266)
* Verify consistency check algorithm and database type before starting
consistency check
* Isolate SingleTableInventoryCalculator for
MatchingTableDataConsistencyChecker
* Clean code
* Improve DataMatchTableDataConsistencyChecker.getChunkSize
* Refactor MigrationDataConsistencyChecker
* Rename TableDataConsistencyCheckParameter to TableInventoryCheckParameter
* Extract JobDataNodeLineConvertUtils.buildTableNameMap
* Add TableDataConsistencyChecker.isBreakOnInventoryCheckNotMatched
* Refactor MigrationDataConsistencyChecker
* Add schemaName in DataRecord
* Add actualTableName in DataRecord
---
.../pipeline/api/ingest/record/DataRecord.java | 11 ++-
.../datanode/JobDataNodeLineConvertUtils.java | 18 +++++
.../CRC32MatchTableDataConsistencyChecker.java | 6 +-
.../DataMatchTableDataConsistencyChecker.java | 22 +++---
.../table/MatchingTableDataConsistencyChecker.java | 29 +++++---
.../table/TableDataConsistencyChecker.java | 11 ++-
...eter.java => TableInventoryCheckParameter.java} | 4 +-
.../DataMatchTableDataConsistencyCheckerTest.java | 51 ++++++++++++++
.../mysql/ingest/MySQLIncrementalDumper.java | 1 +
.../ingest/wal/decode/MppdbDecodingPlugin.java | 2 +-
.../postgresql/ingest/wal/WALEventConverter.java | 3 +-
.../ingest/wal/decode/TestDecodingPlugin.java | 2 +-
.../ingest/wal/event/AbstractRowEvent.java | 2 +-
.../ingest/wal/WALEventConverterTest.java | 12 ++--
.../handler/update/CheckMigrationJobUpdater.java | 10 ++-
.../api/impl/ConsistencyCheckJobAPI.java | 10 +++
.../pojo/CreateConsistencyCheckJobParameter.java | 5 ++
.../migration/api/impl/MigrationJobAPI.java | 12 +---
.../MigrationDataConsistencyChecker.java | 78 +++++++++-------------
.../test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java | 4 +-
.../primarykey/IndexesMigrationE2EIT.java | 6 +-
.../TableDataConsistencyCheckerFixture.java | 4 +-
.../api/impl/ConsistencyCheckJobAPITest.java | 28 +++++---
23 files changed, 218 insertions(+), 113 deletions(-)
diff --git
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
index b0f0fa5622b..2072dd6d169 100644
---
a/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
+++
b/kernel/data-pipeline/api/src/main/java/org/apache/shardingsphere/data/pipeline/api/ingest/record/DataRecord.java
@@ -33,12 +33,14 @@ import java.util.List;
*/
@Getter
@Setter
-@EqualsAndHashCode(of = "tableName", callSuper = false)
+@EqualsAndHashCode(of = {"schemaName", "tableName"}, callSuper = false)
@ToString
public final class DataRecord extends Record {
private final String type;
+ private final String schemaName;
+
private final String tableName;
private final List<Column> columns;
@@ -47,11 +49,18 @@ public final class DataRecord extends Record {
private final List<Object> oldUniqueKeyValues = new ArrayList<>();
+ private String actualTableName;
+
private Long csn;
public DataRecord(final String type, final String tableName, final
IngestPosition position, final int columnCount) {
+ this(type, null, tableName, position, columnCount);
+ }
+
+ public DataRecord(final String type, final String schemaName, final String
tableName, final IngestPosition position, final int columnCount) {
super(position);
this.type = type;
+ this.schemaName = schemaName;
this.tableName = tableName;
columns = new ArrayList<>(columnCount);
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java
index 6285d3a23b7..5cf249adc30 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/common/datanode/JobDataNodeLineConvertUtils.java
@@ -20,6 +20,8 @@ package
org.apache.shardingsphere.data.pipeline.common.datanode;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
+import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.infra.datanode.DataNode;
import java.util.LinkedHashMap;
@@ -65,4 +67,20 @@ public final class JobDataNodeLineConvertUtils {
}
return result;
}
+
+ /**
+ * Build table name map.
+ *
+ * @param dataNodeLine data node line
+ * @return actual table and logic table map
+ */
+ public static Map<ActualTableName, LogicTableName> buildTableNameMap(final
JobDataNodeLine dataNodeLine) {
+ Map<ActualTableName, LogicTableName> result = new LinkedHashMap<>();
+ for (JobDataNodeEntry each : dataNodeLine.getEntries()) {
+ for (DataNode dataNode : each.getDataNodes()) {
+ result.put(new ActualTableName(dataNode.getTableName()), new
LogicTableName(each.getLogicTableName()));
+ }
+ }
+ return result;
+ }
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
index eef9d20fb0a..11e2479a393 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/CRC32MatchTableDataConsistencyChecker.java
@@ -33,11 +33,9 @@ import java.util.LinkedList;
@SPIDescription("Match CRC32 of records.")
public final class CRC32MatchTableDataConsistencyChecker extends
MatchingTableDataConsistencyChecker {
- private final SingleTableInventoryCalculator calculator = new
CRC32SingleTableInventoryCalculator();
-
@Override
- protected SingleTableInventoryCalculator
getSingleTableInventoryCalculator() {
- return calculator;
+ protected SingleTableInventoryCalculator
buildSingleTableInventoryCalculator() {
+ return new CRC32SingleTableInventoryCalculator();
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
index 88589b64bcd..f40ff4609a5 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/DataMatchTableDataConsistencyChecker.java
@@ -17,9 +17,11 @@
package org.apache.shardingsphere.data.pipeline.core.consistencycheck.table;
+import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.RecordSingleTableInventoryCalculator;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator.SingleTableInventoryCalculator;
+import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
@@ -38,31 +40,33 @@ public final class DataMatchTableDataConsistencyChecker
extends MatchingTableDat
private static final int DEFAULT_CHUNK_SIZE = 1000;
- private SingleTableInventoryCalculator calculator;
+ private int chunkSize;
@Override
public void init(final Properties props) {
- calculator = new
RecordSingleTableInventoryCalculator(getChunkSize(props));
+ chunkSize = getChunkSize(props);
}
private int getChunkSize(final Properties props) {
+ String chunkSizeText = props.getProperty(CHUNK_SIZE_KEY);
+ if (Strings.isNullOrEmpty(chunkSizeText)) {
+ return DEFAULT_CHUNK_SIZE;
+ }
int result;
try {
- result = Integer.parseInt(props.getProperty(CHUNK_SIZE_KEY,
Integer.toString(DEFAULT_CHUNK_SIZE)));
+ result = Integer.parseInt(chunkSizeText);
} catch (final NumberFormatException ignore) {
- log.warn("'chunk-size' is not a valid number, use default value
{}", DEFAULT_CHUNK_SIZE);
- return DEFAULT_CHUNK_SIZE;
+ throw new PipelineInvalidParameterException("'chunk-size' is not a
valid number: `" + chunkSizeText + "`");
}
if (result <= 0) {
- log.warn("Invalid 'chunk-size': {}, use default value {}", result,
DEFAULT_CHUNK_SIZE);
- return DEFAULT_CHUNK_SIZE;
+ throw new PipelineInvalidParameterException("Invalid 'chunk-size':
" + result);
}
return result;
}
@Override
- protected SingleTableInventoryCalculator
getSingleTableInventoryCalculator() {
- return calculator;
+ protected SingleTableInventoryCalculator
buildSingleTableInventoryCalculator() {
+ return new RecordSingleTableInventoryCalculator(chunkSize);
}
@Override
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
index 79518e3edb4..496e3a75174 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/MatchingTableDataConsistencyChecker.java
@@ -35,8 +35,10 @@ import
org.apache.shardingsphere.infra.executor.kernel.thread.ExecutorThreadFact
import org.apache.shardingsphere.infra.util.close.QuietlyCloser;
import java.sql.SQLException;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@@ -51,8 +53,10 @@ import java.util.concurrent.TimeUnit;
@RequiredArgsConstructor
public abstract class MatchingTableDataConsistencyChecker implements
TableDataConsistencyChecker {
+ private final Set<SingleTableInventoryCalculator> calculators = new
HashSet<>();
+
@Override
- public TableDataConsistencyCheckResult checkSingleTableInventoryData(final
TableDataConsistencyCheckParameter param) {
+ public TableDataConsistencyCheckResult checkSingleTableInventoryData(final
TableInventoryCheckParameter param) {
ThreadFactory threadFactory =
ExecutorThreadFactoryBuilder.build("job-" + getJobIdDigest(param.getJobId()) +
"-check-%d");
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 60,
TimeUnit.SECONDS, new ArrayBlockingQueue<>(2), threadFactory);
try {
@@ -63,25 +67,30 @@ public abstract class MatchingTableDataConsistencyChecker
implements TableDataCo
}
}
- private TableDataConsistencyCheckResult
checkSingleTableInventoryData(final TableDataConsistencyCheckParameter param,
final ThreadPoolExecutor executor) {
+ private TableDataConsistencyCheckResult
checkSingleTableInventoryData(final TableInventoryCheckParameter param, final
ThreadPoolExecutor executor) {
SingleTableInventoryCalculateParameter sourceParam = new
SingleTableInventoryCalculateParameter(param.getSourceDataSource(),
param.getSourceTable(),
param.getColumnNames(), param.getUniqueKeys(),
param.getProgressContext().getSourceTableCheckPositions().get(param.getSourceTable().getTableName().getOriginal()));
SingleTableInventoryCalculateParameter targetParam = new
SingleTableInventoryCalculateParameter(param.getTargetDataSource(),
param.getTargetTable(),
param.getColumnNames(), param.getUniqueKeys(),
param.getProgressContext().getTargetTableCheckPositions().get(param.getTargetTable().getTableName().getOriginal()));
- SingleTableInventoryCalculator calculator =
getSingleTableInventoryCalculator();
- Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults
= waitFuture(executor.submit(() ->
calculator.calculate(sourceParam))).iterator();
- Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults
= waitFuture(executor.submit(() ->
calculator.calculate(targetParam))).iterator();
+ SingleTableInventoryCalculator sourceCalculator =
buildSingleTableInventoryCalculator();
+ calculators.add(sourceCalculator);
+ SingleTableInventoryCalculator targetCalculator =
buildSingleTableInventoryCalculator();
+ calculators.add(targetCalculator);
+ Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults
= waitFuture(executor.submit(() ->
sourceCalculator.calculate(sourceParam))).iterator();
+ Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults
= waitFuture(executor.submit(() ->
targetCalculator.calculate(targetParam))).iterator();
try {
return checkSingleTableInventoryData(sourceCalculatedResults,
targetCalculatedResults, param, executor);
} finally {
QuietlyCloser.close(sourceParam.getCalculationContext());
QuietlyCloser.close(targetParam.getCalculationContext());
+ calculators.remove(sourceCalculator);
+ calculators.remove(targetCalculator);
}
}
private TableDataConsistencyCheckResult
checkSingleTableInventoryData(final
Iterator<SingleTableInventoryCalculatedResult> sourceCalculatedResults,
final Iterator<SingleTableInventoryCalculatedResult> targetCalculatedResults,
-
final TableDataConsistencyCheckParameter param, final ThreadPoolExecutor
executor) {
+
final TableInventoryCheckParameter param, final ThreadPoolExecutor executor) {
YamlTableDataConsistencyCheckResult checkResult = new
YamlTableDataConsistencyCheckResult(new
YamlTableDataConsistencyCountCheckResult(), new
YamlTableDataConsistencyContentCheckResult(true));
while (sourceCalculatedResults.hasNext() &&
targetCalculatedResults.hasNext()) {
if (null != param.getReadRateLimitAlgorithm()) {
@@ -137,15 +146,17 @@ public abstract class MatchingTableDataConsistencyChecker
implements TableDataCo
}
}
- protected abstract SingleTableInventoryCalculator
getSingleTableInventoryCalculator();
+ protected abstract SingleTableInventoryCalculator
buildSingleTableInventoryCalculator();
@Override
public void cancel() {
- getSingleTableInventoryCalculator().cancel();
+ for (SingleTableInventoryCalculator each : calculators) {
+ each.cancel();
+ }
}
@Override
public boolean isCanceling() {
- return getSingleTableInventoryCalculator().isCanceling();
+ return
calculators.stream().anyMatch(SingleTableInventoryCalculator::isCanceling);
}
}
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
index a46aff91391..2a126301f25 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyChecker.java
@@ -35,7 +35,16 @@ public interface TableDataConsistencyChecker extends
ShardingSphereAlgorithm, Pi
* @param param check parameter
* @return check result
*/
- TableDataConsistencyCheckResult
checkSingleTableInventoryData(TableDataConsistencyCheckParameter param);
+ TableDataConsistencyCheckResult
checkSingleTableInventoryData(TableInventoryCheckParameter param);
+
+ /**
+ * Is break on inventory check not matched.
+ *
+ * @return break or not
+ */
+ default boolean isBreakOnInventoryCheckNotMatched() {
+ return true;
+ }
/**
* Get supported database types.
diff --git
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyCheckParameter.java
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
similarity index 95%
rename from
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyCheckParameter.java
rename to
kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
index 54b58311571..c89fe51a300 100644
---
a/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableDataConsistencyCheckParameter.java
+++
b/kernel/data-pipeline/core/src/main/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/TableInventoryCheckParameter.java
@@ -28,11 +28,11 @@ import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorit
import java.util.List;
/**
- * Table data consistency check parameter.
+ * Table inventory check parameter.
*/
@RequiredArgsConstructor
@Getter
-public final class TableDataConsistencyCheckParameter {
+public final class TableInventoryCheckParameter {
private final String jobId;
diff --git
a/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/DataMatchTableDataConsistencyCheckerTest.java
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/DataMatchTableDataConsistencyCheckerTest.java
new file mode 100644
index 00000000000..01a3dddc40e
--- /dev/null
+++
b/kernel/data-pipeline/core/src/test/java/org/apache/shardingsphere/data/pipeline/core/consistencycheck/table/calculator/DataMatchTableDataConsistencyCheckerTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.calculator;
+
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.DataMatchTableDataConsistencyChecker;
+import
org.apache.shardingsphere.data.pipeline.core.exception.param.PipelineInvalidParameterException;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+class DataMatchTableDataConsistencyCheckerTest {
+
+ @Test
+ void assertInitSuccess() {
+ for (String each : Arrays.asList("1", "1000")) {
+ new
DataMatchTableDataConsistencyChecker().init(buildAlgorithmProperties(each));
+ }
+ }
+
+ @Test
+ void assertInitFailure() {
+ assertThrows(PipelineInvalidParameterException.class, () -> new
DataMatchTableDataConsistencyChecker().init(buildAlgorithmProperties("xyz")));
+ for (String each : Arrays.asList("0", "-1")) {
+ assertThrows(PipelineInvalidParameterException.class, () -> new
DataMatchTableDataConsistencyChecker().init(buildAlgorithmProperties(each)));
+ }
+ }
+
+ private Properties buildAlgorithmProperties(final String chunkSize) {
+ Properties result = new Properties();
+ result.put("chunk-size", chunkSize);
+ return result;
+ }
+}
diff --git
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
index 0814054fa88..0a0eeb75081 100644
---
a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
+++
b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/MySQLIncrementalDumper.java
@@ -237,6 +237,7 @@ public final class MySQLIncrementalDumper extends
AbstractLifecycleExecutor impl
String tableName =
dumperConfig.getLogicTableName(rowsEvent.getTableName()).getOriginal();
IngestPosition position = new BinlogPosition(rowsEvent.getFileName(),
rowsEvent.getPosition(), rowsEvent.getServerId());
DataRecord result = new DataRecord(type, tableName, position,
columnCount);
+ result.setActualTableName(rowsEvent.getTableName());
result.setCommitTime(rowsEvent.getTimestamp() * 1000);
return result;
}
diff --git
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
index 8203f1245c5..19fca8ed5a9 100644
---
a/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
+++
b/kernel/data-pipeline/dialect/opengauss/src/main/java/org/apache/shardingsphere/data/pipeline/opengauss/ingest/wal/decode/MppdbDecodingPlugin.java
@@ -113,7 +113,7 @@ public final class MppdbDecodingPlugin implements
DecodingPlugin {
throw new IngestException("Unknown rowEventType: " +
rowEventType);
}
String[] tableMetaData = mppTableData.getTableName().split("\\.");
- result.setDatabaseName(tableMetaData[0]);
+ result.setSchemaName(tableMetaData[0]);
result.setTableName(tableMetaData[1]);
return result;
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
index fc909f1eec0..bd261596024 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverter.java
@@ -121,7 +121,8 @@ public final class WALEventConverter {
private DataRecord createDataRecord(final String type, final
AbstractRowEvent rowsEvent, final int columnCount) {
String tableName =
dumperConfig.getLogicTableName(rowsEvent.getTableName()).getOriginal();
- DataRecord result = new DataRecord(type, tableName, new
WALPosition(rowsEvent.getLogSequenceNumber()), columnCount);
+ DataRecord result = new DataRecord(type, rowsEvent.getSchemaName(),
tableName, new WALPosition(rowsEvent.getLogSequenceNumber()), columnCount);
+ result.setActualTableName(rowsEvent.getTableName());
result.setCsn(rowsEvent.getCsn());
return result;
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
index 07bf1e43384..a8c5499c658 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/decode/TestDecodingPlugin.java
@@ -81,7 +81,7 @@ public final class TestDecodingPlugin implements
DecodingPlugin {
throw new IngestException("Unknown rowEventType: " +
rowEventType);
}
String[] tableMetaData = tableName.split("\\.");
- result.setDatabaseName(tableMetaData[0]);
+ result.setSchemaName(tableMetaData[0]);
result.setTableName(tableMetaData[1].substring(0,
tableMetaData[1].length() - 1));
return result;
}
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
index 6cba665d907..bc57fbf4c18 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/main/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/event/AbstractRowEvent.java
@@ -29,7 +29,7 @@ import lombok.ToString;
@ToString(callSuper = true)
public abstract class AbstractRowEvent extends AbstractWALEvent {
- private String databaseName;
+ private String schemaName;
private String tableName;
diff --git
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
index 9269cf5113e..1cd12fdb8b8 100644
---
a/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
+++
b/kernel/data-pipeline/dialect/postgresql/src/test/java/org/apache/shardingsphere/data/pipeline/postgresql/ingest/wal/WALEventConverterTest.java
@@ -134,7 +134,7 @@ class WALEventConverterTest {
private void assertWriteRowEvent0(final Map<LogicTableName,
Set<ColumnName>> targetTableColumnsMap, final int expectedColumnCount) throws
ReflectiveOperationException {
dumperConfig.setTargetTableColumnsMap(targetTableColumnsMap);
WriteRowEvent rowsEvent = new WriteRowEvent();
- rowsEvent.setDatabaseName("");
+ rowsEvent.setSchemaName("");
rowsEvent.setTableName("t_order");
rowsEvent.setAfterRow(Arrays.asList(101, 1, "OK"));
Method method =
WALEventConverter.class.getDeclaredMethod("handleWriteRowEvent",
WriteRowEvent.class, PipelineTableMetaData.class);
@@ -201,14 +201,14 @@ class WALEventConverterTest {
void assertConvertFailure() {
AbstractRowEvent event = new AbstractRowEvent() {
};
- event.setDatabaseName("");
+ event.setSchemaName("");
event.setTableName("t_order");
assertThrows(UnsupportedSQLOperationException.class, () ->
walEventConverter.convert(event));
}
private AbstractRowEvent mockWriteRowEvent() {
WriteRowEvent result = new WriteRowEvent();
- result.setDatabaseName("");
+ result.setSchemaName("");
result.setTableName("t_order");
result.setAfterRow(Arrays.asList("id", "user_id"));
return result;
@@ -216,7 +216,7 @@ class WALEventConverterTest {
private AbstractRowEvent mockUpdateRowEvent() {
UpdateRowEvent result = new UpdateRowEvent();
- result.setDatabaseName("");
+ result.setSchemaName("");
result.setTableName("t_order");
result.setAfterRow(Arrays.asList("id", "user_id"));
return result;
@@ -224,7 +224,7 @@ class WALEventConverterTest {
private AbstractRowEvent mockDeleteRowEvent() {
DeleteRowEvent result = new DeleteRowEvent();
- result.setDatabaseName("");
+ result.setSchemaName("");
result.setTableName("t_order");
result.setPrimaryKeys(Collections.singletonList("id"));
return result;
@@ -232,7 +232,7 @@ class WALEventConverterTest {
private AbstractRowEvent mockUnknownTableEvent() {
WriteRowEvent result = new WriteRowEvent();
- result.setDatabaseName("");
+ result.setSchemaName("");
result.setTableName("t_other");
return result;
}
diff --git
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
index b18055ded38..35e684178e3 100644
---
a/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
+++
b/kernel/data-pipeline/distsql/handler/src/main/java/org/apache/shardingsphere/migration/distsql/handler/update/CheckMigrationJobUpdater.java
@@ -19,6 +19,8 @@ package
org.apache.shardingsphere.migration.distsql.handler.update;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.impl.ConsistencyCheckJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.pojo.CreateConsistencyCheckJobParameter;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import org.apache.shardingsphere.distsql.handler.ral.update.RALUpdater;
import org.apache.shardingsphere.distsql.parser.segment.AlgorithmSegment;
import
org.apache.shardingsphere.migration.distsql.statement.CheckMigrationStatement;
@@ -31,14 +33,18 @@ import java.util.Properties;
*/
public final class CheckMigrationJobUpdater implements
RALUpdater<CheckMigrationStatement> {
- private final ConsistencyCheckJobAPI jobAPI = new ConsistencyCheckJobAPI();
+ private final ConsistencyCheckJobAPI checkJobAPI = new
ConsistencyCheckJobAPI();
+
+ private final MigrationJobAPI migrationJobAPI = new MigrationJobAPI();
@Override
public void executeUpdate(final String databaseName, final
CheckMigrationStatement sqlStatement) throws SQLException {
AlgorithmSegment typeStrategy = sqlStatement.getTypeStrategy();
String algorithmTypeName = null == typeStrategy ? null :
typeStrategy.getName();
Properties algorithmProps = null == typeStrategy ? null :
typeStrategy.getProps();
- jobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(sqlStatement.getJobId(), algorithmTypeName,
algorithmProps));
+ String jobId = sqlStatement.getJobId();
+ MigrationJobConfiguration jobConfig =
migrationJobAPI.getJobConfiguration(jobId);
+ checkJobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(jobId, algorithmTypeName, algorithmProps,
jobConfig.getSourceDatabaseType(), jobConfig.getTargetDatabaseType()));
}
@Override
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
index 6473566fe4f..e5162fab517 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPI.java
@@ -38,6 +38,8 @@ import
org.apache.shardingsphere.data.pipeline.common.pojo.PipelineJobInfo;
import
org.apache.shardingsphere.data.pipeline.common.registrycenter.repository.GovernanceRepositoryAPI;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
+import
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.ConsistencyCheckJobNotFoundException;
import
org.apache.shardingsphere.data.pipeline.core.exception.job.UncompletedConsistencyCheckJobExistsException;
import org.apache.shardingsphere.data.pipeline.core.job.PipelineJobIdUtils;
@@ -55,6 +57,7 @@ import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;
import org.apache.shardingsphere.elasticjob.infra.pojo.JobConfigurationPOJO;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -108,6 +111,7 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
throw new
UncompletedConsistencyCheckJobExistsException(latestCheckJobId.get());
}
}
+ verifyPipelineDatabaseType(param);
PipelineContextKey contextKey =
PipelineJobIdUtils.parseContextKey(parentJobId);
String result = marshalJobId(latestCheckJobId.map(s -> new
ConsistencyCheckJobId(contextKey, parentJobId, s)).orElseGet(() -> new
ConsistencyCheckJobId(contextKey, parentJobId)));
repositoryAPI.persistLatestCheckJobId(parentJobId, result);
@@ -122,6 +126,12 @@ public final class ConsistencyCheckJobAPI extends
AbstractPipelineJobAPIImpl {
return result;
}
+ private void verifyPipelineDatabaseType(final
CreateConsistencyCheckJobParameter param) {
+ Collection<DatabaseType> supportedDatabaseTypes =
TableDataConsistencyCheckerFactory.newInstance(param.getAlgorithmTypeName(),
param.getAlgorithmProps()).getSupportedDatabaseTypes();
+
ShardingSpherePreconditions.checkState(supportedDatabaseTypes.contains(param.getSourceDatabaseType()),
() -> new
UnsupportedPipelineDatabaseTypeException(param.getSourceDatabaseType()));
+
ShardingSpherePreconditions.checkState(supportedDatabaseTypes.contains(param.getTargetDatabaseType()),
() -> new
UnsupportedPipelineDatabaseTypeException(param.getTargetDatabaseType()));
+ }
+
/**
* Get latest data consistency check result.
*
diff --git
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/pojo/CreateConsistencyCheckJobParameter.java
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/pojo/CreateConsistencyCheckJobParameter.java
index f48442b2ada..92413dad079 100644
---
a/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/pojo/CreateConsistencyCheckJobParameter.java
+++
b/kernel/data-pipeline/scenario/consistencycheck/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/consistencycheck/api/pojo/CreateConsistencyCheckJobParameter.java
@@ -19,6 +19,7 @@ package
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.po
import lombok.Getter;
import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import java.util.Properties;
@@ -34,4 +35,8 @@ public final class CreateConsistencyCheckJobParameter {
private final String algorithmTypeName;
private final Properties algorithmProps;
+
+ private final DatabaseType sourceDatabaseType;
+
+ private final DatabaseType targetDatabaseType;
}
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
index b91a8ea0af7..ef86896f22d 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/api/impl/MigrationJobAPI.java
@@ -265,7 +265,7 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
public MigrationTaskConfiguration buildTaskConfiguration(final
PipelineJobConfiguration pipelineJobConfig, final int jobShardingItem, final
PipelineProcessConfiguration pipelineProcessConfig) {
MigrationJobConfiguration jobConfig = (MigrationJobConfiguration)
pipelineJobConfig;
JobDataNodeLine dataNodeLine =
jobConfig.getJobShardingDataNodes().get(jobShardingItem);
- Map<ActualTableName, LogicTableName> tableNameMap =
buildTableNameMap(dataNodeLine);
+ Map<ActualTableName, LogicTableName> tableNameMap =
JobDataNodeLineConvertUtils.buildTableNameMap(dataNodeLine);
TableNameSchemaNameMapping tableNameSchemaNameMapping = new
TableNameSchemaNameMapping(jobConfig.getTargetTableSchemaMap());
CreateTableConfiguration createTableConfig =
buildCreateTableConfiguration(jobConfig, tableNameSchemaNameMapping);
String dataSourceName =
dataNodeLine.getEntries().get(0).getDataNodes().get(0).getDataSourceName();
@@ -279,16 +279,6 @@ public final class MigrationJobAPI extends
AbstractInventoryIncrementalJobAPIImp
return result;
}
- private Map<ActualTableName, LogicTableName> buildTableNameMap(final
JobDataNodeLine dataNodeLine) {
- Map<ActualTableName, LogicTableName> result = new LinkedHashMap<>();
- for (JobDataNodeEntry each : dataNodeLine.getEntries()) {
- for (DataNode dataNode : each.getDataNodes()) {
- result.put(new ActualTableName(dataNode.getTableName()), new
LogicTableName(each.getLogicTableName()));
- }
- }
- return result;
- }
-
private CreateTableConfiguration buildCreateTableConfiguration(final
MigrationJobConfiguration jobConfig,
final
TableNameSchemaNameMapping tableNameSchemaNameMapping) {
Collection<CreateTableEntry> createTableEntries = new LinkedList<>();
diff --git
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
index c8dbbc1f6eb..c01a6d404d7 100644
---
a/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
+++
b/kernel/data-pipeline/scenario/migration/src/main/java/org/apache/shardingsphere/data/pipeline/scenario/migration/check/consistency/MigrationDataConsistencyChecker.java
@@ -18,7 +18,6 @@
package
org.apache.shardingsphere.data.pipeline.scenario.migration.check.consistency;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.datasource.config.PipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.metadata.SchemaTableName;
import
org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
@@ -37,27 +36,24 @@ import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPi
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.PipelineDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckParameter;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckerFactory;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
import
org.apache.shardingsphere.data.pipeline.core.exception.data.PipelineTableDataConsistencyCheckLoadingFailedException;
-import
org.apache.shardingsphere.data.pipeline.core.exception.data.UnsupportedPipelineDatabaseTypeException;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.api.impl.MigrationJobAPI;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.spi.ratelimit.JobRateLimitAlgorithm;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.datanode.DataNode;
import
org.apache.shardingsphere.infra.exception.core.ShardingSpherePreconditions;
-import java.util.Collection;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.Set;
/**
* Data consistency checker for migration job.
@@ -71,7 +67,7 @@ public final class MigrationDataConsistencyChecker implements
PipelineDataConsis
private final ConsistencyCheckJobItemProgressContext progressContext;
- private final AtomicReference<TableDataConsistencyChecker>
currentTableChecker = new AtomicReference<>();
+ private final Set<TableDataConsistencyChecker> tableCheckers = new
HashSet<>();
public MigrationDataConsistencyChecker(final MigrationJobConfiguration
jobConfig, final InventoryIncrementalProcessContext processContext,
final
ConsistencyCheckJobItemProgressContext progressContext) {
@@ -82,9 +78,6 @@ public final class MigrationDataConsistencyChecker implements
PipelineDataConsis
@Override
public Map<String, TableDataConsistencyCheckResult> check(final String
algorithmType, final Properties algorithmProps) {
- Collection<DatabaseType> supportedDatabaseTypes =
TableDataConsistencyCheckerFactory.newInstance(algorithmType,
algorithmProps).getSupportedDatabaseTypes();
- verifyPipelineDatabaseType(supportedDatabaseTypes,
jobConfig.getSources().values().iterator().next());
- verifyPipelineDatabaseType(supportedDatabaseTypes,
jobConfig.getTarget());
List<String> sourceTableNames = new LinkedList<>();
jobConfig.getJobShardingDataNodes().forEach(each ->
each.getEntries().forEach(entry -> entry.getDataNodes()
.forEach(dataNode ->
sourceTableNames.add(DataNodeUtils.formatWithSchema(dataNode)))));
@@ -93,33 +86,37 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
progressContext.onProgressUpdated(new
PipelineJobProgressUpdatedParameter(0));
Map<String, TableDataConsistencyCheckResult> result = new
LinkedHashMap<>();
try (PipelineDataSourceManager dataSourceManager = new
DefaultPipelineDataSourceManager()) {
- AtomicBoolean checkFailed = new AtomicBoolean(false);
for (JobDataNodeLine each : jobConfig.getJobShardingDataNodes()) {
- each.getEntries().forEach(entry ->
entry.getDataNodes().forEach(dataNode -> {
- TableDataConsistencyChecker tableChecker =
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps);
- currentTableChecker.set(tableChecker);
- check(tableChecker, result, dataSourceManager,
checkFailed, each, entry, dataNode);
- }));
+ checkTableInventoryData(each, algorithmType, algorithmProps,
result, dataSourceManager);
}
}
return result;
}
- private void check(final TableDataConsistencyChecker tableChecker, final
Map<String, TableDataConsistencyCheckResult> checkResults, final
PipelineDataSourceManager dataSourceManager,
- final AtomicBoolean checkFailed, final JobDataNodeLine
jobDataNodeLine, final JobDataNodeEntry entry, final DataNode dataNode) {
- if (checkFailed.get()) {
- return;
- }
- TableDataConsistencyCheckResult checkResult =
checkSingleTable(entry.getLogicTableName(), dataNode, tableChecker,
dataSourceManager);
- checkResults.put(DataNodeUtils.formatWithSchema(dataNode),
checkResult);
- if (!checkResult.isMatched()) {
- log.info("unmatched on table '{}', ignore left tables",
jobDataNodeLine);
- checkFailed.set(true);
+ private long getRecordsCount() {
+ Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = new
MigrationJobAPI().getJobProgress(jobConfig);
+ return
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
+ }
+
+ private void checkTableInventoryData(final JobDataNodeLine
jobDataNodeLine, final String algorithmType, final Properties algorithmProps,
+ final Map<String,
TableDataConsistencyCheckResult> checkResults, final PipelineDataSourceManager
dataSourceManager) {
+ for (JobDataNodeEntry entry : jobDataNodeLine.getEntries()) {
+ for (DataNode each : entry.getDataNodes()) {
+ TableDataConsistencyChecker tableChecker =
TableDataConsistencyCheckerFactory.newInstance(algorithmType, algorithmProps);
+ tableCheckers.add(tableChecker);
+ TableDataConsistencyCheckResult checkResult =
checkSingleTableInventoryData(entry.getLogicTableName(), each, tableChecker,
dataSourceManager);
+ checkResults.put(DataNodeUtils.formatWithSchema(each),
checkResult);
+ tableCheckers.remove(null);
+ if (!checkResult.isMatched() &&
tableChecker.isBreakOnInventoryCheckNotMatched()) {
+ log.info("Unmatched on table '{}', ignore left tables",
DataNodeUtils.formatWithSchema(each));
+ return;
+ }
+ }
}
}
- private TableDataConsistencyCheckResult checkSingleTable(final String
targetTableName, final DataNode dataNode,
- final
TableDataConsistencyChecker tableChecker, final PipelineDataSourceManager
dataSourceManager) {
+ private TableDataConsistencyCheckResult
checkSingleTableInventoryData(final String targetTableName, final DataNode
dataNode,
+
final TableDataConsistencyChecker tableChecker, final PipelineDataSourceManager
dataSourceManager) {
SchemaTableName sourceTable = new
SchemaTableName(dataNode.getSchemaName(), dataNode.getTableName());
SchemaTableName targetTable = new
SchemaTableName(dataNode.getSchemaName(), targetTableName);
PipelineDataSourceWrapper sourceDataSource =
dataSourceManager.getDataSource(jobConfig.getSources().get(dataNode.getDataSourceName()));
@@ -130,35 +127,20 @@ public final class MigrationDataConsistencyChecker
implements PipelineDataConsis
List<String> columnNames = tableMetaData.getColumnNames();
List<PipelineColumnMetaData> uniqueKeys =
PipelineTableMetaDataUtils.getUniqueKeyColumns(
sourceTable.getSchemaName().getOriginal(),
sourceTable.getTableName().getOriginal(), metaDataLoader);
- TableDataConsistencyCheckParameter param = new
TableDataConsistencyCheckParameter(
+ TableInventoryCheckParameter param = new TableInventoryCheckParameter(
jobConfig.getJobId(), sourceDataSource, targetDataSource,
sourceTable, targetTable, columnNames, uniqueKeys, readRateLimitAlgorithm,
progressContext);
return tableChecker.checkSingleTableInventoryData(param);
}
- private void verifyPipelineDatabaseType(final Collection<DatabaseType>
supportedDatabaseTypes, final PipelineDataSourceConfiguration dataSourceConfig)
{
-
ShardingSpherePreconditions.checkState(supportedDatabaseTypes.contains(dataSourceConfig.getDatabaseType()),
- () -> new
UnsupportedPipelineDatabaseTypeException(dataSourceConfig.getDatabaseType()));
- }
-
- private long getRecordsCount() {
- Map<Integer, InventoryIncrementalJobItemProgress> jobProgress = new
MigrationJobAPI().getJobProgress(jobConfig);
- return
jobProgress.values().stream().filter(Objects::nonNull).mapToLong(InventoryIncrementalJobItemProgress::getProcessedRecordsCount).sum();
- }
-
@Override
public void cancel() {
- TableDataConsistencyChecker tableChecker = currentTableChecker.get();
- if (null != tableChecker) {
- tableChecker.cancel();
+ for (TableDataConsistencyChecker each : tableCheckers) {
+ each.cancel();
}
}
@Override
public boolean isCanceling() {
- TableDataConsistencyChecker tableChecker = currentTableChecker.get();
- if (null != tableChecker) {
- return tableChecker.isCanceling();
- }
- return false;
+ return
tableCheckers.stream().anyMatch(TableDataConsistencyChecker::isCanceling);
}
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
index d9824cfaf00..52aa0d515a6 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/cdc/CDCE2EIT.java
@@ -35,8 +35,8 @@ import
org.apache.shardingsphere.data.pipeline.common.datasource.PipelineDataSou
import
org.apache.shardingsphere.data.pipeline.common.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.ConsistencyCheckJobItemProgressContext;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckParameter;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
import
org.apache.shardingsphere.infra.database.core.metadata.database.DialectDatabaseMetaData;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.core.type.DatabaseTypeRegistry;
@@ -202,7 +202,7 @@ class CDCE2EIT {
PipelineTableMetaData tableMetaData =
metaDataLoader.getTableMetaData(schemaTableName.getSchemaName().getOriginal(),
schemaTableName.getTableName().getOriginal());
List<PipelineColumnMetaData> uniqueKeys =
Collections.singletonList(tableMetaData.getColumnMetaData(tableMetaData.getPrimaryKeyColumns().get(0)));
ConsistencyCheckJobItemProgressContext progressContext = new
ConsistencyCheckJobItemProgressContext("", 0);
- TableDataConsistencyCheckParameter param = new
TableDataConsistencyCheckParameter("", sourceDataSource, targetDataSource,
schemaTableName, schemaTableName,
+ TableInventoryCheckParameter param = new
TableInventoryCheckParameter("", sourceDataSource, targetDataSource,
schemaTableName, schemaTableName,
tableMetaData.getColumnNames(), uniqueKeys, null,
progressContext);
TableDataConsistencyChecker tableChecker =
TypedSPILoader.getService(TableDataConsistencyChecker.class, "DATA_MATCH", new
Properties());
TableDataConsistencyCheckResult checkResult =
tableChecker.checkSingleTableInventoryData(param);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 1fadda3e073..d4b01eee705 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/data/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -20,12 +20,11 @@ package
org.apache.shardingsphere.test.e2e.data.pipeline.cases.migration.primary
import lombok.SneakyThrows;
import org.apache.commons.codec.binary.Hex;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
+import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.database.mysql.type.MySQLDatabaseType;
import
org.apache.shardingsphere.infra.database.postgresql.type.PostgreSQLDatabaseType;
-import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import
org.apache.shardingsphere.infra.exception.core.external.sql.type.wrapper.SQLWrapperException;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
-import
org.apache.shardingsphere.sharding.algorithm.keygen.SnowflakeKeyGenerateAlgorithm;
import
org.apache.shardingsphere.sharding.algorithm.keygen.UUIDKeyGenerateAlgorithm;
import org.apache.shardingsphere.sharding.spi.KeyGenerateAlgorithm;
import
org.apache.shardingsphere.test.e2e.data.pipeline.cases.PipelineContainerComposer;
@@ -36,6 +35,7 @@ import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.Pipeline
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
import
org.apache.shardingsphere.test.e2e.data.pipeline.framework.param.PipelineTestParameter;
+import
org.apache.shardingsphere.test.e2e.data.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
@@ -193,7 +193,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
} else {
return;
}
- KeyGenerateAlgorithm keyGenerateAlgorithm = new
SnowflakeKeyGenerateAlgorithm();
+ KeyGenerateAlgorithm keyGenerateAlgorithm = new
AutoIncrementKeyGenerateAlgorithm();
Object uniqueKey = keyGenerateAlgorithm.generateKey();
assertMigrationSuccess(containerComposer, sql, "user_id",
keyGenerateAlgorithm, consistencyCheckAlgorithmType, dataSource -> {
insertOneOrder(containerComposer, uniqueKey);
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/TableDataConsistencyCheckerFixture.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/TableDataConsistencyCheckerFixture.java
index 022a9c5b6b2..b0270f10613 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/TableDataConsistencyCheckerFixture.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/core/fixture/TableDataConsistencyCheckerFixture.java
@@ -20,8 +20,8 @@ package
org.apache.shardingsphere.test.it.data.pipeline.core.fixture;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyContentCheckResult;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.result.TableDataConsistencyCountCheckResult;
-import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyCheckParameter;
import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableDataConsistencyChecker;
+import
org.apache.shardingsphere.data.pipeline.core.consistencycheck.table.TableInventoryCheckParameter;
import org.apache.shardingsphere.infra.database.core.type.DatabaseType;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.spi.annotation.SPIDescription;
@@ -32,7 +32,7 @@ import java.util.Collection;
public final class TableDataConsistencyCheckerFixture implements
TableDataConsistencyChecker {
@Override
- public TableDataConsistencyCheckResult checkSingleTableInventoryData(final
TableDataConsistencyCheckParameter param) {
+ public TableDataConsistencyCheckResult checkSingleTableInventoryData(final
TableInventoryCheckParameter param) {
return new TableDataConsistencyCheckResult(new
TableDataConsistencyCountCheckResult(2, 2), new
TableDataConsistencyContentCheckResult(true));
}
diff --git
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
index a9d9f8c9d11..c6f78a04282 100644
---
a/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
+++
b/test/it/pipeline/src/test/java/org/apache/shardingsphere/test/it/data/pipeline/scenario/consistencycheck/api/impl/ConsistencyCheckJobAPITest.java
@@ -30,6 +30,8 @@ import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.api.poj
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.config.ConsistencyCheckJobConfiguration;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.context.ConsistencyCheckJobItemContext;
import
org.apache.shardingsphere.data.pipeline.scenario.consistencycheck.util.ConsistencyCheckSequence;
+import
org.apache.shardingsphere.data.pipeline.scenario.migration.config.MigrationJobConfiguration;
+import
org.apache.shardingsphere.data.pipeline.yaml.job.YamlMigrationJobConfigurationSwapper;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.JobConfigurationBuilder;
import
org.apache.shardingsphere.test.it.data.pipeline.core.util.PipelineContextUtils;
import org.junit.jupiter.api.BeforeAll;
@@ -49,6 +51,8 @@ class ConsistencyCheckJobAPITest {
private final ConsistencyCheckJobAPI checkJobAPI = new
ConsistencyCheckJobAPI();
+ private final YamlMigrationJobConfigurationSwapper jobConfigSwapper = new
YamlMigrationJobConfigurationSwapper();
+
@BeforeAll
public static void beforeClass() {
PipelineContextUtils.mockModeConfigAndContextManager();
@@ -56,21 +60,25 @@ class ConsistencyCheckJobAPITest {
@Test
void assertCreateJobConfig() {
- String parentJobId =
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId();
- String checkJobId = checkJobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(parentJobId, null, null));
- ConsistencyCheckJobConfiguration jobConfig =
checkJobAPI.getJobConfiguration(checkJobId);
+ MigrationJobConfiguration parentJobConfig =
jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration());
+ String parentJobId = parentJobConfig.getJobId();
+ String checkJobId = checkJobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(parentJobId, null, null,
+ parentJobConfig.getSourceDatabaseType(),
parentJobConfig.getTargetDatabaseType()));
+ ConsistencyCheckJobConfiguration checkJobConfig =
checkJobAPI.getJobConfiguration(checkJobId);
int expectedSequence = ConsistencyCheckSequence.MIN_SEQUENCE;
String expectCheckJobId = checkJobAPI.marshalJobId(new
ConsistencyCheckJobId(PipelineJobIdUtils.parseContextKey(parentJobId),
parentJobId, expectedSequence));
- assertThat(jobConfig.getJobId(), is(expectCheckJobId));
- assertNull(jobConfig.getAlgorithmTypeName());
+ assertThat(checkJobConfig.getJobId(), is(expectCheckJobId));
+ assertNull(checkJobConfig.getAlgorithmTypeName());
int sequence = ConsistencyCheckJobId.parseSequence(expectCheckJobId);
assertThat(sequence, is(expectedSequence));
}
@Test
void assertGetLatestDataConsistencyCheckResult() {
- String parentJobId =
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId();
- String checkJobId = checkJobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(parentJobId, null, null));
+ MigrationJobConfiguration parentJobConfig =
jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration());
+ String parentJobId = parentJobConfig.getJobId();
+ String checkJobId = checkJobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(parentJobId, null, null,
+ parentJobConfig.getSourceDatabaseType(),
parentJobConfig.getTargetDatabaseType()));
GovernanceRepositoryAPI governanceRepositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
governanceRepositoryAPI.persistLatestCheckJobId(parentJobId,
checkJobId);
Map<String, TableDataConsistencyCheckResult> expectedCheckResult =
Collections.singletonMap("t_order", new TableDataConsistencyCheckResult(new
TableDataConsistencyCountCheckResult(1, 1),
@@ -83,11 +91,13 @@ class ConsistencyCheckJobAPITest {
@Test
void assertDropByParentJobId() {
- String parentJobId =
JobConfigurationBuilder.createYamlMigrationJobConfiguration().getJobId();
+ MigrationJobConfiguration parentJobConfig =
jobConfigSwapper.swapToObject(JobConfigurationBuilder.createYamlMigrationJobConfiguration());
+ String parentJobId = parentJobConfig.getJobId();
GovernanceRepositoryAPI repositoryAPI =
PipelineAPIFactory.getGovernanceRepositoryAPI(PipelineContextUtils.getContextKey());
int expectedSequence = 1;
for (int i = 0; i < 3; i++) {
- String checkJobId = checkJobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(parentJobId, null, null));
+ String checkJobId = checkJobAPI.createJobAndStart(new
CreateConsistencyCheckJobParameter(parentJobId, null, null,
+ parentJobConfig.getSourceDatabaseType(),
parentJobConfig.getTargetDatabaseType()));
ConsistencyCheckJobItemContext checkJobItemContext = new
ConsistencyCheckJobItemContext(
new ConsistencyCheckJobConfiguration(checkJobId,
parentJobId, null, null), 0, JobStatus.FINISHED, null);
checkJobAPI.persistJobItemProgress(checkJobItemContext);