This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new de5eeded580 Extract drop and waitIncrementTaskFinished to
PipelineE2EDistSQLFacade (#37773)
de5eeded580 is described below
commit de5eeded580b502f356c5b55e0660a48e6030c53
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Mon Jan 19 19:38:37 2026 +0800
Extract drop and waitIncrementTaskFinished to PipelineE2EDistSQLFacade
(#37773)
* Improve PipelineE2EDistSQLFacade.commit
* Add PipelineE2EDistSQLFacade.drop
* Move PipelineContainerComposer.waitIncrementTaskFinished to
PipelineE2EDistSQLFacade
* Simplify PipelineE2EDistSQLFacade.waitIncrementTaskFinished param
* Improve PipelineE2EDistSQLFacade.waitIncrementTaskFinished
---
.../pipeline/cases/PipelineContainerComposer.java | 33 ------------
.../e2e/operation/pipeline/cases/cdc/CDCE2EIT.java | 6 +--
.../general/MySQLMigrationGeneralE2EIT.java | 12 ++---
.../general/MySQLTimeTypesMigrationE2EIT.java | 2 +-
.../general/PostgreSQLMigrationGeneralE2EIT.java | 5 +-
.../general/PostgreSQLToMySQLMigrationE2EIT.java | 2 +-
.../migration/general/RulesMigrationE2EIT.java | 2 +-
.../primarykey/IndexesMigrationE2EIT.java | 2 +-
.../primarykey/MariaDBMigrationE2EIT.java | 2 +-
.../primarykey/TextPrimaryKeyMigrationE2EIT.java | 2 +-
.../pipeline/util/PipelineE2EDistSQLFacade.java | 60 +++++++++++++++++++++-
11 files changed, 75 insertions(+), 53 deletions(-)
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
index 975902d9144..0c81d57effa 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/PipelineContainerComposer.java
@@ -67,11 +67,8 @@ import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -537,36 +534,6 @@ public final class PipelineContainerComposer implements
AutoCloseable {
increaseTaskThread.start();
}
- /**
- * Wait increment task finished.
- *
- * @param distSQL dist SQL
- * @return result
- */
- // TODO use DAO to query via DistSQL
- public List<Map<String, Object>> waitIncrementTaskFinished(final String
distSQL) {
- for (int i = 0; i < 10; i++) {
- List<Map<String, Object>> listJobStatus =
queryForListWithLog(distSQL);
- log.info("show status result: {}", listJobStatus);
- Set<String> actualStatus = new HashSet<>(listJobStatus.size(), 1F);
- Collection<Integer> incrementalIdleSecondsList = new
LinkedList<>();
- for (Map<String, Object> each : listJobStatus) {
- assertTrue(Strings.isNullOrEmpty((String)
each.get("error_message")), "error_message: `" + each.get("error_message") +
"`");
- actualStatus.add(each.get("status").toString());
- String incrementalIdleSeconds = (String)
each.get("incremental_idle_seconds");
-
incrementalIdleSecondsList.add(Strings.isNullOrEmpty(incrementalIdleSeconds) ?
0 : Integer.parseInt(incrementalIdleSeconds));
- }
- if (Collections.min(incrementalIdleSecondsList) <= 5) {
- sleepSeconds(3);
- continue;
- }
- if (actualStatus.size() == 1 &&
actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
- return listJobStatus;
- }
- }
- return Collections.emptyList();
- }
-
/**
* Assert order record exists in proxy.
*
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
index 99d27883a9f..98e9c66dfbc 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/cdc/CDCE2EIT.java
@@ -125,11 +125,11 @@ class CDCE2EIT {
final CDCClient cdcClient =
buildCDCClientAndStart(targetDataSource, containerComposer);
Awaitility.await().atMost(10L, TimeUnit.SECONDS).pollInterval(1L,
TimeUnit.SECONDS).until(() -> !distSQLFacade.listJobIds().isEmpty());
String jobId = distSQLFacade.listJobIds().get(0);
- containerComposer.waitIncrementTaskFinished(String.format("SHOW
STREAMING STATUS '%s'", jobId));
+ distSQLFacade.waitIncrementTaskFinished(jobId);
DialectDatabaseMetaData dialectDatabaseMetaData = new
DatabaseTypeRegistry(containerComposer.getDatabaseType()).getDialectDatabaseMetaData();
String tableName =
dialectDatabaseMetaData.getSchemaOption().isSchemaAvailable() ?
String.join(".", PipelineContainerComposer.SCHEMA_NAME, SOURCE_TABLE_NAME) :
SOURCE_TABLE_NAME;
new E2EIncrementalTask(sourceDataSource, tableName, new
SnowflakeKeyGenerateAlgorithm(), containerComposer.getDatabaseType(), 20).run();
- containerComposer.waitIncrementTaskFinished(String.format("SHOW
STREAMING STATUS '%s'", jobId));
+ distSQLFacade.waitIncrementTaskFinished(jobId);
for (int i = 1; i <= 4; i++) {
int orderId = 10000 + i;
containerComposer.proxyExecuteWithLog(String.format("INSERT
INTO %s (order_id, user_id, status) VALUES (%d, %d, 'OK')", tableName, orderId,
i), 0);
@@ -144,7 +144,7 @@ class CDCE2EIT {
cdcClient.close();
Awaitility.await().atMost(10L,
TimeUnit.SECONDS).pollInterval(500L, TimeUnit.MILLISECONDS)
.until(() ->
distSQLFacade.listJobs().stream().noneMatch(each ->
Boolean.parseBoolean(each.get("active").toString())));
- containerComposer.proxyExecuteWithLog(String.format("DROP
STREAMING '%s'", jobId), 0);
+ distSQLFacade.drop(jobId);
assertTrue(distSQLFacade.listJobs().isEmpty());
}
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
index 1f3a9cf2429..51609b79c2e 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLMigrationGeneralE2EIT.java
@@ -91,11 +91,11 @@ class MySQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
containerComposer.assertOrderRecordExist(jdbcDataSource,
"t_order", 10000);
containerComposer.assertOrderRecordExist(jdbcDataSource,
"t_order_item", 10000);
- assertMigrationSuccessById(containerComposer, orderJobId,
"DATA_MATCH", convertToProperties(ImmutableMap.of("chunk-size", "300",
"streaming-range-type", "SMALL")));
+ assertMigrationSuccessById(distSQLFacade, orderJobId,
"DATA_MATCH", convertToProperties(ImmutableMap.of("chunk-size", "300",
"streaming-range-type", "SMALL")));
String orderItemJobId =
distSQLFacade.getJobIdByTableName("ds_0.t_order_item");
- assertMigrationSuccessById(containerComposer, orderItemJobId,
"DATA_MATCH", convertToProperties(ImmutableMap.of("chunk-size", "300",
"streaming-range-type", "LARGE")));
+ assertMigrationSuccessById(distSQLFacade, orderItemJobId,
"DATA_MATCH", convertToProperties(ImmutableMap.of("chunk-size", "300",
"streaming-range-type", "LARGE")));
containerComposer.sleepSeconds(2);
- assertMigrationSuccessById(containerComposer, orderItemJobId,
"CRC32_MATCH", new Properties());
+ assertMigrationSuccessById(distSQLFacade, orderItemJobId,
"CRC32_MATCH", new Properties());
for (String each : distSQLFacade.listJobIds()) {
distSQLFacade.commit(each);
}
@@ -104,13 +104,13 @@ class MySQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
}
}
- private void assertMigrationSuccessById(final PipelineContainerComposer
containerComposer, final String jobId, final String algorithmType, final
Properties algorithmProps) throws SQLException {
- List<Map<String, Object>> jobStatus =
containerComposer.waitIncrementTaskFinished(String.format("SHOW MIGRATION
STATUS '%s'", jobId));
+ private void assertMigrationSuccessById(final PipelineE2EDistSQLFacade
distSQLFacade, final String jobId, final String algorithmType, final Properties
algorithmProps) throws SQLException {
+ List<Map<String, Object>> jobStatus =
distSQLFacade.waitIncrementTaskFinished(jobId);
for (Map<String, Object> each : jobStatus) {
assertTrue(Integer.parseInt(each.get("processed_records_count").toString()) >
0);
assertThat(Integer.parseInt(each.get("inventory_finished_percentage").toString()),
is(100));
}
- assertCheckMigrationSuccess(containerComposer, jobId, algorithmType,
algorithmProps);
+ assertCheckMigrationSuccess(distSQLFacade.getContainerComposer(),
jobId, algorithmType, algorithmProps);
}
private static boolean isEnabled(final ExtensionContext context) {
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
index 96da7255b1e..2e5be76e0cf 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/MySQLTimeTypesMigrationE2EIT.java
@@ -57,7 +57,7 @@ class MySQLTimeTypesMigrationE2EIT extends
AbstractMigrationE2EIT {
String jobId = distSQLFacade.listJobIds().get(0);
containerComposer.waitJobPrepareSuccess(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
insertOneRecordWithZeroValue(containerComposer, 2);
- containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
+ distSQLFacade.waitIncrementTaskFinished(jobId);
distSQLFacade.loadAllSingleTables();
assertCheckMigrationSuccess(containerComposer, jobId,
"DATA_MATCH");
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
index b61b4adbc29..f841cde414b 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLMigrationGeneralE2EIT.java
@@ -107,11 +107,10 @@ class PostgreSQLMigrationGeneralE2EIT extends
AbstractMigrationE2EIT {
}
private void checkOrderMigration(final PipelineE2EDistSQLFacade
distSQLFacade, final String jobId) throws SQLException {
- PipelineContainerComposer containerComposer =
distSQLFacade.getContainerComposer();
- containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
+ distSQLFacade.waitIncrementTaskFinished(jobId);
distSQLFacade.pauseJob(jobId);
distSQLFacade.resumeJob(jobId);
- assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
+ assertCheckMigrationSuccess(distSQLFacade.getContainerComposer(),
jobId, "DATA_MATCH");
}
private void checkOrderItemMigration(final PipelineE2EDistSQLFacade
distSQLFacade) throws SQLException {
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
index fdf78950802..775c1fb9134 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/PostgreSQLToMySQLMigrationE2EIT.java
@@ -85,7 +85,7 @@ class PostgreSQLToMySQLMigrationE2EIT extends
AbstractMigrationE2EIT {
connection.createStatement().execute(String.format("INSERT
INTO t_order (order_id,user_id,status) VALUES (%s, %s, '%s')", "1000000000", 1,
"incremental"));
connection.createStatement().execute(String.format("UPDATE
t_order SET status='%s' WHERE order_id IN (1,2)",
RandomStringUtils.randomAlphanumeric(10)));
}
- containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
+ distSQLFacade.waitIncrementTaskFinished(jobId);
assertCheckMigrationSuccess(containerComposer, jobId,
"DATA_MATCH");
distSQLFacade.commit(jobId);
assertTrue(distSQLFacade.listJobIds().isEmpty());
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
index 952c7493ca6..9599287c1ef 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/general/RulesMigrationE2EIT.java
@@ -89,7 +89,7 @@ class RulesMigrationE2EIT extends AbstractMigrationE2EIT {
PipelineE2EDistSQLFacade distSQLFacade = new
PipelineE2EDistSQLFacade(containerComposer, new MigrationJobType());
String jobId = distSQLFacade.listJobIds().get(0);
containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION
STATUS '%s'", jobId));
- containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
+ distSQLFacade.waitIncrementTaskFinished(jobId);
distSQLFacade.loadAllSingleTables();
assertCheckMigrationSuccess(containerComposer, jobId, "DATA_MATCH");
distSQLFacade.commit(jobId);
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
index 28934494e74..7acd479caf6 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/IndexesMigrationE2EIT.java
@@ -245,7 +245,7 @@ class IndexesMigrationE2EIT extends AbstractMigrationE2EIT {
containerComposer.waitJobPrepareSuccess(String.format("SHOW MIGRATION
STATUS '%s'", jobId));
DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
incrementalTaskFn.accept(jdbcDataSource);
- containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
+ distSQLFacade.waitIncrementTaskFinished(jobId);
if (null != consistencyCheckAlgorithmType) {
assertCheckMigrationSuccess(containerComposer, jobId,
consistencyCheckAlgorithmType);
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
index df23e276776..ab566c1ef5b 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/MariaDBMigrationE2EIT.java
@@ -74,7 +74,7 @@ class MariaDBMigrationE2EIT extends AbstractMigrationE2EIT {
containerComposer.sourceExecuteWithLog("INSERT INTO t_order
(order_id, user_id, status) VALUES ('a1', 1, 'OK')");
DataSource jdbcDataSource =
containerComposer.generateShardingSphereDataSourceFromProxy();
containerComposer.assertOrderRecordExist(jdbcDataSource,
"t_order", "a1");
- containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
+ distSQLFacade.waitIncrementTaskFinished(jobId);
assertCheckMigrationSuccess(containerComposer, jobId,
"CRC32_MATCH");
distSQLFacade.commit(jobId);
assertThat(containerComposer.getTargetTableRecordsCount(jdbcDataSource,
SOURCE_TABLE_NAME), is(PipelineContainerComposer.TABLE_INIT_ROW_COUNT + 1));
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
index f29f3b64a31..c3d1225d810 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
@@ -66,7 +66,7 @@ class TextPrimaryKeyMigrationE2EIT extends
AbstractMigrationE2EIT {
String jobId = distSQLFacade.listJobIds().get(0);
containerComposer.sourceExecuteWithLog(
String.format("INSERT INTO %s (order_id,user_id,status)
VALUES (%s, %s, '%s')", getSourceTableName(containerComposer), "1000000000", 1,
"afterStop"));
- containerComposer.waitIncrementTaskFinished(String.format("SHOW
MIGRATION STATUS '%s'", jobId));
+ distSQLFacade.waitIncrementTaskFinished(jobId);
assertCheckMigrationSuccess(containerComposer, jobId,
"DATA_MATCH");
distSQLFacade.commit(jobId);
assertTrue(distSQLFacade.listJobIds().isEmpty());
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
index 2b7c1b5c693..cd19137fff9 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
@@ -17,18 +17,29 @@
package org.apache.shardingsphere.test.e2e.operation.pipeline.util;
+import com.google.common.base.Strings;
import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.shardingsphere.data.pipeline.core.job.JobStatus;
import org.apache.shardingsphere.data.pipeline.core.job.type.PipelineJobType;
import
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.PipelineContainerComposer;
import org.awaitility.Awaitility;
import java.sql.SQLException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
@Getter
+@Slf4j
public final class PipelineE2EDistSQLFacade {
private static final String PIPELINE_RULE_SQL_TEMPLATE = "ALTER %s RULE(\n"
@@ -139,7 +150,52 @@ public final class PipelineE2EDistSQLFacade {
* @throws SQLException SQL exception
*/
public void commit(final String jobId) throws SQLException {
- containerComposer.proxyExecuteWithLog(String.format("COMMIT %s %s",
jobTypeName, jobId), 2);
- Awaitility.await().atMost(3, TimeUnit.SECONDS).until(() ->
!listJobIds().contains(jobId));
+ String sql = String.format("COMMIT %s %s", jobTypeName, jobId);
+ containerComposer.proxyExecuteWithLog(sql, 0);
+ Awaitility.waitAtMost(10, TimeUnit.SECONDS).until(() ->
!listJobIds().contains(jobId));
+ }
+
+ /**
+ * Drop job.
+ *
+ * @param jobId job id
+ * @throws SQLException SQL exception
+ */
+ public void drop(final String jobId) throws SQLException {
+ containerComposer.proxyExecuteWithLog(String.format("DROP %s %s",
jobTypeName, jobId), 0);
+ }
+
+ /**
+ * Wait increment task finished.
+ *
+ * @param jobId job id
+ * @return result
+ */
+ public List<Map<String, Object>> waitIncrementTaskFinished(final String
jobId) {
+ String distSQL = buildShowJobStatusDistSQL(jobId);
+ for (int i = 0; i < 10; i++) {
+ List<Map<String, Object>> jobStatusRecords =
containerComposer.queryForListWithLog(distSQL);
+ log.info("Show status result: {}", jobStatusRecords);
+ Set<String> actualStatus = new HashSet<>(jobStatusRecords.size(),
1F);
+ Collection<Integer> incrementalIdleSecondsList = new
LinkedList<>();
+ for (Map<String, Object> each : jobStatusRecords) {
+ assertTrue(Strings.isNullOrEmpty((String)
each.get("error_message")), "error_message: `" + each.get("error_message") +
"`");
+ actualStatus.add(each.get("status").toString());
+ String incrementalIdleSeconds = (String)
each.get("incremental_idle_seconds");
+
incrementalIdleSecondsList.add(Strings.isNullOrEmpty(incrementalIdleSeconds) ?
0 : Integer.parseInt(incrementalIdleSeconds));
+ }
+ if (Collections.min(incrementalIdleSecondsList) <= 5) {
+ containerComposer.sleepSeconds(3);
+ continue;
+ }
+ if (actualStatus.size() == 1 &&
actualStatus.contains(JobStatus.EXECUTE_INCREMENTAL_TASK.name())) {
+ return jobStatusRecords;
+ }
+ }
+ return Collections.emptyList();
+ }
+
+ private String buildShowJobStatusDistSQL(final String jobId) {
+ return String.format("SHOW %s STATUS %s", jobTypeName, jobId);
}
}