This is an automated email from the ASF dual-hosted git repository.

zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git


The following commit(s) were added to refs/heads/master by this push:
     new 940b38e3d87 Extract registerStorageUnit to PipelineE2EDistSQLFacade 
(#37796)
940b38e3d87 is described below

commit 940b38e3d879dd520592da76cee12705ecb6b42d
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Jan 21 15:42:18 2026 +0800

    Extract registerStorageUnit to PipelineE2EDistSQLFacade (#37796)
    
    * Use Awaitility.waitAtMost
    
    * Extract registerStorageUnit to PipelineE2EDistSQLFacade
---
 .../pipeline/cases/PipelineContainerComposer.java  | 20 ++-----------------
 .../e2e/operation/pipeline/cases/cdc/CDCE2EIT.java |  8 ++++----
 .../cases/migration/AbstractMigrationE2EIT.java    |  6 +++---
 .../general/PostgreSQLMigrationGeneralE2EIT.java   |  2 +-
 .../general/PostgreSQLToMySQLMigrationE2EIT.java   |  8 ++++----
 .../pipeline/util/PipelineE2EDistSQLFacade.java    | 23 +++++++++++++++++++++-
 6 files changed, 36 insertions(+), 31 deletions(-)

diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
index a9349dad669..634f98fd71e 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
@@ -87,6 +87,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
     
     public static final String SCHEMA_NAME = "test";
     
+    // TODO Refactor: storage unit name and actual data source name are 
different
     public static final String DS_0 = "pipeline_e2e_0";
     
     public static final String DS_1 = "pipeline_e2e_1";
@@ -257,23 +258,6 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
         return jdbcUrl;
     }
     
-    /**
-     * Register storage unit.
-     *
-     * @param storageUnitName storage unit name
-     * @throws SQLException SQL exception
-     */
-    public void registerStorageUnit(final String storageUnitName) throws 
SQLException {
-        String username = ProxyDatabaseTypeUtils.isOracleBranch(databaseType) 
? storageUnitName : getUsername();
-        String registerStorageUnitTemplate = "REGISTER STORAGE UNIT ${ds} ( 
URL='${url}', USER='${user}', PASSWORD='${password}')".replace("${ds}", 
storageUnitName)
-                .replace("${user}", username)
-                .replace("${password}", getPassword())
-                .replace("${url}", getActualJdbcUrlTemplate(storageUnitName, 
Type.DOCKER == E2ETestEnvironment.getInstance().getRunEnvironment().getType()));
-        proxyExecuteWithLog(registerStorageUnitTemplate, 0);
-        int timeout = databaseType instanceof OpenGaussDatabaseType ? 60 : 10;
-        Awaitility.await().ignoreExceptions().atMost(timeout, 
TimeUnit.SECONDS).pollInterval(3L, TimeUnit.SECONDS).until(() -> 
showStorageUnitsName().contains(storageUnitName));
-    }
-    
     /**
      * Show storage units names.
      *
@@ -564,7 +548,7 @@ public final class PipelineContainerComposer implements 
AutoCloseable {
      */
     // TODO proxy support for some fields still needs to be optimized, such as 
binary of MySQL, after these problems are optimized, Proxy dataSource can be 
used.
     public DataSource generateShardingSphereDataSourceFromProxy() {
-        Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> null != getYamlRootConfig().getRules());
+        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> null != getYamlRootConfig().getRules());
         YamlRootConfiguration rootConfig = getYamlRootConfig();
         ShardingSpherePreconditions.checkNotNull(rootConfig.getDataSources(), 
() -> new IllegalStateException("dataSources is null"));
         ShardingSpherePreconditions.checkNotNull(rootConfig.getRules(), () -> 
new IllegalStateException("rules is null"));
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
index c10acdde3bc..e8840ad589a 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
@@ -102,7 +102,7 @@ class CDCE2EIT {
             PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new CDCJobType());
             distSQLFacade.alterPipelineRule();
             for (String each : Arrays.asList(PipelineContainerComposer.DS_0, 
PipelineContainerComposer.DS_1)) {
-                containerComposer.registerStorageUnit(each);
+                distSQLFacade.registerStorageUnit(each);
             }
             createOrderTableRule(containerComposer);
             distSQLFacade.createBroadcastRule("t_address");
@@ -123,7 +123,7 @@ class CDCE2EIT {
             }
             PipelineDataSource targetDataSource = 
createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4);
             final CDCClient cdcClient = 
buildCDCClientAndStart(targetDataSource, containerComposer);
-            Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
+            Awaitility.waitAtMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
             String jobId = distSQLFacade.listJobIds().get(0);
             distSQLFacade.waitJobIncrementalStageFinished(jobId);
             DialectDatabaseMetaData dialectDatabaseMetaData = new 
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
@@ -142,7 +142,7 @@ class CDCE2EIT {
             assertDataMatched(sourceDataSource, targetDataSource, new 
QualifiedTable(null, "t_address"));
             assertDataMatched(sourceDataSource, targetDataSource, new 
QualifiedTable(null, "t_single"));
             cdcClient.close();
-            Awaitility.await().atMost(10L, 
TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS)
+            Awaitility.waitAtMost(10L, TimeUnit.SECONDS).pollInterval(500L, 
TimeUnit.MILLISECONDS)
                     .until(() -> 
distSQLFacade.listJobs().stream().noneMatch(each -> 
Boolean.parseBoolean(each.get("active").toString())));
             distSQLFacade.drop(jobId);
             assertTrue(distSQLFacade.listJobs().isEmpty());
@@ -151,7 +151,7 @@ class CDCE2EIT {
     
     private void createOrderTableRule(final PipelineContainerComposer 
containerComposer) throws SQLException {
         containerComposer.proxyExecuteWithLog(CREATE_SHARDING_RULE_SQL, 0);
-        Awaitility.await().atMost(20L, TimeUnit.SECONDS).pollInterval(2L, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW 
SHARDING TABLE RULE t_order").isEmpty());
+        Awaitility.waitAtMost(20L, TimeUnit.SECONDS).pollInterval(2L, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW 
SHARDING TABLE RULE t_order").isEmpty());
     }
     
     private void initSchemaAndTable(final PipelineContainerComposer 
containerComposer, final Connection connection, final int seconds) throws 
SQLException {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
index c6077ece811..cc91b9028e5 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -64,7 +64,7 @@ public abstract class AbstractMigrationE2EIT {
                 .replace("${ds3}", 
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_3, 
true))
                 .replace("${ds4}", 
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4, 
true));
         containerComposer.proxyExecuteWithLog(addTargetResource, 0);
-        Awaitility.await().ignoreExceptions().atMost(60L, 
TimeUnit.SECONDS).pollInterval(3L, TimeUnit.SECONDS).until(() -> 3 == 
containerComposer.showStorageUnitsName().size());
+        Awaitility.waitAtMost(60L, 
TimeUnit.SECONDS).ignoreExceptions().pollInterval(3L, 
TimeUnit.SECONDS).until(() -> 3 == 
containerComposer.showStorageUnitsName().size());
     }
     
     protected void createSourceSchema(final PipelineContainerComposer 
containerComposer, final String schemaName) throws SQLException {
@@ -88,7 +88,7 @@ public abstract class AbstractMigrationE2EIT {
     
     protected void createTargetOrderTableRule(final PipelineContainerComposer 
containerComposer) throws SQLException {
         
containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderTableRule(),
 0);
-        Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW 
SHARDING TABLE RULE t_order").isEmpty());
+        Awaitility.waitAtMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW 
SHARDING TABLE RULE t_order").isEmpty());
     }
     
     protected void createTargetOrderTableEncryptRule(final 
PipelineContainerComposer containerComposer) throws SQLException {
@@ -97,7 +97,7 @@ public abstract class AbstractMigrationE2EIT {
     
     protected void createTargetOrderItemTableRule(final 
PipelineContainerComposer containerComposer) throws SQLException {
         
containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderItemTableRule(),
 0);
-        Awaitility.await().atMost(4L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW 
SHARDING TABLE RULE t_order_item").isEmpty());
+        Awaitility.waitAtMost(4L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW 
SHARDING TABLE RULE t_order_item").isEmpty());
     }
     
     protected void startMigration(final PipelineContainerComposer 
containerComposer, final String sourceTableName, final String targetTableName) 
throws SQLException {
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 8ea80194942..7aa954e155a 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -82,7 +82,7 @@ class PostgreSQLMigrationGeneralE2EIT extends 
AbstractMigrationE2EIT {
             int replicationSlotsCount = 
getReplicationSlotsCount(containerComposer);
             log.info("init data end: {}, replication slots count: {}", 
LocalDateTime.now(), replicationSlotsCount);
             startMigrationWithSchema(containerComposer, SOURCE_TABLE_NAME, 
"t_order");
-            Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
+            Awaitility.waitAtMost(10L, TimeUnit.SECONDS).pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
             String jobId = distSQLFacade.getJobIdByTableName("ds_0.test." + 
SOURCE_TABLE_NAME);
             distSQLFacade.waitJobPreparingStageFinished(jobId);
             String qualifiedTableName = String.join(".", 
PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME);
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
index 94e9ecd4341..07577cc7bc5 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
@@ -71,13 +71,13 @@ class PostgreSQLToMySQLMigrationE2EIT extends 
AbstractMigrationE2EIT {
             String jdbcUrl = Type.DOCKER == type ? 
postgresqlContainer.getJdbcUrl() : "jdbc:postgresql://localhost:5432/postgres";
             initSourceTable(jdbcUrl);
             registerMigrationSourceStorageUnit(containerComposer);
-            
containerComposer.registerStorageUnit(PipelineContainerComposer.DS_0);
+            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
+            distSQLFacade.registerStorageUnit(PipelineContainerComposer.DS_0);
             containerComposer.proxyExecuteWithLog("CREATE SHARDING TABLE RULE 
t_order(STORAGE_UNITS(pipeline_e2e_0),SHARDING_COLUMN=order_id,TYPE(NAME='hash_mod',PROPERTIES('sharding-count'='2')),"
                     + "KEY_GENERATE_STRATEGY(COLUMN=order_id, 
TYPE(NAME='snowflake')))", 2);
             initTargetTable(containerComposer);
             containerComposer.proxyExecuteWithLog("MIGRATE TABLE 
source_ds.t_order INTO t_order", 2);
-            PipelineE2EDistSQLFacade distSQLFacade = new 
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
-            Awaitility.await().ignoreExceptions().atMost(10L, 
TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() -> 
!distSQLFacade.listJobIds().isEmpty());
+            Awaitility.waitAtMost(10L, 
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1L, 
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
             String jobId = distSQLFacade.listJobIds().get(0);
             distSQLFacade.waitJobIncrementalStageStarted(jobId);
             try (Connection connection = DriverManager.getConnection(jdbcUrl, 
"postgres", "postgres")) {
@@ -150,7 +150,7 @@ class PostgreSQLToMySQLMigrationE2EIT extends 
AbstractMigrationE2EIT {
     
     private static boolean waitForTableExistence(final Connection connection, 
final String tableName) {
         try {
-            Awaitility.await().ignoreExceptions().atMost(60L, 
TimeUnit.SECONDS).pollInterval(3L, TimeUnit.SECONDS).until(() -> 
tableExists(connection, tableName));
+            Awaitility.waitAtMost(60L, 
TimeUnit.SECONDS).ignoreExceptions().pollInterval(3L, 
TimeUnit.SECONDS).until(() -> tableExists(connection, tableName));
             return true;
         } catch (final ConditionTimeoutException ex) {
             return false;
diff --git 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
index 8b28160f562..404d0a529b3 100644
--- 
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
+++ 
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
@@ -22,6 +22,9 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
 import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
+import 
org.apache.shardingsphere.database.connector.opengauss.type.OpenGaussDatabaseType;
+import org.apache.shardingsphere.test.e2e.env.runtime.E2ETestEnvironment;
+import org.apache.shardingsphere.test.e2e.env.runtime.type.RunEnvironment.Type;
 import 
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
 import org.awaitility.Awaitility;
 
@@ -58,6 +61,23 @@ public final class PipelineE2EDistSQLFacade {
         jobTypeName = jobType.getType();
     }
     
+    /**
+     * Register storage unit.
+     *
+     * @param storageUnitName storage unit name
+     * @throws SQLException SQL exception
+     */
+    public void registerStorageUnit(final String storageUnitName) throws 
SQLException {
+        String username = 
ProxyDatabaseTypeUtils.isOracleBranch(containerComposer.getDatabaseType()) ? 
storageUnitName : containerComposer.getUsername();
+        String registerStorageUnitSQL = "REGISTER STORAGE UNIT ${ds} ( 
URL='${url}', USER='${user}', PASSWORD='${password}')".replace("${ds}", 
storageUnitName)
+                .replace("${user}", username)
+                .replace("${password}", containerComposer.getPassword())
+                .replace("${url}", 
containerComposer.getActualJdbcUrlTemplate(storageUnitName, Type.DOCKER == 
E2ETestEnvironment.getInstance().getRunEnvironment().getType()));
+        containerComposer.proxyExecuteWithLog(registerStorageUnitSQL, 0);
+        int timeout = containerComposer.getDatabaseType() instanceof 
OpenGaussDatabaseType ? 60 : 10;
+        Awaitility.waitAtMost(timeout, 
TimeUnit.SECONDS).ignoreExceptions().pollInterval(3L, 
TimeUnit.SECONDS).until(() -> 
containerComposer.showStorageUnitsName().contains(storageUnitName));
+    }
+    
     /**
      * Load all single tables.
      *
@@ -142,7 +162,8 @@ public final class PipelineE2EDistSQLFacade {
      * @throws SQLException SQL exception
      */
     public void rollback(final String jobId) throws SQLException {
-        containerComposer.proxyExecuteWithLog(String.format("ROLLBACK %s %s", 
jobTypeName, jobId), 2);
+        String sql = String.format("ROLLBACK %s %s", jobTypeName, jobId);
+        containerComposer.proxyExecuteWithLog(sql, 2);
     }
     
     /**

Reply via email to