This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 749535769a [tests] Fix @Timeout annotation not working in 
PrimaryKeyFileStoreTableITCase (#4634)
749535769a is described below

commit 749535769aa730cbc6f474c59f47c0877f35cbc0
Author: tsreaper <[email protected]>
AuthorDate: Wed Dec 4 15:44:47 2024 +0800

    [tests] Fix @Timeout annotation not working in 
PrimaryKeyFileStoreTableITCase (#4634)
---
 .../flink/PrimaryKeyFileStoreTableITCase.java      | 76 +++++++++++++++-------
 1 file changed, 52 insertions(+), 24 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
index 3fa95edb86..4ee539c4fd 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PrimaryKeyFileStoreTableITCase.java
@@ -59,6 +59,8 @@ import static org.assertj.core.api.Assertions.assertThatCode;
 /** Tests for changelog table with primary keys. */
 public class PrimaryKeyFileStoreTableITCase extends AbstractTestBase {
 
+    private static final int TIMEOUT = 180;
+
     // ------------------------------------------------------------------------
     //  Test Utilities
     // ------------------------------------------------------------------------
@@ -95,12 +97,38 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
                 catalogName, warehouse, defaultPropertyString);
     }
 
+    private CloseableIterator<Row> collect(TableResult result) {
+        return collect(result, TIMEOUT);
+    }
+
+    private CloseableIterator<Row> collect(TableResult result, int timeout) {
+        JobClient client = result.getJobClient().get();
+        Thread timeoutThread =
+                new Thread(
+                        () -> {
+                            for (int i = 0; i < timeout; i++) {
+                                try {
+                                    Thread.sleep(1000);
+                                    if 
(client.getJobStatus().get().isGloballyTerminalState()) {
+                                        return;
+                                    }
+                                } catch (Exception e) {
+                                    client.cancel();
+                                    throw new RuntimeException(e);
+                                }
+                            }
+                            client.cancel();
+                        });
+        timeoutThread.start();
+        return result.collect();
+    }
+
     // ------------------------------------------------------------------------
     //  Constructed Tests
     // ------------------------------------------------------------------------
 
     @Test
-    @Timeout(180)
+    @Timeout(TIMEOUT)
     public void testFullCompactionTriggerInterval() throws Exception {
         innerTestChangelogProducing(
                 Arrays.asList(
@@ -109,7 +137,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
     }
 
     @Test
-    @Timeout(180)
+    @Timeout(TIMEOUT)
     public void testFullCompactionWithLongCheckpointInterval() throws 
Exception {
         // create table
         TableEnvironment bEnv = 
tableEnvironmentBuilder().batchMode().parallelism(1).build();
@@ -135,7 +163,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
                         .build();
         sEnv.executeSql(createCatalogSql("testCatalog", path));
         sEnv.executeSql("USE CATALOG testCatalog");
-        CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM 
T").collect();
+        CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM 
T"));
 
         // run compact job
         StreamExecutionEnvironment env =
@@ -168,7 +196,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
     }
 
     @Test
-    @Timeout(180)
+    @Timeout(TIMEOUT)
     public void testLookupChangelog() throws Exception {
         
innerTestChangelogProducing(Collections.singletonList("'changelog-producer' = 
'lookup'"));
     }
@@ -190,7 +218,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
                         + "'bucket' = '2'"
                         + ")");
 
-        CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM 
T2").collect();
+        CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM 
T2"));
 
         // insert data
         sEnv.executeSql("INSERT INTO T2 VALUES (1, 'A')").await();
@@ -213,7 +241,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
         sEnv.executeSql("ALTER TABLE T2 SET 
('changelog-producer'='full-compaction')");
 
         CloseableIterator<Row> branchIt =
-                sEnv.executeSql("select * from T2 /*+ OPTIONS('branch' = 
'branch1') */").collect();
+                collect(sEnv.executeSql("select * from T2 /*+ OPTIONS('branch' 
= 'branch1') */"));
         // insert data to branch
         sEnv.executeSql(
                         "INSERT INTO T2/*+ OPTIONS('branch' = 'branch1') */ 
VALUES (10, 'v10'),(11, 'v11'),(12, 'v12')")
@@ -261,7 +289,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
 
         sEnv.executeSql(
                 "INSERT INTO T SELECT SUM(i) AS k, g AS v FROM 
`default_catalog`.`default_database`.`S` GROUP BY g");
-        CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM 
T").collect();
+        CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM 
T"));
 
         // write initial data
         sEnv.executeSql(
@@ -329,7 +357,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
         result1.await();
         result2.await();
 
-        try (CloseableIterator<Row> it = tEnv.executeSql("SELECT * FROM 
t").collect()) {
+        try (CloseableIterator<Row> it = collect(tEnv.executeSql("SELECT * 
FROM t"))) {
             for (int i = 0; i < 3; i++) {
                 assertThat(it).hasNext();
                 Row row = it.next();
@@ -338,7 +366,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
         }
     }
 
-    @Timeout(60)
+    @Timeout(TIMEOUT)
     @ParameterizedTest()
     @ValueSource(booleans = {false, true})
     public void testRecreateTableWithException(boolean isReloadData) throws 
Exception {
@@ -361,7 +389,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
                         .build();
         sEnv.executeSql(createCatalogSql("testCatalog", path + "/warehouse"));
         sEnv.executeSql("USE CATALOG testCatalog");
-        CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM 
t").collect();
+        CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM 
t"));
 
         // first write
         List<String> values = new ArrayList<>();
@@ -414,7 +442,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
     }
 
     @Test
-    @Timeout(120)
+    @Timeout(TIMEOUT)
     public void testChangelogCompactInBatchWrite() throws Exception {
         TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
         String catalogDdl =
@@ -504,7 +532,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
     }
 
     @Test
-    @Timeout(120)
+    @Timeout(TIMEOUT)
     public void testChangelogCompactInStreamWrite() throws Exception {
         TableEnvironment sEnv =
                 tableEnvironmentBuilder()
@@ -533,7 +561,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
                         + "', 'source.monitor-interval' = '500ms' )");
 
         sEnv.executeSql("INSERT INTO t SELECT * FROM 
`default_catalog`.`default_database`.`s`");
-        CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM 
t").collect();
+        CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * FROM 
t"));
 
         // write initial data
         List<String> values = new ArrayList<>();
@@ -589,7 +617,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
 
     private void assertStreamingResult(TableResult result, List<Row> expected) 
throws Exception {
         List<Row> actual = new ArrayList<>();
-        try (CloseableIterator<Row> it = result.collect()) {
+        try (CloseableIterator<Row> it = collect(result)) {
             while (actual.size() < expected.size() && it.hasNext()) {
                 actual.add(it.next());
             }
@@ -611,14 +639,14 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
     // ------------------------------------------------------------------------
 
     @Test
-    @Timeout(180)
+    @Timeout(TIMEOUT)
     public void testNoChangelogProducerBatchRandom() throws Exception {
         TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
         testNoChangelogProducerRandom(bEnv, 1, false);
     }
 
     @Test
-    @Timeout(180)
+    @Timeout(TIMEOUT)
     public void testNoChangelogProducerStreamingRandom() throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         TableEnvironment sEnv =
@@ -631,14 +659,14 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
     }
 
     @Test
-    @Timeout(180)
+    @Timeout(TIMEOUT)
     public void testFullCompactionChangelogProducerBatchRandom() throws 
Exception {
         TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
         testFullCompactionChangelogProducerRandom(bEnv, 1, false);
     }
 
     @Test
-    @Timeout(180)
+    @Timeout(TIMEOUT)
     public void testFullCompactionChangelogProducerStreamingRandom() throws 
Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         TableEnvironment sEnv =
@@ -651,7 +679,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
     }
 
     @Test
-    @Timeout(180)
+    @Timeout(TIMEOUT)
     public void testStandAloneFullCompactJobRandom() throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         TableEnvironment sEnv =
@@ -664,14 +692,14 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
     }
 
     @Test
-    @Timeout(180)
+    @Timeout(TIMEOUT)
     public void testLookupChangelogProducerBatchRandom() throws Exception {
         TableEnvironment bEnv = tableEnvironmentBuilder().batchMode().build();
         testLookupChangelogProducerRandom(bEnv, 1, false);
     }
 
     @Test
-    @Timeout(180)
+    @Timeout(TIMEOUT)
     public void testLookupChangelogProducerStreamingRandom() throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         TableEnvironment sEnv =
@@ -684,7 +712,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
     }
 
     @Test
-    @Timeout(180)
+    @Timeout(TIMEOUT)
     public void testStandAloneLookupJobRandom() throws Exception {
         ThreadLocalRandom random = ThreadLocalRandom.current();
         TableEnvironment sEnv =
@@ -868,7 +896,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
 
         ResultChecker checker = new ResultChecker();
         int endCnt = 0;
-        try (CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM 
T").collect()) {
+        try (CloseableIterator<Row> it = collect(sEnv.executeSql("SELECT * 
FROM T"))) {
             while (it.hasNext()) {
                 Row row = it.next();
                 checker.addChangelog(row);
@@ -986,7 +1014,7 @@ public class PrimaryKeyFileStoreTableITCase extends 
AbstractTestBase {
         bEnv.executeSql("USE CATALOG testCatalog");
 
         ResultChecker checker = new ResultChecker();
-        try (CloseableIterator<Row> it = bEnv.executeSql("SELECT * FROM 
T").collect()) {
+        try (CloseableIterator<Row> it = collect(bEnv.executeSql("SELECT * 
FROM T"))) {
             while (it.hasNext()) {
                 checker.addChangelog(it.next());
             }

Reply via email to