This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new f76cdfa2d9f Add IntPkLargeOrderDAO for pipeline E2E (#37869)
f76cdfa2d9f is described below
commit f76cdfa2d9f504f5b958d1aa975f1336abbc7a99
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Jan 28 13:17:33 2026 +0800
Add IntPkLargeOrderDAO for pipeline E2E (#37869)
* Add IntPkLargeOrderDAO
* Refactor CDCE2EIT.initSchemaAndTable
* Refactor IntPkLargeOrderDAO to use qualifiedTableName
* Improve IntPkLargeOrderDAO.insert: add buildPreparedSimpleInsertSQL
* Update E2E to use IntPkLargeOrderDAO
* Update code style
* Recover e2e-operation.yml on: pull_request
---
.github/workflows/e2e-operation.yml | 2 +
.../pipeline/cases/PipelineContainerComposer.java | 10 ---
.../e2e/operation/pipeline/cases/cdc/CDCE2EIT.java | 62 ++++++++-------
.../general/MySQLMigrationGeneralE2EIT.java | 14 ++--
.../general/PostgreSQLMigrationGeneralE2EIT.java | 16 ++--
.../migration/general/RollbackMigrationE2EIT.java | 2 +-
.../IntPkLargeOrderDAO.java} | 41 +++++-----
.../sqlbuilder/IntPkLargeOrderSQLBuilder.java | 49 ++++++++++++
.../sqlbuilder/MySQLIntPkLargeOrderSQLBuilder.java | 78 +++++++++++++++++++
.../OpenGaussIntPkLargeOrderSQLBuilder.java | 88 ++++++++++++++++++++++
.../PostgreSQLIntPkLargeOrderSQLBuilder.java | 63 ++++++++++++++++
.../dao/order/small/StringPkSmallOrderDAO.java | 9 +--
...rder.large.sqlbuilder.IntPkLargeOrderSQLBuilder | 20 +++++
13 files changed, 368 insertions(+), 86 deletions(-)
diff --git a/.github/workflows/e2e-operation.yml
b/.github/workflows/e2e-operation.yml
index ad2e6bc2398..127aebcc2bf 100644
--- a/.github/workflows/e2e-operation.yml
+++ b/.github/workflows/e2e-operation.yml
@@ -18,6 +18,8 @@
name: E2E - Operation
on:
+ pull_request:
+ branches: [ master ]
workflow_dispatch:
concurrency:
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
index 1a26cb74d13..530286b530e 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
@@ -326,16 +326,6 @@ public final class PipelineContainerComposer implements
AutoCloseable {
sleepSeconds(seconds);
}
- /**
- * Create source order table.
- *
- * @param sourceTableName source table name
- * @throws SQLException SQL exception
- */
- public void createSourceOrderTable(final String sourceTableName) throws
SQLException {
-
sourceExecuteWithLog(extraSQLCommand.getCreateTableOrder(sourceTableName));
- }
-
/**
* Create source table index list.
*
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
index ff1f744a452..c183f6807fe 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
@@ -44,13 +44,12 @@ import
org.apache.shardingsphere.infra.spi.type.typed.TypedSPILoader;
import
org.apache.shardingsphere.test.e2e.env.container.constants.ProxyContainerConstants;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.task.E2EIncrementalTask;
-import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.helper.PipelineCaseHelper;
+import
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.IntPkLargeOrderDAO;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ECondition;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
-import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceExecuteUtils;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
import org.awaitility.Awaitility;
@@ -59,9 +58,10 @@ import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
+import javax.sql.DataSource;
import java.sql.Connection;
-import java.sql.DriverManager;
import java.sql.SQLException;
+import java.sql.Statement;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Collections;
@@ -106,42 +106,38 @@ class CDCE2EIT {
}
createOrderTableRule(containerComposer);
distSQLFacade.createBroadcastRule("t_address");
- try (Connection connection =
containerComposer.getProxyDataSource().getConnection()) {
- initSchemaAndTable(containerComposer, connection, 3);
- }
- PipelineDataSource sourceDataSource = new
PipelineDataSource(containerComposer.generateShardingSphereDataSourceFromProxy(),
containerComposer.getDatabaseType());
- List<Object[]> orderInsertData =
PipelineCaseHelper.generateOrderInsertData(
- containerComposer.getDatabaseType(), new
AutoIncrementKeyGenerateAlgorithm(),
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
+ DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
+ QualifiedTable qualifiedOrderTable =
dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable()
+ ? new
QualifiedTable(PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME)
+ : new QualifiedTable(null, SOURCE_TABLE_NAME);
+ initSchemaAndTable(containerComposer,
containerComposer.getProxyDataSource(), qualifiedOrderTable, 3);
+ PipelineDataSource jdbcDataSource = new
PipelineDataSource(containerComposer.generateShardingSphereDataSourceFromProxy(),
containerComposer.getDatabaseType());
log.info("init data begin: {}", LocalDateTime.now());
- DataSourceExecuteUtils.execute(sourceDataSource,
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME),
orderInsertData);
- DataSourceExecuteUtils.execute(sourceDataSource, "INSERT INTO
t_address(id, address_name) VALUES (?,?)", Arrays.asList(new Object[]{1, "a"},
new Object[]{2, "b"}));
- DataSourceExecuteUtils.execute(sourceDataSource, "INSERT INTO
t_single(id) VALUES (?)", Arrays.asList(new Object[]{1}, new Object[]{2}, new
Object[]{3}));
+ IntPkLargeOrderDAO orderDAO = new
IntPkLargeOrderDAO(jdbcDataSource, containerComposer.getDatabaseType(),
qualifiedOrderTable);
+
orderDAO.batchInsert(PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
+ DataSourceExecuteUtils.execute(jdbcDataSource, "INSERT INTO
t_address(id, address_name) VALUES (?,?)", Arrays.asList(new Object[]{1, "a"},
new Object[]{2, "b"}));
+ DataSourceExecuteUtils.execute(jdbcDataSource, "INSERT INTO
t_single(id) VALUES (?)", Arrays.asList(new Object[]{1}, new Object[]{2}, new
Object[]{3}));
log.info("init data end: {}", LocalDateTime.now());
- try (
- Connection connection =
DriverManager.getConnection(containerComposer.getActualJdbcUrlTemplate(PipelineContainerComposer.DS_4,
false),
- containerComposer.getUsername(),
containerComposer.getPassword())) {
- initSchemaAndTable(containerComposer, connection, 0);
- }
PipelineDataSource targetDataSource =
createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4);
+ initSchemaAndTable(containerComposer, targetDataSource,
qualifiedOrderTable, 0);
final CDCClient cdcClient =
buildCDCClientAndStart(targetDataSource, containerComposer);
Awaitility.waitAtMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
String jobId = distSQLFacade.listJobIds().get(0);
distSQLFacade.waitJobIncrementalStageFinished(jobId);
- DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
- String tableName =
dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable() ?
String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME) :
SOURCE_TABLE_NAME;
- new E2EIncrementalTask(sourceDataSource, tableName, new
SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20).run();
+ String orderTableName = qualifiedOrderTable.format();
+ new E2EIncrementalTask(jdbcDataSource, orderTableName, new
SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20).run();
distSQLFacade.waitJobIncrementalStageFinished(jobId);
for (int i = 1; i <= 4; i++) {
int orderId = 10000 + i;
- containerComposer.proxyExecuteWithLog(String.format("INSERT
INTO %s (order_id, user_id, status) VALUES (%d, %d, 'OK')", tableName, orderId,
i), 0);
- containerComposer.assertRecordExists(targetDataSource,
tableName, orderId);
+ orderDAO.insert(orderId, i, "OK");
+ containerComposer.assertRecordExists(targetDataSource,
orderTableName, orderId);
}
QualifiedTable orderQualifiedTable =
dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable()
? new
QualifiedTable(PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME)
: new QualifiedTable(null, SOURCE_TABLE_NAME);
- assertDataMatched(sourceDataSource, targetDataSource,
orderQualifiedTable);
- assertDataMatched(sourceDataSource, targetDataSource, new
QualifiedTable(null, "t_address"));
- assertDataMatched(sourceDataSource, targetDataSource, new
QualifiedTable(null, "t_single"));
+ assertDataMatched(jdbcDataSource, targetDataSource,
orderQualifiedTable);
+ assertDataMatched(jdbcDataSource, targetDataSource, new
QualifiedTable(null, "t_address"));
+ assertDataMatched(jdbcDataSource, targetDataSource, new
QualifiedTable(null, "t_single"));
cdcClient.close();
Awaitility.waitAtMost(10L, TimeUnit.SECONDS).pollInterval(500L,
TimeUnit.MILLISECONDS)
.until(() ->
distSQLFacade.listJobs().stream().noneMatch(each ->
Boolean.parseBoolean(each.get("active").toString())));
@@ -155,13 +151,15 @@ class CDCE2EIT {
Awaitility.waitAtMost(20L, TimeUnit.SECONDS).pollInterval(2L,
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW
SHARDING TABLE RULE t_order").isEmpty());
}
- private void initSchemaAndTable(final PipelineContainerComposer
containerComposer, final Connection connection, final int seconds) throws
SQLException {
- containerComposer.createSchema(connection, seconds);
- String sql =
containerComposer.getExtraSQLCommand().getCreateTableOrder(SOURCE_TABLE_NAME);
- log.info("Create table sql: {}", sql);
- connection.createStatement().execute(sql);
- connection.createStatement().execute("CREATE TABLE t_address(id
integer primary key, address_name varchar(255))");
- connection.createStatement().execute("CREATE TABLE t_single(id integer
primary key)");
+ private void initSchemaAndTable(final PipelineContainerComposer
containerComposer, final DataSource dataSource, final QualifiedTable
qualifiedOrderTable, final int seconds) throws SQLException {
+ try (
+ Connection connection = dataSource.getConnection();
+ Statement statement = connection.createStatement()) {
+ containerComposer.createSchema(connection, seconds);
+ new IntPkLargeOrderDAO(dataSource,
containerComposer.getDatabaseType(), qualifiedOrderTable).createTable();
+ statement.execute("CREATE TABLE t_address(id integer primary key,
address_name varchar(255))");
+ statement.execute("CREATE TABLE t_single(id integer primary key)");
+ }
containerComposer.sleepSeconds(seconds);
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index cb33b7dff9c..87406f55170 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -21,18 +21,17 @@ import com.google.common.collect.ImmutableMap;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.infra.algorithm.keygen.snowflake.SnowflakeKeyGenerateAlgorithm;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.migration.AbstractMigrationE2EIT;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.task.E2EIncrementalTask;
+import
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.IntPkLargeOrderDAO;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.orderitem.IntPkOrderItemDAO;
-import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.helper.PipelineCaseHelper;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ECondition;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
-import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
-import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceExecuteUtils;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.api.extension.ExtensionContext;
@@ -66,7 +65,8 @@ class MySQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam)) {
PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
distSQLFacade.alterPipelineRule();
- containerComposer.createSourceOrderTable(SOURCE_TABLE_NAME);
+ IntPkLargeOrderDAO orderDAO = new
IntPkLargeOrderDAO(containerComposer.getSourceDataSource(),
containerComposer.getDatabaseType(), new QualifiedTable(null,
SOURCE_TABLE_NAME));
+ orderDAO.createTable();
IntPkOrderItemDAO orderItemDAO = new
IntPkOrderItemDAO(containerComposer.getSourceDataSource(),
containerComposer.getDatabaseType());
orderItemDAO.createTable();
addMigrationSourceResource(containerComposer);
@@ -75,9 +75,7 @@ class MySQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
createTargetOrderTableEncryptRule(containerComposer);
createTargetOrderItemTableRule(containerComposer);
log.info("init data begin: {}", LocalDateTime.now());
- List<Object[]> orderInsertData =
PipelineCaseHelper.generateOrderInsertData(
- containerComposer.getDatabaseType(), new
AutoIncrementKeyGenerateAlgorithm(),
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
-
DataSourceExecuteUtils.execute(containerComposer.getSourceDataSource(),
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME),
orderInsertData);
+
orderDAO.batchInsert(PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
orderItemDAO.batchInsert(PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
log.info("init data end: {}", LocalDateTime.now());
startMigration(containerComposer, SOURCE_TABLE_NAME,
TARGET_TABLE_NAME);
@@ -87,7 +85,7 @@ class MySQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
containerComposer.startIncrementTask(
new
E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME,
new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30L);
- containerComposer.sourceExecuteWithLog(String.format("INSERT INTO
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", SOURCE_TABLE_NAME));
+ orderDAO.insert(10000L, 1, "OK");
orderItemDAO.insert(10000L, 10000L, 1, "OK");
distSQLFacade.pauseJob(orderJobId);
distSQLFacade.resumeJob(orderJobId);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index 49a688471d2..27836a574d5 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -20,18 +20,17 @@ package
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.migration.ge
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.infra.algorithm.keygen.snowflake.SnowflakeKeyGenerateAlgorithm;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.migration.AbstractMigrationE2EIT;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.task.E2EIncrementalTask;
+import
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.IntPkLargeOrderDAO;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.orderitem.IntPkOrderItemDAO;
-import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.helper.PipelineCaseHelper;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ECondition;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
-import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
-import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceExecuteUtils;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.condition.EnabledIf;
@@ -45,7 +44,6 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.LocalDateTime;
-import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.is;
@@ -68,7 +66,9 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
distSQLFacade.alterPipelineRule();
createSourceSchema(containerComposer,
PipelineContainerComposer.SCHEMA_NAME);
- containerComposer.createSourceOrderTable(SOURCE_TABLE_NAME);
+ IntPkLargeOrderDAO orderDAO = new
IntPkLargeOrderDAO(containerComposer.getSourceDataSource(),
containerComposer.getDatabaseType(),
+ new QualifiedTable(PipelineContainerComposer.SCHEMA_NAME,
SOURCE_TABLE_NAME));
+ orderDAO.createTable();
IntPkOrderItemDAO orderItemDAO = new
IntPkOrderItemDAO(containerComposer.getSourceDataSource(),
containerComposer.getDatabaseType());
orderItemDAO.createTable();
containerComposer.createSourceTableIndexList(PipelineContainerComposer.SCHEMA_NAME,
SOURCE_TABLE_NAME);
@@ -78,9 +78,7 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
createTargetOrderTableRule(containerComposer);
createTargetOrderItemTableRule(containerComposer);
log.info("init data begin: {}", LocalDateTime.now());
- List<Object[]> orderInsertData =
PipelineCaseHelper.generateOrderInsertData(
- containerComposer.getDatabaseType(), new
AutoIncrementKeyGenerateAlgorithm(),
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
-
DataSourceExecuteUtils.execute(containerComposer.getSourceDataSource(),
containerComposer.getExtraSQLCommand().getFullInsertOrder(SOURCE_TABLE_NAME),
orderInsertData);
+
orderDAO.batchInsert(PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
orderItemDAO.batchInsert(PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
int replicationSlotsCount =
getReplicationSlotsCount(containerComposer);
log.info("init data end: {}, replication slots count: {}",
LocalDateTime.now(), replicationSlotsCount);
@@ -92,7 +90,7 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
containerComposer.startIncrementTask(new
E2EIncrementalTask(containerComposer.getSourceDataSource(), qualifiedTableName,
new SnowflakeKeyGenerateAlgorithm(),
containerComposer.getDatabaseType(), 20));
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30L);
- containerComposer.sourceExecuteWithLog(String.format("INSERT INTO
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", qualifiedTableName));
+ orderDAO.insert(10000L, 1, "OK");
// TODO Insert new record in t_order_item
DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
containerComposer.assertRecordExists(jdbcDataSource,
qualifiedTableName, 10000);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
index d173d3547c2..f903f37db20 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
@@ -42,7 +42,7 @@ class RollbackMigrationE2EIT extends AbstractMigrationE2EIT {
@ParameterizedTest(name = "{0}")
@EnabledIf("isEnabled")
@ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
- void assertIllegalTimeTypesValueMigrationSuccess(final
PipelineTestParameter testParam) throws Exception {
+ void assertRollbackSuccess(final PipelineTestParameter testParam) throws
Exception {
try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam)) {
String sql = "CREATE TABLE t_order (order_id BIGINT PRIMARY KEY,
user_id INT, status VARCHAR(50))";
containerComposer.sourceExecuteWithLog(sql);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/small/StringPkSmallOrderDAO.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/IntPkLargeOrderDAO.java
similarity index 58%
copy from
test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/small/StringPkSmallOrderDAO.java
copy to
test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/IntPkLargeOrderDAO.java
index 0e355f0f927..63e03975967 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/small/StringPkSmallOrderDAO.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/IntPkLargeOrderDAO.java
@@ -15,14 +15,15 @@
* limitations under the License.
*/
-package org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.small;
+package org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large;
import lombok.extern.slf4j.Slf4j;
import
org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPILoader;
import org.apache.shardingsphere.database.connector.core.type.DatabaseType;
-import
org.apache.shardingsphere.infra.algorithm.keygen.uuid.UUIDKeyGenerateAlgorithm;
-import
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.small.sqlbuilder.StringPkSmallOrderSQLBuilder;
+import org.apache.shardingsphere.infra.metadata.database.schema.QualifiedTable;
+import
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder.IntPkLargeOrderSQLBuilder;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.helper.PipelineCaseHelper;
+import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceExecuteUtils;
import javax.sql.DataSource;
@@ -30,24 +31,24 @@ import java.sql.SQLException;
import java.util.List;
/**
- * String PK small order DAO. Small table means the table has few columns.
+ * Int PK large order DAO. Large table means the table has many columns.
*/
@Slf4j
-public final class StringPkSmallOrderDAO {
+public final class IntPkLargeOrderDAO {
private final DataSource dataSource;
private final DatabaseType databaseType;
- private final StringPkSmallOrderSQLBuilder sqlBuilder;
+ private final IntPkLargeOrderSQLBuilder sqlBuilder;
- private final String tableName;
+ private final String qualifiedTableName;
- public StringPkSmallOrderDAO(final DataSource dataSource, final
DatabaseType databaseType, final String tableName) {
+ public IntPkLargeOrderDAO(final DataSource dataSource, final DatabaseType
databaseType, final QualifiedTable qualifiedTable) {
this.dataSource = dataSource;
this.databaseType = databaseType;
- this.sqlBuilder =
DatabaseTypedSPILoader.getService(StringPkSmallOrderSQLBuilder.class,
databaseType);
- this.tableName = tableName;
+ this.sqlBuilder =
DatabaseTypedSPILoader.getService(IntPkLargeOrderSQLBuilder.class,
databaseType);
+ this.qualifiedTableName = qualifiedTable.format();
}
/**
@@ -56,21 +57,21 @@ public final class StringPkSmallOrderDAO {
* @throws SQLException SQL exception
*/
public void createTable() throws SQLException {
- String sql = sqlBuilder.buildCreateTableSQL(tableName);
- log.info("Create string pk small order table SQL: {}", sql);
+ String sql = sqlBuilder.buildCreateTableSQL(qualifiedTableName);
+ log.info("Create int pk large order table SQL: {}", sql);
DataSourceExecuteUtils.execute(dataSource, sql);
}
/**
* Batch insert orders.
*
- * @param insertRows insert rows
+ * @param recordCount record count
* @throws SQLException SQL exception
*/
- public void batchInsert(final int insertRows) throws SQLException {
- List<Object[]> paramsList =
PipelineCaseHelper.generateSmallOrderInsertData(new UUIDKeyGenerateAlgorithm(),
insertRows);
- String sql = sqlBuilder.buildPreparedInsertSQL(tableName);
- log.info("Batch insert string pk small order SQL: {}, params list
size: {}", sql, paramsList.size());
+ public void batchInsert(final int recordCount) throws SQLException {
+ List<Object[]> paramsList =
PipelineCaseHelper.generateOrderInsertData(databaseType, new
AutoIncrementKeyGenerateAlgorithm(), recordCount);
+ String sql = sqlBuilder.buildPreparedInsertSQL(qualifiedTableName);
+ log.info("Batch insert int pk large order SQL: {}, params list size:
{}", sql, paramsList.size());
DataSourceExecuteUtils.execute(dataSource, sql, paramsList);
}
@@ -82,10 +83,10 @@ public final class StringPkSmallOrderDAO {
* @param status status
* @throws SQLException SQL exception
*/
- public void insert(final String orderId, final int userId, final String
status) throws SQLException {
- String sql = sqlBuilder.buildPreparedInsertSQL(tableName);
+ public void insert(final long orderId, final int userId, final String
status) throws SQLException {
+ String sql =
sqlBuilder.buildPreparedSimpleInsertSQL(qualifiedTableName);
Object[] params = new Object[]{orderId, userId, status};
- log.info("Insert string pk small order SQL: {}, params: {}", sql,
params);
+ log.info("Insert int pk large order simple SQL: {}, params: {}", sql,
params);
DataSourceExecuteUtils.execute(dataSource, sql, params);
}
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/IntPkLargeOrderSQLBuilder.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/IntPkLargeOrderSQLBuilder.java
new file mode 100644
index 00000000000..79f3dc2549f
--- /dev/null
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/IntPkLargeOrderSQLBuilder.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder;
+
+import org.apache.shardingsphere.database.connector.core.spi.DatabaseTypedSPI;
+
+public interface IntPkLargeOrderSQLBuilder extends DatabaseTypedSPI {
+
+ /**
+ * Build create table SQL.
+ *
+ * @param qualifiedTableName qualified table name
+ * @return create table SQL
+ */
+ String buildCreateTableSQL(String qualifiedTableName);
+
+ /**
+ * Build prepared insert SQL.
+ *
+ * @param qualifiedTableName qualified table name
+ * @return prepared insert SQL
+ */
+ String buildPreparedInsertSQL(String qualifiedTableName);
+
+ /**
+ * Build prepared simple insert SQL.
+ *
+ * @param qualifiedTableName qualified table name
+ * @return prepared simple insert SQL
+ */
+ default String buildPreparedSimpleInsertSQL(final String
qualifiedTableName) {
+ return "INSERT INTO " + qualifiedTableName + " (order_id, user_id,
status) VALUES (?, ?, ?)";
+ }
+}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/MySQLIntPkLargeOrderSQLBuilder.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/MySQLIntPkLargeOrderSQLBuilder.java
new file mode 100644
index 00000000000..3f004070ba6
--- /dev/null
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/MySQLIntPkLargeOrderSQLBuilder.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder;
+
+public final class MySQLIntPkLargeOrderSQLBuilder implements
IntPkLargeOrderSQLBuilder {
+
+ @Override
+ public String buildCreateTableSQL(final String qualifiedTableName) {
+ return String.format("""
+ CREATE TABLE `%s` (
+ `order_id` bigint NOT NULL,
+ `user_id` int NOT NULL,
+ `status` varchar ( 255 ) NULL,
+ `t_mediumint` mediumint NULL,
+ `t_smallint` smallint NULL,
+ `t_tinyint` tinyint ( 3 ) NULL,
+ `t_unsigned_int` int UNSIGNED NULL,
+ `t_unsigned_mediumint` mediumint UNSIGNED NULL,
+ `t_unsigned_smallint` smallint UNSIGNED NULL,
+ `t_unsigned_tinyint` tinyint UNSIGNED NULL,
+ `t_float` float NULL,
+ `t_double` double NULL,
+ `t_decimal` decimal ( 10, 2 ) NULL,
+ `t_timestamp` timestamp(3) NULL,
+ `t_datetime` datetime(6) NULL,
+ `t_date` date NULL,
+ `t_time` time(1) NULL,
+ `t_year` year NULL,
+ `t_bit` bit(32) NULL,
+ `t_binary` binary(128) NULL,
+ `t_varbinary` varbinary(255) NULL,
+ `t_blob` blob NULL,
+ `t_mediumblob` mediumblob NULL,
+ `t_char` char ( 128 ) NULL,
+ `t_text` text NULL,
+ `t_mediumtext` mediumtext NULL,
+ `t_enum` enum ('1', '2', '3') NULL,
+ `t_set` set ('1', '2', '3') NULL,
+ `t_json` json NULL COMMENT 'json test',
+ PRIMARY KEY ( `order_id` ),
+ KEY `idx_user_id` (`user_id`),
+ KEY `idx_mulit` (`t_mediumint`,`t_unsigned_mediumint`)
+ ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
COLLATE=utf8mb4_general_ci
+ """, qualifiedTableName);
+ }
+
+ @Override
+ public String buildPreparedInsertSQL(final String qualifiedTableName) {
+ return String.format("""
+ INSERT INTO %s
+ (order_id, user_id, status, t_mediumint, t_smallint,
t_tinyint, t_unsigned_int, t_unsigned_mediumint,
+ t_unsigned_smallint, t_unsigned_tinyint, t_float, t_double,
t_decimal, t_timestamp, t_datetime, t_date, t_time, t_year,
+ t_bit, t_binary, t_varbinary, t_blob, t_mediumblob, t_char,
t_text, t_mediumtext, t_enum, t_set, t_json)
+ VALUES
+ (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """, qualifiedTableName);
+ }
+
+ @Override
+ public String getDatabaseType() {
+ return "MySQL";
+ }
+}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/OpenGaussIntPkLargeOrderSQLBuilder.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/OpenGaussIntPkLargeOrderSQLBuilder.java
new file mode 100644
index 00000000000..dd73e553668
--- /dev/null
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/OpenGaussIntPkLargeOrderSQLBuilder.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder;
+
+public final class OpenGaussIntPkLargeOrderSQLBuilder implements
IntPkLargeOrderSQLBuilder {
+
+ @Override
+ public String buildCreateTableSQL(final String qualifiedTableName) {
+ return String.format("""
+ create table %s (
+ order_id bigint,
+ user_id integer,
+ status character varying(50),
+ c_int integer,
+ c_smallint smallint,
+ c_float real,
+ c_double double precision,
+ c_numeric numeric(10,2),
+ c_boolean boolean,
+ c_char character(32),
+ c_text text,
+ c_bytea bytea,
+ c_raw bytea,
+ c_date date,
+ c_time time without time zone,
+ c_smalldatetime smalldatetime,
+ c_timestamp timestamp without time zone,
+ c_timestamptz timestamp with time zone,
+ c_interval interval,
+ c_array integer[],
+ c_json json,
+ c_jsonb jsonb,
+ c_uuid uuid,
+ c_hash32 hash32,
+ c_tsvector tsvector,
+ c_tsquery tsquery,
+ c_bit bit(4),
+ c_int4range int4range,
+ c_daterange daterange,
+ c_tsrange tsrange,
+ c_reltime reltime,
+ c_abstime abstime,
+ c_point point,
+ c_lseg lseg,
+ c_box box,
+ c_circle circle,
+ c_bitvarying bit varying(32),
+ c_cidr cidr,
+ c_inet inet,
+ c_macaddr macaddr,
+ c_hll hll(14,10,12,0),
+ c_money money,
+ PRIMARY KEY ( order_id )
+ )
+ """, qualifiedTableName);
+ }
+
+ @Override
+ public String buildPreparedInsertSQL(final String qualifiedTableName) {
+ return String.format("""
+ INSERT INTO %s (
+ order_id, user_id, status, c_int, c_smallint, c_float,
c_double, c_numeric, c_boolean, c_char, c_text, c_bytea, c_raw, c_date, c_time,
+ c_smalldatetime, c_timestamp, c_timestamptz, c_interval,
c_array, c_json, c_jsonb, c_uuid, c_hash32, c_tsvector, c_tsquery, c_bit,
+ c_int4range, c_daterange, c_tsrange, c_reltime, c_abstime,
c_point, c_lseg, c_box, c_circle, c_bitvarying, c_cidr, c_inet, c_macaddr,
c_hll, c_money
+ ) VALUES ( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """, qualifiedTableName);
+ }
+
+ @Override
+ public String getDatabaseType() {
+ return "openGauss";
+ }
+}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/PostgreSQLIntPkLargeOrderSQLBuilder.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/PostgreSQLIntPkLargeOrderSQLBuilder.java
new file mode 100644
index 00000000000..0a0546e3c10
--- /dev/null
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/large/sqlbuilder/PostgreSQLIntPkLargeOrderSQLBuilder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package
org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder;
+
+public final class PostgreSQLIntPkLargeOrderSQLBuilder implements
IntPkLargeOrderSQLBuilder {
+
+ @Override
+ public String buildCreateTableSQL(final String qualifiedTableName) {
+ return String.format("""
+ CREATE TABLE %s (
+ order_id int8 NOT NULL,
+ user_id int4 NOT NULL,
+ status varchar ( 50 ) NULL,
+ t_int2 int2 NULL,
+ t_numeric numeric(10,2) NULL,
+ t_bool boolean NULL,
+ t_bytea bytea NULL,
+ t_char char(10) NULL,
+ t_varchar varchar(128) NULL,
+ t_float float4 NULL,
+ t_double float8 NULL,
+ t_json json NULL,
+ t_jsonb jsonb NULL,
+ t_text TEXT NULL,
+ t_date date NULL,
+ t_time TIME NULL,
+ t_timestamp timestamp NULL,
+ t_timestamptz timestamptz NULL,
+ PRIMARY KEY ( order_id )
+ )
+ """, qualifiedTableName);
+ }
+
+ @Override
+ public String buildPreparedInsertSQL(final String qualifiedTableName) {
+ return String.format("""
+ INSERT INTO %s
+ (order_id, user_id, status, t_int2, t_numeric, t_bool,
t_bytea, t_char, t_varchar,
+ t_float, t_double, t_json, t_jsonb, t_text, t_date, t_time,
t_timestamp, t_timestamptz)
+ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+ """, qualifiedTableName);
+ }
+
+ @Override
+ public String getDatabaseType() {
+ return "PostgreSQL";
+ }
+}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/small/StringPkSmallOrderDAO.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/small/StringPkSmallOrderDAO.java
index 0e355f0f927..281c0780b2c 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/small/StringPkSmallOrderDAO.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/dao/order/small/StringPkSmallOrderDAO.java
@@ -37,15 +37,12 @@ public final class StringPkSmallOrderDAO {
private final DataSource dataSource;
- private final DatabaseType databaseType;
-
private final StringPkSmallOrderSQLBuilder sqlBuilder;
private final String tableName;
public StringPkSmallOrderDAO(final DataSource dataSource, final
DatabaseType databaseType, final String tableName) {
this.dataSource = dataSource;
- this.databaseType = databaseType;
this.sqlBuilder =
DatabaseTypedSPILoader.getService(StringPkSmallOrderSQLBuilder.class,
databaseType);
this.tableName = tableName;
}
@@ -64,11 +61,11 @@ public final class StringPkSmallOrderDAO {
/**
* Batch insert orders.
*
- * @param insertRows insert rows
+ * @param recordCount record count
* @throws SQLException SQL exception
*/
- public void batchInsert(final int insertRows) throws SQLException {
- List<Object[]> paramsList =
PipelineCaseHelper.generateSmallOrderInsertData(new UUIDKeyGenerateAlgorithm(),
insertRows);
+ public void batchInsert(final int recordCount) throws SQLException {
+ List<Object[]> paramsList =
PipelineCaseHelper.generateSmallOrderInsertData(new UUIDKeyGenerateAlgorithm(),
recordCount);
String sql = sqlBuilder.buildPreparedInsertSQL(tableName);
log.info("Batch insert string pk small order SQL: {}, params list
size: {}", sql, paramsList.size());
DataSourceExecuteUtils.execute(dataSource, sql, paramsList);
diff --git
a/test/e2e/operation/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder.IntPkLargeOrderSQLBuilder
b/test/e2e/operation/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder.IntPkLargeOrderSQLBuilder
new file mode 100644
index 00000000000..cf9689e0fb8
--- /dev/null
+++
b/test/e2e/operation/pipeline/src/test/resources/META-INF/services/org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder.IntPkLargeOrderSQLBuilder
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder.MySQLIntPkLargeOrderSQLBuilder
+org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder.PostgreSQLIntPkLargeOrderSQLBuilder
+org.apache.shardingsphere.test.e2e.operation.pipeline.dao.order.large.sqlbuilder.OpenGaussIntPkLargeOrderSQLBuilder