This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new de5eeded580 Extract drop and waitIncrementTaskFinished to 
PipelineE2EDistSQLFacade (#37773)
de5eeded580 is described below

commit de5eeded580b502f356c5b55e0660a48e6030c53
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Jan 19 19:38:37 2026 +0800

    Extract drop and waitIncrementTaskFinished to PipelineE2EDistSQLFacade 
(#37773)
    
    * Improve PipelineE2EDistSQLFacade.commit
    
    * Add PipelineE2EDistSQLFacade.drop
    
    * Move PipelineContainerComposer.waitIncrementTaskFinished to 
PipelineE2EDistSQLFacade
    
    * Simplify PipelineE2EDistSQLFacade.waitIncrementTaskFinished param
    
    * Improve PipelineE2EDistSQLFacade.waitIncrementTaskFinished
---
 .../pipeline/cases/PipelineContainerComposer.java  | 33 ------------
 .../e2e/operation/pipeline/cases/cdc/CDCE2EIT.java |  6 +--
 .../general/MySQLMigrationGeneralE2EIT.java        | 12 ++---
 .../general/MySQLTimeTypesMigrationE2EIT.java      |  2 +-
 .../general/PostgreSQLMigrationGeneralE2EIT.java   |  5 +-
 .../general/PostgreSQLToMySQLMigrationE2EIT.java   |  2 +-
 .../migration/general/RulesMigrationE2EIT.java     |  2 +-
 .../primarykey/IndexesMigrationE2EIT.java          |  2 +-
 .../primarykey/MariaDBMigrationE2EIT.java          |  2 +-
 .../primarykey/TextPrimaryKeyMigrationE2EIT.java   |  2 +-
 .../pipeline/util/PipelineE2EDistSQLFacade.java    | 60 +++++++++++++++++++++-
 11 files changed, 75 insertions(+), 53 deletions(-)

diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
index 975902d9144..0c81d57effa 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
@@ -67,11 +67,8 @@ import java.sql.Statement;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -537,36 +534,6 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         increaseTaskThread.start();
     }
     
-    /**
-     * Wait increment task finished.
-     *
-     * @param distSQL dist SQL
-     * @return result
-     */
-    // TODO use DAO to query via DistSQL
-    public List<Map<String, Object>> waitIncrementTaskFinished(final String 
distSQL) {
-        for (int i = 0; i < 10; i++) {
-            List<Map<String, Object>> listJobStatus = 
queryForListWithLog(distSQL);
-            log.info("show status result: {}", listJobStatus);
-            Set<String> actualStatus = new HashSet<>(listJobStatus.size(), 1F);
-            Collection<Integer> incrementalIdleSecondsList = new 
LinkedList<>();
-            for (Map<String, Object> each : listJobStatus) {
-                assertTrue(Strings.isNullOrEmpty((String) 
each.get("error_message")), "error_message: `" + each.get("error_message") + 
"`");
-                actualStatus.add(each.get("status").toString());
-                String incrementalIdleSeconds = (String) 
each.get("incremental_idle_seconds");
-                
incrementalIdleSecondsList.add(Strings.isNullOrEmpty(incrementalIdleSeconds) ? 
0 : Integer.parseInt(incrementalIdleSeconds));
-            }
-            if (Collections.min(incrementalIdleSecondsList) <= 5) {
-                sleepSeconds(3);
-                continue;
-            }
-            if (actualStatus.size() == 1 && 
actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
-                return listJobStatus;
-            }
-        }
-        return Collections.emptyList();
-    }
-    
     /**
      * Assert order record exists in proxy.
      *
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
index 99d27883a9f..98e9c66dfbc 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
@@ -125,11 +125,11 @@ class CDCE2EIT {
             final CDCClient cdcClient = 
buildCDCClientAndStart(targetDataSource, containerComposer);
             Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
             String jobId = distSQLFacade.listJobIds().get(0);
-            containerComposer.waitIncrementTaskFinished(String.format("SHOW 
STREAMING STATUS '%s'", jobId));
+            distSQLFacade.waitIncrementTaskFinished(jobId);
             DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
             String tableName = 
dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable() ? 
String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME) : 
SOURCE_TABLE_NAME;
             new E2EIncrementalTask(sourceDataSource, tableName, new 
SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20).run();
-            containerComposer.waitIncrementTaskFinished(String.format("SHOW 
STREAMING STATUS '%s'", jobId));
+            distSQLFacade.waitIncrementTaskFinished(jobId);
             for (int i = 1; i <= 4; i++) {
                 int orderId = 10000 + i;
                 containerComposer.proxyExecuteWithLog(String.format("INSERT 
INTO %s (order_id, user_id, status) VALUES (%d, %d, 'OK')", tableName, orderId, 
i), 0);
@@ -144,7 +144,7 @@ class CDCE2EIT {
             cdcClient.close();
             Awaitility.await().atMost(10L, 
TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS)
                     .until(() -> 
distSQLFacade.listJobs().stream().noneMatch(each -> 
Boolean.parseBoolean(each.get("active").toString())));
-            containerComposer.proxyExecuteWithLog(String.format("DROP 
STREAMING '%s'", jobId), 0);
+            distSQLFacade.drop(jobId);
             assertTrue(distSQLFacade.listJobs().isEmpty());
         }
     }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 1f3a9cf2429..51609b79c2e 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -91,11 +91,11 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
             containerComposer.assertOrderRecordExist(jdbcDataSource, 
"t_order", 10000);
             containerComposer.assertOrderRecordExist(jdbcDataSource, 
"t_order_item", 10000);
-            assertMigrationSuccessById(containerComposer, orderJobId, 
"DATA_MATCH", convertToProperties(ImmutableMap.of("chunk-size", "300", 
"streaming-range-type", "SMALL")));
+            assertMigrationSuccessById(distSQLFacade, orderJobId, 
"DATA_MATCH", convertToProperties(ImmutableMap.of("chunk-size", "300", 
"streaming-range-type", "SMALL")));
             String orderItemJobId = 
distSQLFacade.getJobIdByTableName("ds_0.t_order_item");
-            assertMigrationSuccessById(containerComposer, orderItemJobId, 
"DATA_MATCH", convertToProperties(ImmutableMap.of("chunk-size", "300", 
"streaming-range-type", "LARGE")));
+            assertMigrationSuccessById(distSQLFacade, orderItemJobId, 
"DATA_MATCH", convertToProperties(ImmutableMap.of("chunk-size", "300", 
"streaming-range-type", "LARGE")));
             containerComposer.sleepSeconds(2);
-            assertMigrationSuccessById(containerComposer, orderItemJobId, 
"CRC32_MATCH", new Properties());
+            assertMigrationSuccessById(distSQLFacade, orderItemJobId, 
"CRC32_MATCH", new Properties());
             for (String each : distSQLFacade.listJobIds()) {
                 distSQLFacade.commit(each);
             }
@@ -104,13 +104,13 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
         }
     }
     
-    private void assertMigrationSuccessById(final PipelineContainerComposer 
containerComposer, final String jobId, final String algorithmType, final 
Properties algorithmProps) throws SQLException {
-        List<Map<String, Object>> jobStatus = 
containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION 
STATUS '%s'", jobId));
+    private void assertMigrationSuccessById(final PipelineE2EDistSQLFacade 
distSQLFacade, final String jobId, final String algorithmType, final Properties 
algorithmProps) throws SQLException {
+        List<Map<String, Object>> jobStatus = 
distSQLFacade.waitIncrementTaskFinished(jobId);
         for (Map<String, Object> each : jobStatus) {
             
assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) > 
0);
             
assertThat(Integer.parseInt(each.get("inventory_finished_percentage").toString()),
 is(100));
         }
-        assertCheckMigrationSuccess(containerComposer, jobId, algorithmType, 
algorithmProps);
+        assertCheckMigrationSuccess(distSQLFacade.getContainerComposer(), 
jobId, algorithmType, algorithmProps);
     }
     
     private static boolean isEnabled(final ExtensionContext context) {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
index 96da7255b1e..2e5be76e0cf 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
@@ -57,7 +57,7 @@ class MySQLTimeTypesMigrationE2EIT extends 
AbstractMigrationE2EIT {
             String jobId = distSQLFacade.listJobIds().get(0);
             containerComposer.waitJobPrepareSuccess(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
             insertOneRecordWithZeroValue(containerComposer, 2);
-            containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
+            distSQLFacade.waitIncrementTaskFinished(jobId);
             distSQLFacade.loadAllSingleTables();
             assertCheckMigrationSuccess(containerComposer, jobId, 
"DATA_MATCH");
         }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index b61b4adbc29..f841cde414b 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -107,11 +107,10 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
     }
     
     private void checkOrderMigration(final PipelineE2EDistSQLFacade 
distSQLFacade, final String jobId) throws SQLException {
-        PipelineContainerComposer containerComposer = 
distSQLFacade.getContainerComposer();
-        containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
+        distSQLFacade.waitIncrementTaskFinished(jobId);
         distSQLFacade.pauseJob(jobId);
         distSQLFacade.resumeJob(jobId);
-        assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
+        assertCheckMigrationSuccess(distSQLFacade.getContainerComposer(), 
jobId, "DATA_MATCH");
     }
     
     private void checkOrderItemMigration(final PipelineE2EDistSQLFacade 
distSQLFacade) throws SQLException {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
index fdf78950802..775c1fb9134 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
@@ -85,7 +85,7 @@ class PostgreSQLToMySQLMigrationE2EIT extends 
AbstractMigrationE2EIT {
                 connection.createStatement().execute(String.format("INSERT 
INTO t_order (order_id,user_id,status) VALUES (%s, %s, '%s')", "1000000000", 1, 
"incremental"));
                 connection.createStatement().execute(String.format("UPDATE 
t_order SET status='%s' WHERE order_id IN (1,2)", 
RandomStringUtils.randomAlphanumeric(10)));
             }
-            containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
+            distSQLFacade.waitIncrementTaskFinished(jobId);
             assertCheckMigrationSuccess(containerComposer, jobId, 
"DATA_MATCH");
             distSQLFacade.commit(jobId);
             assertTrue(distSQLFacade.listJobIds().isEmpty());
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index 952c7493ca6..9599287c1ef 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -89,7 +89,7 @@ class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
         PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
         String jobId = distSQLFacade.listJobIds().get(0);
         containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION 
STATUS '%s'", jobId));
-        containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
+        distSQLFacade.waitIncrementTaskFinished(jobId);
         distSQLFacade.loadAllSingleTables();
         assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
         distSQLFacade.commit(jobId);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 28934494e74..7acd479caf6 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -245,7 +245,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
         containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION 
STATUS '%s'", jobId));
         DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
         incrementalTaskFn.accept(jdbcDataSource);
-        containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
+        distSQLFacade.waitIncrementTaskFinished(jobId);
         if (null != consistencyCheckAlgorithmType) {
             assertCheckMigrationSuccess(containerComposer, jobId, 
consistencyCheckAlgorithmType);
         }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index df23e276776..ab566c1ef5b 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -74,7 +74,7 @@ class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
             containerComposer.sourceExecuteWithLog("INSERT INTO t_order 
(order_id, user_id, status) VALUES ('a1', 1, 'OK')");
             DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
             containerComposer.assertOrderRecordExist(jdbcDataSource, 
"t_order", "a1");
-            containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
+            distSQLFacade.waitIncrementTaskFinished(jobId);
             assertCheckMigrationSuccess(containerComposer, jobId, 
"CRC32_MATCH");
             distSQLFacade.commit(jobId);
             
assertThat(containerComposer.getTargetTableRecordsCount(jdbcDataSource, 
SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
index f29f3b64a31..c3d1225d810 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
@@ -66,7 +66,7 @@ class TextPrimaryKeyMigrationE2EIT extends 
AbstractMigrationE2EIT {
             String jobId = distSQLFacade.listJobIds().get(0);
             containerComposer.sourceExecuteWithLog(
                     String.format("INSERT INTO %s (order_id,user_id,status) 
VALUES (%s, %s, '%s')", getSourceTableName(containerComposer), "1000000000", 1, 
"afterStop"));
-            containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
+            distSQLFacade.waitIncrementTaskFinished(jobId);
             assertCheckMigrationSuccess(containerComposer, jobId, 
"DATA_MATCH");
             distSQLFacade.commit(jobId);
             assertTrue(distSQLFacade.listJobIds().isEmpty());
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
index 2b7c1b5c693..cd19137fff9 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
@@ -17,18 +17,29 @@
 
 package org.apache.shardingsphere.test.e2e.operation.pipeline.util;
 
+import com.google.common.base.Strings;
 import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
 import org.awaitility.Awaitility;
 
 import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
 @Getter
+@Slf4j
 public final class PipelineE2EDistSQLFacade {
     
     private static final String PIPELINE_RULE_SQL_TEMPLATE = "ALTER %s RULE(\n"
@@ -139,7 +150,52 @@ public final class PipelineE2EDistSQLFacade {
      * @throws SQLException SQL exception
      */
     public void commit(final String jobId) throws SQLException {
-        containerComposer.proxyExecuteWithLog(String.format("COMMIT %s %s", 
jobTypeName, jobId), 2);
-        Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() -> 
!listJobIds().contains(jobId));
+        String sql = String.format("COMMIT %s %s", jobTypeName, jobId);
+        containerComposer.proxyExecuteWithLog(sql, 0);
+        Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() -> 
!listJobIds().contains(jobId));
+    }
+    
+    /**
+     * Drop job.
+     *
+     * @param jobId job id
+     * @throws SQLException SQL exception
+     */
+    public void drop(final String jobId) throws SQLException {
+        containerComposer.proxyExecuteWithLog(String.format("DROP %s %s", 
jobTypeName, jobId), 0);
+    }
+    
+    /**
+     * Wait increment task finished.
+     *
+     * @param jobId job id
+     * @return result
+     */
+    public List<Map<String, Object>> waitIncrementTaskFinished(final String 
jobId) {
+        String distSQL = buildShowJobStatusDistSQL(jobId);
+        for (int i = 0; i < 10; i++) {
+            List<Map<String, Object>> jobStatusRecords = 
containerComposer.queryForListWithLog(distSQL);
+            log.info("Show status result: {}", jobStatusRecords);
+            Set<String> actualStatus = new HashSet<>(jobStatusRecords.size(), 
1F);
+            Collection<Integer> incrementalIdleSecondsList = new 
LinkedList<>();
+            for (Map<String, Object> each : jobStatusRecords) {
+                assertTrue(Strings.isNullOrEmpty((String) 
each.get("error_message")), "error_message: `" + each.get("error_message") + 
"`");
+                actualStatus.add(each.get("status").toString());
+                String incrementalIdleSeconds = (String) 
each.get("incremental_idle_seconds");
+                
incrementalIdleSecondsList.add(Strings.isNullOrEmpty(incrementalIdleSeconds) ? 
0 : Integer.parseInt(incrementalIdleSeconds));
+            }
+            if (Collections.min(incrementalIdleSecondsList) <= 5) {
+                containerComposer.sleepSeconds(3);
+                continue;
+            }
+            if (actualStatus.size() == 1 && 
actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
+                return jobStatusRecords;
+            }
+        }
+        return Collections.emptyList();
+    }
+    
+    private String buildShowJobStatusDistSQL(final String jobId) {
+        return String.format("SHOW %s STATUS %s", jobTypeName, jobId);
     }
 }

Reply via email to