This is an automated email from the ASF dual-hosted git repository.
philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new a6060bb322 [GLUTEN-10091][FLINK] Fix premature termination in
NexmarkTest (#10092)
a6060bb322 is described below
commit a6060bb322834308a30e7c0d1c3661ba72d60643
Author: yuanhang ma <[email protected]>
AuthorDate: Mon Jul 21 17:56:36 2025 +0800
[GLUTEN-10091][FLINK] Fix premature termination in NexmarkTest (#10092)
---
.../table/runtime/stream/custom/NexmarkTest.java | 20 ++++++++++++++++----
1 file changed, 16 insertions(+), 4 deletions(-)
diff --git
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
index ca91b764f8..90fdf055da 100644
---
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
+++
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
@@ -39,9 +39,14 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
public class NexmarkTest {
@@ -76,7 +81,7 @@ public class NexmarkTest {
}
@Test
- void testAllNexmarkQueries() {
+ void testAllNexmarkQueries() throws ExecutionException,
InterruptedException, TimeoutException {
List<String> queryFiles = getQueries();
assertThat(queryFiles).isNotEmpty();
@@ -111,7 +116,8 @@ public class NexmarkTest {
return result;
}
- private void executeQuery(StreamTableEnvironment tEnv, String queryFileName)
{
+ private void executeQuery(StreamTableEnvironment tEnv, String queryFileName)
+ throws ExecutionException, InterruptedException, TimeoutException {
String queryContent = readSqlFromFile(NEXMARK_RESOURCE_DIR + "/" +
queryFileName);
String[] sqlStatements = queryContent.split(";");
@@ -120,16 +126,22 @@ public class NexmarkTest {
String createResultTable = sqlStatements[0].trim();
if (!createResultTable.isEmpty()) {
TableResult createResult = tEnv.executeSql(createResultTable);
- assertThat(createResult.getJobClient().isPresent()).isFalse();
+ assertFalse(createResult.getJobClient().isPresent());
}
String insertQuery = sqlStatements[1].trim();
if (!insertQuery.isEmpty()) {
TableResult insertResult = tEnv.executeSql(insertQuery);
- assertThat(insertResult.getJobClient().isPresent()).isTrue();
+ waitForJobCompletion(insertResult, 30000);
}
}
+ private void waitForJobCompletion(TableResult result, long timeoutMs)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ assertTrue(result.getJobClient().isPresent());
+ result.getJobClient().get().getJobExecutionResult().get(timeoutMs,
TimeUnit.MILLISECONDS);
+ }
+
private List<String> getQueries() {
URL resourceUrl =
getClass().getClassLoader().getResource(NEXMARK_RESOURCE_DIR);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]