This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new e6d9d73f4da Extract PipelineE2EDistSQLFacade (#37753)
e6d9d73f4da is described below

commit e6d9d73f4da72ac0a07b878fa8697c21459d87ee
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Jan 16 15:59:27 2026 +0800

    Extract PipelineE2EDistSQLFacade (#37753)
---
 .../pipeline/cases/PipelineContainerComposer.java  |  35 +++--
 .../e2e/operation/pipeline/cases/cdc/CDCE2EIT.java |  26 ++--
 .../cases/migration/AbstractMigrationE2EIT.java    |  34 +----
 .../general/MySQLMigrationGeneralE2EIT.java        |  21 +--
 .../general/MySQLTimeTypesMigrationE2EIT.java      |   6 +-
 .../general/PostgreSQLMigrationGeneralE2EIT.java   |  31 +++--
 .../general/PostgreSQLToMySQLMigrationE2EIT.java   |  12 +-
 .../migration/general/RollbackMigrationE2EIT.java  |   8 +-
 .../migration/general/RulesMigrationE2EIT.java     |   8 +-
 .../primarykey/IndexesMigrationE2EIT.java          |  12 +-
 .../primarykey/MariaDBMigrationE2EIT.java          |  12 +-
 .../primarykey/TextPrimaryKeyMigrationE2EIT.java   |  12 +-
 .../pipeline/command/MigrationDistSQLCommand.java  |   4 -
 .../pipeline/util/PipelineE2EDistSQLFacade.java    | 145 +++++++++++++++++++++
 .../resources/env/common/migration-command.xml     |  16 ---
 15 files changed, 248 insertions(+), 134 deletions(-)

diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
index 440ca60a805..1ebcf787b41 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
@@ -118,6 +118,8 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     private final DataSource proxyDataSource;
     
+    private final PipelineJobType<?> jobType;
+    
     private Thread increaseTaskThread;
     
     public PipelineContainerComposer(final PipelineTestParameter testParam, 
final PipelineJobType<?> jobType) {
@@ -138,6 +140,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         sourceDataSource = 
StorageContainerUtils.generateDataSource(getActualJdbcUrlTemplate(DS_0, false), 
username, password, 2);
         proxyDataSource = StorageContainerUtils.generateDataSource(
                 
appendExtraParameter(containerComposer.getProxyJdbcUrl(PROXY_DATABASE)), 
ProxyContainerConstants.USER, ProxyContainerConstants.PASSWORD, 2);
+        this.jobType = jobType;
         init(jobType);
     }
     
@@ -205,7 +208,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         } catch (final SQLException ex) {
             log.warn("Drop proxy database failed, error={}", ex.getMessage());
         }
-        Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() -> true);
+        sleepSeconds(2);
     }
     
     private void createProxyDatabase(final Connection connection) throws 
SQLException {
@@ -213,7 +216,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         log.info("Create proxy database {}", PROXY_DATABASE);
         try (Statement statement = connection.createStatement()) {
             statement.execute(sql);
-            Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() -> 
true);
+            sleepSeconds(2);
         }
     }
     
@@ -227,6 +230,18 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         }
     }
     
+    /**
+     * Sleep seconds.
+     *
+     * @param seconds seconds
+     */
+    public void sleepSeconds(final int seconds) {
+        if (seconds <= 0) {
+            return;
+        }
+        Awaitility.await().pollDelay(seconds, TimeUnit.SECONDS).until(() -> 
true);
+    }
+    
     /**
      * Append extra parameter.
      *
@@ -316,19 +331,17 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
      * Create schema.
      *
      * @param connection connection
-     * @param sleepSeconds sleep seconds
+     * @param seconds sleep seconds
      * @throws SQLException SQL exception
      */
-    public void createSchema(final Connection connection, final int 
sleepSeconds) throws SQLException {
+    public void createSchema(final Connection connection, final int seconds) 
throws SQLException {
         if (!new 
DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData().getSchemaOption().isSchemaAvailable())
 {
             return;
         }
         try (Statement statement = connection.createStatement()) {
             statement.execute(String.format("CREATE SCHEMA %s", SCHEMA_NAME));
         }
-        if (sleepSeconds > 0) {
-            Awaitility.await().pollDelay(sleepSeconds, 
TimeUnit.SECONDS).until(() -> true);
-        }
+        sleepSeconds(seconds);
     }
     
     /**
@@ -432,7 +445,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
             List<Map<String, Object>> jobStatus = queryForListWithLog(distSQL);
             Set<String> statusSet = jobStatus.stream().map(each -> 
String.valueOf(each.get("status"))).collect(Collectors.toSet());
             if (statusSet.contains(JobStatus.PREPARING.name()) || 
statusSet.contains(JobStatus.RUNNING.name())) {
-                Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() -> 
true);
+                sleepSeconds(2);
                 continue;
             }
             break;
@@ -455,7 +468,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
             if (statusSet.stream().allMatch(each -> 
each.equals(jobStatus.name()))) {
                 return;
             }
-            Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() -> 
true);
+            sleepSeconds(2);
         }
         throw new IllegalStateException("Job status not reached: " + 
jobStatus);
     }
@@ -542,7 +555,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
                 
incrementalIdleSecondsList.add(Strings.isNullOrEmpty(incrementalIdleSeconds) ? 
0 : Integer.parseInt(incrementalIdleSeconds));
             }
             if (Collections.min(incrementalIdleSecondsList) <= 5) {
-                Awaitility.await().pollDelay(3L, TimeUnit.SECONDS).until(() -> 
true);
+                sleepSeconds(3);
                 continue;
             }
             if (actualStatus.size() == 1 && 
actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
@@ -583,7 +596,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
             if (recordExist) {
                 break;
             }
-            Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() -> 
true);
+            sleepSeconds(2);
         }
         assertTrue(recordExist, "Order record does not exist");
     }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
index a6b39b1ee03..d5d6c94a85e 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
@@ -52,6 +52,7 @@ import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceExecuteUtils;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.api.extension.ExtensionContext;
@@ -105,7 +106,8 @@ class CDCE2EIT {
                 containerComposer.registerStorageUnit(each);
             }
             createOrderTableRule(containerComposer);
-            createBroadcastRule(containerComposer);
+            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer);
+            distSQLFacade.createBroadcastRule("t_address");
             try (Connection connection = 
containerComposer.getProxyDataSource().getConnection()) {
                 initSchemaAndTable(containerComposer, connection, 3);
             }
@@ -123,8 +125,8 @@ class CDCE2EIT {
             }
             PipelineDataSource targetDataSource = 
createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4);
             final CDCClient cdcClient = 
buildCDCClientAndStart(targetDataSource, containerComposer);
-            Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW 
STREAMING LIST").isEmpty());
-            String jobId = containerComposer.queryForListWithLog("SHOW 
STREAMING LIST").get(0).get("id").toString();
+            Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
+            String jobId = distSQLFacade.listJobIds().get(0);
             containerComposer.waitIncrementTaskFinished(String.format("SHOW 
STREAMING STATUS '%s'", jobId));
             DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
             String tableName = 
dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable() ? 
String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME) : 
SOURCE_TABLE_NAME;
@@ -142,10 +144,10 @@ class CDCE2EIT {
             assertDataMatched(sourceDataSource, targetDataSource, new 
QualifiedTable(null, "t_address"));
             assertDataMatched(sourceDataSource, targetDataSource, new 
QualifiedTable(null, "t_single"));
             cdcClient.close();
-            Awaitility.await().atMost(10L, 
TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS).until(() -> 
containerComposer.queryForListWithLog("SHOW STREAMING LIST")
-                    .stream().noneMatch(each -> 
Boolean.parseBoolean(each.get("active").toString())));
+            Awaitility.await().atMost(10L, 
TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS)
+                    .until(() -> 
distSQLFacade.listJobs().stream().noneMatch(each -> 
Boolean.parseBoolean(each.get("active").toString())));
             containerComposer.proxyExecuteWithLog(String.format("DROP 
STREAMING '%s'", jobId), 0);
-            assertTrue(containerComposer.queryForListWithLog("SHOW STREAMING 
LIST").isEmpty());
+            assertTrue(distSQLFacade.listJobs().isEmpty());
         }
     }
     
@@ -154,20 +156,14 @@ class CDCE2EIT {
         Awaitility.await().atMost(20L, TimeUnit.SECONDS).pollInterval(2L, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW 
SHARDING TABLE RULE t_order").isEmpty());
     }
     
-    private void createBroadcastRule(final PipelineContainerComposer 
containerComposer) throws SQLException {
-        containerComposer.proxyExecuteWithLog("CREATE BROADCAST TABLE RULE 
t_address", 2);
-    }
-    
-    private void initSchemaAndTable(final PipelineContainerComposer 
containerComposer, final Connection connection, final int sleepSeconds) throws 
SQLException {
-        containerComposer.createSchema(connection, sleepSeconds);
+    private void initSchemaAndTable(final PipelineContainerComposer 
containerComposer, final Connection connection, final int seconds) throws 
SQLException {
+        containerComposer.createSchema(connection, seconds);
         String sql = 
containerComposer.getExtraSQLCommand().getCreateTableOrder(SOURCE_TABLE_NAME);
         log.info("Create table sql: {}", sql);
         connection.createStatement().execute(sql);
         connection.createStatement().execute("CREATE TABLE t_address(id 
integer primary key, address_name varchar(255))");
         connection.createStatement().execute("CREATE TABLE t_single(id integer 
primary key)");
-        if (sleepSeconds > 0) {
-            Awaitility.await().pollDelay(sleepSeconds, 
TimeUnit.SECONDS).until(() -> true);
-        }
+        containerComposer.sleepSeconds(seconds);
     }
     
     private PipelineDataSource createStandardDataSource(final 
PipelineContainerComposer containerComposer, final String storageUnitName) {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
index 537883f5d52..9d62d2ff4fd 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -98,10 +98,6 @@ public abstract class AbstractMigrationE2EIT {
         }
     }
     
-    protected void loadAllSingleTables(final PipelineContainerComposer 
containerComposer) throws SQLException {
-        containerComposer.proxyExecuteWithLog("LOAD SINGLE TABLE *.*", 5);
-    }
-    
     protected void createTargetOrderTableRule(final PipelineContainerComposer 
containerComposer) throws SQLException {
         
containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderTableRule(),
 0);
         Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW 
SHARDING TABLE RULE t_order").isEmpty());
@@ -124,32 +120,6 @@ public abstract class AbstractMigrationE2EIT {
         
containerComposer.proxyExecuteWithLog(migrationDistSQL.getMigrationSingleTableWithSchema(sourceTableName,
 targetTableName), 5);
     }
     
-    protected void addMigrationProcessConfig(final PipelineContainerComposer 
containerComposer) throws SQLException {
-        
containerComposer.proxyExecuteWithLog(migrationDistSQL.getAlterMigrationRule(), 
0);
-    }
-    
-    protected void stopMigrationByJobId(final PipelineContainerComposer 
containerComposer, final String jobId) throws SQLException {
-        containerComposer.proxyExecuteWithLog(String.format("STOP MIGRATION 
'%s'", jobId), 1);
-    }
-    
-    protected void startMigrationByJobId(final PipelineContainerComposer 
containerComposer, final String jobId) throws SQLException {
-        containerComposer.proxyExecuteWithLog(String.format("START MIGRATION 
'%s'", jobId), 4);
-    }
-    
-    protected void commitMigrationByJobId(final PipelineContainerComposer 
containerComposer, final String jobId) throws SQLException {
-        containerComposer.proxyExecuteWithLog(String.format("COMMIT MIGRATION 
'%s'", jobId), 1);
-    }
-    
-    protected List<String> listJobId(final PipelineContainerComposer 
containerComposer) {
-        List<Map<String, Object>> jobList = 
containerComposer.queryForListWithLog("SHOW MIGRATION LIST");
-        return jobList.stream().map(a -> 
a.get("id").toString()).collect(Collectors.toList());
-    }
-    
-    protected String getJobIdByTableName(final PipelineContainerComposer 
containerComposer, final String tableName) {
-        List<Map<String, Object>> jobList = 
containerComposer.queryForListWithLog("SHOW MIGRATION LIST");
-        return jobList.stream().filter(a -> 
a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new 
RuntimeException("not find " + tableName + " table")).get("id").toString();
-    }
-    
     protected void assertCheckMigrationSuccess(final PipelineContainerComposer 
containerComposer, final String jobId, final String algorithmType) throws 
SQLException {
         assertCheckMigrationSuccess(containerComposer, jobId, algorithmType, 
new Properties());
     }
@@ -161,7 +131,7 @@ public abstract class AbstractMigrationE2EIT {
         for (int i = 0; i < 30; i++) {
             resultList = 
containerComposer.queryForListWithLog(String.format("SHOW MIGRATION CHECK 
STATUS '%s'", jobId));
             if (resultList.isEmpty()) {
-                Awaitility.await().pollDelay(3L, TimeUnit.SECONDS).until(() -> 
true);
+                containerComposer.sleepSeconds(3);
                 continue;
             }
             List<String> checkEndTimeList = resultList.stream().map(map -> 
map.get("check_end_time").toString()).filter(each -> 
!Strings.isNullOrEmpty(each)).collect(Collectors.toList());
@@ -169,7 +139,7 @@ public abstract class AbstractMigrationE2EIT {
             if (checkEndTimeList.size() == resultList.size() && 1 == 
finishedPercentages.size() && finishedPercentages.contains("100")) {
                 break;
             } else {
-                Awaitility.await().pollDelay(1L, TimeUnit.SECONDS).until(() -> 
true);
+                containerComposer.sleepSeconds(1);
             }
         }
         log.info("check job results: {}", resultList);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 8c7bad4323b..8650306176c 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -32,7 +32,7 @@ import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceExecuteUtils;
-import org.awaitility.Awaitility;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
 import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -63,7 +63,8 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
     @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     void assertMigrationSuccess(final PipelineTestParameter testParam) throws 
SQLException, InterruptedException {
         try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
-            addMigrationProcessConfig(containerComposer);
+            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer);
+            distSQLFacade.alterPipelineRule();
             containerComposer.createSourceOrderTable(SOURCE_TABLE_NAME);
             containerComposer.createSourceOrderItemTable();
             addMigrationSourceResource(containerComposer);
@@ -78,27 +79,27 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             log.info("init data end: {}", LocalDateTime.now());
             startMigration(containerComposer, SOURCE_TABLE_NAME, 
TARGET_TABLE_NAME);
             startMigration(containerComposer, "t_order_item", "t_order_item");
-            String orderJobId = getJobIdByTableName(containerComposer, "ds_0." 
+ SOURCE_TABLE_NAME);
+            String orderJobId = distSQLFacade.getJobIdByTableName("ds_0." + 
SOURCE_TABLE_NAME);
             containerComposer.waitJobPrepareSuccess(String.format("SHOW 
MIGRATION STATUS '%s'", orderJobId));
             containerComposer.startIncrementTask(
                     new 
E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME, 
new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
             
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30L);
             containerComposer.sourceExecuteWithLog(String.format("INSERT INTO 
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", SOURCE_TABLE_NAME));
             containerComposer.sourceExecuteWithLog("INSERT INTO t_order_item 
(item_id, order_id, user_id, status) VALUES (10000, 10000, 1, 'OK')");
-            stopMigrationByJobId(containerComposer, orderJobId);
-            startMigrationByJobId(containerComposer, orderJobId);
+            distSQLFacade.pauseJob(orderJobId);
+            distSQLFacade.resumeJob(orderJobId);
             DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
             containerComposer.assertOrderRecordExist(jdbcDataSource, 
"t_order", 10000);
             containerComposer.assertOrderRecordExist(jdbcDataSource, 
"t_order_item", 10000);
             assertMigrationSuccessById(containerComposer, orderJobId, 
"DATA_MATCH", convertToProperties(ImmutableMap.of("chunk-size", "300", 
"streaming-range-type", "SMALL")));
-            String orderItemJobId = getJobIdByTableName(containerComposer, 
"ds_0.t_order_item");
+            String orderItemJobId = 
distSQLFacade.getJobIdByTableName("ds_0.t_order_item");
             assertMigrationSuccessById(containerComposer, orderItemJobId, 
"DATA_MATCH", convertToProperties(ImmutableMap.of("chunk-size", "300", 
"streaming-range-type", "LARGE")));
-            Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() -> 
true);
+            containerComposer.sleepSeconds(2);
             assertMigrationSuccessById(containerComposer, orderItemJobId, 
"CRC32_MATCH", new Properties());
-            for (String each : listJobId(containerComposer)) {
-                commitMigrationByJobId(containerComposer, each);
+            for (String each : distSQLFacade.listJobIds()) {
+                distSQLFacade.commit(each);
             }
-            assertTrue(listJobId(containerComposer).isEmpty());
+            assertTrue(distSQLFacade.listJobIds().isEmpty());
             
containerComposer.assertGreaterThanOrderTableInitRows(jdbcDataSource, 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT, "");
         }
     }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
index df9c720d60a..ed5812ff1cd 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
@@ -24,6 +24,7 @@ import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
 import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -52,11 +53,12 @@ class MySQLTimeTypesMigrationE2EIT extends 
AbstractMigrationE2EIT {
             addMigrationSourceResource(containerComposer);
             addMigrationTargetResource(containerComposer);
             startMigration(containerComposer, "time_e2e", "time_e2e");
-            String jobId = listJobId(containerComposer).get(0);
+            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer);
+            String jobId = distSQLFacade.listJobIds().get(0);
             containerComposer.waitJobPrepareSuccess(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
             insertOneRecordWithZeroValue(containerComposer, 2);
             containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
-            loadAllSingleTables(containerComposer);
+            distSQLFacade.loadAllSingleTables();
             assertCheckMigrationSuccess(containerComposer, jobId, 
"DATA_MATCH");
         }
     }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index d816381c8f5..0fbfdf2d732 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -32,6 +32,7 @@ import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceExecuteUtils;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.api.extension.ExtensionContext;
@@ -64,7 +65,8 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
     @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     void assertMigrationSuccess(final PipelineTestParameter testParam) throws 
SQLException, InterruptedException {
         try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
-            addMigrationProcessConfig(containerComposer);
+            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer);
+            distSQLFacade.alterPipelineRule();
             createSourceSchema(containerComposer, 
PipelineContainerComposer.SCHEMA_NAME);
             containerComposer.createSourceOrderTable(SOURCE_TABLE_NAME);
             containerComposer.createSourceOrderItemTable();
@@ -81,8 +83,8 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             int replicationSlotsCount = 
getReplicationSlotsCount(containerComposer);
             log.info("init data end: {}, replication slots count: {}", 
LocalDateTime.now(), replicationSlotsCount);
             startMigrationWithSchema(containerComposer, SOURCE_TABLE_NAME, 
"t_order");
-            Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !listJobId(containerComposer).isEmpty());
-            String jobId = getJobIdByTableName(containerComposer, "ds_0.test." 
+ SOURCE_TABLE_NAME);
+            Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
+            String jobId = distSQLFacade.getJobIdByTableName("ds_0.test." + 
SOURCE_TABLE_NAME);
             containerComposer.waitJobPrepareSuccess(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
             String qualifiedTableName = String.join(".", 
PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME);
             containerComposer.startIncrementTask(new 
E2EIncrementalTask(containerComposer.getSourceDataSource(), qualifiedTableName, 
new SnowflakeKeyGenerateAlgorithm(),
@@ -91,29 +93,30 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             containerComposer.sourceExecuteWithLog(String.format("INSERT INTO 
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", qualifiedTableName));
             DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
             containerComposer.assertOrderRecordExist(jdbcDataSource, 
qualifiedTableName, 10000);
-            checkOrderMigration(containerComposer, jobId);
+            checkOrderMigration(distSQLFacade, jobId);
             startMigrationWithSchema(containerComposer, "t_order_item", 
"t_order_item");
-            checkOrderItemMigration(containerComposer);
-            for (String each : listJobId(containerComposer)) {
-                commitMigrationByJobId(containerComposer, each);
+            checkOrderItemMigration(distSQLFacade);
+            for (String each : distSQLFacade.listJobIds()) {
+                distSQLFacade.commit(each);
             }
-            List<String> lastJobIds = listJobId(containerComposer);
-            assertTrue(lastJobIds.isEmpty());
+            assertTrue(distSQLFacade.listJobIds().isEmpty());
             
containerComposer.assertGreaterThanOrderTableInitRows(jdbcDataSource, 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1, 
PipelineContainerComposer.SCHEMA_NAME);
             assertThat("Replication slots count doesn't match, it might be not 
cleaned, run `SELECT * FROM pg_replication_slots;` in PostgreSQL to verify",
                     getReplicationSlotsCount(containerComposer), 
is(replicationSlotsCount));
         }
     }
     
-    private void checkOrderMigration(final PipelineContainerComposer 
containerComposer, final String jobId) throws SQLException {
+    private void checkOrderMigration(final PipelineE2EDistSQLFacade 
distSQLFacade, final String jobId) throws SQLException {
+        PipelineContainerComposer containerComposer = 
distSQLFacade.getContainerComposer();
         containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
-        stopMigrationByJobId(containerComposer, jobId);
-        startMigrationByJobId(containerComposer, jobId);
+        distSQLFacade.pauseJob(jobId);
+        distSQLFacade.resumeJob(jobId);
         assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
     }
     
-    private void checkOrderItemMigration(final PipelineContainerComposer 
containerComposer) throws SQLException {
-        String jobId = getJobIdByTableName(containerComposer, 
"ds_0.test.t_order_item");
+    private void checkOrderItemMigration(final PipelineE2EDistSQLFacade 
distSQLFacade) throws SQLException {
+        String jobId = 
distSQLFacade.getJobIdByTableName("ds_0.test.t_order_item");
+        PipelineContainerComposer containerComposer = 
distSQLFacade.getContainerComposer();
         containerComposer.waitJobStatusReached(String.format("SHOW MIGRATION 
STATUS '%s'", jobId), JobStatus.EXECUTE_INCREMENTAL_TASK, 15);
         assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
     }
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
index e0c3f9967d7..a41d0137748 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
@@ -31,6 +31,7 @@ import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
 import org.awaitility.Awaitility;
 import org.awaitility.core.ConditionTimeoutException;
 import org.junit.jupiter.api.condition.EnabledIf;
@@ -48,7 +49,6 @@ import java.sql.SQLException;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
-import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -77,8 +77,9 @@ class PostgreSQLToMySQLMigrationE2EIT extends 
AbstractMigrationE2EIT {
                     + "KEY_GENERATE_STRATEGY(COLUMN=order_id, 
TYPE(NAME='snowflake')))", 2);
             initTargetTable(containerComposer);
             containerComposer.proxyExecuteWithLog("MIGRATE TABLE 
source_ds.t_order INTO t_order", 2);
-            Awaitility.await().ignoreExceptions().atMost(10L, 
TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> 
!listJobId(containerComposer).isEmpty());
-            String jobId = listJobId(containerComposer).get(0);
+            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer);
+            Awaitility.await().ignoreExceptions().atMost(10L, 
TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> 
!distSQLFacade.listJobIds().isEmpty());
+            String jobId = distSQLFacade.listJobIds().get(0);
             containerComposer.waitJobStatusReached(String.format("SHOW 
MIGRATION STATUS %s", jobId), JobStatus.EXECUTE_INCREMENTAL_TASK, 15);
             try (Connection connection = DriverManager.getConnection(jdbcUrl, 
"postgres", "postgres")) {
                 connection.createStatement().execute(String.format("INSERT 
INTO t_order (order_id,user_id,status) VALUES (%s, %s, '%s')", "1000000000", 1, 
"incremental"));
@@ -86,9 +87,8 @@ class PostgreSQLToMySQLMigrationE2EIT extends 
AbstractMigrationE2EIT {
             }
             containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
             assertCheckMigrationSuccess(containerComposer, jobId, 
"DATA_MATCH");
-            commitMigrationByJobId(containerComposer, jobId);
-            List<String> lastJobIds = listJobId(containerComposer);
-            assertTrue(lastJobIds.isEmpty());
+            distSQLFacade.commit(jobId);
+            assertTrue(distSQLFacade.listJobIds().isEmpty());
         } finally {
             if (null != postgresqlContainer) {
                 postgresqlContainer.close();
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
index 9449d394237..ec1c742467d 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
@@ -26,6 +26,7 @@ import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
 import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -51,9 +52,10 @@ class RollbackMigrationE2EIT extends AbstractMigrationE2EIT {
             addMigrationSourceResource(containerComposer);
             addMigrationTargetResource(containerComposer);
             startMigration(containerComposer, "t_order", "t_order");
-            String jobId = listJobId(containerComposer).get(0);
-            containerComposer.proxyExecuteWithLog(String.format("ROLLBACK 
MIGRATION %s", jobId), 2);
-            assertTrue(listJobId(containerComposer).isEmpty());
+            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer);
+            String jobId = distSQLFacade.listJobIds().get(0);
+            distSQLFacade.rollback(jobId);
+            assertTrue(distSQLFacade.listJobIds().isEmpty());
         }
     }
     
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index 591d7f5323f..4486edb8899 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -27,6 +27,7 @@ import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
 import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -85,12 +86,13 @@ class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
             addRuleFn.call();
         }
         startMigration(containerComposer, SOURCE_TABLE_NAME, 
TARGET_TABLE_NAME);
-        String jobId = listJobId(containerComposer).get(0);
+        PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer);
+        String jobId = distSQLFacade.listJobIds().get(0);
         containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION 
STATUS '%s'", jobId));
         containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
-        loadAllSingleTables(containerComposer);
+        distSQLFacade.loadAllSingleTables();
         assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
-        commitMigrationByJobId(containerComposer, jobId);
+        distSQLFacade.commit(jobId);
         
assertThat(containerComposer.getTargetTableRecordsCount(containerComposer.getProxyDataSource(),
 SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT));
     }
     
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index f53d2633a68..ef8d02a8d7e 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -35,6 +35,7 @@ import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
 import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -44,7 +45,6 @@ import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
-import java.util.List;
 import java.util.function.Consumer;
 
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -235,12 +235,13 @@ class IndexesMigrationE2EIT extends 
AbstractMigrationE2EIT {
         try (Connection connection = 
containerComposer.getSourceDataSource().getConnection()) {
             
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, 
keyGenerateAlgorithm, SOURCE_TABLE_NAME, 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
         }
-        addMigrationProcessConfig(containerComposer);
+        PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer);
+        distSQLFacade.alterPipelineRule();
         addMigrationSourceResource(containerComposer);
         addMigrationTargetResource(containerComposer);
         
containerComposer.proxyExecuteWithLog(String.format(ORDER_TABLE_SHARDING_RULE_FORMAT,
 shardingColumn), 2);
         startMigration(containerComposer, SOURCE_TABLE_NAME, 
TARGET_TABLE_NAME);
-        String jobId = listJobId(containerComposer).get(0);
+        String jobId = distSQLFacade.listJobIds().get(0);
         containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION 
STATUS '%s'", jobId));
         DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
         incrementalTaskFn.accept(jdbcDataSource);
@@ -248,10 +249,9 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT 
{
         if (null != consistencyCheckAlgorithmType) {
             assertCheckMigrationSuccess(containerComposer, jobId, 
consistencyCheckAlgorithmType);
         }
-        commitMigrationByJobId(containerComposer, jobId);
+        distSQLFacade.commit(jobId);
         
assertThat(containerComposer.getTargetTableRecordsCount(jdbcDataSource, 
SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
-        List<String> lastJobIds = listJobId(containerComposer);
-        assertTrue(lastJobIds.isEmpty());
+        assertTrue(distSQLFacade.listJobIds().isEmpty());
     }
     
     private static boolean isEnabled(final ExtensionContext context) {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index 41e81f4b0ab..f3cf694686e 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -28,6 +28,7 @@ import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.api.extension.ExtensionContext;
@@ -37,7 +38,6 @@ import org.junit.jupiter.params.provider.ArgumentsSource;
 import javax.sql.DataSource;
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.List;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
@@ -63,22 +63,22 @@ class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
                 KeyGenerateAlgorithm generateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
                 
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, 
generateAlgorithm, SOURCE_TABLE_NAME, 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
             }
-            addMigrationProcessConfig(containerComposer);
+            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer);
+            distSQLFacade.alterPipelineRule();
             addMigrationSourceResource(containerComposer);
             addMigrationTargetResource(containerComposer);
             createTargetOrderTableRule(containerComposer);
             startMigration(containerComposer, SOURCE_TABLE_NAME, 
TARGET_TABLE_NAME);
-            String jobId = listJobId(containerComposer).get(0);
+            String jobId = distSQLFacade.listJobIds().get(0);
             containerComposer.waitJobPrepareSuccess(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
             containerComposer.sourceExecuteWithLog("INSERT INTO t_order 
(order_id, user_id, status) VALUES ('a1', 1, 'OK')");
             DataSource jdbcDataSource = 
containerComposer.generateShardingSphereDataSourceFromProxy();
             containerComposer.assertOrderRecordExist(jdbcDataSource, 
"t_order", "a1");
             containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
             assertCheckMigrationSuccess(containerComposer, jobId, 
"CRC32_MATCH");
-            commitMigrationByJobId(containerComposer, jobId);
+            distSQLFacade.commit(jobId);
             
assertThat(containerComposer.getTargetTableRecordsCount(jdbcDataSource, 
SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
-            List<String> lastJobIds = listJobId(containerComposer);
-            assertTrue(lastJobIds.isEmpty());
+            assertTrue(distSQLFacade.listJobIds().isEmpty());
         }
     }
     
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
index d48b91ddfe5..311cf6fa05e 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
@@ -28,6 +28,7 @@ import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
 import org.junit.jupiter.api.condition.EnabledIf;
 import org.junit.jupiter.api.extension.ExtensionContext;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -35,7 +36,6 @@ import org.junit.jupiter.params.provider.ArgumentsSource;
 
 import java.sql.Connection;
 import java.sql.SQLException;
-import java.util.List;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -57,19 +57,19 @@ class TextPrimaryKeyMigrationE2EIT extends 
AbstractMigrationE2EIT {
                 UUIDKeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
                 
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, 
keyGenerateAlgorithm, getSourceTableName(containerComposer), 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
             }
-            addMigrationProcessConfig(containerComposer);
+            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer);
+            distSQLFacade.alterPipelineRule();
             addMigrationSourceResource(containerComposer);
             addMigrationTargetResource(containerComposer);
             createTargetOrderTableRule(containerComposer);
             startMigration(containerComposer, 
getSourceTableName(containerComposer), TARGET_TABLE_NAME);
-            String jobId = listJobId(containerComposer).get(0);
+            String jobId = distSQLFacade.listJobIds().get(0);
             containerComposer.sourceExecuteWithLog(
                     String.format("INSERT INTO %s (order_id,user_id,status) 
VALUES (%s, %s, '%s')", getSourceTableName(containerComposer), "1000000000", 1, 
"afterStop"));
             containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
             assertCheckMigrationSuccess(containerComposer, jobId, 
"DATA_MATCH");
-            commitMigrationByJobId(containerComposer, jobId);
-            List<String> lastJobIds = listJobId(containerComposer);
-            assertTrue(lastJobIds.isEmpty());
+            distSQLFacade.commit(jobId);
+            assertTrue(distSQLFacade.listJobIds().isEmpty());
         }
     }
     
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/command/MigrationDistSQLCommand.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/command/MigrationDistSQLCommand.java
index 8687b4c65e5..b8d094e5c79 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/command/MigrationDistSQLCommand.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/command/MigrationDistSQLCommand.java
@@ -30,10 +30,6 @@ import javax.xml.bind.annotation.XmlRootElement;
 @Setter
 public final class MigrationDistSQLCommand {
     
-    @XmlElement(name = "alter-migration-rule")
-    @Getter
-    private String alterMigrationRule;
-    
     @XmlElement(name = "create-target-order-table-encrypt-rule")
     @Getter
     private String createTargetOrderTableEncryptRule;
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
new file mode 100644
index 00000000000..d1b1b882342
--- /dev/null
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.operation.pipeline.util;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
+import org.awaitility.Awaitility;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@RequiredArgsConstructor
+@Getter
+public final class PipelineE2EDistSQLFacade {
+    
+    private static final String PIPELINE_RULE_SQL_TEMPLATE = "ALTER %s RULE(\n"
+            + "READ(WORKER_THREAD=20, BATCH_SIZE=1000, SHARDING_SIZE=100000, 
RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))),\n"
+            + "WRITE(WORKER_THREAD=20, BATCH_SIZE=1000, RATE_LIMITER 
(TYPE(NAME='TPS',PROPERTIES('tps'='2000')))),\n"
+            + "STREAM_CHANNEL(TYPE(NAME='MEMORY', 
PROPERTIES('block-queue-size'=1000))))";
+    
+    private final String jobTypeName;
+    
+    private final PipelineContainerComposer containerComposer;
+    
+    public PipelineE2EDistSQLFacade(final PipelineContainerComposer 
containerComposer) {
+        this(containerComposer.getJobType().getType(), containerComposer);
+    }
+    
+    /**
+     * Load all single tables.
+     *
+     * @throws SQLException if there's DistSQL execution failure
+     */
+    public void loadAllSingleTables() throws SQLException {
+        containerComposer.proxyExecuteWithLog("LOAD SINGLE TABLE *.*", 5);
+    }
+    
+    /**
+     * Create broadcast rule.
+     *
+     * @param tableName table name
+     * @throws SQLException if there's DistSQL execution failure
+     */
+    public void createBroadcastRule(final String tableName) throws 
SQLException {
+        containerComposer.proxyExecuteWithLog(String.format("CREATE BROADCAST 
TABLE RULE %s", tableName), 2);
+    }
+    
+    /**
+     * Alter pipeline rule.
+     *
+     * @throws SQLException if there's DistSQL execution failure
+     */
+    public void alterPipelineRule() throws SQLException {
+        
containerComposer.proxyExecuteWithLog(String.format(PIPELINE_RULE_SQL_TEMPLATE, 
jobTypeName), 2);
+    }
+    
+    /**
+     * List job ids.
+     *
+     * @return job ids
+     */
+    public List<String> listJobIds() {
+        return listJobs().stream().map(a -> 
a.get("id").toString()).collect(Collectors.toList());
+    }
+    
+    /**
+     * List jobs.
+     *
+     * @return jobs
+     */
+    public List<Map<String, Object>> listJobs() {
+        return containerComposer.queryForListWithLog(String.format("SHOW %s 
LIST", jobTypeName));
+    }
+    
+    /**
+     * Get job id by table name.
+     *
+     * @param tableName table name
+     * @return job id
+     */
+    public String getJobIdByTableName(final String tableName) {
+        return listJobs().stream().filter(a -> 
a.get("tables").toString().equals(tableName)).findFirst()
+                .orElseThrow(() -> new RuntimeException("Could not find job by 
table name: `" + tableName + "` table`")).get("id").toString();
+    }
+    
+    /**
+     * Pause job.
+     *
+     * @param jobId job id
+     * @throws SQLException SQL exception
+     */
+    public void pauseJob(final String jobId) throws SQLException {
+        containerComposer.proxyExecuteWithLog(String.format("STOP %s %s", 
jobTypeName, jobId), 1);
+    }
+    
+    /**
+     * Resume job.
+     *
+     * @param jobId job id
+     * @throws SQLException SQL exception
+     */
+    public void resumeJob(final String jobId) throws SQLException {
+        containerComposer.proxyExecuteWithLog(String.format("START %s %s", 
jobTypeName, jobId), 5);
+    }
+    
+    /**
+     * Rollback job.
+     *
+     * @param jobId job id
+     * @throws SQLException SQL exception
+     */
+    public void rollback(final String jobId) throws SQLException {
+        containerComposer.proxyExecuteWithLog(String.format("ROLLBACK %s %s", 
jobTypeName, jobId), 2);
+    }
+    
+    /**
+     * Commit job.
+     *
+     * @param jobId job id
+     * @throws SQLException SQL exception
+     */
+    public void commit(final String jobId) throws SQLException {
+        containerComposer.proxyExecuteWithLog(String.format("COMMIT %s %s", 
jobTypeName, jobId), 2);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() -> 
!listJobIds().contains(jobId));
+    }
+}
diff --git 
a/test/e2e/operation/pipeline/src/test/resources/env/common/migration-command.xml
 
b/test/e2e/operation/pipeline/src/test/resources/env/common/migration-command.xml
index 1659848eba4..ee965154163 100644
--- 
a/test/e2e/operation/pipeline/src/test/resources/env/common/migration-command.xml
+++ 
b/test/e2e/operation/pipeline/src/test/resources/env/common/migration-command.xml
@@ -16,22 +16,6 @@
   -->
 
 <command>
-    <alter-migration-rule>
-        ALTER MIGRATION RULE (
-        READ(
-        WORKER_THREAD=20,
-        BATCH_SIZE=1000,
-        SHARDING_SIZE=10000000,
-        RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))
-        ),
-        WRITE(
-        WORKER_THREAD=20,
-        BATCH_SIZE=1000,
-        RATE_LIMITER (TYPE(NAME='TPS',PROPERTIES('tps'='2000')))
-        ),
-        STREAM_CHANNEL 
(TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000')))
-        );
-    </alter-migration-rule>
     
     <register-migration-source-storage-unit-template>
         REGISTER MIGRATION SOURCE STORAGE UNIT ds_0 (

Reply via email to