This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new a533f1e1e77 Refactor PipelineContainerComposer and 
PipelineE2EDistSQLFacade to support running several types of jobs in any E2E 
(#37758)
a533f1e1e77 is described below

commit a533f1e1e77574e1b96a77d818edbda95abfe9af
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Jan 16 20:39:21 2026 +0800

    Refactor PipelineContainerComposer and PipelineE2EDistSQLFacade to support 
running several types of jobs in any E2E (#37758)
    
    * Improve PipelineContainerComposer.cleanUpPipelineJobs, clean all types of 
job and remove jobType param
    
    * Add jobType param in PipelineE2EDistSQLFacade constructor
    
    * Remove PipelineContainerComposer constructor jobType param
---
 .../pipeline/cases/PipelineContainerComposer.java  | 23 ++++++++++++++--------
 .../e2e/operation/pipeline/cases/cdc/CDCE2EIT.java |  8 +++-----
 .../general/MySQLMigrationGeneralE2EIT.java        |  4 ++--
 .../general/MySQLTimeTypesMigrationE2EIT.java      |  4 ++--
 .../general/PostgreSQLMigrationGeneralE2EIT.java   |  4 ++--
 .../general/PostgreSQLToMySQLMigrationE2EIT.java   |  4 ++--
 .../migration/general/RollbackMigrationE2EIT.java  |  4 ++--
 .../migration/general/RulesMigrationE2EIT.java     |  6 +++---
 .../primarykey/IndexesMigrationE2EIT.java          | 10 +++++-----
 .../primarykey/MariaDBMigrationE2EIT.java          |  4 ++--
 .../primarykey/TextPrimaryKeyMigrationE2EIT.java   |  4 ++--
 .../pipeline/util/PipelineE2EDistSQLFacade.java    |  9 +++++----
 12 files changed, 45 insertions(+), 39 deletions(-)

diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
index 1ebcf787b41..28f0bdfc838 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
@@ -35,6 +35,7 @@ import 
org.apache.shardingsphere.database.connector.mysql.type.MySQLDatabaseType
 import 
org.apache.shardingsphere.database.connector.opengauss.type.OpenGaussDatabaseType;
 import 
org.apache.shardingsphere.database.connector.postgresql.type.PostgreSQLDatabaseType;
 import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
 import org.apache.shardingsphere.infra.util.props.PropertiesBuilder;
 import org.apache.shardingsphere.infra.util.props.PropertiesBuilder.Property;
 import org.apache.shardingsphere.infra.util.yaml.YamlEngine;
@@ -118,11 +119,9 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     private final DataSource proxyDataSource;
     
-    private final PipelineJobType<?> jobType;
-    
     private Thread increaseTaskThread;
     
-    public PipelineContainerComposer(final PipelineTestParameter testParam, 
final PipelineJobType<?> jobType) {
+    public PipelineContainerComposer(final PipelineTestParameter testParam) {
         databaseType = testParam.getDatabaseType();
         Type type = 
E2ETestEnvironment.getInstance().getRunEnvironment().getType();
         containerComposer = Type.DOCKER == type
@@ -140,15 +139,14 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         sourceDataSource = 
StorageContainerUtils.generateDataSource(getActualJdbcUrlTemplate(DS_0, false), 
username, password, 2);
         proxyDataSource = StorageContainerUtils.generateDataSource(
                 
appendExtraParameter(containerComposer.getProxyJdbcUrl(PROXY_DATABASE)), 
ProxyContainerConstants.USER, ProxyContainerConstants.PASSWORD, 2);
-        this.jobType = jobType;
-        init(jobType);
+        init();
     }
     
     @SneakyThrows(SQLException.class)
-    private void init(final PipelineJobType<?> jobType) {
+    private void init() {
         String jdbcUrl = containerComposer.getProxyJdbcUrl(databaseType 
instanceof PostgreSQLDatabaseType || databaseType instanceof 
OpenGaussDatabaseType ? "postgres" : "");
         try (Connection connection = DriverManager.getConnection(jdbcUrl, 
ProxyContainerConstants.USER, ProxyContainerConstants.PASSWORD)) {
-            cleanUpPipelineJobs(connection, jobType);
+            cleanUpPipelineJobs(connection);
             cleanUpProxyDatabase(connection);
             // Compatible with "drop database if exists sharding_db;" failed 
for now
             cleanUpProxyDatabase(connection);
@@ -157,10 +155,19 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         cleanUpDataSource();
     }
     
-    private void cleanUpPipelineJobs(final Connection connection, final 
PipelineJobType<?> jobType) throws SQLException {
+    private void cleanUpPipelineJobs(final Connection connection) throws 
SQLException {
         if (Type.NATIVE != 
E2ETestEnvironment.getInstance().getRunEnvironment().getType()) {
             return;
         }
+        for (PipelineJobType<?> each : 
ShardingSphereServiceLoader.getServiceInstances(PipelineJobType.class)) {
+            if (!each.getOption().isTransmissionJob()) {
+                continue;
+            }
+            cleanUpPipelineJobsWithType(connection, each);
+        }
+    }
+    
+    private void cleanUpPipelineJobsWithType(final Connection connection, 
final PipelineJobType<?> jobType) throws SQLException {
         String jobTypeName = jobType.getType();
         for (Map<String, Object> each : queryJobs(connection, jobTypeName)) {
             String jobId = each.get("id").toString();
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
index d5d6c94a85e..99d27883a9f 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
@@ -98,15 +98,13 @@ class CDCE2EIT {
     @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     void assertCDCDataImportSuccess(final PipelineTestParameter testParam) 
throws SQLException {
         TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
-        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new CDCJobType())) {
-            String alterStreamingRule = "ALTER STREAMING RULE 
(READ(WORKER_THREAD=20,BATCH_SIZE=1000,SHARDING_SIZE=10000000,RATE_LIMITER(TYPE(NAME='QPS',PROPERTIES('qps'='10000')))),"
-                    + "WRITE(WORKER_THREAD=20,BATCH_SIZE=1000, 
RATE_LIMITER(TYPE(NAME='TPS',PROPERTIES('tps'='10000')))),STREAM_CHANNEL(TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000'))))";
-            containerComposer.proxyExecuteWithLog(alterStreamingRule, 0);
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam)) {
+            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new CDCJobType());
+            distSQLFacade.alterPipelineRule();
             for (String each : Arrays.asList(PipelineContainerComposer.DS_0, 
PipelineContainerComposer.DS_1)) {
                 containerComposer.registerStorageUnit(each);
             }
             createOrderTableRule(containerComposer);
-            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer);
             distSQLFacade.createBroadcastRule("t_address");
             try (Connection connection = 
containerComposer.getProxyDataSource().getConnection()) {
                 initSchemaAndTable(containerComposer, connection, 3);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 8650306176c..1f3a9cf2429 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -62,8 +62,8 @@ class MySQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
     @EnabledIf("isEnabled")
     @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     void assertMigrationSuccess(final PipelineTestParameter testParam) throws 
SQLException, InterruptedException {
-        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
-            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer);
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam)) {
+            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
             distSQLFacade.alterPipelineRule();
             containerComposer.createSourceOrderTable(SOURCE_TABLE_NAME);
             containerComposer.createSourceOrderItemTable();
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
index ed5812ff1cd..96da7255b1e 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
@@ -45,7 +45,7 @@ class MySQLTimeTypesMigrationE2EIT extends 
AbstractMigrationE2EIT {
     @EnabledIf("isEnabled")
     @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     void assertIllegalTimeTypesValueMigrationSuccess(final 
PipelineTestParameter testParam) throws Exception {
-        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam)) {
             String sql = "CREATE TABLE `time_e2e` ( `id` int NOT NULL, 
`t_timestamp` timestamp NULL DEFAULT NULL, `t_datetime` datetime DEFAULT NULL, 
`t_date` date DEFAULT NULL, "
                     + "`t_year` year DEFAULT NULL, PRIMARY KEY (`id`)) 
ENGINE=InnoDB;";
             containerComposer.sourceExecuteWithLog(sql);
@@ -53,7 +53,7 @@ class MySQLTimeTypesMigrationE2EIT extends 
AbstractMigrationE2EIT {
             addMigrationSourceResource(containerComposer);
             addMigrationTargetResource(containerComposer);
             startMigration(containerComposer, "time_e2e", "time_e2e");
-            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer);
+            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
             String jobId = distSQLFacade.listJobIds().get(0);
             containerComposer.waitJobPrepareSuccess(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
             insertOneRecordWithZeroValue(containerComposer, 2);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 0fbfdf2d732..b61b4adbc29 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -64,8 +64,8 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
     @EnabledIf("isEnabled")
     @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     void assertMigrationSuccess(final PipelineTestParameter testParam) throws 
SQLException, InterruptedException {
-        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
-            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer);
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam)) {
+            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
             distSQLFacade.alterPipelineRule();
             createSourceSchema(containerComposer, 
PipelineContainerComposer.SCHEMA_NAME);
             containerComposer.createSourceOrderTable(SOURCE_TABLE_NAME);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
index a41d0137748..fdf78950802 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
@@ -63,7 +63,7 @@ class PostgreSQLToMySQLMigrationE2EIT extends 
AbstractMigrationE2EIT {
     void assertMigrationSuccess(final PipelineTestParameter testParam) throws 
SQLException {
         PostgreSQLContainer<?> postgresqlContainer = null;
         Type type = 
E2ETestEnvironment.getInstance().getRunEnvironment().getType();
-        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam)) {
             if (Type.DOCKER == type) {
                 postgresqlContainer = new PostgreSQLContainer<>("postgres:13");
                 
postgresqlContainer.withNetwork(containerComposer.getContainerComposer().getContainers().getNetwork()).withNetworkAliases("postgresql.host")
@@ -77,7 +77,7 @@ class PostgreSQLToMySQLMigrationE2EIT extends 
AbstractMigrationE2EIT {
                     + "KEY_GENERATE_STRATEGY(COLUMN=order_id, 
TYPE(NAME='snowflake')))", 2);
             initTargetTable(containerComposer);
             containerComposer.proxyExecuteWithLog("MIGRATE TABLE 
source_ds.t_order INTO t_order", 2);
-            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer);
+            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
             Awaitility.await().ignoreExceptions().atMost(10L, 
TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> 
!distSQLFacade.listJobIds().isEmpty());
             String jobId = distSQLFacade.listJobIds().get(0);
             containerComposer.waitJobStatusReached(String.format("SHOW 
MIGRATION STATUS %s", jobId), JobStatus.EXECUTE_INCREMENTAL_TASK, 15);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
index ec1c742467d..d173d3547c2 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
@@ -43,7 +43,7 @@ class RollbackMigrationE2EIT extends AbstractMigrationE2EIT {
     @EnabledIf("isEnabled")
     @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     void assertIllegalTimeTypesValueMigrationSuccess(final 
PipelineTestParameter testParam) throws Exception {
-        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam)) {
             String sql = "CREATE TABLE t_order (order_id BIGINT PRIMARY KEY, 
user_id INT, status VARCHAR(50))";
             containerComposer.sourceExecuteWithLog(sql);
             try (Connection connection = 
containerComposer.getSourceDataSource().getConnection()) {
@@ -52,7 +52,7 @@ class RollbackMigrationE2EIT extends AbstractMigrationE2EIT {
             addMigrationSourceResource(containerComposer);
             addMigrationTargetResource(containerComposer);
             startMigration(containerComposer, "t_order", "t_order");
-            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer);
+            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
             String jobId = distSQLFacade.listJobIds().get(0);
             distSQLFacade.rollback(jobId);
             assertTrue(distSQLFacade.listJobIds().isEmpty());
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index 4486edb8899..952c7493ca6 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -58,7 +58,7 @@ class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
     @EnabledIf("isEnabled")
     @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     void assertNoRuleMigrationSuccess(final PipelineTestParameter testParam) 
throws Exception {
-        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam)) {
             assertMigrationSuccess(containerComposer, null);
         }
     }
@@ -67,7 +67,7 @@ class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
     @EnabledIf("isEnabled")
     @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     void assertOnlyEncryptRuleMigrationSuccess(final PipelineTestParameter 
testParam) throws Exception {
-        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam)) {
             assertMigrationSuccess(containerComposer, () -> {
                 createTargetOrderTableEncryptRule(containerComposer);
                 return null;
@@ -86,7 +86,7 @@ class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
             addRuleFn.call();
         }
         startMigration(containerComposer, SOURCE_TABLE_NAME, 
TARGET_TABLE_NAME);
-        PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer);
+        PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
         String jobId = distSQLFacade.listJobIds().get(0);
         containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION 
STATUS '%s'", jobId));
         containerComposer.waitIncrementTaskFinished(String.format("SHOW 
MIGRATION STATUS '%s'", jobId));
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index ef8d02a8d7e..28934494e74 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -78,7 +78,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
     @EnabledIf("isEnabled")
     @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     void assertNoUniqueKeyMigrationSuccess(final PipelineTestParameter 
testParam) throws Exception {
-        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam)) {
             String sql;
             String consistencyCheckAlgorithmType;
             if (containerComposer.getDatabaseType() instanceof 
MySQLDatabaseType) {
@@ -162,7 +162,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
     @EnabledIf("isEnabled")
     @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     void assertMultiPrimaryKeyMigrationSuccess(final PipelineTestParameter 
testParam) throws Exception {
-        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam)) {
             String sql;
             String consistencyCheckAlgorithmType;
             if (containerComposer.getDatabaseType() instanceof 
MySQLDatabaseType) {
@@ -185,7 +185,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
     @EnabledIf("isEnabled")
     @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     void assertMultiUniqueKeyMigrationSuccess(final PipelineTestParameter 
testParam) throws Exception {
-        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam)) {
             String sql;
             String consistencyCheckAlgorithmType;
             if (containerComposer.getDatabaseType() instanceof 
MySQLDatabaseType) {
@@ -208,7 +208,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
     @EnabledIf("isEnabled")
     @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     void assertSpecialTypeSingleColumnUniqueKeyMigrationSuccess(final 
PipelineTestParameter testParam) throws Exception {
-        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam)) {
             String sql;
             String consistencyCheckAlgorithmType;
             if (containerComposer.getDatabaseType() instanceof 
MySQLDatabaseType) {
@@ -235,7 +235,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
         try (Connection connection = 
containerComposer.getSourceDataSource().getConnection()) {
             
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, 
keyGenerateAlgorithm, SOURCE_TABLE_NAME, 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
         }
-        PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer);
+        PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
         distSQLFacade.alterPipelineRule();
         addMigrationSourceResource(containerComposer);
         addMigrationTargetResource(containerComposer);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index f3cf694686e..df23e276776 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -56,14 +56,14 @@ class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
     @EnabledIf("isEnabled")
     @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     void assertMigrationSuccess(final PipelineTestParameter testParam) throws 
SQLException {
-        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam)) {
             String sqlPattern = "CREATE TABLE `%s` (`order_id` VARCHAR(64) NOT 
NULL, `user_id` INT NOT NULL, `status` varchar(255), PRIMARY KEY (`order_id`)) 
ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
             containerComposer.sourceExecuteWithLog(String.format(sqlPattern, 
SOURCE_TABLE_NAME));
             try (Connection connection = 
containerComposer.getSourceDataSource().getConnection()) {
                 KeyGenerateAlgorithm generateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
                 
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, 
generateAlgorithm, SOURCE_TABLE_NAME, 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
             }
-            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer);
+            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
             distSQLFacade.alterPipelineRule();
             addMigrationSourceResource(containerComposer);
             addMigrationTargetResource(containerComposer);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
index 311cf6fa05e..f29f3b64a31 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
@@ -51,13 +51,13 @@ class TextPrimaryKeyMigrationE2EIT extends 
AbstractMigrationE2EIT {
     @EnabledIf("isEnabled")
     @ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
     void assertTextPrimaryMigrationSuccess(final PipelineTestParameter 
testParam) throws SQLException {
-        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam, new MigrationJobType())) {
+        try (PipelineContainerComposer containerComposer = new 
PipelineContainerComposer(testParam)) {
             
containerComposer.createSourceOrderTable(getSourceTableName(containerComposer));
             try (Connection connection = 
containerComposer.getSourceDataSource().getConnection()) {
                 UUIDKeyGenerateAlgorithm keyGenerateAlgorithm = new 
UUIDKeyGenerateAlgorithm();
                 
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection, 
keyGenerateAlgorithm, getSourceTableName(containerComposer), 
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
             }
-            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer);
+            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
             distSQLFacade.alterPipelineRule();
             addMigrationSourceResource(containerComposer);
             addMigrationTargetResource(containerComposer);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
index d1b1b882342..27081b183fa 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
@@ -19,6 +19,7 @@ package 
org.apache.shardingsphere.test.e2e.operation.pipeline.util;
 
 import lombok.Getter;
 import lombok.RequiredArgsConstructor;
+import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
 import org.awaitility.Awaitility;
 
@@ -37,12 +38,12 @@ public final class PipelineE2EDistSQLFacade {
             + "WRITE(WORKER_THREAD=20, BATCH_SIZE=1000, RATE_LIMITER 
(TYPE(NAME='TPS',PROPERTIES('tps'='2000')))),\n"
             + "STREAM_CHANNEL(TYPE(NAME='MEMORY', 
PROPERTIES('block-queue-size'=1000))))";
     
-    private final String jobTypeName;
-    
     private final PipelineContainerComposer containerComposer;
     
-    public PipelineE2EDistSQLFacade(final PipelineContainerComposer 
containerComposer) {
-        this(containerComposer.getJobType().getType(), containerComposer);
+    private final String jobTypeName;
+    
+    public PipelineE2EDistSQLFacade(final PipelineContainerComposer 
containerComposer, final PipelineJobType<?> jobType) {
+        this(containerComposer, jobType.getType());
     }
     
     /**

Reply via email to