This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new e6d9d73f4da Extract PipelineE2EDistSQLFacade (#37753)
e6d9d73f4da is described below
commit e6d9d73f4da72ac0a07b878fa8697c21459d87ee
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Fri Jan 16 15:59:27 2026 +0800
Extract PipelineE2EDistSQLFacade (#37753)
---
.../pipeline/cases/PipelineContainerComposer.java | 35 +++--
.../e2e/operation/pipeline/cases/cdc/CDCE2EIT.java | 26 ++--
.../cases/migration/AbstractMigrationE2EIT.java | 34 +----
.../general/MySQLMigrationGeneralE2EIT.java | 21 +--
.../general/MySQLTimeTypesMigrationE2EIT.java | 6 +-
.../general/PostgreSQLMigrationGeneralE2EIT.java | 31 +++--
.../general/PostgreSQLToMySQLMigrationE2EIT.java | 12 +-
.../migration/general/RollbackMigrationE2EIT.java | 8 +-
.../migration/general/RulesMigrationE2EIT.java | 8 +-
.../primarykey/IndexesMigrationE2EIT.java | 12 +-
.../primarykey/MariaDBMigrationE2EIT.java | 12 +-
.../primarykey/TextPrimaryKeyMigrationE2EIT.java | 12 +-
.../pipeline/command/MigrationDistSQLCommand.java | 4 -
.../pipeline/util/PipelineE2EDistSQLFacade.java | 145 +++++++++++++++++++++
.../resources/env/common/migration-command.xml | 16 ---
15 files changed, 248 insertions(+), 134 deletions(-)
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
index 440ca60a805..1ebcf787b41 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
@@ -118,6 +118,8 @@ public final class PipelineContainerComposer implements
AutoCloseable {
private final DataSource proxyDataSource;
+ private final PipelineJobType<?> jobType;
+
private Thread increaseTaskThread;
public PipelineContainerComposer(final PipelineTestParameter testParam,
final PipelineJobType<?> jobType) {
@@ -138,6 +140,7 @@ public final class PipelineContainerComposer implements
AutoCloseable {
sourceDataSource =
StorageContainerUtils.generateDataSource(getActualJdbcUrlTemplate(DS_0, false),
username, password, 2);
proxyDataSource = StorageContainerUtils.generateDataSource(
appendExtraParameter(containerComposer.getProxyJdbcUrl(PROXY_DATABASE)),
ProxyContainerConstants.USER, ProxyContainerConstants.PASSWORD, 2);
+ this.jobType = jobType;
init(jobType);
}
@@ -205,7 +208,7 @@ public final class PipelineContainerComposer implements
AutoCloseable {
} catch (final SQLException ex) {
log.warn("Drop proxy database failed, error={}", ex.getMessage());
}
- Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() -> true);
+ sleepSeconds(2);
}
private void createProxyDatabase(final Connection connection) throws
SQLException {
@@ -213,7 +216,7 @@ public final class PipelineContainerComposer implements
AutoCloseable {
log.info("Create proxy database {}", PROXY_DATABASE);
try (Statement statement = connection.createStatement()) {
statement.execute(sql);
- Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() ->
true);
+ sleepSeconds(2);
}
}
@@ -227,6 +230,18 @@ public final class PipelineContainerComposer implements
AutoCloseable {
}
}
+ /**
+ * Sleep seconds.
+ *
+ * @param seconds seconds
+ */
+ public void sleepSeconds(final int seconds) {
+ if (seconds <= 0) {
+ return;
+ }
+ Awaitility.await().pollDelay(seconds, TimeUnit.SECONDS).until(() ->
true);
+ }
+
/**
* Append extra parameter.
*
@@ -316,19 +331,17 @@ public final class PipelineContainerComposer implements
AutoCloseable {
* Create schema.
*
* @param connection connection
- * @param sleepSeconds sleep seconds
+ * @param seconds sleep seconds
* @throws SQLException SQL exception
*/
- public void createSchema(final Connection connection, final int
sleepSeconds) throws SQLException {
+ public void createSchema(final Connection connection, final int seconds)
throws SQLException {
if (!new
DatabaseTypeRegistry(databaseType).getDialectDatabaseMetaData().getSchemaOption().isSchemaAvailable())
{
return;
}
try (Statement statement = connection.createStatement()) {
statement.execute(String.format("CREATE SCHEMA %s", SCHEMA_NAME));
}
- if (sleepSeconds > 0) {
- Awaitility.await().pollDelay(sleepSeconds,
TimeUnit.SECONDS).until(() -> true);
- }
+ sleepSeconds(seconds);
}
/**
@@ -432,7 +445,7 @@ public final class PipelineContainerComposer implements
AutoCloseable {
List<Map<String, Object>> jobStatus = queryForListWithLog(distSQL);
Set<String> statusSet = jobStatus.stream().map(each ->
String.valueOf(each.get("status"))).collect(Collectors.toSet());
if (statusSet.contains(JobStatus.PREPARING.name()) ||
statusSet.contains(JobStatus.RUNNING.name())) {
- Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() ->
true);
+ sleepSeconds(2);
continue;
}
break;
@@ -455,7 +468,7 @@ public final class PipelineContainerComposer implements
AutoCloseable {
if (statusSet.stream().allMatch(each ->
each.equals(jobStatus.name()))) {
return;
}
- Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() ->
true);
+ sleepSeconds(2);
}
throw new IllegalStateException("Job status not reached: " +
jobStatus);
}
@@ -542,7 +555,7 @@ public final class PipelineContainerComposer implements
AutoCloseable {
incrementalIdleSecondsList.add(Strings.isNullOrEmpty(incrementalIdleSeconds) ?
0 : Integer.parseInt(incrementalIdleSeconds));
}
if (Collections.min(incrementalIdleSecondsList) <= 5) {
- Awaitility.await().pollDelay(3L, TimeUnit.SECONDS).until(() ->
true);
+ sleepSeconds(3);
continue;
}
if (actualStatus.size() == 1 &&
actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
@@ -583,7 +596,7 @@ public final class PipelineContainerComposer implements
AutoCloseable {
if (recordExist) {
break;
}
- Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() ->
true);
+ sleepSeconds(2);
}
assertTrue(recordExist, "Order record does not exist");
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
index a6b39b1ee03..d5d6c94a85e 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
@@ -52,6 +52,7 @@ import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceExecuteUtils;
+import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.api.extension.ExtensionContext;
@@ -105,7 +106,8 @@ class CDCE2EIT {
containerComposer.registerStorageUnit(each);
}
createOrderTableRule(containerComposer);
- createBroadcastRule(containerComposer);
+ PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer);
+ distSQLFacade.createBroadcastRule("t_address");
try (Connection connection =
containerComposer.getProxyDataSource().getConnection()) {
initSchemaAndTable(containerComposer, connection, 3);
}
@@ -123,8 +125,8 @@ class CDCE2EIT {
}
PipelineDataSource targetDataSource =
createStandardDataSource(containerComposer, PipelineContainerComposer.DS_4);
final CDCClient cdcClient =
buildCDCClientAndStart(targetDataSource, containerComposer);
- Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW
STREAMING LIST").isEmpty());
- String jobId = containerComposer.queryForListWithLog("SHOW
STREAMING LIST").get(0).get("id").toString();
+ Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
+ String jobId = distSQLFacade.listJobIds().get(0);
containerComposer.waitIncrementTaskFinished(String.format("SHOW
STREAMING STATUS '%s'", jobId));
DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
String tableName =
dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable() ?
String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME) :
SOURCE_TABLE_NAME;
@@ -142,10 +144,10 @@ class CDCE2EIT {
assertDataMatched(sourceDataSource, targetDataSource, new
QualifiedTable(null, "t_address"));
assertDataMatched(sourceDataSource, targetDataSource, new
QualifiedTable(null, "t_single"));
cdcClient.close();
- Awaitility.await().atMost(10L,
TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS).until(() ->
containerComposer.queryForListWithLog("SHOW STREAMING LIST")
- .stream().noneMatch(each ->
Boolean.parseBoolean(each.get("active").toString())));
+ Awaitility.await().atMost(10L,
TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS)
+ .until(() ->
distSQLFacade.listJobs().stream().noneMatch(each ->
Boolean.parseBoolean(each.get("active").toString())));
containerComposer.proxyExecuteWithLog(String.format("DROP
STREAMING '%s'", jobId), 0);
- assertTrue(containerComposer.queryForListWithLog("SHOW STREAMING
LIST").isEmpty());
+ assertTrue(distSQLFacade.listJobs().isEmpty());
}
}
@@ -154,20 +156,14 @@ class CDCE2EIT {
Awaitility.await().atMost(20L, TimeUnit.SECONDS).pollInterval(2L,
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW
SHARDING TABLE RULE t_order").isEmpty());
}
- private void createBroadcastRule(final PipelineContainerComposer
containerComposer) throws SQLException {
- containerComposer.proxyExecuteWithLog("CREATE BROADCAST TABLE RULE
t_address", 2);
- }
-
- private void initSchemaAndTable(final PipelineContainerComposer
containerComposer, final Connection connection, final int sleepSeconds) throws
SQLException {
- containerComposer.createSchema(connection, sleepSeconds);
+ private void initSchemaAndTable(final PipelineContainerComposer
containerComposer, final Connection connection, final int seconds) throws
SQLException {
+ containerComposer.createSchema(connection, seconds);
String sql =
containerComposer.getExtraSQLCommand().getCreateTableOrder(SOURCE_TABLE_NAME);
log.info("Create table sql: {}", sql);
connection.createStatement().execute(sql);
connection.createStatement().execute("CREATE TABLE t_address(id
integer primary key, address_name varchar(255))");
connection.createStatement().execute("CREATE TABLE t_single(id integer
primary key)");
- if (sleepSeconds > 0) {
- Awaitility.await().pollDelay(sleepSeconds,
TimeUnit.SECONDS).until(() -> true);
- }
+ containerComposer.sleepSeconds(seconds);
}
private PipelineDataSource createStandardDataSource(final
PipelineContainerComposer containerComposer, final String storageUnitName) {
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
index 537883f5d52..9d62d2ff4fd 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/AbstractMigrationE2EIT.java
@@ -98,10 +98,6 @@ public abstract class AbstractMigrationE2EIT {
}
}
- protected void loadAllSingleTables(final PipelineContainerComposer
containerComposer) throws SQLException {
- containerComposer.proxyExecuteWithLog("LOAD SINGLE TABLE *.*", 5);
- }
-
protected void createTargetOrderTableRule(final PipelineContainerComposer
containerComposer) throws SQLException {
containerComposer.proxyExecuteWithLog(migrationDistSQL.getCreateTargetOrderTableRule(),
0);
Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !containerComposer.queryForListWithLog("SHOW
SHARDING TABLE RULE t_order").isEmpty());
@@ -124,32 +120,6 @@ public abstract class AbstractMigrationE2EIT {
containerComposer.proxyExecuteWithLog(migrationDistSQL.getMigrationSingleTableWithSchema(sourceTableName,
targetTableName), 5);
}
- protected void addMigrationProcessConfig(final PipelineContainerComposer
containerComposer) throws SQLException {
-
containerComposer.proxyExecuteWithLog(migrationDistSQL.getAlterMigrationRule(),
0);
- }
-
- protected void stopMigrationByJobId(final PipelineContainerComposer
containerComposer, final String jobId) throws SQLException {
- containerComposer.proxyExecuteWithLog(String.format("STOP MIGRATION
'%s'", jobId), 1);
- }
-
- protected void startMigrationByJobId(final PipelineContainerComposer
containerComposer, final String jobId) throws SQLException {
- containerComposer.proxyExecuteWithLog(String.format("START MIGRATION
'%s'", jobId), 4);
- }
-
- protected void commitMigrationByJobId(final PipelineContainerComposer
containerComposer, final String jobId) throws SQLException {
- containerComposer.proxyExecuteWithLog(String.format("COMMIT MIGRATION
'%s'", jobId), 1);
- }
-
- protected List<String> listJobId(final PipelineContainerComposer
containerComposer) {
- List<Map<String, Object>> jobList =
containerComposer.queryForListWithLog("SHOW MIGRATION LIST");
- return jobList.stream().map(a ->
a.get("id").toString()).collect(Collectors.toList());
- }
-
- protected String getJobIdByTableName(final PipelineContainerComposer
containerComposer, final String tableName) {
- List<Map<String, Object>> jobList =
containerComposer.queryForListWithLog("SHOW MIGRATION LIST");
- return jobList.stream().filter(a ->
a.get("tables").toString().equals(tableName)).findFirst().orElseThrow(() -> new
RuntimeException("not find " + tableName + " table")).get("id").toString();
- }
-
protected void assertCheckMigrationSuccess(final PipelineContainerComposer
containerComposer, final String jobId, final String algorithmType) throws
SQLException {
assertCheckMigrationSuccess(containerComposer, jobId, algorithmType,
new Properties());
}
@@ -161,7 +131,7 @@ public abstract class AbstractMigrationE2EIT {
for (int i = 0; i < 30; i++) {
resultList =
containerComposer.queryForListWithLog(String.format("SHOW MIGRATION CHECK
STATUS '%s'", jobId));
if (resultList.isEmpty()) {
- Awaitility.await().pollDelay(3L, TimeUnit.SECONDS).until(() ->
true);
+ containerComposer.sleepSeconds(3);
continue;
}
List<String> checkEndTimeList = resultList.stream().map(map ->
map.get("check_end_time").toString()).filter(each ->
!Strings.isNullOrEmpty(each)).collect(Collectors.toList());
@@ -169,7 +139,7 @@ public abstract class AbstractMigrationE2EIT {
if (checkEndTimeList.size() == resultList.size() && 1 ==
finishedPercentages.size() && finishedPercentages.contains("100")) {
break;
} else {
- Awaitility.await().pollDelay(1L, TimeUnit.SECONDS).until(() ->
true);
+ containerComposer.sleepSeconds(1);
}
}
log.info("check job results: {}", resultList);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 8c7bad4323b..8650306176c 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -32,7 +32,7 @@ import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceExecuteUtils;
-import org.awaitility.Awaitility;
+import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
@@ -63,7 +63,8 @@ class MySQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
@ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
void assertMigrationSuccess(final PipelineTestParameter testParam) throws
SQLException, InterruptedException {
try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
- addMigrationProcessConfig(containerComposer);
+ PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer);
+ distSQLFacade.alterPipelineRule();
containerComposer.createSourceOrderTable(SOURCE_TABLE_NAME);
containerComposer.createSourceOrderItemTable();
addMigrationSourceResource(containerComposer);
@@ -78,27 +79,27 @@ class MySQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
log.info("init data end: {}", LocalDateTime.now());
startMigration(containerComposer, SOURCE_TABLE_NAME,
TARGET_TABLE_NAME);
startMigration(containerComposer, "t_order_item", "t_order_item");
- String orderJobId = getJobIdByTableName(containerComposer, "ds_0."
+ SOURCE_TABLE_NAME);
+ String orderJobId = distSQLFacade.getJobIdByTableName("ds_0." +
SOURCE_TABLE_NAME);
containerComposer.waitJobPrepareSuccess(String.format("SHOW
MIGRATION STATUS '%s'", orderJobId));
containerComposer.startIncrementTask(
new
E2EIncrementalTask(containerComposer.getSourceDataSource(), SOURCE_TABLE_NAME,
new SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 30));
TimeUnit.SECONDS.timedJoin(containerComposer.getIncreaseTaskThread(), 30L);
containerComposer.sourceExecuteWithLog(String.format("INSERT INTO
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", SOURCE_TABLE_NAME));
containerComposer.sourceExecuteWithLog("INSERT INTO t_order_item
(item_id, order_id, user_id, status) VALUES (10000, 10000, 1, 'OK')");
- stopMigrationByJobId(containerComposer, orderJobId);
- startMigrationByJobId(containerComposer, orderJobId);
+ distSQLFacade.pauseJob(orderJobId);
+ distSQLFacade.resumeJob(orderJobId);
DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
containerComposer.assertOrderRecordExist(jdbcDataSource,
"t_order", 10000);
containerComposer.assertOrderRecordExist(jdbcDataSource,
"t_order_item", 10000);
assertMigrationSuccessById(containerComposer, orderJobId,
"DATA_MATCH", convertToProperties(ImmutableMap.of("chunk-size", "300",
"streaming-range-type", "SMALL")));
- String orderItemJobId = getJobIdByTableName(containerComposer,
"ds_0.t_order_item");
+ String orderItemJobId =
distSQLFacade.getJobIdByTableName("ds_0.t_order_item");
assertMigrationSuccessById(containerComposer, orderItemJobId,
"DATA_MATCH", convertToProperties(ImmutableMap.of("chunk-size", "300",
"streaming-range-type", "LARGE")));
- Awaitility.await().pollDelay(2L, TimeUnit.SECONDS).until(() ->
true);
+ containerComposer.sleepSeconds(2);
assertMigrationSuccessById(containerComposer, orderItemJobId,
"CRC32_MATCH", new Properties());
- for (String each : listJobId(containerComposer)) {
- commitMigrationByJobId(containerComposer, each);
+ for (String each : distSQLFacade.listJobIds()) {
+ distSQLFacade.commit(each);
}
- assertTrue(listJobId(containerComposer).isEmpty());
+ assertTrue(distSQLFacade.listJobIds().isEmpty());
containerComposer.assertGreaterThanOrderTableInitRows(jdbcDataSource,
PipelineContainerComposer.TABLE_INIT_ROW_COUNT, "");
}
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
index df9c720d60a..ed5812ff1cd 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
@@ -24,6 +24,7 @@ import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
+import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
@@ -52,11 +53,12 @@ class MySQLTimeTypesMigrationE2EIT extends
AbstractMigrationE2EIT {
addMigrationSourceResource(containerComposer);
addMigrationTargetResource(containerComposer);
startMigration(containerComposer, "time_e2e", "time_e2e");
- String jobId = listJobId(containerComposer).get(0);
+ PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer);
+ String jobId = distSQLFacade.listJobIds().get(0);
containerComposer.waitJobPrepareSuccess(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
insertOneRecordWithZeroValue(containerComposer, 2);
containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
- loadAllSingleTables(containerComposer);
+ distSQLFacade.loadAllSingleTables();
assertCheckMigrationSuccess(containerComposer, jobId,
"DATA_MATCH");
}
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index d816381c8f5..0fbfdf2d732 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -32,6 +32,7 @@ import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.DataSourceExecuteUtils;
+import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.api.extension.ExtensionContext;
@@ -64,7 +65,8 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
@ArgumentsSource(PipelineE2ETestCaseArgumentsProvider.class)
void assertMigrationSuccess(final PipelineTestParameter testParam) throws
SQLException, InterruptedException {
try (PipelineContainerComposer containerComposer = new
PipelineContainerComposer(testParam, new MigrationJobType())) {
- addMigrationProcessConfig(containerComposer);
+ PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer);
+ distSQLFacade.alterPipelineRule();
createSourceSchema(containerComposer,
PipelineContainerComposer.SCHEMA_NAME);
containerComposer.createSourceOrderTable(SOURCE_TABLE_NAME);
containerComposer.createSourceOrderItemTable();
@@ -81,8 +83,8 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
int replicationSlotsCount =
getReplicationSlotsCount(containerComposer);
log.info("init data end: {}, replication slots count: {}",
LocalDateTime.now(), replicationSlotsCount);
startMigrationWithSchema(containerComposer, SOURCE_TABLE_NAME,
"t_order");
- Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !listJobId(containerComposer).isEmpty());
- String jobId = getJobIdByTableName(containerComposer, "ds_0.test."
+ SOURCE_TABLE_NAME);
+ Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
+ String jobId = distSQLFacade.getJobIdByTableName("ds_0.test." +
SOURCE_TABLE_NAME);
containerComposer.waitJobPrepareSuccess(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
String qualifiedTableName = String.join(".",
PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME);
containerComposer.startIncrementTask(new
E2EIncrementalTask(containerComposer.getSourceDataSource(), qualifiedTableName,
new SnowflakeKeyGenerateAlgorithm(),
@@ -91,29 +93,30 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
containerComposer.sourceExecuteWithLog(String.format("INSERT INTO
%s (order_id, user_id, status) VALUES (10000, 1, 'OK')", qualifiedTableName));
DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
containerComposer.assertOrderRecordExist(jdbcDataSource,
qualifiedTableName, 10000);
- checkOrderMigration(containerComposer, jobId);
+ checkOrderMigration(distSQLFacade, jobId);
startMigrationWithSchema(containerComposer, "t_order_item",
"t_order_item");
- checkOrderItemMigration(containerComposer);
- for (String each : listJobId(containerComposer)) {
- commitMigrationByJobId(containerComposer, each);
+ checkOrderItemMigration(distSQLFacade);
+ for (String each : distSQLFacade.listJobIds()) {
+ distSQLFacade.commit(each);
}
- List<String> lastJobIds = listJobId(containerComposer);
- assertTrue(lastJobIds.isEmpty());
+ assertTrue(distSQLFacade.listJobIds().isEmpty());
containerComposer.assertGreaterThanOrderTableInitRows(jdbcDataSource,
PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1,
PipelineContainerComposer.SCHEMA_NAME);
assertThat("Replication slots count doesn't match, it might be not
cleaned, run `SELECT * FROM pg_replication_slots;` in PostgreSQL to verify",
getReplicationSlotsCount(containerComposer),
is(replicationSlotsCount));
}
}
- private void checkOrderMigration(final PipelineContainerComposer
containerComposer, final String jobId) throws SQLException {
+ private void checkOrderMigration(final PipelineE2EDistSQLFacade
distSQLFacade, final String jobId) throws SQLException {
+ PipelineContainerComposer containerComposer =
distSQLFacade.getContainerComposer();
containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
- stopMigrationByJobId(containerComposer, jobId);
- startMigrationByJobId(containerComposer, jobId);
+ distSQLFacade.pauseJob(jobId);
+ distSQLFacade.resumeJob(jobId);
assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
}
- private void checkOrderItemMigration(final PipelineContainerComposer
containerComposer) throws SQLException {
- String jobId = getJobIdByTableName(containerComposer,
"ds_0.test.t_order_item");
+ private void checkOrderItemMigration(final PipelineE2EDistSQLFacade
distSQLFacade) throws SQLException {
+ String jobId =
distSQLFacade.getJobIdByTableName("ds_0.test.t_order_item");
+ PipelineContainerComposer containerComposer =
distSQLFacade.getContainerComposer();
containerComposer.waitJobStatusReached(String.format("SHOW MIGRATION
STATUS '%s'", jobId), JobStatus.EXECUTE_INCREMENTAL_TASK, 15);
assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
index e0c3f9967d7..a41d0137748 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
@@ -31,6 +31,7 @@ import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
+import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.jupiter.api.condition.EnabledIf;
@@ -48,7 +49,6 @@ import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
-import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -77,8 +77,9 @@ class PostgreSQLToMySQLMigrationE2EIT extends
AbstractMigrationE2EIT {
+ "KEY_GENERATE_STRATEGY(COLUMN=order_id,
TYPE(NAME='snowflake')))", 2);
initTargetTable(containerComposer);
containerComposer.proxyExecuteWithLog("MIGRATE TABLE
source_ds.t_order INTO t_order", 2);
- Awaitility.await().ignoreExceptions().atMost(10L,
TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() ->
!listJobId(containerComposer).isEmpty());
- String jobId = listJobId(containerComposer).get(0);
+ PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer);
+ Awaitility.await().ignoreExceptions().atMost(10L,
TimeUnit.SECONDS).pollInterval(1L, TimeUnit.SECONDS).until(() ->
!distSQLFacade.listJobIds().isEmpty());
+ String jobId = distSQLFacade.listJobIds().get(0);
containerComposer.waitJobStatusReached(String.format("SHOW
MIGRATION STATUS %s", jobId), JobStatus.EXECUTE_INCREMENTAL_TASK, 15);
try (Connection connection = DriverManager.getConnection(jdbcUrl,
"postgres", "postgres")) {
connection.createStatement().execute(String.format("INSERT
INTO t_order (order_id,user_id,status) VALUES (%s, %s, '%s')", "1000000000", 1,
"incremental"));
@@ -86,9 +87,8 @@ class PostgreSQLToMySQLMigrationE2EIT extends
AbstractMigrationE2EIT {
}
containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
assertCheckMigrationSuccess(containerComposer, jobId,
"DATA_MATCH");
- commitMigrationByJobId(containerComposer, jobId);
- List<String> lastJobIds = listJobId(containerComposer);
- assertTrue(lastJobIds.isEmpty());
+ distSQLFacade.commit(jobId);
+ assertTrue(distSQLFacade.listJobIds().isEmpty());
} finally {
if (null != postgresqlContainer) {
postgresqlContainer.close();
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
index 9449d394237..ec1c742467d 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RollbackMigrationE2EIT.java
@@ -26,6 +26,7 @@ import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
+import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
@@ -51,9 +52,10 @@ class RollbackMigrationE2EIT extends AbstractMigrationE2EIT {
addMigrationSourceResource(containerComposer);
addMigrationTargetResource(containerComposer);
startMigration(containerComposer, "t_order", "t_order");
- String jobId = listJobId(containerComposer).get(0);
- containerComposer.proxyExecuteWithLog(String.format("ROLLBACK
MIGRATION %s", jobId), 2);
- assertTrue(listJobId(containerComposer).isEmpty());
+ PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer);
+ String jobId = distSQLFacade.listJobIds().get(0);
+ distSQLFacade.rollback(jobId);
+ assertTrue(distSQLFacade.listJobIds().isEmpty());
}
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index 591d7f5323f..4486edb8899 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -27,6 +27,7 @@ import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
+import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
@@ -85,12 +86,13 @@ class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
addRuleFn.call();
}
startMigration(containerComposer, SOURCE_TABLE_NAME,
TARGET_TABLE_NAME);
- String jobId = listJobId(containerComposer).get(0);
+ PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer);
+ String jobId = distSQLFacade.listJobIds().get(0);
containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION
STATUS '%s'", jobId));
containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
- loadAllSingleTables(containerComposer);
+ distSQLFacade.loadAllSingleTables();
assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
- commitMigrationByJobId(containerComposer, jobId);
+ distSQLFacade.commit(jobId);
assertThat(containerComposer.getTargetTableRecordsCount(containerComposer.getProxyDataSource(),
SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT));
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index f53d2633a68..ef8d02a8d7e 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -35,6 +35,7 @@ import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.AutoIncrementKeyGenerateAlgorithm;
+import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
@@ -44,7 +45,6 @@ import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
-import java.util.List;
import java.util.function.Consumer;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -235,12 +235,13 @@ class IndexesMigrationE2EIT extends
AbstractMigrationE2EIT {
try (Connection connection =
containerComposer.getSourceDataSource().getConnection()) {
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection,
keyGenerateAlgorithm, SOURCE_TABLE_NAME,
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
}
- addMigrationProcessConfig(containerComposer);
+ PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer);
+ distSQLFacade.alterPipelineRule();
addMigrationSourceResource(containerComposer);
addMigrationTargetResource(containerComposer);
containerComposer.proxyExecuteWithLog(String.format(ORDER_TABLE_SHARDING_RULE_FORMAT,
shardingColumn), 2);
startMigration(containerComposer, SOURCE_TABLE_NAME,
TARGET_TABLE_NAME);
- String jobId = listJobId(containerComposer).get(0);
+ String jobId = distSQLFacade.listJobIds().get(0);
containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION
STATUS '%s'", jobId));
DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
incrementalTaskFn.accept(jdbcDataSource);
@@ -248,10 +249,9 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT
{
if (null != consistencyCheckAlgorithmType) {
assertCheckMigrationSuccess(containerComposer, jobId,
consistencyCheckAlgorithmType);
}
- commitMigrationByJobId(containerComposer, jobId);
+ distSQLFacade.commit(jobId);
assertThat(containerComposer.getTargetTableRecordsCount(jdbcDataSource,
SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
- List<String> lastJobIds = listJobId(containerComposer);
- assertTrue(lastJobIds.isEmpty());
+ assertTrue(distSQLFacade.listJobIds().isEmpty());
}
private static boolean isEnabled(final ExtensionContext context) {
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index 41e81f4b0ab..f3cf694686e 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -28,6 +28,7 @@ import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
+import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.api.extension.ExtensionContext;
@@ -37,7 +38,6 @@ import org.junit.jupiter.params.provider.ArgumentsSource;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.List;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
@@ -63,22 +63,22 @@ class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
KeyGenerateAlgorithm generateAlgorithm = new
UUIDKeyGenerateAlgorithm();
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection,
generateAlgorithm, SOURCE_TABLE_NAME,
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
}
- addMigrationProcessConfig(containerComposer);
+ PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer);
+ distSQLFacade.alterPipelineRule();
addMigrationSourceResource(containerComposer);
addMigrationTargetResource(containerComposer);
createTargetOrderTableRule(containerComposer);
startMigration(containerComposer, SOURCE_TABLE_NAME,
TARGET_TABLE_NAME);
- String jobId = listJobId(containerComposer).get(0);
+ String jobId = distSQLFacade.listJobIds().get(0);
containerComposer.waitJobPrepareSuccess(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
containerComposer.sourceExecuteWithLog("INSERT INTO t_order
(order_id, user_id, status) VALUES ('a1', 1, 'OK')");
DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
containerComposer.assertOrderRecordExist(jdbcDataSource,
"t_order", "a1");
containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
assertCheckMigrationSuccess(containerComposer, jobId,
"CRC32_MATCH");
- commitMigrationByJobId(containerComposer, jobId);
+ distSQLFacade.commit(jobId);
assertThat(containerComposer.getTargetTableRecordsCount(jdbcDataSource,
SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
- List<String> lastJobIds = listJobId(containerComposer);
- assertTrue(lastJobIds.isEmpty());
+ assertTrue(distSQLFacade.listJobIds().isEmpty());
}
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
index d48b91ddfe5..311cf6fa05e 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
@@ -28,6 +28,7 @@ import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.Pip
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ESettings.PipelineE2EDatabaseSettings;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineE2ETestCaseArgumentsProvider;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.framework.param.PipelineTestParameter;
+import
org.apache.shardingsphere.test.e2e.operation.pipeline.util.PipelineE2EDistSQLFacade;
import org.junit.jupiter.api.condition.EnabledIf;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
@@ -35,7 +36,6 @@ import org.junit.jupiter.params.provider.ArgumentsSource;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.List;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -57,19 +57,19 @@ class TextPrimaryKeyMigrationE2EIT extends
AbstractMigrationE2EIT {
UUIDKeyGenerateAlgorithm keyGenerateAlgorithm = new
UUIDKeyGenerateAlgorithm();
PipelineCaseHelper.batchInsertOrderRecordsWithGeneralColumns(connection,
keyGenerateAlgorithm, getSourceTableName(containerComposer),
PipelineContainerComposer.TABLE_INIT_ROW_COUNT);
}
- addMigrationProcessConfig(containerComposer);
+ PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer);
+ distSQLFacade.alterPipelineRule();
addMigrationSourceResource(containerComposer);
addMigrationTargetResource(containerComposer);
createTargetOrderTableRule(containerComposer);
startMigration(containerComposer,
getSourceTableName(containerComposer), TARGET_TABLE_NAME);
- String jobId = listJobId(containerComposer).get(0);
+ String jobId = distSQLFacade.listJobIds().get(0);
containerComposer.sourceExecuteWithLog(
String.format("INSERT INTO %s (order_id,user_id,status)
VALUES (%s, %s, '%s')", getSourceTableName(containerComposer), "1000000000", 1,
"afterStop"));
containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
assertCheckMigrationSuccess(containerComposer, jobId,
"DATA_MATCH");
- commitMigrationByJobId(containerComposer, jobId);
- List<String> lastJobIds = listJobId(containerComposer);
- assertTrue(lastJobIds.isEmpty());
+ distSQLFacade.commit(jobId);
+ assertTrue(distSQLFacade.listJobIds().isEmpty());
}
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/command/MigrationDistSQLCommand.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/command/MigrationDistSQLCommand.java
index 8687b4c65e5..b8d094e5c79 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/command/MigrationDistSQLCommand.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/command/MigrationDistSQLCommand.java
@@ -30,10 +30,6 @@ import javax.xml.bind.annotation.XmlRootElement;
@Setter
public final class MigrationDistSQLCommand {
- @XmlElement(name = "alter-migration-rule")
- @Getter
- private String alterMigrationRule;
-
@XmlElement(name = "create-target-order-table-encrypt-rule")
@Getter
private String createTargetOrderTableEncryptRule;
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
new file mode 100644
index 00000000000..d1b1b882342
--- /dev/null
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shardingsphere.test.e2e.operation.pipeline.util;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
+import org.awaitility.Awaitility;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+@RequiredArgsConstructor
+@Getter
+public final class PipelineE2EDistSQLFacade {
+
+ private static final String PIPELINE_RULE_SQL_TEMPLATE = "ALTER %s RULE(\n"
+ + "READ(WORKER_THREAD=20, BATCH_SIZE=1000, SHARDING_SIZE=100000,
RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))),\n"
+ + "WRITE(WORKER_THREAD=20, BATCH_SIZE=1000, RATE_LIMITER
(TYPE(NAME='TPS',PROPERTIES('tps'='2000')))),\n"
+ + "STREAM_CHANNEL(TYPE(NAME='MEMORY',
PROPERTIES('block-queue-size'=1000))))";
+
+ private final String jobTypeName;
+
+ private final PipelineContainerComposer containerComposer;
+
+ public PipelineE2EDistSQLFacade(final PipelineContainerComposer
containerComposer) {
+ this(containerComposer.getJobType().getType(), containerComposer);
+ }
+
+ /**
+ * Load all single tables.
+ *
+ * @throws SQLException if there's DistSQL execution failure
+ */
+ public void loadAllSingleTables() throws SQLException {
+ containerComposer.proxyExecuteWithLog("LOAD SINGLE TABLE *.*", 5);
+ }
+
+ /**
+ * Create broadcast rule.
+ *
+ * @param tableName table name
+ * @throws SQLException if there's DistSQL execution failure
+ */
+ public void createBroadcastRule(final String tableName) throws
SQLException {
+ containerComposer.proxyExecuteWithLog(String.format("CREATE BROADCAST
TABLE RULE %s", tableName), 2);
+ }
+
+ /**
+ * Alter pipeline rule.
+ *
+ * @throws SQLException if there's DistSQL execution failure
+ */
+ public void alterPipelineRule() throws SQLException {
+
containerComposer.proxyExecuteWithLog(String.format(PIPELINE_RULE_SQL_TEMPLATE,
jobTypeName), 2);
+ }
+
+ /**
+ * List job ids.
+ *
+ * @return job ids
+ */
+ public List<String> listJobIds() {
+ return listJobs().stream().map(a ->
a.get("id").toString()).collect(Collectors.toList());
+ }
+
+ /**
+ * List jobs.
+ *
+ * @return jobs
+ */
+ public List<Map<String, Object>> listJobs() {
+ return containerComposer.queryForListWithLog(String.format("SHOW %s
LIST", jobTypeName));
+ }
+
+ /**
+ * Get job id by table name.
+ *
+ * @param tableName table name
+ * @return job id
+ */
+ public String getJobIdByTableName(final String tableName) {
+ return listJobs().stream().filter(a ->
a.get("tables").toString().equals(tableName)).findFirst()
+ .orElseThrow(() -> new RuntimeException("Could not find job by
table name: `" + tableName + "` table`")).get("id").toString();
+ }
+
+ /**
+ * Pause job.
+ *
+ * @param jobId job id
+ * @throws SQLException SQL exception
+ */
+ public void pauseJob(final String jobId) throws SQLException {
+ containerComposer.proxyExecuteWithLog(String.format("STOP %s %s",
jobTypeName, jobId), 1);
+ }
+
+ /**
+ * Resume job.
+ *
+ * @param jobId job id
+ * @throws SQLException SQL exception
+ */
+ public void resumeJob(final String jobId) throws SQLException {
+ containerComposer.proxyExecuteWithLog(String.format("START %s %s",
jobTypeName, jobId), 5);
+ }
+
+ /**
+ * Rollback job.
+ *
+ * @param jobId job id
+ * @throws SQLException SQL exception
+ */
+ public void rollback(final String jobId) throws SQLException {
+ containerComposer.proxyExecuteWithLog(String.format("ROLLBACK %s %s",
jobTypeName, jobId), 2);
+ }
+
+ /**
+ * Commit job.
+ *
+ * @param jobId job id
+ * @throws SQLException SQL exception
+ */
+ public void commit(final String jobId) throws SQLException {
+ containerComposer.proxyExecuteWithLog(String.format("COMMIT %s %s",
jobTypeName, jobId), 2);
+ Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() ->
!listJobIds().contains(jobId));
+ }
+}
diff --git
a/test/e2e/operation/pipeline/src/test/resources/env/common/migration-command.xml
b/test/e2e/operation/pipeline/src/test/resources/env/common/migration-command.xml
index 1659848eba4..ee965154163 100644
---
a/test/e2e/operation/pipeline/src/test/resources/env/common/migration-command.xml
+++
b/test/e2e/operation/pipeline/src/test/resources/env/common/migration-command.xml
@@ -16,22 +16,6 @@
-->
<command>
- <alter-migration-rule>
- ALTER MIGRATION RULE (
- READ(
- WORKER_THREAD=20,
- BATCH_SIZE=1000,
- SHARDING_SIZE=10000000,
- RATE_LIMITER (TYPE(NAME='QPS',PROPERTIES('qps'='500')))
- ),
- WRITE(
- WORKER_THREAD=20,
- BATCH_SIZE=1000,
- RATE_LIMITER (TYPE(NAME='TPS',PROPERTIES('tps'='2000')))
- ),
- STREAM_CHANNEL
(TYPE(NAME='MEMORY',PROPERTIES('block-queue-size'='2000')))
- );
- </alter-migration-rule>
<register-migration-source-storage-unit-template>
REGISTER MIGRATION SOURCE STORAGE UNIT ds_0 (