This is an automated email from the ASF dual-hosted git repository.

philo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new a6060bb322 [GLUTEN-10091][FLINK] Fix premature termination in 
NexmarkTest (#10092)
a6060bb322 is described below

commit a6060bb322834308a30e7c0d1c3661ba72d60643
Author: yuanhang ma <[email protected]>
AuthorDate: Mon Jul 21 17:56:36 2025 +0800

    [GLUTEN-10091][FLINK] Fix premature termination in NexmarkTest (#10092)
---
 .../table/runtime/stream/custom/NexmarkTest.java     | 20 ++++++++++++++++----
 1 file changed, 16 insertions(+), 4 deletions(-)

diff --git 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
index ca91b764f8..90fdf055da 100644
--- 
a/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
+++ 
b/gluten-flink/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/NexmarkTest.java
@@ -39,9 +39,14 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class NexmarkTest {
 
@@ -76,7 +81,7 @@ public class NexmarkTest {
   }
 
   @Test
-  void testAllNexmarkQueries() {
+  void testAllNexmarkQueries() throws ExecutionException, 
InterruptedException, TimeoutException {
     List<String> queryFiles = getQueries();
     assertThat(queryFiles).isNotEmpty();
 
@@ -111,7 +116,8 @@ public class NexmarkTest {
     return result;
   }
 
-  private void executeQuery(StreamTableEnvironment tEnv, String queryFileName) 
{
+  private void executeQuery(StreamTableEnvironment tEnv, String queryFileName)
+      throws ExecutionException, InterruptedException, TimeoutException {
     String queryContent = readSqlFromFile(NEXMARK_RESOURCE_DIR + "/" + 
queryFileName);
 
     String[] sqlStatements = queryContent.split(";");
@@ -120,16 +126,22 @@ public class NexmarkTest {
     String createResultTable = sqlStatements[0].trim();
     if (!createResultTable.isEmpty()) {
       TableResult createResult = tEnv.executeSql(createResultTable);
-      assertThat(createResult.getJobClient().isPresent()).isFalse();
+      assertFalse(createResult.getJobClient().isPresent());
     }
 
     String insertQuery = sqlStatements[1].trim();
     if (!insertQuery.isEmpty()) {
       TableResult insertResult = tEnv.executeSql(insertQuery);
-      assertThat(insertResult.getJobClient().isPresent()).isTrue();
+      waitForJobCompletion(insertResult, 30000);
     }
   }
 
+  private void waitForJobCompletion(TableResult result, long timeoutMs)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    assertTrue(result.getJobClient().isPresent());
+    result.getJobClient().get().getJobExecutionResult().get(timeoutMs, 
TimeUnit.MILLISECONDS);
+  }
+
   private List<String> getQueries() {
     URL resourceUrl = 
getClass().getClassLoader().getResource(NEXMARK_RESOURCE_DIR);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to