This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 940b38e3d87 Extract registerStorageUnit to PipelineE2EDistSQLFacade
(#37796)
940b38e3d87 is described below
commit 940b38e3d879dd520592da76cee12705ecb6b42d
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Jan 21 15:42:18 2026 +0800
Extract registerStorageUnit to PipelineE2EDistSQLFacade (#37796)
* Use Awaitility.waitAtMost
* Extract registerStorageUnit to PipelineE2EDistSQLFacade
---
.../pipeline/cases/PipelineContainerComposer.java | 20 ++-----------------
.../e2e/operation/pipeline/cases/cdc/CDCE2EIT.java | 8 ++++----
.../cases/migration/AbstractMigrationE2EIT.java | 6 +++---
.../general/PostgreSQLMigrationGeneralE2EIT.java | 2 +-
.../general/PostgreSQLToMySQLMigrationE2EIT.java | 8 ++++----
.../pipeline/util/PipelineE2EDistSQLFacade.java | 23 +++++++++++++++++++++-
6 files changed, 36 insertions(+), 31 deletions(-)
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
index a9349dad669..634f98fd71e 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
@@ -87,6 +87,7 @@ public final class PipelineContainerComposer implements
AutoCloseable {
public static final String SCHEMA_NAME = "test";
+ // TODO Refactor: storage unit name and actual data source name are
different
public static final String DS_0 = "pipeline_e2e_0";
public static final String DS_1 = "pipeline_e2e_1";
@@ -257,23 +258,6 @@ public final class PipelineContainerComposer implements
AutoCloseable {
return jdbcUrl;
}
- /**
- * Register storage unit.
- *
- * @param storageUnitName storage unit name
- * @throws SQLException SQL exception
- */
- public void registerStorageUnit(final String storageUnitName) throws
SQLException {
- String username = ProxyDatabaseTypeUtils.isOracleBranch(databaseType)
? storageUnitName : getUsername();
- String registerStorageUnitTemplate = "REGISTER STORAGE UNIT ${ds} (
URL='${url}', USER='${user}', PASSWORD='${password}')".replace("${ds}",
storageUnitName)
- .replace("${user}", username)
- .replace("${password}", getPassword())
- .replace("${url}", getActualJdbcUrlTemplate(storageUnitName,
Type.DOCKER == E2ETestEnvironment.getInstance().getRunEnvironment().getType()));
- proxyExecuteWithLog(registerStorageUnitTemplate, 0);
- int timeout = databaseType instanceof OpenGaussDatabaseType ? 60 : 10;
- Awaitility.await().ignoreExceptions().atMost(timeout,
TimeUnit.SECONDS).pollInterval(3L, TimeUnit.SECONDS).until(() ->
showStorageUnitsName().contains(storageUnitName));
- }
-
/**
* Show storage units names.
*
@@ -564,7 +548,7 @@ public final class PipelineContainerComposer implements
AutoCloseable {
*/
// TODO proxy support for some fields still needs to be optimized, such as
binary of MySQL, after these problems are optimized, Proxy dataSource can be
used.
public DataSource generateShardingSphereDataSourceFromProxy() {
- Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> null != getYamlRootConfig().getRules());
+ Awaitility.waitAtMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> null != getYamlRootConfig().getRules());
YamlRootConfiguration rootConfig = getYamlRootConfig();
ShardingSpherePreconditions.checkNotNull(rootConfig.getDataSources(),
() -> new IllegalStateException("dataSources is null"));
ShardingSpherePreconditions.checkNotNull(rootConfig.getRules(), () ->
new IllegalStateException("rules is null"));
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
index c10acdde3bc..e8840ad589a 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
@@ -102,7 +102,7 @@ class CDCE2EIT {
PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new CDCJobType());
distSQLFacade.alterPipelineRule();
for (String each : Arrays.asList(PipelineContainerComposer.DS_0,
PipelineContainerComposer.DS_1)) {
- containerComposer.registerStorageUnit(each);
+ distSQLFacade.registerStorageUnit(each);
}
createOrderTableRule(containerComposer);
distSQLFacade.createBroadcastRule("t_address");
@@ -123,7 +123,7 @@ class CDCE2EIT {
}
PipelineDataSource targetDataSource =
createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4);
final CDCClient cdcClient =
buildCDCClientAndStart(targetDataSource, containerComposer);
- Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
+ Awaitility.waitAtMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
String jobId = distSQLFacade.listJobIds().get(0);
distSQLFacade.waitJobIncrementalStageFinished(jobId);
DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
@@ -142,7 +142,7 @@ class CDCE2EIT {
assertDataMatched(sourceDataSource, targetDataSource, new
QualifiedTable(null, "t_address"));
assertDataMatched(sourceDataSource, targetDataSource, new
QualifiedTable(null, "t_single"));
cdcClient.close();
- Awaitility.await().atMost(10L,
TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS)
+ Awaitility.waitAtMost(10L, TimeUnit.SECONDS).pollInterval(500L,
TimeUnit.MILLISECONDS)
.until(() ->
distSQLFacade.listJobs().stream().noneMatch(each ->
Boolean.parseBoolean(each.get("active").toString())));
distSQLFacade.drop(jobId);
assertTrue(distSQLFacade.listJobs().isEmpty());
@@ -151,7 +151,7 @@ class CDCE2EIT {
private void createOrderTableRule(final PipelineContainerComposer
containerComposer) throws SQLException {
containerComposer.proxyExecuteWithLog(CREATE_SHARDING_RULE_SQL, 0);
- Awaitility.await().atMost(20L, TimeUnit.SECONDS).pollInterval(2L,
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW
SHARDING TABLE RULE t_order").isEmpty());
+ Awaitility.waitAtMost(20L, TimeUnit.SECONDS).pollInterval(2L,
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW
SHARDING TABLE RULE t_order").isEmpty());
}
private void initSchemaAndTable(final PipelineContainerComposer
containerComposer, final Connection connection, final int seconds) throws
SQLException {
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
index c6077ece811..cc91b9028e5 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -64,7 +64,7 @@ public abstract class AbstractMigrationE2EIT {
.replace("${ds3}",
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_3,
true))
.replace("${ds4}",
containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4,
true));
containerComposer.proxyExecuteWithLog(addTargetResource, 0);
- Awaitility.await().ignoreExceptions().atMost(60L,
TimeUnit.SECONDS).pollInterval(3L, TimeUnit.SECONDS).until(() -> 3 ==
containerComposer.showStorageUnitsName().size());
+ Awaitility.waitAtMost(60L,
TimeUnit.SECONDS).ignoreExceptions().pollInterval(3L,
TimeUnit.SECONDS).until(() -> 3 ==
containerComposer.showStorageUnitsName().size());
}
protected void createSourceSchema(final PipelineContainerComposer
containerComposer, final String schemaName) throws SQLException {
@@ -88,7 +88,7 @@ public abstract class AbstractMigrationE2EIT {
protected void createTargetOrderTableRule(final PipelineContainerComposer
containerComposer) throws SQLException {
containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderTableRule(),
0);
- Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW
SHARDING TABLE RULE t_order").isEmpty());
+ Awaitility.waitAtMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW
SHARDING TABLE RULE t_order").isEmpty());
}
protected void createTargetOrderTableEncryptRule(final
PipelineContainerComposer containerComposer) throws SQLException {
@@ -97,7 +97,7 @@ public abstract class AbstractMigrationE2EIT {
protected void createTargetOrderItemTableRule(final
PipelineContainerComposer containerComposer) throws SQLException {
containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderItemTableRule(),
0);
- Awaitility.await().atMost(4L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW
SHARDING TABLE RULE t_order_item").isEmpty());
+ Awaitility.waitAtMost(4L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW
SHARDING TABLE RULE t_order_item").isEmpty());
}
protected void startMigration(final PipelineContainerComposer
containerComposer, final String sourceTableName, final String targetTableName)
throws SQLException {
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 8ea80194942..7aa954e155a 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -82,7 +82,7 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
int replicationSlotsCount =
getReplicationSlotsCount(containerComposer);
log.info("init data end: {}, replication slots count: {}",
LocalDateTime.now(), replicationSlotsCount);
startMigrationWithSchema(containerComposer, SOURCE_TABLE_NAME,
"t_order");
- Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
+ Awaitility.waitAtMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
String jobId = distSQLFacade.getJobIdByTableName("ds_0.test." +
SOURCE_TABLE_NAME);
distSQLFacade.waitJobPreparingStageFinished(jobId);
String qualifiedTableName = String.join(".",
PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
index 94e9ecd4341..07577cc7bc5 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
@@ -71,13 +71,13 @@ class PostgreSQLToMySQLMigrationE2EIT extends
AbstractMigrationE2EIT {
String jdbcUrl = Type.DOCKER == type ?
postgresqlContainer.getJdbcUrl() : "jdbc:postgresql://localhost:5432/postgres";
initSourceTable(jdbcUrl);
registerMigrationSourceStorageUnit(containerComposer);
-
containerComposer.registerStorageUnit(PipelineContainerComposer.DS_0);
+ PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
+ distSQLFacade.registerStorageUnit(PipelineContainerComposer.DS_0);
containerComposer.proxyExecuteWithLog("CREATE SHARDING TABLE RULE
t_order(STORAGE_UNITS(pipeline_e2e_0),SHARDING_COLUMN=order_id,TYPE(NAME='hash_mod',PROPERTIES('sharding-count'='2')),"
+ "KEY_GENERATE_STRATEGY(COLUMN=order_id,
TYPE(NAME='snowflake')))", 2);
initTargetTable(containerComposer);
containerComposer.proxyExecuteWithLog("MIGRATE TABLE
source_ds.t_order INTO t_order", 2);
- PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
- Awaitility.await().ignoreExceptions().atMost(10L,
TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() ->
!distSQLFacade.listJobIds().isEmpty());
+ Awaitility.waitAtMost(10L,
TimeUnit.SECONDS).ignoreExceptions().pollInterval(1L,
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
String jobId = distSQLFacade.listJobIds().get(0);
distSQLFacade.waitJobIncrementalStageStarted(jobId);
try (Connection connection = DriverManager.getConnection(jdbcUrl,
"postgres", "postgres")) {
@@ -150,7 +150,7 @@ class PostgreSQLToMySQLMigrationE2EIT extends
AbstractMigrationE2EIT {
private static boolean waitForTableExistence(final Connection connection,
final String tableName) {
try {
- Awaitility.await().ignoreExceptions().atMost(60L,
TimeUnit.SECONDS).pollInterval(3L, TimeUnit.SECONDS).until(() ->
tableExists(connection, tableName));
+ Awaitility.waitAtMost(60L,
TimeUnit.SECONDS).ignoreExceptions().pollInterval(3L,
TimeUnit.SECONDS).until(() -> tableExists(connection, tableName));
return true;
} catch (final ConditionTimeoutException ex) {
return false;
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
index 8b28160f562..404d0a529b3 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
@@ -22,6 +22,9 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
+import
org.apache.shardingsphere.database.connector.opengauss.type.OpenGaussDatabaseType;
+import org.apache.shardingsphere.test.e2e.env.runtime.E2ETestEnvironment;
+import org.apache.shardingsphere.test.e2e.env.runtime.type.RunEnvironment.Type;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
import org.awaitility.Awaitility;
@@ -58,6 +61,23 @@ public final class PipelineE2EDistSQLFacade {
jobTypeName = jobType.getType();
}
+ /**
+ * Register storage unit.
+ *
+ * @param storageUnitName storage unit name
+ * @throws SQLException SQL exception
+ */
+ public void registerStorageUnit(final String storageUnitName) throws
SQLException {
+ String username =
ProxyDatabaseTypeUtils.isOracleBranch(containerComposer.getDatabaseType()) ?
storageUnitName : containerComposer.getUsername();
+ String registerStorageUnitSQL = "REGISTER STORAGE UNIT ${ds} (
URL='${url}', USER='${user}', PASSWORD='${password}')".replace("${ds}",
storageUnitName)
+ .replace("${user}", username)
+ .replace("${password}", containerComposer.getPassword())
+ .replace("${url}",
containerComposer.getActualJdbcUrlTemplate(storageUnitName, Type.DOCKER ==
E2ETestEnvironment.getInstance().getRunEnvironment().getType()));
+ containerComposer.proxyExecuteWithLog(registerStorageUnitSQL, 0);
+ int timeout = containerComposer.getDatabaseType() instanceof
OpenGaussDatabaseType ? 60 : 10;
+ Awaitility.waitAtMost(timeout,
TimeUnit.SECONDS).ignoreExceptions().pollInterval(3L,
TimeUnit.SECONDS).until(() ->
containerComposer.showStorageUnitsName().contains(storageUnitName));
+ }
+
/**
* Load all single tables.
*
@@ -142,7 +162,8 @@ public final class PipelineE2EDistSQLFacade {
* @throws SQLException SQL exception
*/
public void rollback(final String jobId) throws SQLException {
- containerComposer.proxyExecuteWithLog(String.format("ROLLBACK %s %s",
jobTypeName, jobId), 2);
+ String sql = String.format("ROLLBACK %s %s", jobTypeName, jobId);
+ containerComposer.proxyExecuteWithLog(sql, 2);
}
/**