This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 77a764e517c Extract consistency check methods to
PipelineE2EDistSQLFacade (#37792)
77a764e517c is described below
commit 77a764e517ce0cb32f6fad4f4b96afe96f47ccd1
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Jan 20 19:26:37 2026 +0800
Extract consistency check methods to PipelineE2EDistSQLFacade (#37792)
* Simplify algorithmProps type in E2E
* Rename to startCheckAndVerify
* Extract buildConsistencyCheckDistSQL to PipelineE2EDistSQLFacade
* Unify PipelineE2EDistSQLFacade methods naming
* Simplify PipelineE2EDistSQLFacade.waitJobIncrementalStageStarted params
* Extract startCheck in AbstractMigrationE2EIT
* Simplify AbstractMigrationE2EIT.startCheckAndVerify
* Move AbstractMigrationE2EIT.startCheck and verifyCheck to
PipelineE2EDistSQLFacade; improve
* Move AbstractMigrationE2EIT.startCheckAndVerify to
PipelineE2EDistSQLFacade; Refactor param
---
.../e2e/operation/pipeline/cases/cdc/CDCE2EIT.java | 4 +-
.../cases/migration/AbstractMigrationE2EIT.java | 63 ---------------
.../general/MySQLMigrationGeneralE2EIT.java | 18 +++--
.../general/MySQLTimeTypesMigrationE2EIT.java | 6 +-
.../general/PostgreSQLMigrationGeneralE2EIT.java | 12 ++-
.../general/PostgreSQLToMySQLMigrationE2EIT.java | 7 +-
.../migration/general/RulesMigrationE2EIT.java | 6 +-
.../primarykey/IndexesMigrationE2EIT.java | 6 +-
.../primarykey/MariaDBMigrationE2EIT.java | 6 +-
.../primarykey/TextPrimaryKeyMigrationE2EIT.java | 4 +-
.../pipeline/util/PipelineE2EDistSQLFacade.java | 94 +++++++++++++++++++---
11 files changed, 115 insertions(+), 111 deletions(-)
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
index 98e9c66dfbc..c10acdde3bc 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
@@ -125,11 +125,11 @@ class CDCE2EIT {
final CDCClient cdcClient =
buildCDCClientAndStart(targetDataSource, containerComposer);
Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
String jobId = distSQLFacade.listJobIds().get(0);
- distSQLFacade.waitIncrementTaskFinished(jobId);
+ distSQLFacade.waitJobIncrementalStageFinished(jobId);
DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
String tableName =
dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable() ?
String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME) :
SOURCE_TABLE_NAME;
new E2EIncrementalTask(sourceDataSource, tableName, new
SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20).run();
- distSQLFacade.waitIncrementTaskFinished(jobId);
+ distSQLFacade.waitJobIncrementalStageFinished(jobId);
for (int i = 1; i <= 4; i++) {
int orderId = 10000 + i;
containerComposer.proxyExecuteWithLog(String.format("INSERT
INTO %s (order_id, user_id, status) VALUES (%d, %d, 'OK')", tableName, orderId,
i), 0);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
index 9d62d2ff4fd..c6077ece811 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -17,7 +17,6 @@
package org.apache.shardingsphere.test.e2e.operation.pipeline.cases.migration;
-import com.google.common.base.Strings;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.database.connector.opengauss.type.OpenGaussDatabaseType;
@@ -31,19 +30,8 @@ import org.opengauss.util.PSQLException;
import javax.xml.bind.JAXB;
import java.sql.SQLException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
import java.util.Objects;
-import java.util.Properties;
-import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
@Getter
@Slf4j
@@ -119,55 +107,4 @@ public abstract class AbstractMigrationE2EIT {
protected void startMigrationWithSchema(final PipelineContainerComposer
containerComposer, final String sourceTableName, final String targetTableName)
throws SQLException {
containerComposer.proxyExecuteWithLog(migrationDistSQL.getMigrationSingleTableWithSchema(sourceTableName,
targetTableName), 5);
}
-
- protected void assertCheckMigrationSuccess(final PipelineContainerComposer
containerComposer, final String jobId, final String algorithmType) throws
SQLException {
- assertCheckMigrationSuccess(containerComposer, jobId, algorithmType,
new Properties());
- }
-
- protected void assertCheckMigrationSuccess(final PipelineContainerComposer
containerComposer, final String jobId, final String algorithmType, final
Properties algorithmProps) throws SQLException {
-
containerComposer.proxyExecuteWithLog(buildConsistencyCheckDistSQL(jobId,
algorithmType, algorithmProps), 0);
- // TODO Need to add after the stop then to start, can continue the
consistency check from the previous progress
- List<Map<String, Object>> resultList = Collections.emptyList();
- for (int i = 0; i < 30; i++) {
- resultList =
containerComposer.queryForListWithLog(String.format("SHOW MIGRATION CHECK
STATUS '%s'", jobId));
- if (resultList.isEmpty()) {
- containerComposer.sleepSeconds(3);
- continue;
- }
- List<String> checkEndTimeList = resultList.stream().map(map ->
map.get("check_end_time").toString()).filter(each ->
!Strings.isNullOrEmpty(each)).collect(Collectors.toList());
- Set<String> finishedPercentages = resultList.stream().map(map ->
map.get("inventory_finished_percentage").toString()).collect(Collectors.toSet());
- if (checkEndTimeList.size() == resultList.size() && 1 ==
finishedPercentages.size() && finishedPercentages.contains("100")) {
- break;
- } else {
- containerComposer.sleepSeconds(1);
- }
- }
- log.info("check job results: {}", resultList);
- assertFalse(resultList.isEmpty());
- for (Map<String, Object> each : resultList) {
- assertTrue(Boolean.parseBoolean(each.get("result").toString()),
String.format("%s check result is false", each.get("tables")));
- assertThat("inventory_finished_percentage is not 100",
each.get("inventory_finished_percentage").toString(), is("100"));
- }
- }
-
- private String buildConsistencyCheckDistSQL(final String jobId, final
String algorithmType, final Properties algorithmProps) {
- if (null == algorithmProps || algorithmProps.isEmpty()) {
- return String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')",
jobId, algorithmType);
- }
- String sql = "CHECK MIGRATION '%s' BY TYPE (NAME='%s', PROPERTIES("
- + algorithmProps.entrySet().stream().map(entry ->
String.format("'%s'='%s'", entry.getKey(),
entry.getValue())).collect(Collectors.joining(","))
- + "))";
- return String.format(sql, jobId, algorithmType);
- }
-
- protected Properties convertToProperties(final Map<String, String> map) {
- Properties result = new Properties();
- if (null == map || map.isEmpty()) {
- return result;
- }
- for (Map.Entry<String, String> entry : map.entrySet()) {
- result.setProperty(entry.getKey(), entry.getValue());
- }
- return result;
- }
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index d80cbbb2790..08311319194 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -41,9 +41,9 @@ import org.junit.jupiter.params.provider.ArgumentsSource;
import javax.sql.DataSource;
import java.sql.SQLException;
import java.time.LocalDateTime;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.is;
@@ -80,7 +80,7 @@ class MySQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
startMigration(containerComposer, SOURCE_TABLE_NAME,
TARGET_TABLE_NAME);
startMigration(containerComposer, "t_order_item", "t_order_item");
String orderJobId = distSQLFacade.getJobIdByTableName("ds_0." +
SOURCE_TABLE_NAME);
- distSQLFacade.waitJobPrepareSuccess(orderJobId);
+ distSQLFacade.waitJobPreparingStageFinished(orderJobId);
containerComposer.startIncrementTask(
new
E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME,
new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30L);
@@ -91,11 +91,11 @@ class MySQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
containerComposer.assertOrderRecordExist(jdbcDataSource,
"t_order", 10000);
containerComposer.assertOrderRecordExist(jdbcDataSource,
"t_order_item", 10000);
- assertMigrationSuccessById(distSQLFacade, orderJobId,
"DATA_MATCH", convertToProperties(ImmutableMap.of("chunk-size", "300",
"streaming-range-type", "SMALL")));
+ assertMigrationSuccessById(distSQLFacade, orderJobId,
"DATA_MATCH", ImmutableMap.of("chunk-size", "300", "streaming-range-type",
"SMALL"));
String orderItemJobId =
distSQLFacade.getJobIdByTableName("ds_0.t_order_item");
- assertMigrationSuccessById(distSQLFacade, orderItemJobId,
"DATA_MATCH", convertToProperties(ImmutableMap.of("chunk-size", "300",
"streaming-range-type", "LARGE")));
+ assertMigrationSuccessById(distSQLFacade, orderItemJobId,
"DATA_MATCH", ImmutableMap.of("chunk-size", "300", "streaming-range-type",
"LARGE"));
containerComposer.sleepSeconds(2);
- assertMigrationSuccessById(distSQLFacade, orderItemJobId,
"CRC32_MATCH", new Properties());
+ assertMigrationSuccessById(distSQLFacade, orderItemJobId,
"CRC32_MATCH", Collections.emptyMap());
for (String each : distSQLFacade.listJobIds()) {
distSQLFacade.commit(each);
}
@@ -104,13 +104,15 @@ class MySQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
}
}
- private void assertMigrationSuccessById(final PipelineE2EDistSQLFacade
distSQLFacade, final String jobId, final String algorithmType, final Properties
algorithmProps) throws SQLException {
- List<Map<String, Object>> jobStatus =
distSQLFacade.waitIncrementTaskFinished(jobId);
+ private void assertMigrationSuccessById(final PipelineE2EDistSQLFacade
distSQLFacade, final String jobId,
+ final String algorithmType, final
Map<String, String> algorithmProps) throws SQLException {
+ List<Map<String, Object>> jobStatus =
distSQLFacade.waitJobIncrementalStageFinished(jobId);
for (Map<String, Object> each : jobStatus) {
assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) >
0);
assertThat(Integer.parseInt(each.get("inventory_finished_percentage").toString()),
is(100));
}
- assertCheckMigrationSuccess(distSQLFacade.getContainerComposer(),
jobId, algorithmType, algorithmProps);
+ distSQLFacade.startCheck(jobId, algorithmType, algorithmProps);
+ distSQLFacade.verifyCheck(jobId);
}
private static boolean isEnabled(final ExtensionContext context) {
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
index e3cf1a19b96..df2e9422607 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
@@ -55,11 +55,11 @@ class MySQLTimeTypesMigrationE2EIT extends
AbstractMigrationE2EIT {
startMigration(containerComposer, "time_e2e", "time_e2e");
PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
String jobId = distSQLFacade.listJobIds().get(0);
- distSQLFacade.waitJobPrepareSuccess(jobId);
+ distSQLFacade.waitJobPreparingStageFinished(jobId);
insertOneRecordWithZeroValue(containerComposer, 2);
- distSQLFacade.waitIncrementTaskFinished(jobId);
+ distSQLFacade.waitJobIncrementalStageFinished(jobId);
distSQLFacade.loadAllSingleTables();
- assertCheckMigrationSuccess(containerComposer, jobId,
"DATA_MATCH");
+ distSQLFacade.startCheckAndVerify(jobId, "DATA_MATCH");
}
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index a8f58477883..8ea80194942 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -19,7 +19,6 @@ package
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.migration.ge
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
-import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.infra.algorithm.keygen.snowflake.SnowflakeKeyGenerateAlgorithm;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
@@ -85,7 +84,7 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
startMigrationWithSchema(containerComposer, SOURCE_TABLE_NAME,
"t_order");
Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
String jobId = distSQLFacade.getJobIdByTableName("ds_0.test." +
SOURCE_TABLE_NAME);
- distSQLFacade.waitJobPrepareSuccess(jobId);
+ distSQLFacade.waitJobPreparingStageFinished(jobId);
String qualifiedTableName = String.join(".",
PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME);
containerComposer.startIncrementTask(new
E2EIncrementalTask(containerComposer.getSourceDataSource(), qualifiedTableName,
new SnowflakeKeyGenerateAlgorithm(),
containerComposer.getDatabaseType(), 20));
@@ -107,17 +106,16 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
}
private void checkOrderMigration(final PipelineE2EDistSQLFacade
distSQLFacade, final String jobId) throws SQLException {
- distSQLFacade.waitIncrementTaskFinished(jobId);
+ distSQLFacade.waitJobIncrementalStageFinished(jobId);
distSQLFacade.pauseJob(jobId);
distSQLFacade.resumeJob(jobId);
- assertCheckMigrationSuccess(distSQLFacade.getContainerComposer(),
jobId, "DATA_MATCH");
+ distSQLFacade.startCheckAndVerify(jobId, "DATA_MATCH");
}
private void checkOrderItemMigration(final PipelineE2EDistSQLFacade
distSQLFacade) throws SQLException {
String jobId =
distSQLFacade.getJobIdByTableName("ds_0.test.t_order_item");
- PipelineContainerComposer containerComposer =
distSQLFacade.getContainerComposer();
- distSQLFacade.waitJobStatusReached(jobId,
JobStatus.EXECUTE_INCREMENTAL_TASK, 15);
- assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
+ distSQLFacade.waitJobIncrementalStageStarted(jobId);
+ distSQLFacade.startCheckAndVerify(jobId, "DATA_MATCH");
}
private int getReplicationSlotsCount(final PipelineContainerComposer
containerComposer) throws SQLException {
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
index d43df9ad176..94e9ecd4341 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
@@ -20,7 +20,6 @@ package
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.migration.ge
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
-import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import org.apache.shardingsphere.test.e2e.env.runtime.E2ETestEnvironment;
import org.apache.shardingsphere.test.e2e.env.runtime.type.RunEnvironment.Type;
@@ -80,13 +79,13 @@ class PostgreSQLToMySQLMigrationE2EIT extends
AbstractMigrationE2EIT {
PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
Awaitility.await().ignoreExceptions().atMost(10L,
TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() ->
!distSQLFacade.listJobIds().isEmpty());
String jobId = distSQLFacade.listJobIds().get(0);
- distSQLFacade.waitJobStatusReached(jobId,
JobStatus.EXECUTE_INCREMENTAL_TASK, 15);
+ distSQLFacade.waitJobIncrementalStageStarted(jobId);
try (Connection connection = DriverManager.getConnection(jdbcUrl,
"postgres", "postgres")) {
connection.createStatement().execute(String.format("INSERT
INTO t_order (order_id,user_id,status) VALUES (%s, %s, '%s')", "1000000000", 1,
"incremental"));
connection.createStatement().execute(String.format("UPDATE
t_order SET status='%s' WHERE order_id IN (1,2)",
RandomStringUtils.randomAlphanumeric(10)));
}
- distSQLFacade.waitIncrementTaskFinished(jobId);
- assertCheckMigrationSuccess(containerComposer, jobId,
"DATA_MATCH");
+ distSQLFacade.waitJobIncrementalStageFinished(jobId);
+ distSQLFacade.startCheckAndVerify(jobId, "DATA_MATCH");
distSQLFacade.commit(jobId);
assertTrue(distSQLFacade.listJobIds().isEmpty());
} finally {
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index 8488c46b49e..c9a4c32c9b2 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -88,10 +88,10 @@ class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
startMigration(containerComposer, SOURCE_TABLE_NAME,
TARGET_TABLE_NAME);
PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
String jobId = distSQLFacade.listJobIds().get(0);
- distSQLFacade.waitJobPrepareSuccess(jobId);
- distSQLFacade.waitIncrementTaskFinished(jobId);
+ distSQLFacade.waitJobPreparingStageFinished(jobId);
+ distSQLFacade.waitJobIncrementalStageFinished(jobId);
distSQLFacade.loadAllSingleTables();
- assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
+ distSQLFacade.startCheckAndVerify(jobId, "DATA_MATCH");
distSQLFacade.commit(jobId);
assertThat(containerComposer.getTargetTableRecordsCount(containerComposer.getProxyDataSource(),
SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT));
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 586dae287b8..022b9882dae 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -242,12 +242,12 @@ class IndexesMigrationE2EIT extends
AbstractMigrationE2EIT {
containerComposer.proxyExecuteWithLog(String.format(ORDER_TABLE_SHARDING_RULE_FORMAT,
shardingColumn), 2);
startMigration(containerComposer, SOURCE_TABLE_NAME,
TARGET_TABLE_NAME);
String jobId = distSQLFacade.listJobIds().get(0);
- distSQLFacade.waitJobPrepareSuccess(jobId);
+ distSQLFacade.waitJobPreparingStageFinished(jobId);
DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
incrementalTaskFn.accept(jdbcDataSource);
- distSQLFacade.waitIncrementTaskFinished(jobId);
+ distSQLFacade.waitJobIncrementalStageFinished(jobId);
if (null != consistencyCheckAlgorithmType) {
- assertCheckMigrationSuccess(containerComposer, jobId,
consistencyCheckAlgorithmType);
+ distSQLFacade.startCheckAndVerify(jobId,
consistencyCheckAlgorithmType);
}
distSQLFacade.commit(jobId);
assertThat(containerComposer.getTargetTableRecordsCount(jdbcDataSource,
SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index 97192a5a66e..b39b782e606 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -70,12 +70,12 @@ class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
createTargetOrderTableRule(containerComposer);
startMigration(containerComposer, SOURCE_TABLE_NAME,
TARGET_TABLE_NAME);
String jobId = distSQLFacade.listJobIds().get(0);
- distSQLFacade.waitJobPrepareSuccess(jobId);
+ distSQLFacade.waitJobPreparingStageFinished(jobId);
containerComposer.sourceExecuteWithLog("INSERT INTO t_order
(order_id, user_id, status) VALUES ('a1', 1, 'OK')");
DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
containerComposer.assertOrderRecordExist(jdbcDataSource,
"t_order", "a1");
- distSQLFacade.waitIncrementTaskFinished(jobId);
- assertCheckMigrationSuccess(containerComposer, jobId,
"CRC32_MATCH");
+ distSQLFacade.waitJobIncrementalStageFinished(jobId);
+ distSQLFacade.startCheckAndVerify(jobId, "CRC32_MATCH");
distSQLFacade.commit(jobId);
assertThat(containerComposer.getTargetTableRecordsCount(jdbcDataSource,
SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
assertTrue(distSQLFacade.listJobIds().isEmpty());
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
index c3d1225d810..206a7652819 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
@@ -66,8 +66,8 @@ class TextPrimaryKeyMigrationE2EIT extends
AbstractMigrationE2EIT {
String jobId = distSQLFacade.listJobIds().get(0);
containerComposer.sourceExecuteWithLog(
String.format("INSERT INTO %s (order_id,user_id,status)
VALUES (%s, %s, '%s')", getSourceTableName(containerComposer), "1000000000", 1,
"afterStop"));
- distSQLFacade.waitIncrementTaskFinished(jobId);
- assertCheckMigrationSuccess(containerComposer, jobId,
"DATA_MATCH");
+ distSQLFacade.waitJobIncrementalStageFinished(jobId);
+ distSQLFacade.startCheckAndVerify(jobId, "DATA_MATCH");
distSQLFacade.commit(jobId);
assertTrue(distSQLFacade.listJobIds().isEmpty());
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
index 18ee6da4845..e5acd6631ff 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
@@ -35,6 +35,9 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Getter
@@ -165,15 +168,15 @@ public final class PipelineE2EDistSQLFacade {
}
/**
- * Wait job prepare success.
+ * Wait job preparing stage finished.
*
* @param jobId job id
*/
- public void waitJobPrepareSuccess(final String jobId) {
+ public void waitJobPreparingStageFinished(final String jobId) {
String sql = buildShowJobStatusDistSQL(jobId);
for (int i = 0; i < 5; i++) {
List<Map<String, Object>> jobStatusRecords =
containerComposer.queryForListWithLog(sql);
- log.info("Wait job prepare success, job status records: {}",
jobStatusRecords);
+ log.info("Wait job preparing stage finished, job status records:
{}", jobStatusRecords);
Set<String> statusSet = jobStatusRecords.stream().map(each ->
String.valueOf(each.get("status"))).collect(Collectors.toSet());
if (statusSet.contains(JobStatus.PREPARING.name()) ||
statusSet.contains(JobStatus.RUNNING.name())) {
containerComposer.sleepSeconds(2);
@@ -184,18 +187,17 @@ public final class PipelineE2EDistSQLFacade {
}
/**
- * Wait job status reached.
+ * Wait job incremental stage started.
*
* @param jobId job id
- * @param jobStatus job status
- * @param maxSleepSeconds max sleep seconds
- * @throws IllegalStateException if job status not reached
+ * @throws IllegalStateException if job incremental stage not started
*/
- public void waitJobStatusReached(final String jobId, final JobStatus
jobStatus, final int maxSleepSeconds) {
+ public void waitJobIncrementalStageStarted(final String jobId) {
String sql = buildShowJobStatusDistSQL(jobId);
- for (int i = 0, count = maxSleepSeconds / 2 + (0 == maxSleepSeconds %
2 ? 0 : 1); i < count; i++) {
+ JobStatus jobStatus = JobStatus.EXECUTE_INCREMENTAL_TASK;
+ for (int i = 0; i < 10; i++) {
List<Map<String, Object>> jobStatusRecords =
containerComposer.queryForListWithLog(sql);
- log.info("Wait job status reached, job status records: {}",
jobStatusRecords);
+ log.info("Wait job Incremental stage started, job status records:
{}", jobStatusRecords);
List<String> statusList = jobStatusRecords.stream().map(each ->
String.valueOf(each.get("status"))).collect(Collectors.toList());
if (statusList.stream().allMatch(each ->
each.equals(jobStatus.name()))) {
return;
@@ -206,16 +208,16 @@ public final class PipelineE2EDistSQLFacade {
}
/**
- * Wait increment task finished.
+ * Wait job incremental stage finished.
*
* @param jobId job id
* @return result
*/
- public List<Map<String, Object>> waitIncrementTaskFinished(final String
jobId) {
+ public List<Map<String, Object>> waitJobIncrementalStageFinished(final
String jobId) {
String sql = buildShowJobStatusDistSQL(jobId);
for (int i = 0; i < 10; i++) {
List<Map<String, Object>> jobStatusRecords =
containerComposer.queryForListWithLog(sql);
- log.info("Wait incremental task finished, job status records: {}",
jobStatusRecords);
+ log.info("Wait job incremental stage finished, job status records:
{}", jobStatusRecords);
Set<String> statusSet = new HashSet<>(jobStatusRecords.size(), 1F);
List<Integer> incrementalIdleSecondsList = new LinkedList<>();
for (Map<String, Object> each : jobStatusRecords) {
@@ -238,4 +240,70 @@ public final class PipelineE2EDistSQLFacade {
private String buildShowJobStatusDistSQL(final String jobId) {
return String.format("SHOW %s STATUS %s", jobTypeName, jobId);
}
+
+ /**
+ * Start check and verify.
+ *
+ * @param jobId job id
+ * @param algorithmType algorithm type
+ * @throws SQLException SQL exception
+ */
+ public void startCheckAndVerify(final String jobId, final String
algorithmType) throws SQLException {
+ startCheck(jobId, algorithmType, Collections.emptyMap());
+ verifyCheck(jobId);
+ }
+
+ /**
+ * Start check.
+ *
+ * @param jobId job id
+ * @param algorithmType algorithm type
+ * @param algorithmProps algorithm properties
+ * @throws SQLException SQL exception
+ */
+ public void startCheck(final String jobId, final String algorithmType,
final Map<String, String> algorithmProps) throws SQLException {
+
containerComposer.proxyExecuteWithLog(buildConsistencyCheckDistSQL(jobId,
algorithmType, algorithmProps), 0);
+ }
+
+ private String buildConsistencyCheckDistSQL(final String jobId, final
String algorithmType, final Map<String, String> algorithmProps) {
+ if (null == algorithmProps || algorithmProps.isEmpty()) {
+ return String.format("CHECK %s %s BY TYPE (NAME='%s')",
jobTypeName, jobId, algorithmType);
+ }
+ String sqlTemplate = "CHECK %s %s BY TYPE (NAME='%s', PROPERTIES("
+ + algorithmProps.entrySet().stream().map(entry ->
String.format("'%s'='%s'", entry.getKey(),
entry.getValue())).collect(Collectors.joining(","))
+ + "))";
+ return String.format(sqlTemplate, jobTypeName, jobId, algorithmType);
+ }
+
+ /**
+ * Verify check.
+ *
+ * @param jobId job id
+ */
+ public void verifyCheck(final String jobId) {
+ List<Map<String, Object>> checkStatusRecords = Collections.emptyList();
+ for (int i = 0; i < 10; i++) {
+ checkStatusRecords =
containerComposer.queryForListWithLog(buildShowCheckJobStatusDistSQL(jobId));
+ if (checkStatusRecords.isEmpty()) {
+ containerComposer.sleepSeconds(3);
+ continue;
+ }
+ List<String> checkEndTimeList =
checkStatusRecords.stream().map(map ->
map.get("check_end_time").toString()).filter(each ->
!Strings.isNullOrEmpty(each)).collect(Collectors.toList());
+ if (checkEndTimeList.size() == checkStatusRecords.size()) {
+ break;
+ } else {
+ containerComposer.sleepSeconds(3);
+ }
+ }
+ log.info("Verify check, results: {}", checkStatusRecords);
+ assertFalse(checkStatusRecords.isEmpty());
+ for (Map<String, Object> entry : checkStatusRecords) {
+ assertThat(entry.get("inventory_finished_percentage").toString(),
is("100"));
+ assertTrue(Boolean.parseBoolean(entry.get("result").toString()));
+ }
+ }
+
+ private String buildShowCheckJobStatusDistSQL(final String jobId) {
+ return String.format("SHOW %s CHECK STATUS %s", jobTypeName, jobId);
+ }
}