This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 75ccb7256 fix(test): Prevent shuffle rpc client leak in all 
integration test (#2403)
75ccb7256 is described below

commit 75ccb7256ccfbc6f062605a0188b99894433eb15
Author: summaryzb <[email protected]>
AuthorDate: Fri Mar 14 15:24:08 2025 +0800

    fix(test): Prevent shuffle rpc client leak in all integration test (#2403)
    
    ### What changes were proposed in this pull request?
    Close shuffle client per test
    
    ### Why are the changes needed?
    ShuffleClient never close in all unit test
    Below action failed mainly because shuffleclient leak, in a result channel 
process slow down
    
    https://github.com/apache/uniffle/actions/runs/13764353793/job/38487344830
    ```
    io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: CallOptions deadline 
exceeded after 0.199753390s. Name resolution delay 0.000111669 seconds. 
[closed=[], open=[[buffered_nanos=200741728, waiting_for_connection]]]
            at 
io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:268)
            at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:249)
            at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:167)
            at  
org.apache.uniffle.proto.ShuffleServerGrpc$ShuffleServerBlockingStub.registerShuffle(ShuffleServerGrpc.java:850)
            at 
org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient.doRegisterShuffle(ShuffleServerGrpcClient.java:242)
            at 
org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient.registerShuffle(ShuffleServerGrpcClient.java:498)
            at 
org.apache.uniffle.client.impl.ShuffleWriteClientImpl.registerShuffle(ShuffleWriteClientImpl.java:604)
            at 
org.apache.uniffle.client.api.ShuffleWriteClient.registerShuffle(ShuffleWriteClient.java:67)
            at 
org.apache.uniffle.test.QuorumTest.registerShuffleServer(QuorumTest.java:376)```
    
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    UT
---
 .../org/apache/uniffle/test/IntegrationTestBase.java  |  2 ++
 .../test/java/org/apache/uniffle/test/QuorumTest.java | 19 +++----------------
 2 files changed, 5 insertions(+), 16 deletions(-)

diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
index 2e89fa942..d3a9b8ffa 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
@@ -34,6 +34,7 @@ import com.google.common.collect.Sets;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.io.TempDir;
 
+import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
 import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.port.PortRegistry;
 import org.apache.uniffle.common.rpc.ServerType;
@@ -203,6 +204,7 @@ public abstract class IntegrationTestBase extends 
HadoopTestBase {
     jettyPorts.clear();
     ShuffleServerMetrics.clear();
     CoordinatorMetrics.clear();
+    ShuffleServerClientFactory.getInstance().cleanupCache();
   }
 
   protected static CoordinatorConf getCoordinatorConf() {
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java 
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
index b321c4200..5de2693d2 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
@@ -135,15 +135,15 @@ public class QuorumTest extends ShuffleReadWriteBase {
     ((ShuffleServerGrpcClient)
             ShuffleServerClientFactory.getInstance()
                 .getShuffleServerClient("GRPC", shuffleServerInfo0))
-        .adjustTimeout(200);
+        .adjustTimeout(300);
     ((ShuffleServerGrpcClient)
             ShuffleServerClientFactory.getInstance()
                 .getShuffleServerClient("GRPC", shuffleServerInfo1))
-        .adjustTimeout(200);
+        .adjustTimeout(300);
     ((ShuffleServerGrpcClient)
             ShuffleServerClientFactory.getInstance()
                 .getShuffleServerClient("GRPC", shuffleServerInfo2))
-        .adjustTimeout(200);
+        .adjustTimeout(300);
 
     Thread.sleep(2000);
   }
@@ -154,19 +154,6 @@ public class QuorumTest extends ShuffleReadWriteBase {
       shuffleWriteClientImpl.close();
     }
     shutdownServers();
-    // we need recovery `rpcTime`, or some unit tests may fail
-    ((ShuffleServerGrpcClient)
-            ShuffleServerClientFactory.getInstance()
-                .getShuffleServerClient("GRPC", shuffleServerInfo0))
-        .adjustTimeout(60000);
-    ((ShuffleServerGrpcClient)
-            ShuffleServerClientFactory.getInstance()
-                .getShuffleServerClient("GRPC", shuffleServerInfo1))
-        .adjustTimeout(60000);
-    ((ShuffleServerGrpcClient)
-            ShuffleServerClientFactory.getInstance()
-                .getShuffleServerClient("GRPC", shuffleServerInfo2))
-        .adjustTimeout(60000);
   }
 
   @Test

Reply via email to