This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 77a764e517c Extract consistency check methods to 
PipelineE2EDistSQLFacade (#37792)
77a764e517c is described below

commit 77a764e517ce0cb32f6fad4f4b96afe96f47ccd1
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Tue Jan 20 19:26:37 2026 +0800

    Extract consistency check methods to PipelineE2EDistSQLFacade (#37792)
    
    * Simplify algorithmProps type in E2E
    
    * Rename to startCheckAndVerify
    
    * Extract buildConsistencyCheckDistSQL to PipelineE2EDistSQLFacade
    
    * Unify PipelineE2EDistSQLFacade methods naming
    
    * Simplify PipelineE2EDistSQLFacade.waitJobIncrementalStageStarted params
    
    * Extract startCheck in AbstractMigrationE2EIT
    
    * Simplify AbstractMigrationE2EIT.startCheckAndVerify
    
    * Move AbstractMigrationE2EIT.startCheck and verifyCheck to 
PipelineE2EDistSQLFacade; improve
    
    * Move AbstractMigrationE2EIT.startCheckAndVerify to 
PipelineE2EDistSQLFacade; Refactor param
---
 .../e2e/operation/pipeline/cases/cdc/CDCE2EIT.java |  4 +-
 .../cases/migration/AbstractMigrationE2EIT.java    | 63 ---------------
 .../general/MySQLMigrationGeneralE2EIT.java        | 18 +++--
 .../general/MySQLTimeTypesMigrationE2EIT.java      |  6 +-
 .../general/PostgreSQLMigrationGeneralE2EIT.java   | 12 ++-
 .../general/PostgreSQLToMySQLMigrationE2EIT.java   |  7 +-
 .../migration/general/RulesMigrationE2EIT.java     |  6 +-
 .../primarykey/IndexesMigrationE2EIT.java          |  6 +-
 .../primarykey/MariaDBMigrationE2EIT.java          |  6 +-
 .../primarykey/TextPrimaryKeyMigrationE2EIT.java   |  4 +-
 .../pipeline/util/PipelineE2EDistSQLFacade.java    | 94 +++++++++++++++++++---
 11 files changed, 115 insertions(+), 111 deletions(-)

diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
index 98e9c66dfbc..c10acdde3bc 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
@@ -125,11 +125,11 @@ class CDCE2EIT {
             final CDCClient cdcClient = 
buildCDCClientAndStart(targetDataSource, containerComposer);
             Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
             String jobId = distSQLFacade.listJobIds().get(0);
-            distSQLFacade.waitIncrementTaskFinished(jobId);
+            distSQLFacade.waitJobIncrementalStageFinished(jobId);
             DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
             String tableName = 
dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable() ? 
String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME) : 
SOURCE_TABLE_NAME;
             new E2EIncrementalTask(sourceDataSource, tableName, new 
SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20).run();
-            distSQLFacade.waitIncrementTaskFinished(jobId);
+            distSQLFacade.waitJobIncrementalStageFinished(jobId);
             for (int i = 1; i <= 4; i++) {
                 int orderId = 10000 + i;
                 containerComposer.proxyExecuteWithLog(String.format("INSERT 
INTO %s (order_id, user_id, status) VALUES (%d, %d, 'OK')", tableName, orderId, 
i), 0);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
index 9d62d2ff4fd..c6077ece811 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -17,7 +17,6 @@
 
 package org.apache.shardingsphere.test.e2e.operation.pipeline.cases.migration;
 
-import com.google.common.base.Strings;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import 
org.apache.shardingsphere.database.connector.opengauss.type.OpenGaussDatabaseType;
@@ -31,19 +30,8 @@ import org.opengauss.util.PSQLException;
 
 import javax.xml.bind.JAXB;
 import java.sql.SQLException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
 import java.util.Objects;
-import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Getter
 @Slf4j
@@ -119,55 +107,4 @@ public abstract class AbstractMigrationE2EIT {
     protected void startMigrationWithSchema(final PipelineContainerComposer 
containerComposer, final String sourceTableName, final String targetTableName) 
throws SQLException {
         
containerComposer.proxyExecuteWithLog(migrationDistSQL.getMigrationSingleTableWithSchema(sourceTableName,
 targetTableName), 5);
     }
-    
-    protected void assertCheckMigrationSuccess(final PipelineContainerComposer 
containerComposer, final String jobId, final String algorithmType) throws 
SQLException {
-        assertCheckMigrationSuccess(containerComposer, jobId, algorithmType, 
new Properties());
-    }
-    
-    protected void assertCheckMigrationSuccess(final PipelineContainerComposer 
containerComposer, final String jobId, final String algorithmType, final 
Properties algorithmProps) throws SQLException {
-        
containerComposer.proxyExecuteWithLog(buildConsistencyCheckDistSQL(jobId, 
algorithmType, algorithmProps), 0);
-        // TODO Need to add after the stop then to start, can continue the 
consistency check from the previous progress
-        List<Map<String, Object>> resultList = Collections.emptyList();
-        for (int i = 0; i < 30; i++) {
-            resultList = 
containerComposer.queryForListWithLog(String.format("SHOW MIGRATION CHECK 
STATUS '%s'", jobId));
-            if (resultList.isEmpty()) {
-                containerComposer.sleepSeconds(3);
-                continue;
-            }
-            List<String> checkEndTimeList = resultList.stream().map(map -> 
map.get("check_end_time").toString()).filter(each -> 
!Strings.isNullOrEmpty(each)).collect(Collectors.toList());
-            Set<String> finishedPercentages = resultList.stream().map(map -> 
map.get("inventory_finished_percentage").toString()).collect(Collectors.toSet());
-            if (checkEndTimeList.size() == resultList.size() && 1 == 
finishedPercentages.size() && finishedPercentages.contains("100")) {
-                break;
-            } else {
-                containerComposer.sleepSeconds(1);
-            }
-        }
-        log.info("check job results: {}", resultList);
-        assertFalse(resultList.isEmpty());
-        for (Map<String, Object> each : resultList) {
-            assertTrue(Boolean.parseBoolean(each.get("result").toString()), 
String.format("%s check result is false", each.get("tables")));
-            assertThat("inventory_finished_percentage is not 100", 
each.get("inventory_finished_percentage").toString(), is("100"));
-        }
-    }
-    
-    private String buildConsistencyCheckDistSQL(final String jobId, final 
String algorithmType, final Properties algorithmProps) {
-        if (null == algorithmProps || algorithmProps.isEmpty()) {
-            return String.format("CHECK MIGRATION '%s' BY TYPE (NAME='%s')", 
jobId, algorithmType);
-        }
-        String sql = "CHECK MIGRATION '%s' BY TYPE (NAME='%s', PROPERTIES("
-                + algorithmProps.entrySet().stream().map(entry -> 
String.format("'%s'='%s'", entry.getKey(), 
entry.getValue())).collect(Collectors.joining(","))
-                + "))";
-        return String.format(sql, jobId, algorithmType);
-    }
-    
-    protected Properties convertToProperties(final Map<String, String> map) {
-        Properties result = new Properties();
-        if (null == map || map.isEmpty()) {
-            return result;
-        }
-        for (Map.Entry<String, String> entry : map.entrySet()) {
-            result.setProperty(entry.getKey(), entry.getValue());
-        }
-        return result;
-    }
 }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index d80cbbb2790..08311319194 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -41,9 +41,9 @@ import org.junit.jupiter.params.provider.ArgumentsSource;
 import javax.sql.DataSource;
 import java.sql.SQLException;
 import java.time.LocalDateTime;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -80,7 +80,7 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             startMigration(containerComposer, SOURCE_TABLE_NAME, 
TARGET_TABLE_NAME);
             startMigration(containerComposer, "t_order_item", "t_order_item");
             String orderJobId = distSQLFacade.getJobIdByTableName("ds_0." + 
SOURCE_TABLE_NAME);
-            distSQLFacade.waitJobPrepareSuccess(orderJobId);
+            distSQLFacade.waitJobPreparingStageFinished(orderJobId);
             containerComposer.startIncrementTask(
                     new 
E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME, 
new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
             
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30L);
@@ -91,11 +91,11 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
             containerComposer.assertOrderRecordExist(jdbcDataSource, 
"t_order", 10000);
             containerComposer.assertOrderRecordExist(jdbcDataSource, 
"t_order_item", 10000);
-            assertMigrationSuccessById(distSQLFacade, orderJobId, 
"DATA_MATCH", convertToProperties(ImmutableMap.of("chunk-size", "300", 
"streaming-range-type", "SMALL")));
+            assertMigrationSuccessById(distSQLFacade, orderJobId, 
"DATA_MATCH", ImmutableMap.of("chunk-size", "300", "streaming-range-type", 
"SMALL"));
             String orderItemJobId = 
distSQLFacade.getJobIdByTableName("ds_0.t_order_item");
-            assertMigrationSuccessById(distSQLFacade, orderItemJobId, 
"DATA_MATCH", convertToProperties(ImmutableMap.of("chunk-size", "300", 
"streaming-range-type", "LARGE")));
+            assertMigrationSuccessById(distSQLFacade, orderItemJobId, 
"DATA_MATCH", ImmutableMap.of("chunk-size", "300", "streaming-range-type", 
"LARGE"));
             containerComposer.sleepSeconds(2);
-            assertMigrationSuccessById(distSQLFacade, orderItemJobId, 
"CRC32_MATCH", new Properties());
+            assertMigrationSuccessById(distSQLFacade, orderItemJobId, 
"CRC32_MATCH", Collections.emptyMap());
             for (String each : distSQLFacade.listJobIds()) {
                 distSQLFacade.commit(each);
             }
@@ -104,13 +104,15 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
         }
     }
     
-    private void assertMigrationSuccessById(final PipelineE2EDistSQLFacade 
distSQLFacade, final String jobId, final String algorithmType, final Properties 
algorithmProps) throws SQLException {
-        List<Map<String, Object>> jobStatus = 
distSQLFacade.waitIncrementTaskFinished(jobId);
+    private void assertMigrationSuccessById(final PipelineE2EDistSQLFacade 
distSQLFacade, final String jobId,
+                                            final String algorithmType, final 
Map<String, String> algorithmProps) throws SQLException {
+        List<Map<String, Object>> jobStatus = 
distSQLFacade.waitJobIncrementalStageFinished(jobId);
         for (Map<String, Object> each : jobStatus) {
             
assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) > 
0);
             
assertThat(Integer.parseInt(each.get("inventory_finished_percentage").toString()),
 is(100));
         }
-        assertCheckMigrationSuccess(distSQLFacade.getContainerComposer(), 
jobId, algorithmType, algorithmProps);
+        distSQLFacade.startCheck(jobId, algorithmType, algorithmProps);
+        distSQLFacade.verifyCheck(jobId);
     }
     
     private static boolean isEnabled(final ExtensionContext context) {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
index e3cf1a19b96..df2e9422607 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
@@ -55,11 +55,11 @@ class MySQLTimeTypesMigrationE2EIT extends 
AbstractMigrationE2EIT {
             startMigration(containerComposer, "time_e2e", "time_e2e");
             PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
             String jobId = distSQLFacade.listJobIds().get(0);
-            distSQLFacade.waitJobPrepareSuccess(jobId);
+            distSQLFacade.waitJobPreparingStageFinished(jobId);
             insertOneRecordWithZeroValue(containerComposer, 2);
-            distSQLFacade.waitIncrementTaskFinished(jobId);
+            distSQLFacade.waitJobIncrementalStageFinished(jobId);
             distSQLFacade.loadAllSingleTables();
-            assertCheckMigrationSuccess(containerComposer, jobId, 
"DATA_MATCH");
+            distSQLFacade.startCheckAndVerify(jobId, "DATA_MATCH");
         }
     }
     
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index a8f58477883..8ea80194942 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -19,7 +19,6 @@ package 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.migration.ge
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import 
org.apache.shardingsphere.infra.algorithm.keygen.snowflake.SnowflakeKeyGenerateAlgorithm;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
@@ -85,7 +84,7 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             startMigrationWithSchema(containerComposer, SOURCE_TABLE_NAME, 
"t_order");
             Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
             String jobId = distSQLFacade.getJobIdByTableName("ds_0.test." + 
SOURCE_TABLE_NAME);
-            distSQLFacade.waitJobPrepareSuccess(jobId);
+            distSQLFacade.waitJobPreparingStageFinished(jobId);
             String qualifiedTableName = String.join(".", 
PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME);
             containerComposer.startIncrementTask(new 
E2EIncrementalTask(containerComposer.getSourceDataSource(), qualifiedTableName, 
new SnowflakeKeyGenerateAlgorithm(),
                     containerComposer.getDatabaseType(), 20));
@@ -107,17 +106,16 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
     }
     
     private void checkOrderMigration(final PipelineE2EDistSQLFacade 
distSQLFacade, final String jobId) throws SQLException {
-        distSQLFacade.waitIncrementTaskFinished(jobId);
+        distSQLFacade.waitJobIncrementalStageFinished(jobId);
         distSQLFacade.pauseJob(jobId);
         distSQLFacade.resumeJob(jobId);
-        assertCheckMigrationSuccess(distSQLFacade.getContainerComposer(), 
jobId, "DATA_MATCH");
+        distSQLFacade.startCheckAndVerify(jobId, "DATA_MATCH");
     }
     
     private void checkOrderItemMigration(final PipelineE2EDistSQLFacade 
distSQLFacade) throws SQLException {
         String jobId = 
distSQLFacade.getJobIdByTableName("ds_0.test.t_order_item");
-        PipelineContainerComposer containerComposer = 
distSQLFacade.getContainerComposer();
-        distSQLFacade.waitJobStatusReached(jobId, 
JobStatus.EXECUTE_INCREMENTAL_TASK, 15);
-        assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
+        distSQLFacade.waitJobIncrementalStageStarted(jobId);
+        distSQLFacade.startCheckAndVerify(jobId, "DATA_MATCH");
     }
     
     private int getReplicationSlotsCount(final PipelineContainerComposer 
containerComposer) throws SQLException {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
index d43df9ad176..94e9ecd4341 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
@@ -20,7 +20,6 @@ package 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.migration.ge
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.commons.lang3.RandomUtils;
-import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import 
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
 import org.apache.shardingsphere.test.e2e.env.runtime.E2ETestEnvironment;
 import org.apache.shardingsphere.test.e2e.env.runtime.type.RunEnvironment.Type;
@@ -80,13 +79,13 @@ class PostgreSQLToMySQLMigrationE2EIT extends 
AbstractMigrationE2EIT {
             PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
             Awaitility.await().ignoreExceptions().atMost(10L, 
TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> 
!distSQLFacade.listJobIds().isEmpty());
             String jobId = distSQLFacade.listJobIds().get(0);
-            distSQLFacade.waitJobStatusReached(jobId, 
JobStatus.EXECUTE_INCREMENTAL_TASK, 15);
+            distSQLFacade.waitJobIncrementalStageStarted(jobId);
             try (Connection connection = DriverManager.getConnection(jdbcUrl, 
"postgres", "postgres")) {
                 connection.createStatement().execute(String.format("INSERT 
INTO t_order (order_id,user_id,status) VALUES (%s, %s, '%s')", "1000000000", 1, 
"incremental"));
                 connection.createStatement().execute(String.format("UPDATE 
t_order SET status='%s' WHERE order_id IN (1,2)", 
RandomStringUtils.randomAlphanumeric(10)));
             }
-            distSQLFacade.waitIncrementTaskFinished(jobId);
-            assertCheckMigrationSuccess(containerComposer, jobId, 
"DATA_MATCH");
+            distSQLFacade.waitJobIncrementalStageFinished(jobId);
+            distSQLFacade.startCheckAndVerify(jobId, "DATA_MATCH");
             distSQLFacade.commit(jobId);
             assertTrue(distSQLFacade.listJobIds().isEmpty());
         } finally {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index 8488c46b49e..c9a4c32c9b2 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -88,10 +88,10 @@ class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
         startMigration(containerComposer, SOURCE_TABLE_NAME, 
TARGET_TABLE_NAME);
         PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
         String jobId = distSQLFacade.listJobIds().get(0);
-        distSQLFacade.waitJobPrepareSuccess(jobId);
-        distSQLFacade.waitIncrementTaskFinished(jobId);
+        distSQLFacade.waitJobPreparingStageFinished(jobId);
+        distSQLFacade.waitJobIncrementalStageFinished(jobId);
         distSQLFacade.loadAllSingleTables();
-        assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
+        distSQLFacade.startCheckAndVerify(jobId, "DATA_MATCH");
         distSQLFacade.commit(jobId);
         
assertThat(containerComposer.getTargetTableRecordsCount(containerComposer.getProxyDataSource(),
 SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT));
     }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 586dae287b8..022b9882dae 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -242,12 +242,12 @@ class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
         
containerComposer.proxyExecuteWithLog(String.format(ORDER_TABLE_SHARDING_RULE_FORMAT,
 shardingColumn), 2);
         startMigration(containerComposer, SOURCE_TABLE_NAME, 
TARGET_TABLE_NAME);
         String jobId = distSQLFacade.listJobIds().get(0);
-        distSQLFacade.waitJobPrepareSuccess(jobId);
+        distSQLFacade.waitJobPreparingStageFinished(jobId);
         DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
         incrementalTaskFn.accept(jdbcDataSource);
-        distSQLFacade.waitIncrementTaskFinished(jobId);
+        distSQLFacade.waitJobIncrementalStageFinished(jobId);
         if (null != consistencyCheckAlgorithmType) {
-            assertCheckMigrationSuccess(containerComposer, jobId, 
consistencyCheckAlgorithmType);
+            distSQLFacade.startCheckAndVerify(jobId, 
consistencyCheckAlgorithmType);
         }
         distSQLFacade.commit(jobId);
         
assertThat(containerComposer.getTargetTableRecordsCount(jdbcDataSource, 
SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index 97192a5a66e..b39b782e606 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -70,12 +70,12 @@ class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
             createTargetOrderTableRule(containerComposer);
             startMigration(containerComposer, SOURCE_TABLE_NAME, 
TARGET_TABLE_NAME);
             String jobId = distSQLFacade.listJobIds().get(0);
-            distSQLFacade.waitJobPrepareSuccess(jobId);
+            distSQLFacade.waitJobPreparingStageFinished(jobId);
             containerComposer.sourceExecuteWithLog("INSERT INTO t_order 
(order_id, user_id, status) VALUES ('a1', 1, 'OK')");
             DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
             containerComposer.assertOrderRecordExist(jdbcDataSource, 
"t_order", "a1");
-            distSQLFacade.waitIncrementTaskFinished(jobId);
-            assertCheckMigrationSuccess(containerComposer, jobId, 
"CRC32_MATCH");
+            distSQLFacade.waitJobIncrementalStageFinished(jobId);
+            distSQLFacade.startCheckAndVerify(jobId, "CRC32_MATCH");
             distSQLFacade.commit(jobId);
             
assertThat(containerComposer.getTargetTableRecordsCount(jdbcDataSource, 
SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
             assertTrue(distSQLFacade.listJobIds().isEmpty());
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
index c3d1225d810..206a7652819 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
@@ -66,8 +66,8 @@ class TextPrimaryKeyMigrationE2EIT extends 
AbstractMigrationE2EIT {
             String jobId = distSQLFacade.listJobIds().get(0);
             containerComposer.sourceExecuteWithLog(
                     String.format("INSERT INTO %s (order_id,user_id,status) 
VALUES (%s, %s, '%s')", getSourceTableName(containerComposer), "1000000000", 1, 
"afterStop"));
-            distSQLFacade.waitIncrementTaskFinished(jobId);
-            assertCheckMigrationSuccess(containerComposer, jobId, 
"DATA_MATCH");
+            distSQLFacade.waitJobIncrementalStageFinished(jobId);
+            distSQLFacade.startCheckAndVerify(jobId, "DATA_MATCH");
             distSQLFacade.commit(jobId);
             assertTrue(distSQLFacade.listJobIds().isEmpty());
         }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
index 18ee6da4845..e5acd6631ff 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
@@ -35,6 +35,9 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Getter
@@ -165,15 +168,15 @@ public final class PipelineE2EDistSQLFacade {
     }
     
     /**
-     * Wait job prepare success.
+     * Wait job preparing stage finished.
      *
      * @param jobId job id
      */
-    public void waitJobPrepareSuccess(final String jobId) {
+    public void waitJobPreparingStageFinished(final String jobId) {
         String sql = buildShowJobStatusDistSQL(jobId);
         for (int i = 0; i < 5; i++) {
             List<Map<String, Object>> jobStatusRecords = 
containerComposer.queryForListWithLog(sql);
-            log.info("Wait job prepare success, job status records: {}", 
jobStatusRecords);
+            log.info("Wait job preparing stage finished, job status records: 
{}", jobStatusRecords);
             Set<String> statusSet = jobStatusRecords.stream().map(each -> 
String.valueOf(each.get("status"))).collect(Collectors.toSet());
             if (statusSet.contains(JobStatus.PREPARING.name()) || 
statusSet.contains(JobStatus.RUNNING.name())) {
                 containerComposer.sleepSeconds(2);
@@ -184,18 +187,17 @@ public final class PipelineE2EDistSQLFacade {
     }
     
     /**
-     * Wait job status reached.
+     * Wait job incremental stage started.
      *
      * @param jobId job id
-     * @param jobStatus job status
-     * @param maxSleepSeconds max sleep seconds
-     * @throws IllegalStateException if job status not reached
+     * @throws IllegalStateException if job incremental stage not started
      */
-    public void waitJobStatusReached(final String jobId, final JobStatus 
jobStatus, final int maxSleepSeconds) {
+    public void waitJobIncrementalStageStarted(final String jobId) {
         String sql = buildShowJobStatusDistSQL(jobId);
-        for (int i = 0, count = maxSleepSeconds / 2 + (0 == maxSleepSeconds % 
2 ? 0 : 1); i < count; i++) {
+        JobStatus jobStatus = JobStatus.EXECUTE_INCREMENTAL_TASK;
+        for (int i = 0; i < 10; i++) {
             List<Map<String, Object>> jobStatusRecords = 
containerComposer.queryForListWithLog(sql);
-            log.info("Wait job status reached, job status records: {}", 
jobStatusRecords);
+            log.info("Wait job Incremental stage started, job status records: 
{}", jobStatusRecords);
             List<String> statusList = jobStatusRecords.stream().map(each -> 
String.valueOf(each.get("status"))).collect(Collectors.toList());
             if (statusList.stream().allMatch(each -> 
each.equals(jobStatus.name()))) {
                 return;
@@ -206,16 +208,16 @@ public final class PipelineE2EDistSQLFacade {
     }
     
     /**
-     * Wait increment task finished.
+     * Wait job incremental stage finished.
      *
      * @param jobId job id
      * @return result
      */
-    public List<Map<String, Object>> waitIncrementTaskFinished(final String 
jobId) {
+    public List<Map<String, Object>> waitJobIncrementalStageFinished(final 
String jobId) {
         String sql = buildShowJobStatusDistSQL(jobId);
         for (int i = 0; i < 10; i++) {
             List<Map<String, Object>> jobStatusRecords = 
containerComposer.queryForListWithLog(sql);
-            log.info("Wait incremental task finished, job status records: {}", 
jobStatusRecords);
+            log.info("Wait job incremental stage finished, job status records: 
{}", jobStatusRecords);
             Set<String> statusSet = new HashSet<>(jobStatusRecords.size(), 1F);
             List<Integer> incrementalIdleSecondsList = new LinkedList<>();
             for (Map<String, Object> each : jobStatusRecords) {
@@ -238,4 +240,70 @@ public final class PipelineE2EDistSQLFacade {
     private String buildShowJobStatusDistSQL(final String jobId) {
         return String.format("SHOW %s STATUS %s", jobTypeName, jobId);
     }
+    
+    /**
+     * Start check and verify.
+     *
+     * @param jobId job id
+     * @param algorithmType algorithm type
+     * @throws SQLException SQL exception
+     */
+    public void startCheckAndVerify(final String jobId, final String 
algorithmType) throws SQLException {
+        startCheck(jobId, algorithmType, Collections.emptyMap());
+        verifyCheck(jobId);
+    }
+    
+    /**
+     * Start check.
+     *
+     * @param jobId job id
+     * @param algorithmType algorithm type
+     * @param algorithmProps algorithm properties
+     * @throws SQLException SQL exception
+     */
+    public void startCheck(final String jobId, final String algorithmType, 
final Map<String, String> algorithmProps) throws SQLException {
+        
containerComposer.proxyExecuteWithLog(buildConsistencyCheckDistSQL(jobId, 
algorithmType, algorithmProps), 0);
+    }
+    
+    private String buildConsistencyCheckDistSQL(final String jobId, final 
String algorithmType, final Map<String, String> algorithmProps) {
+        if (null == algorithmProps || algorithmProps.isEmpty()) {
+            return String.format("CHECK %s %s BY TYPE (NAME='%s')", 
jobTypeName, jobId, algorithmType);
+        }
+        String sqlTemplate = "CHECK %s %s BY TYPE (NAME='%s', PROPERTIES("
+                + algorithmProps.entrySet().stream().map(entry -> 
String.format("'%s'='%s'", entry.getKey(), 
entry.getValue())).collect(Collectors.joining(","))
+                + "))";
+        return String.format(sqlTemplate, jobTypeName, jobId, algorithmType);
+    }
+    
+    /**
+     * Verify check.
+     *
+     * @param jobId job id
+     */
+    public void verifyCheck(final String jobId) {
+        List<Map<String, Object>> checkStatusRecords = Collections.emptyList();
+        for (int i = 0; i < 10; i++) {
+            checkStatusRecords = 
containerComposer.queryForListWithLog(buildShowCheckJobStatusDistSQL(jobId));
+            if (checkStatusRecords.isEmpty()) {
+                containerComposer.sleepSeconds(3);
+                continue;
+            }
+            List<String> checkEndTimeList = 
checkStatusRecords.stream().map(map -> 
map.get("check_end_time").toString()).filter(each -> 
!Strings.isNullOrEmpty(each)).collect(Collectors.toList());
+            if (checkEndTimeList.size() == checkStatusRecords.size()) {
+                break;
+            } else {
+                containerComposer.sleepSeconds(3);
+            }
+        }
+        log.info("Verify check, results: {}", checkStatusRecords);
+        assertFalse(checkStatusRecords.isEmpty());
+        for (Map<String, Object> entry : checkStatusRecords) {
+            assertThat(entry.get("inventory_finished_percentage").toString(), 
is("100"));
+            assertTrue(Boolean.parseBoolean(entry.get("result").toString()));
+        }
+    }
+    
+    private String buildShowCheckJobStatusDistSQL(final String jobId) {
+        return String.format("SHOW %s CHECK STATUS %s", jobTypeName, jobId);
+    }
 }


Reply via email to