This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new a113a961287 Refactor CDCE2EIT (#37871)
a113a961287 is described below
commit a113a9612876cc64780681d98619e8834afd1eb5
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Jan 28 16:03:48 2026 +0800
Refactor CDCE2EIT (#37871)
* Clean CDCE2EIT
* Extract PipelineContainerComposer.createQualifiedTable
* Extract DataSourceTestUtils.createStandardDataSource
---
.../pipeline/cases/PipelineContainerComposer.java | 12 ++++++
.../e2e/operation/pipeline/cases/cdc/CDCE2EIT.java | 37 +++++------------
.../pipeline/util/DataSourceTestUtils.java | 46 ++++++++++++++++++++++
3 files changed, 68 insertions(+), 27 deletions(-)
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
index 530286b530e..90c3b8f5b02 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
@@ -35,6 +35,7 @@ import
org.apache.shardingsphere.database.connector.mysql.type.MySQLDatabaseType
import
org.apache.shardingsphere.database.connector.opengauss.type.OpenGaussDatabaseType;
import
org.apache.shardingsphere.database.connector.postgresql.type.PostgreSQLDatabaseType;
import org.apache.shardingsphere.infra.exception.ShardingSpherePreconditions;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import org.apache.shardingsphere.infra.spi.ShardingSphereServiceLoader;
import org.apache.shardingsphere.infra.util.props.PropertiesBuilder;
import org.apache.shardingsphere.infra.util.props.PropertiesBuilder.Property;
@@ -326,6 +327,17 @@ public final class PipelineContainerComposer implements
AutoCloseable {
sleepSeconds(seconds);
}
+ /**
+ * Create qualified table.
+ *
+ * @param tableName table name
+ * @return qualified table
+ */
+ public QualifiedTable createQualifiedTable(final String tableName) {
+ String schemaName = new
DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData().getSchemaOption().isSchemaAvailable()
? SCHEMA_NAME : null;
+ return new QualifiedTable(schemaName, tableName);
+ }
+
/**
* Create source table index list.
*
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
index c183f6807fe..36c9c191965 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
@@ -18,7 +18,6 @@
package org.apache.shardingsphere.test.e2e.operation.pipeline.cases.cdc;
import lombok.extern.slf4j.Slf4j;
-import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.cdc.CDCJobType;
import org.apache.shardingsphere.data.pipeline.cdc.client.CDCClient;
import
org.apache.shardingsphere.data.pipeline.cdc.client.config.CDCClientConfiguration;
@@ -36,8 +35,6 @@ import
org.apache.shardingsphere.data.pipeline.core.ingest.position.type.pk.Uniq
import
org.apache.shardingsphere.data.pipeline.core.metadata.loader.StandardPipelineTableMetaDataLoader;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData;
import
org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData;
-import
org.apache.shardingsphere.database.connector.core.metadata.database.metadata.DialectDatabaseMetaData;
-import
org.apache.shardingsphere.database.connector.core.type.DatabaseTypeRegistry;
import
org.apache.shardingsphere.infra.algorithm.keygen.snowflake.SnowflakeKeyGenerateAlgorithm;
import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
@@ -51,6 +48,7 @@ import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceExecuteUtils;
+import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceTestUtils;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.condition.EnabledIf;
@@ -65,9 +63,7 @@ import java.sql.Statement;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
@@ -106,25 +102,23 @@ class CDCE2EIT {
}
createOrderTableRule(containerComposer);
distSQLFacade.createBroadcastRule("t_address");
- DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
- QualifiedTable qualifiedOrderTable =
dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable()
- ? new
QualifiedTable(PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME)
- : new QualifiedTable(null, SOURCE_TABLE_NAME);
- initSchemaAndTable(containerComposer,
containerComposer.getProxyDataSource(), qualifiedOrderTable, 3);
+ QualifiedTable orderQualifiedTable =
containerComposer.createQualifiedTable(SOURCE_TABLE_NAME);
+ initSchemaAndTable(containerComposer,
containerComposer.getProxyDataSource(), orderQualifiedTable, 3);
PipelineDataSource jdbcDataSource = new
PipelineDataSource(containerComposer.generateShardingSphereDataSourceFromProxy(),
containerComposer.getDatabaseType());
log.info("init data begin: {}", LocalDateTime.now());
- IntPkLargeOrderDAO orderDAO = new
IntPkLargeOrderDAO(jdbcDataSource, containerComposer.getDatabaseType(),
qualifiedOrderTable);
+ IntPkLargeOrderDAO orderDAO = new
IntPkLargeOrderDAO(jdbcDataSource, containerComposer.getDatabaseType(),
orderQualifiedTable);
orderDAO.batchInsert(PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
DataSourceExecuteUtils.execute(jdbcDataSource, "INSERT INTO
t_address(id, address_name) VALUES (?,?)", Arrays.asList(new Object[]{1, "a"},
new Object[]{2, "b"}));
DataSourceExecuteUtils.execute(jdbcDataSource, "INSERT INTO
t_single(id) VALUES (?)", Arrays.asList(new Object[]{1}, new Object[]{2}, new
Object[]{3}));
log.info("init data end: {}", LocalDateTime.now());
- PipelineDataSource targetDataSource =
createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4);
- initSchemaAndTable(containerComposer, targetDataSource,
qualifiedOrderTable, 0);
+ PipelineDataSource targetDataSource =
DataSourceTestUtils.createStandardDataSource(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4,
false),
+ containerComposer.getUsername(),
containerComposer.getPassword());
+ initSchemaAndTable(containerComposer, targetDataSource,
orderQualifiedTable, 0);
final CDCClient cdcClient =
buildCDCClientAndStart(targetDataSource, containerComposer);
Awaitility.waitAtMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
String jobId = distSQLFacade.listJobIds().get(0);
distSQLFacade.waitJobIncrementalStageFinished(jobId);
- String orderTableName = qualifiedOrderTable.format();
+ String orderTableName = orderQualifiedTable.format();
new E2EIncrementalTask(jdbcDataSource, orderTableName, new
SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20).run();
distSQLFacade.waitJobIncrementalStageFinished(jobId);
for (int i = 1; i <= 4; i++) {
@@ -132,9 +126,6 @@ class CDCE2EIT {
orderDAO.insert(orderId, i, "OK");
containerComposer.assertRecordExists(targetDataSource,
orderTableName, orderId);
}
- QualifiedTable orderQualifiedTable =
dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable()
- ? new
QualifiedTable(PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME)
- : new QualifiedTable(null, SOURCE_TABLE_NAME);
assertDataMatched(jdbcDataSource, targetDataSource,
orderQualifiedTable);
assertDataMatched(jdbcDataSource, targetDataSource, new
QualifiedTable(null, "t_address"));
assertDataMatched(jdbcDataSource, targetDataSource, new
QualifiedTable(null, "t_single"));
@@ -151,26 +142,18 @@ class CDCE2EIT {
Awaitility.waitAtMost(20L, TimeUnit.SECONDS).pollInterval(2L,
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW
SHARDING TABLE RULE t_order").isEmpty());
}
- private void initSchemaAndTable(final PipelineContainerComposer
containerComposer, final DataSource dataSource, final QualifiedTable
qualifiedOrderTable, final int seconds) throws SQLException {
+ private void initSchemaAndTable(final PipelineContainerComposer
containerComposer, final DataSource dataSource, final QualifiedTable
orderQualifiedTable, final int seconds) throws SQLException {
try (
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement()) {
containerComposer.createSchema(connection, seconds);
- new IntPkLargeOrderDAO(dataSource,
containerComposer.getDatabaseType(), qualifiedOrderTable).createTable();
+ new IntPkLargeOrderDAO(dataSource,
containerComposer.getDatabaseType(), orderQualifiedTable).createTable();
statement.execute("CREATE TABLE t_address(id integer primary key,
address_name varchar(255))");
statement.execute("CREATE TABLE t_single(id integer primary key)");
}
containerComposer.sleepSeconds(seconds);
}
- private PipelineDataSource createStandardDataSource(final
PipelineContainerComposer containerComposer, final String storageUnitName) {
- Map<String, Object> poolProps = new HashMap<>(3, 1F);
- poolProps.put("url",
containerComposer.getActualJdbcUrlTemplate(storageUnitName, false));
- poolProps.put("username", containerComposer.getUsername());
- poolProps.put("password", containerComposer.getPassword());
- return new PipelineDataSource(new
StandardPipelineDataSourceConfiguration(poolProps));
- }
-
private CDCClient buildCDCClientAndStart(final PipelineDataSource
dataSource, final PipelineContainerComposer containerComposer) {
DataSourceRecordConsumer recordConsumer = new
DataSourceRecordConsumer(dataSource, containerComposer.getDatabaseType());
CDCClient result = new CDCClient(new
CDCClientConfiguration("localhost",
containerComposer.getContainerComposer().getProxyCDCPort(), 10000));
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/DataSourceTestUtils.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/DataSourceTestUtils.java
new file mode 100644
index 00000000000..887bbecf2c7
--- /dev/null
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/DataSourceTestUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.operation.pipeline.util;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+import
org.apache.shardingsphere.data.pipeline.api.type.StandardPipelineDataSourceConfiguration;
+import
org.apache.shardingsphere.data.pipeline.core.datasource.PipelineDataSource;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@NoArgsConstructor(access = AccessLevel.NONE)
+public final class DataSourceTestUtils {
+
+ /**
+ * Create standard data source.
+ *
+ * @param url URL
+ * @param username username
+ * @param password password
+ * @return standard data source
+ */
+ public static PipelineDataSource createStandardDataSource(final String
url, final String username, final String password) {
+ Map<String, Object> poolProps = new HashMap<>(3, 1F);
+ poolProps.put("url", url);
+ poolProps.put("username", username);
+ poolProps.put("password", password);
+ return new PipelineDataSource(new
StandardPipelineDataSourceConfiguration(poolProps));
+ }
+}