This is an automated email from the ASF dual-hosted git repository.
zhonghongsheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/shardingsphere.git
The following commit(s) were added to refs/heads/master by this push:
new 8bac82dc424 Add queryCheckJobStatus and dropCheck in
PipelineE2EDistSQLFacade (#37794)
8bac82dc424 is described below
commit 8bac82dc4243d72bd42de0c01753b7ee1e1fb6cd
Author: Hongsheng Zhong <[email protected]>
AuthorDate: Wed Jan 21 11:02:00 2026 +0800
Add queryCheckJobStatus and dropCheck in PipelineE2EDistSQLFacade (#37794)
* Add PipelineE2EDistSQLFacade.queryCheckJobStatus
* Add PipelineE2EDistSQLFacade.dropCheck
---
.../primarykey/TextPrimaryKeyMigrationE2EIT.java | 11 ++++++++++-
.../pipeline/util/PipelineE2EDistSQLFacade.java | 23 +++++++++++++++++++++-
2 files changed, 32 insertions(+), 2 deletions(-)
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
index 206a7652819..7d5e32e01a5 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/cases/migration/primarykey/TextPrimaryKeyMigrationE2EIT.java
@@ -17,6 +17,7 @@
package
org.apache.shardingsphere.test.e2e.operation.pipeline.cases.migration.primarykey;
+import com.google.common.collect.ImmutableMap;
import
org.apache.shardingsphere.data.pipeline.scenario.migration.MigrationJobType;
import
org.apache.shardingsphere.database.connector.mysql.type.MySQLDatabaseType;
import
org.apache.shardingsphere.infra.algorithm.keygen.uuid.UUIDKeyGenerateAlgorithm;
@@ -37,6 +38,7 @@ import org.junit.jupiter.params.provider.ArgumentsSource;
import java.sql.Connection;
import java.sql.SQLException;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@PipelineE2ESettings(fetchSingle = true, database = {
@@ -67,7 +69,14 @@ class TextPrimaryKeyMigrationE2EIT extends
AbstractMigrationE2EIT {
containerComposer.sourceExecuteWithLog(
String.format("INSERT INTO %s (order_id,user_id,status)
VALUES (%s, %s, '%s')", getSourceTableName(containerComposer), "1000000000", 1,
"afterStop"));
distSQLFacade.waitJobIncrementalStageFinished(jobId);
- distSQLFacade.startCheckAndVerify(jobId, "DATA_MATCH");
+ distSQLFacade.startCheck(jobId, "DATA_MATCH",
ImmutableMap.of("chunk-size", "300", "streaming-range-type", "SMALL"));
+ distSQLFacade.verifyCheck(jobId);
+ distSQLFacade.startCheck(jobId, "DATA_MATCH",
ImmutableMap.of("chunk-size", "300", "streaming-range-type", "LARGE"));
+ distSQLFacade.verifyCheck(jobId);
+ distSQLFacade.dropCheck(jobId);
+ distSQLFacade.dropCheck(jobId);
+ assertThrows(RuntimeException.class, () ->
distSQLFacade.queryCheckJobStatus(jobId));
+ assertThrows(SQLException.class, () ->
distSQLFacade.dropCheck(jobId));
distSQLFacade.commit(jobId);
assertTrue(distSQLFacade.listJobIds().isEmpty());
}
diff --git
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
index e5acd6631ff..8b28160f562 100644
---
a/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
+++
b/test/e2e/operation/pipeline/src/test/java/org/apache/shardingsphere/test/e2e/operation/pipeline/util/PipelineE2EDistSQLFacade.java
@@ -283,7 +283,7 @@ public final class PipelineE2EDistSQLFacade {
public void verifyCheck(final String jobId) {
List<Map<String, Object>> checkStatusRecords = Collections.emptyList();
for (int i = 0; i < 10; i++) {
- checkStatusRecords =
containerComposer.queryForListWithLog(buildShowCheckJobStatusDistSQL(jobId));
+ checkStatusRecords = queryCheckJobStatus(jobId);
if (checkStatusRecords.isEmpty()) {
containerComposer.sleepSeconds(3);
continue;
@@ -303,7 +303,28 @@ public final class PipelineE2EDistSQLFacade {
}
}
+ /**
+ * Query check job status.
+ *
+ * @param jobId job id
+ * @return check job status
+ * @throws RuntimeException if there is underlying SQL exception, e.g. job
id not found
+ */
+ public List<Map<String, Object>> queryCheckJobStatus(final String jobId) {
+ return
containerComposer.queryForListWithLog(buildShowCheckJobStatusDistSQL(jobId));
+ }
+
private String buildShowCheckJobStatusDistSQL(final String jobId) {
return String.format("SHOW %s CHECK STATUS %s", jobTypeName, jobId);
}
+
+ /**
+ * Drop check.
+ *
+ * @param jobId job id
+ * @throws SQLException SQL exception
+ */
+ public void dropCheck(final String jobId) throws SQLException {
+ containerComposer.proxyExecuteWithLog(String.format("DROP %s CHECK
%s", jobTypeName, jobId), 2);
+ }
}