This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d8aa9e66 [flink] Fix unstable 
RemoteLookupJoinITCase.testServiceFileCleaned test method. (#3653)
9d8aa9e66 is described below

commit 9d8aa9e665222aeda8264783c5007b03f83247f3
Author: Kerwin <[email protected]>
AuthorDate: Tue Jul 2 15:26:24 2024 +0800

    [flink] Fix unstable RemoteLookupJoinITCase.testServiceFileCleaned test 
method. (#3653)
---
 .../main/java/org/apache/paimon/flink/query/RemoteTableQuery.java  | 7 +++++++
 .../test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java  | 6 ++----
 .../main/java/org/apache/paimon/service/client/KvQueryClient.java  | 6 +++++-
 3 files changed, 14 insertions(+), 5 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java
index fc07e58f9..62edd6bae 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.flink.query;
 
+import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
@@ -35,6 +36,7 @@ import org.apache.paimon.utils.TypeUtils;
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
 import static org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP;
@@ -109,4 +111,9 @@ public class RemoteTableQuery implements TableQuery {
     public void close() throws IOException {
         client.shutdown();
     }
+
+    @VisibleForTesting
+    public CompletableFuture<Void> cancel() {
+        return client.shutdownFuture();
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
index 13c674662..336c58217 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
@@ -39,7 +39,6 @@ import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.io.Closeable;
@@ -95,7 +94,7 @@ public class RemoteLookupJoinITCase extends CatalogITCaseBase 
{
         assertThat(query.lookup(row(), 0, row(5))).isNull();
 
         service.close();
-        query.close();
+        query.cancel().get();
     }
 
     @Test
@@ -135,7 +134,6 @@ public class RemoteLookupJoinITCase extends 
CatalogITCaseBase {
         proxy.close();
     }
 
-    @Disabled // TODO Fix unstable
     @Test
     public void testServiceFileCleaned() throws Exception {
         sql(
@@ -153,7 +151,7 @@ public class RemoteLookupJoinITCase extends 
CatalogITCaseBase {
                 .isEqualTo(11);
 
         client.cancel().get();
-        query.close();
+        query.cancel().get();
         ServiceManager serviceManager = 
paimonTable("DIM").store().newServiceManager();
         
assertThat(serviceManager.service(PRIMARY_KEY_LOOKUP).isPresent()).isFalse();
     }
diff --git 
a/paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/client/KvQueryClient.java
 
b/paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/client/KvQueryClient.java
index 60c1ac0f8..a1b950c27 100644
--- 
a/paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/client/KvQueryClient.java
+++ 
b/paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/client/KvQueryClient.java
@@ -110,10 +110,14 @@ public class KvQueryClient {
 
     public void shutdown() {
         try {
-            networkClient.shutdown().get(10L, TimeUnit.SECONDS);
+            shutdownFuture().get(60L, TimeUnit.SECONDS);
             LOG.info("{} was shutdown successfully.", 
networkClient.getClientName());
         } catch (Exception e) {
             LOG.warn(String.format("%s shutdown failed.", 
networkClient.getClientName()), e);
         }
     }
+
+    public CompletableFuture<Void> shutdownFuture() {
+        return networkClient.shutdown();
+    }
 }

Reply via email to