This is an automated email from the ASF dual-hosted git repository.
rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 9007bed27 [#1651] fix(netty): Set Netty as the default client type
(#2422)
9007bed27 is described below
commit 9007bed27680245dc1ad8b0c6eff5bfded88acbd
Author: Andras Salamon <[email protected]>
AuthorDate: Thu Apr 10 08:18:50 2025 +0200
[#1651] fix(netty): Set Netty as the default client type (#2422)
### What changes were proposed in this pull request?
Changing the default client type from GRPC to GRPC_NETTY
### Why are the changes needed?
cad584a2fb7f3536ebcd32d70537c7e943e2d9c4 changed the server type to
GRPC_NETTY we need to change the client type as well.
Discussed in #2420
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UTs.
---
.../uniffle/client/util/RssClientConfig.java | 2 +-
.../org/apache/uniffle/test/AccessClusterTest.java | 5 +-
.../uniffle/test/AssignmentWithTagsTest.java | 15 +++---
.../uniffle/test/CoordinatorAssignmentTest.java | 2 +-
.../test/CoordinatorReconfigureNodeMaxTest.java | 2 +-
.../java/org/apache/uniffle/test/ServletTest.java | 42 +++++++++-------
.../ShuffleServerWithLocalOfExceptionTest.java | 5 +-
.../uniffle/test/ShuffleWithRssClientTest.java | 56 ++++++++++++++--------
.../org/apache/uniffle/test/AutoAccessTest.java | 3 +-
.../apache/uniffle/test/RssShuffleManagerTest.java | 8 ++--
...QLWithDelegationShuffleManagerFallbackTest.java | 3 +-
.../org/apache/uniffle/test/GetReaderTest.java | 2 +-
.../org/apache/uniffle/test/GetReaderTest.java | 2 +-
13 files changed, 87 insertions(+), 60 deletions(-)
diff --git
a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
index 1e2089bee..a2e60caf7 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
@@ -20,7 +20,7 @@ package org.apache.uniffle.client.util;
public class RssClientConfig {
public static final String RSS_CLIENT_TYPE = "rss.client.type";
- public static final String RSS_CLIENT_TYPE_DEFAULT_VALUE = "GRPC";
+ public static final String RSS_CLIENT_TYPE_DEFAULT_VALUE = "GRPC_NETTY";
public static final String RSS_CLIENT_RETRY_MAX = "rss.client.retry.max";
public static final int RSS_CLIENT_RETRY_MAX_DEFAULT_VALUE = 50;
public static final String RSS_CLIENT_RETRY_INTERVAL_MAX =
"rss.client.retry.interval.max";
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
index 1a962bcb9..5ca58acae 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
@@ -151,7 +151,7 @@ public class AccessClusterTest extends CoordinatorTestBase {
+
"org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker");
storeCoordinatorConf(coordinatorConf);
- storeShuffleServerConf(shuffleServerConfWithoutPort(0, tempDir,
ServerType.GRPC));
+ storeShuffleServerConf(shuffleServerConfWithoutPort(0, tempDir,
ServerType.GRPC_NETTY));
startServersWithRandomPorts();
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
String accessId = "111111";
@@ -170,7 +170,8 @@ public class AccessClusterTest extends CoordinatorTestBase {
response = coordinatorClient.accessCluster(request);
assertEquals(StatusCode.ACCESS_DENIED, response.getStatusCode());
assertTrue(response.getMessage().startsWith("Denied by
AccessClusterLoadChecker"));
- ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0,
tempDir, ServerType.GRPC);
+ ShuffleServerConf shuffleServerConf =
+ shuffleServerConfWithoutPort(0, tempDir, ServerType.GRPC_NETTY);
shuffleServerConf.setString("rss.coordinator.quorum", getQuorum());
ShuffleServer shuffleServer = new ShuffleServer(shuffleServerConf);
shuffleServer.start();
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
index 6b84a28b6..a98552dee 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/AssignmentWithTagsTest.java
@@ -35,6 +35,7 @@ import org.apache.uniffle.client.factory.ShuffleClientFactory;
import org.apache.uniffle.client.impl.ShuffleWriteClientImpl;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.ShuffleAssignmentsInfo;
+import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -59,7 +60,7 @@ public class AssignmentWithTagsTest extends
CoordinatorTestBase {
private static void prepareShuffleServerConf(int subDirIndex, Set<String>
tags, File tmpDir) {
ShuffleServerConf shuffleServerConf =
- shuffleServerConfWithoutPort(subDirIndex, tmpDir, ServerType.GRPC);
+ shuffleServerConfWithoutPort(subDirIndex, tmpDir,
ServerType.GRPC_NETTY);
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 4000);
shuffleServerConf.setString("rss.storage.type",
StorageType.LOCALFILE.name());
shuffleServerConf.setString("rss.server.tags", StringUtils.join(tags,
","));
@@ -85,16 +86,16 @@ public class AssignmentWithTagsTest extends
CoordinatorTestBase {
startServersWithRandomPorts();
List<Integer> collect =
-
grpcShuffleServers.stream().map(ShuffleServer::getGrpcPort).collect(Collectors.toList());
+
nettyShuffleServers.stream().map(ShuffleServer::getNettyPort).collect(Collectors.toList());
tagOfShufflePorts.put(Constants.SHUFFLE_SERVER_VERSION, collect);
tagOfShufflePorts.put(
tag1,
Arrays.asList(
- grpcShuffleServers.get(2).getGrpcPort(),
grpcShuffleServers.get(3).getGrpcPort()));
+ nettyShuffleServers.get(2).getNettyPort(),
nettyShuffleServers.get(3).getNettyPort()));
tagOfShufflePorts.put(
tag2,
Arrays.asList(
- grpcShuffleServers.get(4).getGrpcPort(),
grpcShuffleServers.get(5).getGrpcPort()));
+ nettyShuffleServers.get(4).getNettyPort(),
nettyShuffleServers.get(5).getNettyPort()));
// Wait all shuffle servers registering to coordinator
long startTimeMS = System.currentTimeMillis();
@@ -137,7 +138,7 @@ public class AssignmentWithTagsTest extends
CoordinatorTestBase {
List<Integer> assignedServerPorts =
assignmentsInfo.getPartitionToServers().values().stream()
.flatMap(x -> x.stream())
- .map(x -> x.getGrpcPort())
+ .map(ShuffleServerInfo::getNettyPort)
.collect(Collectors.toList());
assertEquals(1, assignedServerPorts.size());
assertTrue(
@@ -161,7 +162,7 @@ public class AssignmentWithTagsTest extends
CoordinatorTestBase {
assignedServerPorts =
assignmentsInfo.getPartitionToServers().values().stream()
.flatMap(x -> x.stream())
- .map(x -> x.getGrpcPort())
+ .map(ShuffleServerInfo::getNettyPort)
.collect(Collectors.toList());
assertEquals(1, assignedServerPorts.size());
assertTrue(tagOfShufflePorts.get(tag1).contains(assignedServerPorts.get(0)));
@@ -173,7 +174,7 @@ public class AssignmentWithTagsTest extends
CoordinatorTestBase {
assignedServerPorts =
assignmentsInfo.getPartitionToServers().values().stream()
.flatMap(x -> x.stream())
- .map(x -> x.getGrpcPort())
+ .map(ShuffleServerInfo::getNettyPort)
.collect(Collectors.toList());
assertEquals(1, assignedServerPorts.size());
assertTrue(tagOfShufflePorts.get(tag1).contains(assignedServerPorts.get(0)));
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
index 136cc22e8..2eed0b10c 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAssignmentTest.java
@@ -69,7 +69,7 @@ public class CoordinatorAssignmentTest extends
CoordinatorTestBase {
for (int i = 0; i < SERVER_NUM; i++) {
ShuffleServerConf shuffleServerConf =
- shuffleServerConfWithoutPort(i, tmpDir, ServerType.GRPC);
+ shuffleServerConfWithoutPort(i, tmpDir, ServerType.GRPC_NETTY);
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE_HDFS.name());
shuffleServerConf.set(RssBaseConf.RPC_METRICS_ENABLED, true);
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorReconfigureNodeMaxTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorReconfigureNodeMaxTest.java
index 57004b755..e4754b470 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorReconfigureNodeMaxTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorReconfigureNodeMaxTest.java
@@ -70,7 +70,7 @@ public class CoordinatorReconfigureNodeMaxTest extends
CoordinatorTestBase {
for (int i = 0; i < SERVER_NUM; i++) {
ShuffleServerConf shuffleServerConf =
- shuffleServerConfWithoutPort(i, tmpDir, ServerType.GRPC);
+ shuffleServerConfWithoutPort(i, tmpDir, ServerType.GRPC_NETTY);
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE_HDFS.name());
shuffleServerConf.set(RssBaseConf.RPC_METRICS_ENABLED, true);
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
index 2ba14bf14..030884302 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
@@ -39,7 +39,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcClient;
+import org.apache.uniffle.client.impl.grpc.ShuffleServerGrpcNettyClient;
import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ServerStatus;
@@ -82,7 +82,7 @@ public class ServletTest extends IntegrationTestBase {
private static void prepareShuffleServerConf(int subDirIndex, File tmpDir)
throws Exception {
ShuffleServerConf shuffleServerConf =
- shuffleServerConfWithoutPort(subDirIndex, tmpDir, ServerType.GRPC);
+ shuffleServerConfWithoutPort(subDirIndex, tmpDir,
ServerType.GRPC_NETTY);
shuffleServerConf.set(ShuffleServerConf.SERVER_DECOMMISSION_SHUTDOWN,
false);
storeShuffleServerConf(shuffleServerConf);
}
@@ -110,7 +110,7 @@ public class ServletTest extends IntegrationTestBase {
@Test
public void testGetSingleNode() throws Exception {
- ShuffleServer shuffleServer = grpcShuffleServers.get(0);
+ ShuffleServer shuffleServer = nettyShuffleServers.get(0);
String content =
TestUtils.httpGet(
String.format(SINGLE_NODE_URL, coordinatorHttpPort,
shuffleServer.getId()));
@@ -119,8 +119,8 @@ public class ServletTest extends IntegrationTestBase {
HashMap<String, Object> server = response.getData();
assertEquals(0, response.getCode());
assertEquals(
- grpcShuffleServers.get(0).getGrpcPort(),
- Integer.parseInt(server.get("grpcPort").toString()));
+ nettyShuffleServers.get(0).getNettyPort(),
+ Integer.parseInt(server.get("nettyPort").toString()));
assertEquals(ServerStatus.ACTIVE.toString(), server.get("status"));
}
@@ -134,10 +134,10 @@ public class ServletTest extends IntegrationTestBase {
assertEquals(0, response.getCode());
assertEquals(4, serverList.size());
Set<Integer> portSet =
- grpcShuffleServers.stream().map(server ->
server.getGrpcPort()).collect(Collectors.toSet());
+
nettyShuffleServers.stream().map(ShuffleServer::getNettyPort).collect(Collectors.toSet());
for (int i = 0; i < serverList.size(); i++) {
assertEquals(ServerStatus.ACTIVE.toString(),
serverList.get(i).get("status"));
-
assertTrue(portSet.contains(Integer.parseInt(serverList.get(i).get("grpcPort").toString())));
+
assertTrue(portSet.contains(Integer.parseInt(serverList.get(i).get("nettyPort").toString())));
}
}
@@ -145,8 +145,8 @@ public class ServletTest extends IntegrationTestBase {
public void testLostNodesServlet() throws IOException {
try (SimpleClusterManager clusterManager =
(SimpleClusterManager) coordinatorServer.getClusterManager()) {
- ShuffleServer shuffleServer3 = grpcShuffleServers.get(2);
- ShuffleServer shuffleServer4 = grpcShuffleServers.get(3);
+ ShuffleServer shuffleServer3 = nettyShuffleServers.get(2);
+ ShuffleServer shuffleServer4 = nettyShuffleServers.get(3);
Map<String, ServerNode> servers = clusterManager.getServers();
servers.get(shuffleServer3.getId()).setTimestamp(System.currentTimeMillis() -
40000);
servers.get(shuffleServer4.getId()).setTimestamp(System.currentTimeMillis() -
40000);
@@ -168,7 +168,7 @@ public class ServletTest extends IntegrationTestBase {
@Test
public void testDecommissionedNodeServlet() {
- ShuffleServer shuffleServer = grpcShuffleServers.get(1);
+ ShuffleServer shuffleServer = nettyShuffleServers.get(1);
shuffleServer.decommission();
Awaitility.await()
.atMost(30, TimeUnit.SECONDS)
@@ -191,8 +191,8 @@ public class ServletTest extends IntegrationTestBase {
@Test
public void testUnhealthyNodesServlet() {
- ShuffleServer shuffleServer3 = grpcShuffleServers.get(2);
- ShuffleServer shuffleServer4 = grpcShuffleServers.get(3);
+ ShuffleServer shuffleServer3 = nettyShuffleServers.get(2);
+ ShuffleServer shuffleServer4 = nettyShuffleServers.get(3);
shuffleServer3.markUnhealthy();
shuffleServer4.markUnhealthy();
List<String> expectShuffleIds = Arrays.asList(shuffleServer3.getId(),
shuffleServer4.getId());
@@ -221,7 +221,7 @@ public class ServletTest extends IntegrationTestBase {
@Test
public void testDecommissionServlet() throws Exception {
- ShuffleServer shuffleServer = grpcShuffleServers.get(0);
+ ShuffleServer shuffleServer = nettyShuffleServers.get(0);
assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
DecommissionRequest decommissionRequest = new DecommissionRequest();
decommissionRequest.setServerIds(Sets.newHashSet("not_exist_serverId"));
@@ -244,8 +244,11 @@ public class ServletTest extends IntegrationTestBase {
assertEquals(0, response.getCode());
// Register shuffle, avoid server exiting immediately.
- ShuffleServerGrpcClient shuffleServerClient =
- new ShuffleServerGrpcClient(LOCALHOST,
grpcShuffleServers.get(0).getGrpcPort());
+ ShuffleServerGrpcNettyClient shuffleServerClient =
+ new ShuffleServerGrpcNettyClient(
+ LOCALHOST,
+ nettyShuffleServers.get(0).getGrpcPort(),
+ nettyShuffleServers.get(0).getNettyPort());
shuffleServerClient.registerShuffle(
new RssRegisterShuffleRequest(
"testDecommissionServlet_appId", 0, Lists.newArrayList(new
PartitionRange(0, 1)), ""));
@@ -282,7 +285,7 @@ public class ServletTest extends IntegrationTestBase {
@Test
public void testDecommissionSingleNode() throws Exception {
- ShuffleServer shuffleServer = grpcShuffleServers.get(0);
+ ShuffleServer shuffleServer = nettyShuffleServers.get(0);
assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
String content =
TestUtils.httpPost(
@@ -303,8 +306,11 @@ public class ServletTest extends IntegrationTestBase {
assertEquals(0, response.getCode());
// Register shuffle, avoid server exiting immediately.
- ShuffleServerGrpcClient shuffleServerClient =
- new ShuffleServerGrpcClient(LOCALHOST,
grpcShuffleServers.get(0).getGrpcPort());
+ ShuffleServerGrpcNettyClient shuffleServerClient =
+ new ShuffleServerGrpcNettyClient(
+ LOCALHOST,
+ nettyShuffleServers.get(0).getGrpcPort(),
+ nettyShuffleServers.get(0).getNettyPort());
shuffleServerClient.registerShuffle(
new RssRegisterShuffleRequest(
"testDecommissionServlet_appId", 0, Lists.newArrayList(new
PartitionRange(0, 1)), ""));
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
index 48cb91886..6c91abf0a 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithLocalOfExceptionTest.java
@@ -47,7 +47,8 @@ public class ShuffleServerWithLocalOfExceptionTest extends
ShuffleReadWriteBase
CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
storeCoordinatorConf(coordinatorConf);
- ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0,
tmpDir, ServerType.GRPC);
+ ShuffleServerConf shuffleServerConf =
+ shuffleServerConfWithoutPort(0, tmpDir, ServerType.GRPC_NETTY);
shuffleServerConf.setString("rss.server.app.expired.withoutHeartbeat",
"5000");
storeShuffleServerConf(shuffleServerConf);
@@ -78,7 +79,7 @@ public class ShuffleServerWithLocalOfExceptionTest extends
ShuffleReadWriteBase
150,
shuffleServerClient,
Roaring64NavigableMap.bitmapOf());
- grpcShuffleServers.get(0).stopServer();
+ nettyShuffleServers.get(0).stopServer();
try {
memoryClientReadHandler.readShuffleData();
fail("Should throw connection exception directly.");
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
index 3d8b6e5c2..bb63370b0 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java
@@ -74,22 +74,30 @@ public class ShuffleWithRssClientTest extends
ShuffleReadWriteBase {
.forEach(
subDirIndex -> {
ShuffleServerConf shuffleServerConf =
- shuffleServerConfWithoutPort(subDirIndex, tmpDir,
ServerType.GRPC);
+ shuffleServerConfWithoutPort(subDirIndex, tmpDir,
ServerType.GRPC_NETTY);
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 4000);
shuffleServerConf.setString("rss.storage.type",
StorageType.LOCALFILE.name());
storeShuffleServerConf(shuffleServerConf);
});
startServersWithRandomPorts();
- shuffleServerInfo1 = new ShuffleServerInfo(LOCALHOST,
grpcShuffleServers.get(0).getGrpcPort());
- shuffleServerInfo2 = new ShuffleServerInfo(LOCALHOST,
grpcShuffleServers.get(1).getGrpcPort());
+ shuffleServerInfo1 =
+ new ShuffleServerInfo(
+ LOCALHOST,
+ nettyShuffleServers.get(0).getGrpcPort(),
+ nettyShuffleServers.get(0).getNettyPort());
+ shuffleServerInfo2 =
+ new ShuffleServerInfo(
+ LOCALHOST,
+ nettyShuffleServers.get(1).getGrpcPort(),
+ nettyShuffleServers.get(1).getNettyPort());
}
@BeforeEach
public void createClient() {
shuffleWriteClientImpl =
ShuffleClientFactory.newWriteBuilder()
- .clientType(ClientType.GRPC.name())
+ .clientType(ClientType.GRPC_NETTY.name())
.retryMax(3)
.retryIntervalMax(1000)
.heartBeatThreadNum(1)
@@ -128,7 +136,7 @@ public class ShuffleWithRssClientTest extends
ShuffleReadWriteBase {
ShuffleServerInfo fakeShuffleServerInfo =
new ShuffleServerInfo(
"127.0.0.1-20001",
- grpcShuffleServers.get(0).getIp(),
+ nettyShuffleServers.get(0).getIp(),
generateNonExistingPorts(1).get(0));
List<ShuffleBlockInfo> blocks =
createShuffleBlockList(
@@ -168,7 +176,11 @@ public class ShuffleWithRssClientTest extends
ShuffleReadWriteBase {
shuffleWriteClientImpl.reportShuffleResult(serverToPartitionToBlockIds,
testAppId, 0, 0, 2);
Roaring64NavigableMap report =
shuffleWriteClientImpl.getShuffleResult(
- "GRPC", Sets.newHashSet(shuffleServerInfo1,
fakeShuffleServerInfo), testAppId, 0, 0);
+ "GRPC_NETTY",
+ Sets.newHashSet(shuffleServerInfo1, fakeShuffleServerInfo),
+ testAppId,
+ 0,
+ 0);
assertEquals(blockIdBitmap, report);
}
@@ -208,12 +220,12 @@ public class ShuffleWithRssClientTest extends
ShuffleReadWriteBase {
shuffleWriteClientImpl.reportShuffleResult(serverToPartitionToBlockIds,
testAppId, 1, 0, 1);
Roaring64NavigableMap bitmap =
shuffleWriteClientImpl.getShuffleResult(
- "GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 0);
+ "GRPC_NETTY", Sets.newHashSet(shuffleServerInfo1), testAppId, 1,
0);
assertTrue(bitmap.isEmpty());
bitmap =
shuffleWriteClientImpl.getShuffleResult(
- "GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId, 1,
partitionIdx);
+ "GRPC_NETTY", Sets.newHashSet(shuffleServerInfo1), testAppId, 1,
partitionIdx);
assertEquals(5, bitmap.getLongCardinality());
for (Long b : partitionToBlocks.get(1)) {
assertTrue(bitmap.contains(b));
@@ -267,12 +279,12 @@ public class ShuffleWithRssClientTest extends
ShuffleReadWriteBase {
Roaring64NavigableMap bitmap =
shuffleWriteClientImpl.getShuffleResult(
- "GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 0);
+ "GRPC_NETTY", Sets.newHashSet(shuffleServerInfo1), testAppId, 1,
0);
assertTrue(bitmap.isEmpty());
bitmap =
shuffleWriteClientImpl.getShuffleResult(
- "GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 1);
+ "GRPC_NETTY", Sets.newHashSet(shuffleServerInfo1), testAppId, 1,
1);
assertEquals(5, bitmap.getLongCardinality());
for (Long b : partitionToBlocks1.get(1)) {
assertTrue(bitmap.contains(b));
@@ -280,22 +292,22 @@ public class ShuffleWithRssClientTest extends
ShuffleReadWriteBase {
bitmap =
shuffleWriteClientImpl.getShuffleResult(
- "GRPC", Sets.newHashSet(shuffleServerInfo1), testAppId, 1, 2);
+ "GRPC_NETTY", Sets.newHashSet(shuffleServerInfo1), testAppId, 1,
2);
assertTrue(bitmap.isEmpty());
bitmap =
shuffleWriteClientImpl.getShuffleResult(
- "GRPC", Sets.newHashSet(shuffleServerInfo2), testAppId, 1, 0);
+ "GRPC_NETTY", Sets.newHashSet(shuffleServerInfo2), testAppId, 1,
0);
assertTrue(bitmap.isEmpty());
bitmap =
shuffleWriteClientImpl.getShuffleResult(
- "GRPC", Sets.newHashSet(shuffleServerInfo2), testAppId, 1, 1);
+ "GRPC_NETTY", Sets.newHashSet(shuffleServerInfo2), testAppId, 1,
1);
assertTrue(bitmap.isEmpty());
bitmap =
shuffleWriteClientImpl.getShuffleResult(
- "GRPC", Sets.newHashSet(shuffleServerInfo2), testAppId, 1, 2);
+ "GRPC_NETTY", Sets.newHashSet(shuffleServerInfo2), testAppId, 1,
2);
assertEquals(7, bitmap.getLongCardinality());
for (Long b : partitionToBlocks2.get(2)) {
assertTrue(bitmap.contains(b));
@@ -345,7 +357,7 @@ public class ShuffleWithRssClientTest extends
ShuffleReadWriteBase {
ShuffleReadClientImpl readClient =
ShuffleClientFactory.newReadBuilder()
- .clientType(ClientType.GRPC)
+ .clientType(ClientType.GRPC_NETTY)
.storageType(StorageType.LOCALFILE.name())
.appId(testAppId)
.shuffleId(0)
@@ -369,7 +381,7 @@ public class ShuffleWithRssClientTest extends
ShuffleReadWriteBase {
assertTrue(commitResult);
readClient =
ShuffleClientFactory.newReadBuilder()
- .clientType(ClientType.GRPC)
+ .clientType(ClientType.GRPC_NETTY)
.storageType(StorageType.LOCALFILE.name())
.appId(testAppId)
.shuffleId(0)
@@ -392,7 +404,10 @@ public class ShuffleWithRssClientTest extends
ShuffleReadWriteBase {
shuffleWriteClientImpl.sendCommit(
Sets.newHashSet(
new ShuffleServerInfo(
- "127.0.0.1-20001", "fakeIp",
grpcShuffleServers.get(0).getGrpcPort())),
+ "127.0.0.1-20001",
+ "fakeIp",
+ nettyShuffleServers.get(0).getGrpcPort(),
+ nettyShuffleServers.get(0).getNettyPort())),
testAppId,
0,
2);
@@ -429,12 +444,13 @@ public class ShuffleWithRssClientTest extends
ShuffleReadWriteBase {
@Test
public void testRetryAssgin() throws Throwable {
- int maxTryTime = grpcShuffleServers.size();
+ int maxTryTime = nettyShuffleServers.size();
AtomicInteger tryTime = new AtomicInteger();
String appId = "app-1";
RemoteStorageInfo remoteStorage = new RemoteStorageInfo("");
ShuffleAssignmentsInfo response = null;
- ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0,
null, ServerType.GRPC);
+ ShuffleServerConf shuffleServerConf =
+ shuffleServerConfWithoutPort(0, null, ServerType.GRPC_NETTY);
int heartbeatInterval =
shuffleServerConf.getInteger("rss.server.heartbeat.interval", 1000);
Thread.sleep(heartbeatInterval * 2);
shuffleWriteClientImpl.registerCoordinators(getQuorum());
@@ -454,7 +470,7 @@ public class ShuffleWithRssClientTest extends
ShuffleReadWriteBase {
.forEach(
entry -> {
if (currentTryTime < maxTryTime) {
- grpcShuffleServers.forEach(
+ nettyShuffleServers.forEach(
(ss) -> {
if (ss.getId().equals(entry.getKey().getId()))
{
try {
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java
index b04f3efd7..2919bd0c3 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java
@@ -89,7 +89,8 @@ public class AutoAccessTest extends IntegrationTestBase {
+
"org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker");
storeCoordinatorConf(coordinatorConf);
- ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0,
tmpDir, ServerType.GRPC);
+ ShuffleServerConf shuffleServerConf =
+ shuffleServerConfWithoutPort(0, tmpDir, ServerType.GRPC_NETTY);
storeShuffleServerConf(shuffleServerConf);
startServersWithRandomPorts();
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
index 8ba28e15f..72c47fe29 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
@@ -93,7 +93,7 @@ public class RssShuffleManagerTest extends
SparkIntegrationTestBase {
CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
addDynamicConf(coordinatorConf, dynamicConf);
storeCoordinatorConf(coordinatorConf);
- storeShuffleServerConf(shuffleServerConfWithoutPort(0, tempDir,
ServerType.GRPC));
+ storeShuffleServerConf(shuffleServerConfWithoutPort(0, tempDir,
ServerType.GRPC_NETTY));
startServersWithRandomPorts();
return dynamicConf;
}
@@ -146,7 +146,7 @@ public class RssShuffleManagerTest extends
SparkIntegrationTestBase {
Map<String, String> dynamicConf = startServers(dynamicConfLayout);
SparkConf conf = createSparkConf();
- updateSparkConfWithRssGrpc(conf);
+ updateSparkConfWithRssNetty(conf);
// enable stage recompute
conf.set("spark." + RssClientConfig.RSS_RESUBMIT_STAGE, "true");
// enable dynamic client conf
@@ -204,7 +204,7 @@ public class RssShuffleManagerTest extends
SparkIntegrationTestBase {
shuffleManager.configureBlockIdLayout(conf, rssConf);
ShuffleWriteClient shuffleWriteClient =
ShuffleClientFactory.newWriteBuilder()
- .clientType(ClientType.GRPC.name())
+ .clientType(ClientType.GRPC_NETTY.name())
.retryMax(3)
.retryIntervalMax(2000)
.heartBeatThreadNum(4)
@@ -228,7 +228,7 @@ public class RssShuffleManagerTest extends
SparkIntegrationTestBase {
for (int partitionId : new int[] {0, 1}) {
Roaring64NavigableMap blockIdLongs =
shuffleWriteClient.getShuffleResult(
- ClientType.GRPC.name(), servers, shuffleManager.getAppId(), 0,
partitionId);
+ ClientType.GRPC_NETTY.name(), servers,
shuffleManager.getAppId(), 0, partitionId);
List<BlockId> blockIds =
blockIdLongs.stream()
.sorted()
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallbackTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallbackTest.java
index e1431d5eb..945c73c44 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallbackTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallbackTest.java
@@ -57,7 +57,8 @@ public class SparkSQLWithDelegationShuffleManagerFallbackTest
extends SparkSQLTe
addDynamicConf(coordinatorConf, dynamicConf);
storeCoordinatorConf(coordinatorConf);
- ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0,
tmpDir, ServerType.GRPC);
+ ShuffleServerConf shuffleServerConf =
+ shuffleServerConfWithoutPort(0, tmpDir, ServerType.GRPC_NETTY);
shuffleServerConf.set(ShuffleServerConf.SERVER_HEARTBEAT_INTERVAL, 1000L);
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT,
4000L);
shuffleServerConf.setString(
diff --git
a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
index c112f2cda..8ebdc1862 100644
---
a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
+++
b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
@@ -89,7 +89,7 @@ public class GetReaderTest extends IntegrationTestBase {
coordinatorConf.setInteger("rss.coordinator.remote.storage.schedule.access.times",
1);
storeCoordinatorConf(coordinatorConf);
- storeShuffleServerConf(shuffleServerConfWithoutPort(0, tmpDir,
ServerType.GRPC));
+ storeShuffleServerConf(shuffleServerConfWithoutPort(0, tmpDir,
ServerType.GRPC_NETTY));
startServersWithRandomPorts();
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
index bb8caba2d..92db1a623 100644
---
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
+++
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
@@ -100,7 +100,7 @@ public class GetReaderTest extends IntegrationTestBase {
coordinatorConf.setInteger("rss.coordinator.remote.storage.schedule.access.times",
1);
storeCoordinatorConf(coordinatorConf);
- storeShuffleServerConf(shuffleServerConfWithoutPort(0, null,
ServerType.GRPC));
+ storeShuffleServerConf(shuffleServerConfWithoutPort(0, null,
ServerType.GRPC_NETTY));
startServersWithRandomPorts();
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), getQuorum());