This is an automated email from the ASF dual-hosted git repository.

rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 9007bed27 [#1651] fix(netty): Set Netty as the default client type 
(#2422)
9007bed27 is described below

commit 9007bed27680245dc1ad8b0c6eff5bfded88acbd
Author: Andras Salamon <[email protected]>
AuthorDate: Thu Apr 10 08:18:50 2025 +0200

    [#1651] fix(netty): Set Netty as the default client type (#2422)
    
    ### What changes were proposed in this pull request?
    Changing the default client type from GRPC to GRPC_NETTY
    
    ### Why are the changes needed?
    cad584a2fb7f3536ebcd32d70537c7e943e2d9c4 changed the server type to 
GRPC_NETTY we need to change the client type as well.
    
    Discussed in #2420
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    UTs.
---
 .../uniffle/client/util/RssClientConfig.java       |  2 +-
 .../org/apache/uniffle/test/AccessClusterTest.java |  5 +-
 .../uniffle/test/AssignmentWithTagsTest.java       | 15 +++---
 .../uniffle/test/CoordinatorAssignmentTest.java    |  2 +-
 .../test/CoordinatorReconfigureNodeMaxTest.java    |  2 +-
 .../java/org/apache/uniffle/test/ServletTest.java  | 42 +++++++++-------
 .../ShuffleServerWithLocalOfExceptionTest.java     |  5 +-
 .../uniffle/test/ShuffleWithRssClientTest.java     | 56 ++++++++++++++--------
 .../org/apache/uniffle/test/AutoAccessTest.java    |  3 +-
 .../apache/uniffle/test/RssShuffleManagerTest.java |  8 ++--
 ...QLWithDelegationShuffleManagerFallbackTest.java |  3 +-
 .../org/apache/uniffle/test/GetReaderTest.java     |  2 +-
 .../org/apache/uniffle/test/GetReaderTest.java     |  2 +-
 13 files changed, 87 insertions(+), 60 deletions(-)

diff --git 
a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java 
b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
index 1e2089bee..a2e60caf7 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
@@ -20,7 +20,7 @@ package org.apache.uniffle.client.util;
 public class RssClientConfig {
 
   public static final String RSS_CLIENT_TYPE = "rss.client.type";
-  public static final String RSS_CLIENT_TYPE_DEFAULT_VALUE = "GRPC";
+  public static final String RSS_CLIENT_TYPE_DEFAULT_VALUE = "GRPC_NETTY";
   public static final String RSS_CLIENT_RETRY_MAX = "rss.client.retry.max";
   public static final int RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE = 50;
   public static final String RSS_CLIENT_RETRY_INTERVAL_MAX = 
"rss.client.retry.interval.max";
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
index 1a962bcb9..5ca58acae 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
@@ -151,7 +151,7 @@ public class AccessClusterTest extends CoordinatorTestBase {
             + 
"org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker");
     storeCoordinatorConf(coordinatorConf);
 
-    storeShuffleServerConf(shuffleServerConfWithoutPort(0, tempDir, 
ServerType.GRPC));
+    storeShuffleServerConf(shuffleServerConfWithoutPort(0, tempDir, 
ServerType.GRPC_NETTY));
     startServersWithRandomPorts();
     Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
     String accessId = "111111";
@@ -170,7 +170,8 @@ public class AccessClusterTest extends CoordinatorTestBase {
     response = coordinatorClient.accessCluster(request);
     assertEquals(StatusCode.ACCESS_DENIED, response.getStatusCode());
     assertTrue(response.getMessage().startsWith("Denied by 
AccessClusterLoadChecker"));
-    ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0, 
tempDir, ServerType.GRPC);
+    ShuffleServerConf shuffleServerConf =
+        shuffleServerConfWithoutPort(0, tempDir, ServerType.GRPC_NETTY);
     shuffleServerConf.setString("rss.coordinator.quorum", getQuorum());
     ShuffleServer shuffleServer = new ShuffleServer(shuffleServerConf);
     shuffleServer.start();
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
index 6b84a28b6..a98552dee 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
@@ -35,6 +35,7 @@ import org.apache.uniffle.client.factory.ShuffleClientFactory;
 import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
 import org.apache.uniffle.common.ClientType;
 import org.apache.uniffle.common.ShuffleAssignmentsInfo;
+import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -59,7 +60,7 @@ public class AssignmentWithTagsTest extends 
CoordinatorTestBase {
 
   private static void prepareShuffleServerConf(int subDirIndex, Set<String> 
tags, File tmpDir) {
     ShuffleServerConf shuffleServerConf =
-        shuffleServerConfWithoutPort(subDirIndex, tmpDir, ServerType.GRPC);
+        shuffleServerConfWithoutPort(subDirIndex, tmpDir, 
ServerType.GRPC_NETTY);
     shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 4000);
     shuffleServerConf.setString("rss.storage.type", 
StorageType.LOCALFILE.name());
     shuffleServerConf.setString("rss.server.tags", StringUtils.join(tags, 
","));
@@ -85,16 +86,16 @@ public class AssignmentWithTagsTest extends 
CoordinatorTestBase {
 
     startServersWithRandomPorts();
     List<Integer> collect =
-        
grpcShuffleServers.stream().map(ShuffleServer::getGrpcPort).collect(Collectors.toList());
+        
nettyShuffleServers.stream().map(ShuffleServer::getNettyPort).collect(Collectors.toList());
     tagOfShufflePorts.put(Constants.SHUFFLE_SERVER_VERSION, collect);
     tagOfShufflePorts.put(
         tag1,
         Arrays.asList(
-            grpcShuffleServers.get(2).getGrpcPort(), 
grpcShuffleServers.get(3).getGrpcPort()));
+            nettyShuffleServers.get(2).getNettyPort(), 
nettyShuffleServers.get(3).getNettyPort()));
     tagOfShufflePorts.put(
         tag2,
         Arrays.asList(
-            grpcShuffleServers.get(4).getGrpcPort(), 
grpcShuffleServers.get(5).getGrpcPort()));
+            nettyShuffleServers.get(4).getNettyPort(), 
nettyShuffleServers.get(5).getNettyPort()));
 
     // Wait all shuffle servers registering to coordinator
     long startTimeMS = System.currentTimeMillis();
@@ -137,7 +138,7 @@ public class AssignmentWithTagsTest extends 
CoordinatorTestBase {
     List<Integer> assignedServerPorts =
         assignmentsInfo.getPartitionToServers().values().stream()
             .flatMap(x -> x.stream())
-            .map(x -> x.getGrpcPort())
+            .map(ShuffleServerInfo::getNettyPort)
             .collect(Collectors.toList());
     assertEquals(1, assignedServerPorts.size());
     assertTrue(
@@ -161,7 +162,7 @@ public class AssignmentWithTagsTest extends 
CoordinatorTestBase {
     assignedServerPorts =
         assignmentsInfo.getPartitionToServers().values().stream()
             .flatMap(x -> x.stream())
-            .map(x -> x.getGrpcPort())
+            .map(ShuffleServerInfo::getNettyPort)
             .collect(Collectors.toList());
     assertEquals(1, assignedServerPorts.size());
     
assertTrue(tagOfShufflePorts.get(tag1).contains(assignedServerPorts.get(0)));
@@ -173,7 +174,7 @@ public class AssignmentWithTagsTest extends 
CoordinatorTestBase {
     assignedServerPorts =
         assignmentsInfo.getPartitionToServers().values().stream()
             .flatMap(x -> x.stream())
-            .map(x -> x.getGrpcPort())
+            .map(ShuffleServerInfo::getNettyPort)
             .collect(Collectors.toList());
     assertEquals(1, assignedServerPorts.size());
     
assertTrue(tagOfShufflePorts.get(tag1).contains(assignedServerPorts.get(0)));
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
index 136cc22e8..2eed0b10c 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
@@ -69,7 +69,7 @@ public class CoordinatorAssignmentTest extends 
CoordinatorTestBase {
 
     for (int i = 0; i < SERVER_NUM; i++) {
       ShuffleServerConf shuffleServerConf =
-          shuffleServerConfWithoutPort(i, tmpDir, ServerType.GRPC);
+          shuffleServerConfWithoutPort(i, tmpDir, ServerType.GRPC_NETTY);
       shuffleServerConf.setString(
           ShuffleServerConf.RSS_STORAGE_TYPE.key(), 
StorageType.MEMORY_LOCALFILE_HDFS.name());
       shuffleServerConf.set(RssBaseConf.RPC_METRICS_ENABLED, true);
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorReconfigureNodeMaxTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorReconfigureNodeMaxTest.java
index 57004b755..e4754b470 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorReconfigureNodeMaxTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorReconfigureNodeMaxTest.java
@@ -70,7 +70,7 @@ public class CoordinatorReconfigureNodeMaxTest extends 
CoordinatorTestBase {
 
     for (int i = 0; i < SERVER_NUM; i++) {
       ShuffleServerConf shuffleServerConf =
-          shuffleServerConfWithoutPort(i, tmpDir, ServerType.GRPC);
+          shuffleServerConfWithoutPort(i, tmpDir, ServerType.GRPC_NETTY);
       shuffleServerConf.setString(
           ShuffleServerConf.RSS_STORAGE_TYPE.key(), 
StorageType.MEMORY_LOCALFILE_HDFS.name());
       shuffleServerConf.set(RssBaseConf.RPC_METRICS_ENABLED, true);
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
index 2ba14bf14..030884302 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
@@ -39,7 +39,7 @@ import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
-import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
+import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcNettyClient;
 import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
 import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.ServerStatus;
@@ -82,7 +82,7 @@ public class ServletTest extends IntegrationTestBase {
 
   private static void prepareShuffleServerConf(int subDirIndex, File tmpDir) 
throws Exception {
     ShuffleServerConf shuffleServerConf =
-        shuffleServerConfWithoutPort(subDirIndex, tmpDir, ServerType.GRPC);
+        shuffleServerConfWithoutPort(subDirIndex, tmpDir, 
ServerType.GRPC_NETTY);
     shuffleServerConf.set(ShuffleServerConf.SERVER_DECOMMISSION_SHUTDOWN, 
false);
     storeShuffleServerConf(shuffleServerConf);
   }
@@ -110,7 +110,7 @@ public class ServletTest extends IntegrationTestBase {
 
   @Test
   public void testGetSingleNode() throws Exception {
-    ShuffleServer shuffleServer = grpcShuffleServers.get(0);
+    ShuffleServer shuffleServer = nettyShuffleServers.get(0);
     String content =
         TestUtils.httpGet(
             String.format(SINGLE_NODE_URL, coordinatorHttpPort, 
shuffleServer.getId()));
@@ -119,8 +119,8 @@ public class ServletTest extends IntegrationTestBase {
     HashMap<String, Object> server = response.getData();
     assertEquals(0, response.getCode());
     assertEquals(
-        grpcShuffleServers.get(0).getGrpcPort(),
-        Integer.parseInt(server.get("grpcPort").toString()));
+        nettyShuffleServers.get(0).getNettyPort(),
+        Integer.parseInt(server.get("nettyPort").toString()));
     assertEquals(ServerStatus.ACTIVE.toString(), server.get("status"));
   }
 
@@ -134,10 +134,10 @@ public class ServletTest extends IntegrationTestBase {
     assertEquals(0, response.getCode());
     assertEquals(4, serverList.size());
     Set<Integer> portSet =
-        grpcShuffleServers.stream().map(server -> 
server.getGrpcPort()).collect(Collectors.toSet());
+        
nettyShuffleServers.stream().map(ShuffleServer::getNettyPort).collect(Collectors.toSet());
     for (int i = 0; i < serverList.size(); i++) {
       assertEquals(ServerStatus.ACTIVE.toString(), 
serverList.get(i).get("status"));
-      
assertTrue(portSet.contains(Integer.parseInt(serverList.get(i).get("grpcPort").toString())));
+      
assertTrue(portSet.contains(Integer.parseInt(serverList.get(i).get("nettyPort").toString())));
     }
   }
 
@@ -145,8 +145,8 @@ public class ServletTest extends IntegrationTestBase {
   public void testLostNodesServlet() throws IOException {
     try (SimpleClusterManager clusterManager =
         (SimpleClusterManager) coordinatorServer.getClusterManager()) {
-      ShuffleServer shuffleServer3 = grpcShuffleServers.get(2);
-      ShuffleServer shuffleServer4 = grpcShuffleServers.get(3);
+      ShuffleServer shuffleServer3 = nettyShuffleServers.get(2);
+      ShuffleServer shuffleServer4 = nettyShuffleServers.get(3);
       Map<String, ServerNode> servers = clusterManager.getServers();
       
servers.get(shuffleServer3.getId()).setTimestamp(System.currentTimeMillis() - 
40000);
       
servers.get(shuffleServer4.getId()).setTimestamp(System.currentTimeMillis() - 
40000);
@@ -168,7 +168,7 @@ public class ServletTest extends IntegrationTestBase {
 
   @Test
   public void testDecommissionedNodeServlet() {
-    ShuffleServer shuffleServer = grpcShuffleServers.get(1);
+    ShuffleServer shuffleServer = nettyShuffleServers.get(1);
     shuffleServer.decommission();
     Awaitility.await()
         .atMost(30, TimeUnit.SECONDS)
@@ -191,8 +191,8 @@ public class ServletTest extends IntegrationTestBase {
 
   @Test
   public void testUnhealthyNodesServlet() {
-    ShuffleServer shuffleServer3 = grpcShuffleServers.get(2);
-    ShuffleServer shuffleServer4 = grpcShuffleServers.get(3);
+    ShuffleServer shuffleServer3 = nettyShuffleServers.get(2);
+    ShuffleServer shuffleServer4 = nettyShuffleServers.get(3);
     shuffleServer3.markUnhealthy();
     shuffleServer4.markUnhealthy();
     List<String> expectShuffleIds = Arrays.asList(shuffleServer3.getId(), 
shuffleServer4.getId());
@@ -221,7 +221,7 @@ public class ServletTest extends IntegrationTestBase {
 
   @Test
   public void testDecommissionServlet() throws Exception {
-    ShuffleServer shuffleServer = grpcShuffleServers.get(0);
+    ShuffleServer shuffleServer = nettyShuffleServers.get(0);
     assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
     DecommissionRequest decommissionRequest = new DecommissionRequest();
     decommissionRequest.setServerIds(Sets.newHashSet("not_exist_serverId"));
@@ -244,8 +244,11 @@ public class ServletTest extends IntegrationTestBase {
     assertEquals(0, response.getCode());
 
     // Register shuffle, avoid server exiting immediately.
-    ShuffleServerGrpcClient shuffleServerClient =
-        new ShuffleServerGrpcClient(LOCALHOST, 
grpcShuffleServers.get(0).getGrpcPort());
+    ShuffleServerGrpcNettyClient shuffleServerClient =
+        new ShuffleServerGrpcNettyClient(
+            LOCALHOST,
+            nettyShuffleServers.get(0).getGrpcPort(),
+            nettyShuffleServers.get(0).getNettyPort());
     shuffleServerClient.registerShuffle(
         new RssRegisterShuffleRequest(
             "testDecommissionServlet_appId", 0, Lists.newArrayList(new 
PartitionRange(0, 1)), ""));
@@ -282,7 +285,7 @@ public class ServletTest extends IntegrationTestBase {
 
   @Test
   public void testDecommissionSingleNode() throws Exception {
-    ShuffleServer shuffleServer = grpcShuffleServers.get(0);
+    ShuffleServer shuffleServer = nettyShuffleServers.get(0);
     assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
     String content =
         TestUtils.httpPost(
@@ -303,8 +306,11 @@ public class ServletTest extends IntegrationTestBase {
     assertEquals(0, response.getCode());
 
     // Register shuffle, avoid server exiting immediately.
-    ShuffleServerGrpcClient shuffleServerClient =
-        new ShuffleServerGrpcClient(LOCALHOST, 
grpcShuffleServers.get(0).getGrpcPort());
+    ShuffleServerGrpcNettyClient shuffleServerClient =
+        new ShuffleServerGrpcNettyClient(
+            LOCALHOST,
+            nettyShuffleServers.get(0).getGrpcPort(),
+            nettyShuffleServers.get(0).getNettyPort());
     shuffleServerClient.registerShuffle(
         new RssRegisterShuffleRequest(
             "testDecommissionServlet_appId", 0, Lists.newArrayList(new 
PartitionRange(0, 1)), ""));
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
index 48cb91886..6c91abf0a 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
@@ -47,7 +47,8 @@ public class ShuffleServerWithLocalOfExceptionTest extends 
ShuffleReadWriteBase
     CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
     storeCoordinatorConf(coordinatorConf);
 
-    ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0, 
tmpDir, ServerType.GRPC);
+    ShuffleServerConf shuffleServerConf =
+        shuffleServerConfWithoutPort(0, tmpDir, ServerType.GRPC_NETTY);
     shuffleServerConf.setString("rss.server.app.expired.withoutHeartbeat", 
"5000");
     storeShuffleServerConf(shuffleServerConf);
 
@@ -78,7 +79,7 @@ public class ShuffleServerWithLocalOfExceptionTest extends 
ShuffleReadWriteBase
             150,
             shuffleServerClient,
             Roaring64NavigableMap.bitmapOf());
-    grpcShuffleServers.get(0).stopServer();
+    nettyShuffleServers.get(0).stopServer();
     try {
       memoryClientReadHandler.readShuffleData();
       fail("Should throw connection exception directly.");
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index 3d8b6e5c2..bb63370b0 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -74,22 +74,30 @@ public class ShuffleWithRssClientTest extends 
ShuffleReadWriteBase {
         .forEach(
             subDirIndex -> {
               ShuffleServerConf shuffleServerConf =
-                  shuffleServerConfWithoutPort(subDirIndex, tmpDir, 
ServerType.GRPC);
+                  shuffleServerConfWithoutPort(subDirIndex, tmpDir, 
ServerType.GRPC_NETTY);
               
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 4000);
               shuffleServerConf.setString("rss.storage.type", 
StorageType.LOCALFILE.name());
               storeShuffleServerConf(shuffleServerConf);
             });
 
     startServersWithRandomPorts();
-    shuffleServerInfo1 = new ShuffleServerInfo(LOCALHOST, 
grpcShuffleServers.get(0).getGrpcPort());
-    shuffleServerInfo2 = new ShuffleServerInfo(LOCALHOST, 
grpcShuffleServers.get(1).getGrpcPort());
+    shuffleServerInfo1 =
+        new ShuffleServerInfo(
+            LOCALHOST,
+            nettyShuffleServers.get(0).getGrpcPort(),
+            nettyShuffleServers.get(0).getNettyPort());
+    shuffleServerInfo2 =
+        new ShuffleServerInfo(
+            LOCALHOST,
+            nettyShuffleServers.get(1).getGrpcPort(),
+            nettyShuffleServers.get(1).getNettyPort());
   }
 
   @BeforeEach
   public void createClient() {
     shuffleWriteClientImpl =
         ShuffleClientFactory.newWriteBuilder()
-            .clientType(ClientType.GRPC.name())
+            .clientType(ClientType.GRPC_NETTY.name())
             .retryMax(3)
             .retryIntervalMax(1000)
             .heartBeatThreadNum(1)
@@ -128,7 +136,7 @@ public class ShuffleWithRssClientTest extends 
ShuffleReadWriteBase {
     ShuffleServerInfo fakeShuffleServerInfo =
         new ShuffleServerInfo(
             "127.0.0.1-20001",
-            grpcShuffleServers.get(0).getIp(),
+            nettyShuffleServers.get(0).getIp(),
             generateNonExistingPorts(1).get(0));
     List<ShuffleBlockInfo> blocks =
         createShuffleBlockList(
@@ -168,7 +176,11 @@ public class ShuffleWithRssClientTest extends 
ShuffleReadWriteBase {
     shuffleWriteClientImpl.reportShuffleResult(serverToPartitionToBlockIds, 
testAppId, 0, 0, 2);
     Roaring64NavigableMap report =
         shuffleWriteClientImpl.getShuffleResult(
-            "GRPC", Sets.newHashSet(shuffleServerInfo1, 
fakeShuffleServerInfo), testAppId, 0, 0);
+            "GRPC_NETTY",
+            Sets.newHashSet(shuffleServerInfo1, fakeShuffleServerInfo),
+            testAppId,
+            0,
+            0);
     assertEquals(blockIdBitmap, report);
   }
 
@@ -208,12 +220,12 @@ public class ShuffleWithRssClientTest extends 
ShuffleReadWriteBase {
     shuffleWriteClientImpl.reportShuffleResult(serverToPartitionToBlockIds, 
testAppId, 1, 0, 1);
     Roaring64NavigableMap bitmap =
         shuffleWriteClientImpl.getShuffleResult(
-            "GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 0);
+            "GRPC_NETTY", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 
0);
     assertTrue(bitmap.isEmpty());
 
     bitmap =
         shuffleWriteClientImpl.getShuffleResult(
-            "GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 
partitionIdx);
+            "GRPC_NETTY", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 
partitionIdx);
     assertEquals(5, bitmap.getLongCardinality());
     for (Long b : partitionToBlocks.get(1)) {
       assertTrue(bitmap.contains(b));
@@ -267,12 +279,12 @@ public class ShuffleWithRssClientTest extends 
ShuffleReadWriteBase {
 
     Roaring64NavigableMap bitmap =
         shuffleWriteClientImpl.getShuffleResult(
-            "GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 0);
+            "GRPC_NETTY", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 
0);
     assertTrue(bitmap.isEmpty());
 
     bitmap =
         shuffleWriteClientImpl.getShuffleResult(
-            "GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 1);
+            "GRPC_NETTY", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 
1);
     assertEquals(5, bitmap.getLongCardinality());
     for (Long b : partitionToBlocks1.get(1)) {
       assertTrue(bitmap.contains(b));
@@ -280,22 +292,22 @@ public class ShuffleWithRssClientTest extends 
ShuffleReadWriteBase {
 
     bitmap =
         shuffleWriteClientImpl.getShuffleResult(
-            "GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 2);
+            "GRPC_NETTY", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 
2);
     assertTrue(bitmap.isEmpty());
 
     bitmap =
         shuffleWriteClientImpl.getShuffleResult(
-            "GRPC", Sets.newHashSet(shuffleServerInfo2), testAppId, 1, 0);
+            "GRPC_NETTY", Sets.newHashSet(shuffleServerInfo2), testAppId, 1, 
0);
     assertTrue(bitmap.isEmpty());
 
     bitmap =
         shuffleWriteClientImpl.getShuffleResult(
-            "GRPC", Sets.newHashSet(shuffleServerInfo2), testAppId, 1, 1);
+            "GRPC_NETTY", Sets.newHashSet(shuffleServerInfo2), testAppId, 1, 
1);
     assertTrue(bitmap.isEmpty());
 
     bitmap =
         shuffleWriteClientImpl.getShuffleResult(
-            "GRPC", Sets.newHashSet(shuffleServerInfo2), testAppId, 1, 2);
+            "GRPC_NETTY", Sets.newHashSet(shuffleServerInfo2), testAppId, 1, 
2);
     assertEquals(7, bitmap.getLongCardinality());
     for (Long b : partitionToBlocks2.get(2)) {
       assertTrue(bitmap.contains(b));
@@ -345,7 +357,7 @@ public class ShuffleWithRssClientTest extends 
ShuffleReadWriteBase {
 
     ShuffleReadClientImpl readClient =
         ShuffleClientFactory.newReadBuilder()
-            .clientType(ClientType.GRPC)
+            .clientType(ClientType.GRPC_NETTY)
             .storageType(StorageType.LOCALFILE.name())
             .appId(testAppId)
             .shuffleId(0)
@@ -369,7 +381,7 @@ public class ShuffleWithRssClientTest extends 
ShuffleReadWriteBase {
     assertTrue(commitResult);
     readClient =
         ShuffleClientFactory.newReadBuilder()
-            .clientType(ClientType.GRPC)
+            .clientType(ClientType.GRPC_NETTY)
             .storageType(StorageType.LOCALFILE.name())
             .appId(testAppId)
             .shuffleId(0)
@@ -392,7 +404,10 @@ public class ShuffleWithRssClientTest extends 
ShuffleReadWriteBase {
         shuffleWriteClientImpl.sendCommit(
             Sets.newHashSet(
                 new ShuffleServerInfo(
-                    "127.0.0.1-20001", "fakeIp", 
grpcShuffleServers.get(0).getGrpcPort())),
+                    "127.0.0.1-20001",
+                    "fakeIp",
+                    nettyShuffleServers.get(0).getGrpcPort(),
+                    nettyShuffleServers.get(0).getNettyPort())),
             testAppId,
             0,
             2);
@@ -429,12 +444,13 @@ public class ShuffleWithRssClientTest extends 
ShuffleReadWriteBase {
 
   @Test
   public void testRetryAssgin() throws Throwable {
-    int maxTryTime = grpcShuffleServers.size();
+    int maxTryTime = nettyShuffleServers.size();
     AtomicInteger tryTime = new AtomicInteger();
     String appId = "app-1";
     RemoteStorageInfo remoteStorage = new RemoteStorageInfo("");
     ShuffleAssignmentsInfo response = null;
-    ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0, 
null, ServerType.GRPC);
+    ShuffleServerConf shuffleServerConf =
+        shuffleServerConfWithoutPort(0, null, ServerType.GRPC_NETTY);
     int heartbeatInterval = 
shuffleServerConf.getInteger("rss.server.heartbeat.interval", 1000);
     Thread.sleep(heartbeatInterval * 2);
     shuffleWriteClientImpl.registerCoordinators(getQuorum());
@@ -454,7 +470,7 @@ public class ShuffleWithRssClientTest extends 
ShuffleReadWriteBase {
                   .forEach(
                       entry -> {
                         if (currentTryTime < maxTryTime) {
-                          grpcShuffleServers.forEach(
+                          nettyShuffleServers.forEach(
                               (ss) -> {
                                 if (ss.getId().equals(entry.getKey().getId())) 
{
                                   try {
diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java
index b04f3efd7..2919bd0c3 100644
--- 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java
@@ -89,7 +89,8 @@ public class AutoAccessTest extends IntegrationTestBase {
             + 
"org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker");
     storeCoordinatorConf(coordinatorConf);
 
-    ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0, 
tmpDir, ServerType.GRPC);
+    ShuffleServerConf shuffleServerConf =
+        shuffleServerConfWithoutPort(0, tmpDir, ServerType.GRPC_NETTY);
     storeShuffleServerConf(shuffleServerConf);
     startServersWithRandomPorts();
     Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
index 8ba28e15f..72c47fe29 100644
--- 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
@@ -93,7 +93,7 @@ public class RssShuffleManagerTest extends 
SparkIntegrationTestBase {
     CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
     addDynamicConf(coordinatorConf, dynamicConf);
     storeCoordinatorConf(coordinatorConf);
-    storeShuffleServerConf(shuffleServerConfWithoutPort(0, tempDir, 
ServerType.GRPC));
+    storeShuffleServerConf(shuffleServerConfWithoutPort(0, tempDir, 
ServerType.GRPC_NETTY));
     startServersWithRandomPorts();
     return dynamicConf;
   }
@@ -146,7 +146,7 @@ public class RssShuffleManagerTest extends 
SparkIntegrationTestBase {
     Map<String, String> dynamicConf = startServers(dynamicConfLayout);
 
     SparkConf conf = createSparkConf();
-    updateSparkConfWithRssGrpc(conf);
+    updateSparkConfWithRssNetty(conf);
     // enable stage recompute
     conf.set("spark." + RssClientConfig.RSS_RESUBMIT_STAGE, "true");
     // enable dynamic client conf
@@ -204,7 +204,7 @@ public class RssShuffleManagerTest extends 
SparkIntegrationTestBase {
       shuffleManager.configureBlockIdLayout(conf, rssConf);
       ShuffleWriteClient shuffleWriteClient =
           ShuffleClientFactory.newWriteBuilder()
-              .clientType(ClientType.GRPC.name())
+              .clientType(ClientType.GRPC_NETTY.name())
               .retryMax(3)
               .retryIntervalMax(2000)
               .heartBeatThreadNum(4)
@@ -228,7 +228,7 @@ public class RssShuffleManagerTest extends 
SparkIntegrationTestBase {
       for (int partitionId : new int[] {0, 1}) {
         Roaring64NavigableMap blockIdLongs =
             shuffleWriteClient.getShuffleResult(
-                ClientType.GRPC.name(), servers, shuffleManager.getAppId(), 0, 
partitionId);
+                ClientType.GRPC_NETTY.name(), servers, 
shuffleManager.getAppId(), 0, partitionId);
         List<BlockId> blockIds =
             blockIdLongs.stream()
                 .sorted()
diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallbackTest.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallbackTest.java
index e1431d5eb..945c73c44 100644
--- 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallbackTest.java
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallbackTest.java
@@ -57,7 +57,8 @@ public class SparkSQLWithDelegationShuffleManagerFallbackTest 
extends SparkSQLTe
     addDynamicConf(coordinatorConf, dynamicConf);
     storeCoordinatorConf(coordinatorConf);
 
-    ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0, 
tmpDir, ServerType.GRPC);
+    ShuffleServerConf shuffleServerConf =
+        shuffleServerConfWithoutPort(0, tmpDir, ServerType.GRPC_NETTY);
     shuffleServerConf.set(ShuffleServerConf.SERVER_HEARTBEAT_INTERVAL, 1000L);
     
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 
4000L);
     shuffleServerConf.setString(
diff --git 
a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
 
b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
index c112f2cda..8ebdc1862 100644
--- 
a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
+++ 
b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
@@ -89,7 +89,7 @@ public class GetReaderTest extends IntegrationTestBase {
     
coordinatorConf.setInteger("rss.coordinator.remote.storage.schedule.access.times",
 1);
     storeCoordinatorConf(coordinatorConf);
 
-    storeShuffleServerConf(shuffleServerConfWithoutPort(0, tmpDir, 
ServerType.GRPC));
+    storeShuffleServerConf(shuffleServerConfWithoutPort(0, tmpDir, 
ServerType.GRPC_NETTY));
     startServersWithRandomPorts();
     Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
 
diff --git 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
index bb8caba2d..92db1a623 100644
--- 
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
+++ 
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
@@ -100,7 +100,7 @@ public class GetReaderTest extends IntegrationTestBase {
     
coordinatorConf.setInteger("rss.coordinator.remote.storage.schedule.access.times",
 1);
     storeCoordinatorConf(coordinatorConf);
 
-    storeShuffleServerConf(shuffleServerConfWithoutPort(0, null, 
ServerType.GRPC));
+    storeShuffleServerConf(shuffleServerConfWithoutPort(0, null, 
ServerType.GRPC_NETTY));
     startServersWithRandomPorts();
     Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
     sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), getQuorum());

Reply via email to