This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 41155a7f9 [#2382] improvement(test): Fix ServletTest,
ShuffleServerFaultToleranceTest,ShuffleServerWithKerberizedHadoopTest to use
random port (#2384)
41155a7f9 is described below
commit 41155a7f9c91c933384eee96a54a6fb90983e6f4
Author: summaryzb <[email protected]>
AuthorDate: Mon Mar 10 19:34:46 2025 +0800
[#2382] improvement(test): Fix ServletTest,
ShuffleServerFaultToleranceTest,ShuffleServerWithKerberizedHadoopTest to use
random port (#2384)
### What changes were proposed in this pull request?
Introduce a new way to start servers
1. Store coordinator and shuffleServer conf
2. Initialize coordinator and shuffleServer lazily by using random port in
`startServersWithRandomPorts`
3. Pass coordinator quorum to shuffleServer after coordinator started
### Why are the changes needed?
Avoid port conflicts of unit test
Fix: #2382
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT
---
.../apache/uniffle/test/IntegrationTestBase.java | 122 +++++++++++++++++--
.../java/org/apache/uniffle/test/ServletTest.java | 131 ++++++++-------------
.../test/ShuffleServerFaultToleranceTest.java | 34 ++----
.../ShuffleServerWithKerberizedHadoopTest.java | 52 ++++----
4 files changed, 207 insertions(+), 132 deletions(-)
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
index 42be2aa91..bd827616c 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
@@ -24,11 +24,14 @@ import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import com.google.common.collect.Lists;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.io.TempDir;
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.port.PortRegistry;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -69,6 +72,10 @@ public abstract class IntegrationTestBase extends
HadoopTestBase {
protected static List<ShuffleServer> nettyShuffleServers =
Lists.newArrayList();
protected static List<CoordinatorServer> coordinators = Lists.newArrayList();
+ private static List<ShuffleServerConf> shuffleServerConfList =
Lists.newArrayList();
+ private static List<ShuffleServerConf> mockShuffleServerConfList =
Lists.newArrayList();
+ protected static List<CoordinatorConf> coordinatorConfList =
Lists.newArrayList();
+
/** Should not be accessed directly, use `getNextNettyServerPort` instead */
private static final int NETTY_INITIAL_PORT = 21000;
@@ -90,6 +97,59 @@ public abstract class IntegrationTestBase extends
HadoopTestBase {
}
}
+ public static void startServersWithRandomPorts() throws Exception {
+ final int jettyPortSize =
+ coordinatorConfList.size()
+ + shuffleServerConfList.size()
+ + mockShuffleServerConfList.size();
+ reserveJettyPorts(jettyPortSize);
+ int index = 0;
+ for (CoordinatorConf coordinatorConf : coordinatorConfList) {
+ coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT,
jettyPorts.get(index));
+ index++;
+ coordinatorConf.setInteger(CoordinatorConf.RPC_SERVER_PORT, 0);
+ createCoordinatorServer(coordinatorConf);
+ }
+ for (CoordinatorServer coordinator : coordinators) {
+ coordinator.start();
+ }
+ String quorum =
+ coordinators.stream()
+ .map(CoordinatorServer::getRpcListenPort)
+ .map(port -> LOCALHOST + ":" + port)
+ .collect(Collectors.joining(","));
+
+ for (ShuffleServerConf serverConf : shuffleServerConfList) {
+ serverConf.setInteger(RssBaseConf.JETTY_HTTP_PORT,
jettyPorts.get(index));
+ index++;
+ serverConf.setInteger(RssBaseConf.RPC_SERVER_PORT, 0);
+ serverConf.setString(RssBaseConf.RSS_COORDINATOR_QUORUM, quorum);
+ createShuffleServer(serverConf);
+ }
+ for (ShuffleServerConf serverConf : mockShuffleServerConfList) {
+ serverConf.setInteger(RssBaseConf.JETTY_HTTP_PORT,
jettyPorts.get(index));
+ index++;
+ serverConf.setInteger(RssBaseConf.RPC_SERVER_PORT, 0);
+ serverConf.setString(RssBaseConf.RSS_COORDINATOR_QUORUM, quorum);
+ createMockedShuffleServer(serverConf);
+ }
+ for (ShuffleServer server : grpcShuffleServers) {
+ server.start();
+ }
+ for (ShuffleServer server : nettyShuffleServers) {
+
server.getShuffleServerConf().setInteger(ShuffleServerConf.NETTY_SERVER_PORT,
0);
+ server.start();
+ }
+ }
+
+ protected static List<Integer> jettyPorts = Lists.newArrayList();
+
+ public static void reserveJettyPorts(int numPorts) {
+ for (int i = 0; i < numPorts; i++) {
+ jettyPorts.add(PortRegistry.reservePort());
+ }
+ }
+
@AfterAll
public static void shutdownServers() throws Exception {
for (CoordinatorServer coordinator : coordinators) {
@@ -101,9 +161,16 @@ public abstract class IntegrationTestBase extends
HadoopTestBase {
for (ShuffleServer shuffleServer : nettyShuffleServers) {
shuffleServer.stopServer();
}
- grpcShuffleServers = Lists.newArrayList();
- nettyShuffleServers = Lists.newArrayList();
- coordinators = Lists.newArrayList();
+ for (int port : jettyPorts) {
+ PortRegistry.release(port);
+ }
+ grpcShuffleServers.clear();
+ nettyShuffleServers.clear();
+ coordinators.clear();
+ shuffleServerConfList.clear();
+ mockShuffleServerConfList.clear();
+ coordinatorConfList.clear();
+ jettyPorts.clear();
ShuffleServerMetrics.clear();
CoordinatorMetrics.clear();
}
@@ -116,6 +183,12 @@ public abstract class IntegrationTestBase extends
HadoopTestBase {
return coordinatorConf;
}
+ protected static CoordinatorConf coordinatorConfWithoutPort() {
+ CoordinatorConf coordinatorConf = new CoordinatorConf();
+ coordinatorConf.setInteger(CoordinatorConf.RPC_EXECUTOR_SIZE, 10);
+ return coordinatorConf;
+ }
+
protected static void addDynamicConf(
CoordinatorConf coordinatorConf, Map<String, String> dynamicConf) throws
Exception {
File file = createDynamicConfFile(dynamicConf);
@@ -126,19 +199,32 @@ public abstract class IntegrationTestBase extends
HadoopTestBase {
CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC,
5);
}
+ // TODO(summaryzb) when all test use random port,
+ // https://github.com/apache/incubator-uniffle/issues/2064 should be closed,
then this method
+ // should be removed
protected static ShuffleServerConf getShuffleServerConf(ServerType
serverType) throws Exception {
+ return getShuffleServerConf(
+ serverType,
+ COORDINATOR_QUORUM,
+ getNextRpcServerPort(),
+ getNextNettyServerPort(),
+ getNextJettyServerPort());
+ }
+
+ private static ShuffleServerConf getShuffleServerConf(
+ ServerType serverType, String quorum, int grpcPort, int nettyPort, int
jettyPort) {
ShuffleServerConf serverConf = new ShuffleServerConf();
- serverConf.setInteger("rss.rpc.server.port", getNextRpcServerPort());
+ serverConf.setInteger("rss.rpc.server.port", grpcPort);
serverConf.setString("rss.storage.type",
StorageType.MEMORY_LOCALFILE_HDFS.name());
serverConf.setString("rss.storage.basePath", tempDir.getAbsolutePath());
serverConf.setString("rss.server.buffer.capacity", "671088640");
serverConf.setString("rss.server.memory.shuffle.highWaterMark", "50.0");
serverConf.setString("rss.server.memory.shuffle.lowWaterMark", "0.0");
serverConf.setString("rss.server.read.buffer.capacity", "335544320");
- serverConf.setString("rss.coordinator.quorum", COORDINATOR_QUORUM);
+ serverConf.setString("rss.coordinator.quorum", quorum);
serverConf.setString("rss.server.heartbeat.delay", "1000");
serverConf.setString("rss.server.heartbeat.interval", "1000");
- serverConf.setInteger("rss.jetty.http.port", getNextJettyServerPort());
+ serverConf.setInteger(RssBaseConf.JETTY_HTTP_PORT, jettyPort);
serverConf.setInteger("rss.jetty.corePool.size", 64);
serverConf.setInteger("rss.rpc.executor.size", 10);
serverConf.setString("rss.server.hadoop.dfs.replication", "2");
@@ -148,11 +234,21 @@ public abstract class IntegrationTestBase extends
HadoopTestBase {
serverConf.set(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL,
500L);
serverConf.set(ShuffleServerConf.RPC_SERVER_TYPE, serverType);
if (serverType == ServerType.GRPC_NETTY) {
- serverConf.setInteger(ShuffleServerConf.NETTY_SERVER_PORT,
getNextNettyServerPort());
+ serverConf.setInteger(ShuffleServerConf.NETTY_SERVER_PORT, nettyPort);
}
return serverConf;
}
+ protected static ShuffleServerConf shuffleServerConfWithoutPort(
+ int subDirIndex, File tmpDir, ServerType serverType) {
+ ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType, "",
0, 0, 0);
+ File dataDir1 = new File(tmpDir, subDirIndex + "_1");
+ File dataDir2 = new File(tmpDir, subDirIndex + "_2");
+ String basePath = dataDir1.getAbsolutePath() + "," +
dataDir2.getAbsolutePath();
+ shuffleServerConf.setString("rss.storage.basePath", basePath);
+ return shuffleServerConf;
+ }
+
public static int getNextRpcServerPort() {
return SHUFFLE_SERVER_INITIAL_PORT +
serverRpcPortCounter.getAndIncrement();
}
@@ -169,6 +265,10 @@ public abstract class IntegrationTestBase extends
HadoopTestBase {
coordinators.add(new CoordinatorServer(coordinatorConf));
}
+ protected static void storeCoordinatorConf(CoordinatorConf coordinatorConf) {
+ coordinatorConfList.add(coordinatorConf);
+ }
+
protected static void createShuffleServer(ShuffleServerConf serverConf)
throws Exception {
ServerType serverType = serverConf.get(ShuffleServerConf.RPC_SERVER_TYPE);
switch (serverType) {
@@ -183,6 +283,14 @@ public abstract class IntegrationTestBase extends
HadoopTestBase {
}
}
+ protected static void storeShuffleServerConf(ShuffleServerConf serverConf) {
+ shuffleServerConfList.add(serverConf);
+ }
+
+ protected static void storeMockShuffleServerConf(ShuffleServerConf
serverConf) {
+ mockShuffleServerConfList.add(serverConf);
+ }
+
protected static void createMockedShuffleServer(ShuffleServerConf
serverConf) throws Exception {
ServerType serverType = serverConf.get(ShuffleServerConf.RPC_SERVER_TYPE);
switch (serverType) {
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
index aeaa58d97..692763e86 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -54,14 +55,14 @@ import
org.apache.uniffle.coordinator.web.request.CancelDecommissionRequest;
import org.apache.uniffle.coordinator.web.request.DecommissionRequest;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
-import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ServletTest extends IntegrationTestBase {
- private static final String URL_PREFIX = "http://127.0.0.1:12345/api/";
+ private static int coordinatorHttpPort;
+ private static final String URL_PREFIX = "http://127.0.0.1:%s/api/";
private static final String SINGLE_NODE_URL = URL_PREFIX + "server/nodes/%s";
private static final String NODES_URL = URL_PREFIX + "server/nodes";
private static final String LOSTNODES_URL = URL_PREFIX +
"server/nodes?status=LOST";
@@ -79,68 +80,28 @@ public class ServletTest extends IntegrationTestBase {
private static CoordinatorServer coordinatorServer;
private ObjectMapper objectMapper = new ObjectMapper();
- private static int rpcPort1;
- private static int rpcPort2;
- private static int rpcPort3;
- private static int rpcPort4;
+ private static void prepareShuffleServerConf(int subDirIndex, File tmpDir)
throws Exception {
+ ShuffleServerConf shuffleServerConf =
+ shuffleServerConfWithoutPort(subDirIndex, tmpDir, ServerType.GRPC);
+ shuffleServerConf.set(ShuffleServerConf.SERVER_DECOMMISSION_SHUTDOWN,
false);
+ storeShuffleServerConf(shuffleServerConf);
+ }
@BeforeAll
public static void setUp(@TempDir File tmpDir) throws Exception {
- CoordinatorConf coordinatorConf = new CoordinatorConf();
- coordinatorConf.set(RssBaseConf.JETTY_HTTP_PORT, 12345);
+
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
coordinatorConf.set(RssBaseConf.JETTY_CORE_POOL_SIZE, 128);
- coordinatorConf.set(RssBaseConf.RPC_SERVER_PORT, 12346);
coordinatorConf.set(RssBaseConf.REST_AUTHORIZATION_CREDENTIALS,
AUTHORIZATION_CREDENTIALS);
- createCoordinatorServer(coordinatorConf);
+ storeCoordinatorConf(coordinatorConf);
- ShuffleServerConf shuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
- shuffleServerConf.set(RssBaseConf.RSS_COORDINATOR_QUORUM,
"127.0.0.1:12346");
- shuffleServerConf.set(ShuffleServerConf.SERVER_DECOMMISSION_SHUTDOWN,
false);
- File dataDir1 = new File(tmpDir, "data1");
- File dataDir2 = new File(tmpDir, "data2");
- List<String> basePath =
- Lists.newArrayList(dataDir1.getAbsolutePath(),
dataDir2.getAbsolutePath());
- shuffleServerConf.setString(RssBaseConf.RSS_STORAGE_TYPE.key(),
StorageType.LOCALFILE.name());
- shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, basePath);
- rpcPort1 = shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT);
- createShuffleServer(shuffleServerConf);
- File dataDir3 = new File(tmpDir, "data3");
- File dataDir4 = new File(tmpDir, "data4");
- basePath = Lists.newArrayList(dataDir3.getAbsolutePath(),
dataDir4.getAbsolutePath());
- shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, basePath);
- shuffleServerConf.set(
- RssBaseConf.RPC_SERVER_PORT,
- shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT) + 1);
- shuffleServerConf.set(
- RssBaseConf.JETTY_HTTP_PORT,
- shuffleServerConf.getInteger(ShuffleServerConf.JETTY_HTTP_PORT) + 1);
- rpcPort2 = shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT);
- createShuffleServer(shuffleServerConf);
- File dataDir5 = new File(tmpDir, "data5");
- File dataDir6 = new File(tmpDir, "data6");
- basePath = Lists.newArrayList(dataDir5.getAbsolutePath(),
dataDir6.getAbsolutePath());
- shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, basePath);
- shuffleServerConf.set(
- RssBaseConf.RPC_SERVER_PORT,
- shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT) + 1);
- shuffleServerConf.set(
- RssBaseConf.JETTY_HTTP_PORT,
- shuffleServerConf.getInteger(ShuffleServerConf.JETTY_HTTP_PORT) + 1);
- rpcPort3 = shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT);
- createShuffleServer(shuffleServerConf);
- File dataDir7 = new File(tmpDir, "data7");
- File dataDir8 = new File(tmpDir, "data8");
- basePath = Lists.newArrayList(dataDir7.getAbsolutePath(),
dataDir8.getAbsolutePath());
- shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, basePath);
- shuffleServerConf.set(
- RssBaseConf.RPC_SERVER_PORT,
- shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT) + 1);
- shuffleServerConf.set(
- RssBaseConf.JETTY_HTTP_PORT,
- shuffleServerConf.getInteger(ShuffleServerConf.JETTY_HTTP_PORT) + 1);
- rpcPort4 = shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT);
- createShuffleServer(shuffleServerConf);
- startServers();
+ prepareShuffleServerConf(0, tmpDir);
+ prepareShuffleServerConf(1, tmpDir);
+ prepareShuffleServerConf(2, tmpDir);
+ prepareShuffleServerConf(3, tmpDir);
+
+ startServersWithRandomPorts();
+ coordinatorHttpPort = jettyPorts.get(0);
coordinatorServer = coordinators.get(0);
Awaitility.await()
.timeout(30, TimeUnit.SECONDS)
@@ -150,28 +111,34 @@ public class ServletTest extends IntegrationTestBase {
@Test
public void testGetSingleNode() throws Exception {
ShuffleServer shuffleServer = grpcShuffleServers.get(0);
- String content = TestUtils.httpGet(String.format(SINGLE_NODE_URL,
shuffleServer.getId()));
+ String content =
+ TestUtils.httpGet(
+ String.format(SINGLE_NODE_URL, coordinatorHttpPort,
shuffleServer.getId()));
Response<HashMap<String, Object>> response =
objectMapper.readValue(content, new
TypeReference<Response<HashMap<String, Object>>>() {});
HashMap<String, Object> server = response.getData();
assertEquals(0, response.getCode());
- assertEquals(rpcPort1,
Integer.parseInt(server.get("grpcPort").toString()));
+ assertEquals(
+ grpcShuffleServers.get(0).getGrpcPort(),
+ Integer.parseInt(server.get("grpcPort").toString()));
assertEquals(ServerStatus.ACTIVE.toString(), server.get("status"));
}
@Test
public void testNodesServlet() throws Exception {
- String content = TestUtils.httpGet(NODES_URL);
+ String content = TestUtils.httpGet(String.format(NODES_URL,
coordinatorHttpPort));
Response<List<HashMap<String, Object>>> response =
objectMapper.readValue(
content, new TypeReference<Response<List<HashMap<String,
Object>>>>() {});
List<HashMap<String, Object>> serverList = response.getData();
assertEquals(0, response.getCode());
assertEquals(4, serverList.size());
- assertEquals(rpcPort1,
Integer.parseInt(serverList.get(0).get("grpcPort").toString()));
- assertEquals(ServerStatus.ACTIVE.toString(),
serverList.get(0).get("status"));
- assertEquals(rpcPort2,
Integer.parseInt(serverList.get(1).get("grpcPort").toString()));
- assertEquals(ServerStatus.ACTIVE.toString(),
serverList.get(1).get("status"));
+ Set<Integer> portSet =
+ grpcShuffleServers.stream().map(server ->
server.getGrpcPort()).collect(Collectors.toSet());
+ for (int i = 0; i < serverList.size(); i++) {
+ assertEquals(ServerStatus.ACTIVE.toString(),
serverList.get(i).get("status"));
+
assertTrue(portSet.contains(Integer.parseInt(serverList.get(i).get("grpcPort").toString())));
+ }
}
@Test
@@ -188,7 +155,7 @@ public class ServletTest extends IntegrationTestBase {
List<String> shuffleIds = new ArrayList<>();
Response<List<HashMap<String, Object>>> response =
objectMapper.readValue(
- TestUtils.httpGet(LOSTNODES_URL),
+ TestUtils.httpGet(String.format(LOSTNODES_URL,
coordinatorHttpPort)),
new TypeReference<Response<List<HashMap<String, Object>>>>() {});
List<HashMap<String, Object>> serverList = response.getData();
for (HashMap<String, Object> stringObjectHashMap : serverList) {
@@ -209,7 +176,8 @@ public class ServletTest extends IntegrationTestBase {
() -> {
Response<List<HashMap<String, Object>>> response =
objectMapper.readValue(
- TestUtils.httpGet(DECOMMISSIONEDNODES_URL),
+ TestUtils.httpGet(
+ String.format(DECOMMISSIONEDNODES_URL,
coordinatorHttpPort)),
new TypeReference<Response<List<HashMap<String,
Object>>>>() {});
List<HashMap<String, Object>> serverList = response.getData();
for (HashMap<String, Object> stringObjectHashMap : serverList) {
@@ -234,7 +202,7 @@ public class ServletTest extends IntegrationTestBase {
() -> {
Response<List<HashMap<String, Object>>> response =
objectMapper.readValue(
- TestUtils.httpGet(UNHEALTHYNODES_URL),
+ TestUtils.httpGet(String.format(UNHEALTHYNODES_URL,
coordinatorHttpPort)),
new TypeReference<Response<List<HashMap<String,
Object>>>>() {});
List<HashMap<String, Object>> serverList = response.getData();
if (serverList.size() != 2) {
@@ -259,7 +227,7 @@ public class ServletTest extends IntegrationTestBase {
decommissionRequest.setServerIds(Sets.newHashSet("not_exist_serverId"));
String content =
TestUtils.httpPost(
- CANCEL_DECOMMISSION_URL,
+ String.format(CANCEL_DECOMMISSION_URL, coordinatorHttpPort),
objectMapper.writeValueAsString(decommissionRequest),
authorizationHeader);
Response<?> response = objectMapper.readValue(content, Response.class);
@@ -269,21 +237,22 @@ public class ServletTest extends IntegrationTestBase {
cancelDecommissionRequest.setServerIds(Sets.newHashSet(shuffleServer.getId()));
content =
TestUtils.httpPost(
- CANCEL_DECOMMISSION_URL,
+ String.format(CANCEL_DECOMMISSION_URL, coordinatorHttpPort),
objectMapper.writeValueAsString(cancelDecommissionRequest),
authorizationHeader);
response = objectMapper.readValue(content, Response.class);
assertEquals(0, response.getCode());
// Register shuffle, avoid server exiting immediately.
- ShuffleServerGrpcClient shuffleServerClient = new
ShuffleServerGrpcClient(LOCALHOST, rpcPort1);
+ ShuffleServerGrpcClient shuffleServerClient =
+ new ShuffleServerGrpcClient(LOCALHOST,
grpcShuffleServers.get(0).getGrpcPort());
shuffleServerClient.registerShuffle(
new RssRegisterShuffleRequest(
"testDecommissionServlet_appId", 0, Lists.newArrayList(new
PartitionRange(0, 1)), ""));
decommissionRequest.setServerIds(Sets.newHashSet(shuffleServer.getId()));
content =
TestUtils.httpPost(
- DECOMMISSION_URL,
+ String.format(DECOMMISSION_URL, coordinatorHttpPort),
objectMapper.writeValueAsString(decommissionRequest),
authorizationHeader);
response = objectMapper.readValue(content, Response.class);
@@ -303,7 +272,7 @@ public class ServletTest extends IntegrationTestBase {
// Cancel decommission.
content =
TestUtils.httpPost(
- CANCEL_DECOMMISSION_URL,
+ String.format(CANCEL_DECOMMISSION_URL, coordinatorHttpPort),
objectMapper.writeValueAsString(cancelDecommissionRequest),
authorizationHeader);
response = objectMapper.readValue(content, Response.class);
@@ -317,7 +286,8 @@ public class ServletTest extends IntegrationTestBase {
assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
String content =
TestUtils.httpPost(
- String.format(CANCEL_DECOMMISSION_SINGLENODE_URL,
"not_exist_serverId"),
+ String.format(
+ CANCEL_DECOMMISSION_SINGLENODE_URL, coordinatorHttpPort,
"not_exist_serverId"),
null,
authorizationHeader);
Response<?> response = objectMapper.readValue(content, Response.class);
@@ -325,20 +295,22 @@ public class ServletTest extends IntegrationTestBase {
assertNotNull(response.getErrMsg());
content =
TestUtils.httpPost(
- String.format(CANCEL_DECOMMISSION_SINGLENODE_URL,
shuffleServer.getId()),
+ String.format(
+ CANCEL_DECOMMISSION_SINGLENODE_URL, coordinatorHttpPort,
shuffleServer.getId()),
null,
authorizationHeader);
response = objectMapper.readValue(content, Response.class);
assertEquals(0, response.getCode());
// Register shuffle, avoid server exiting immediately.
- ShuffleServerGrpcClient shuffleServerClient = new
ShuffleServerGrpcClient(LOCALHOST, rpcPort1);
+ ShuffleServerGrpcClient shuffleServerClient =
+ new ShuffleServerGrpcClient(LOCALHOST,
grpcShuffleServers.get(0).getGrpcPort());
shuffleServerClient.registerShuffle(
new RssRegisterShuffleRequest(
"testDecommissionServlet_appId", 0, Lists.newArrayList(new
PartitionRange(0, 1)), ""));
content =
TestUtils.httpPost(
- String.format(DECOMMISSION_SINGLENODE_URL, shuffleServer.getId()),
+ String.format(DECOMMISSION_SINGLENODE_URL, coordinatorHttpPort,
shuffleServer.getId()),
null,
authorizationHeader);
response = objectMapper.readValue(content, Response.class);
@@ -358,7 +330,8 @@ public class ServletTest extends IntegrationTestBase {
// Cancel decommission.
content =
TestUtils.httpPost(
- String.format(CANCEL_DECOMMISSION_SINGLENODE_URL,
shuffleServer.getId()),
+ String.format(
+ CANCEL_DECOMMISSION_SINGLENODE_URL, coordinatorHttpPort,
shuffleServer.getId()),
null,
authorizationHeader);
response = objectMapper.readValue(content, Response.class);
@@ -373,7 +346,7 @@ public class ServletTest extends IntegrationTestBase {
String wrongCredentials = "dW5pZmZsZTp1bmlmZmxlMTIz1";
String content =
TestUtils.httpPost(
- CANCEL_DECOMMISSION_URL,
+ String.format(CANCEL_DECOMMISSION_URL, coordinatorHttpPort),
objectMapper.writeValueAsString(decommissionRequest),
ImmutableMap.of("Authorization", "Basic " + wrongCredentials));
assertEquals("Authentication Failed", content);
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
index d08bf80bd..03c40ea69 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
@@ -49,8 +49,6 @@ import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.util.ByteBufUtils;
-import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.MockedShuffleServer;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.buffer.ShuffleBuffer;
@@ -74,15 +72,14 @@ public class ShuffleServerFaultToleranceTest extends
ShuffleReadWriteBase {
@BeforeEach
public void setupServers(@TempDir File tmpDir) throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
- createCoordinatorServer(coordinatorConf);
- grpcShuffleServers.add(createServer(0, tmpDir, ServerType.GRPC));
- grpcShuffleServers.add(createServer(1, tmpDir, ServerType.GRPC));
- grpcShuffleServers.add(createServer(2, tmpDir, ServerType.GRPC));
- nettyShuffleServers.add(createServer(0, tmpDir, ServerType.GRPC_NETTY));
- nettyShuffleServers.add(createServer(1, tmpDir, ServerType.GRPC_NETTY));
- nettyShuffleServers.add(createServer(2, tmpDir, ServerType.GRPC_NETTY));
- startServers();
+ storeCoordinatorConf(coordinatorConfWithoutPort());
+ prepareShuffleServerConf(0, tmpDir, ServerType.GRPC);
+ prepareShuffleServerConf(1, tmpDir, ServerType.GRPC);
+ prepareShuffleServerConf(2, tmpDir, ServerType.GRPC);
+ prepareShuffleServerConf(3, tmpDir, ServerType.GRPC_NETTY);
+ prepareShuffleServerConf(4, tmpDir, ServerType.GRPC_NETTY);
+ prepareShuffleServerConf(5, tmpDir, ServerType.GRPC_NETTY);
+ startServersWithRandomPorts();
grpcShuffleServerClients = new ArrayList<>();
nettyShuffleServerClients = new ArrayList<>();
for (ShuffleServer shuffleServer : grpcShuffleServers) {
@@ -286,9 +283,10 @@ public class ShuffleServerFaultToleranceTest extends
ShuffleReadWriteBase {
return new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
}
- public static MockedShuffleServer createServer(int id, File tmpDir,
ServerType serverType)
+ public static void prepareShuffleServerConf(int subDirIndex, File tmpDir,
ServerType serverType)
throws Exception {
- ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType);
+ ShuffleServerConf shuffleServerConf =
+ shuffleServerConfWithoutPort(subDirIndex, tmpDir, serverType);
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(),
StorageType.LOCALFILE.name());
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT,
5000L);
@@ -298,18 +296,10 @@ public class ShuffleServerFaultToleranceTest extends
ShuffleReadWriteBase {
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT,
5000L);
shuffleServerConf.set(ShuffleServerConf.DISK_CAPACITY, 1000000L);
shuffleServerConf.setLong("rss.server.heartbeat.interval", 5000);
- File dataDir1 = new File(tmpDir, id + "_1");
- File dataDir2 = new File(tmpDir, id + "_2");
- String basePath = dataDir1.getAbsolutePath() + "," +
dataDir2.getAbsolutePath();
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE_HDFS.name());
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE,
450L);
- shuffleServerConf.setInteger(
- "rss.rpc.server.port",
- shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT) + 20 +
id);
- shuffleServerConf.setInteger("rss.jetty.http.port", 19081 + id * 100);
- shuffleServerConf.setString("rss.storage.basePath", basePath);
- return new MockedShuffleServer(shuffleServerConf);
+ storeShuffleServerConf(shuffleServerConf);
}
protected void waitFlush(String appId, int shuffleId, boolean isNettyMode)
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
index 77330fe2b..270b3d4b2 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
@@ -54,6 +54,7 @@ import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.port.PortRegistry;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.RssUtils;
@@ -81,9 +82,6 @@ public class ShuffleServerWithKerberizedHadoopTest extends
KerberizedHadoopBase
}
}
- private static final int COORDINATOR_RPC_PORT = 19999;
- private static final String COORDINATOR_QUORUM = LOCALHOST + ":" +
COORDINATOR_RPC_PORT;
-
private ShuffleServerGrpcClient grpcShuffleServerClient;
private ShuffleServerGrpcNettyClient nettyShuffleServerClient;
private static CoordinatorServer coordinatorServer;
@@ -91,22 +89,26 @@ public class ShuffleServerWithKerberizedHadoopTest extends
KerberizedHadoopBase
private static ShuffleServer nettyShuffleServer;
private static ShuffleServerConf grpcShuffleServerConfig;
private static ShuffleServerConf nettyShuffleServerConfig;
+ protected static List<Integer> jettyPorts = Lists.newArrayList();
static @TempDir File tempDir;
- private static ShuffleServerConf getShuffleServerConf(ServerType serverType)
throws Exception {
+ private static ShuffleServerConf getShuffleServerConf(
+ int id, File tmpDir, int coordinatorRpcPort, ServerType serverType)
throws Exception {
ShuffleServerConf serverConf = new ShuffleServerConf();
- serverConf.setInteger("rss.rpc.server.port",
IntegrationTestBase.getNextRpcServerPort());
- serverConf.setString("rss.storage.type",
StorageType.MEMORY_LOCALFILE_HDFS.name());
- serverConf.setString("rss.storage.basePath", tempDir.getAbsolutePath());
+ File dataDir1 = new File(tmpDir, id + "_1");
+ File dataDir2 = new File(tmpDir, id + "_2");
+ String basePath = dataDir1.getAbsolutePath() + "," +
dataDir2.getAbsolutePath();
+ serverConf.setInteger("rss.rpc.server.port", 0);
+ serverConf.setString("rss.storage.basePath", basePath);
serverConf.setString("rss.server.buffer.capacity", "671088640");
serverConf.setString("rss.server.memory.shuffle.highWaterMark", "50.0");
serverConf.setString("rss.server.memory.shuffle.lowWaterMark", "0.0");
serverConf.setString("rss.server.read.buffer.capacity", "335544320");
- serverConf.setString("rss.coordinator.quorum", COORDINATOR_QUORUM);
+ serverConf.setString("rss.coordinator.quorum", LOCALHOST + ":" +
coordinatorRpcPort);
serverConf.setString("rss.server.heartbeat.delay", "1000");
serverConf.setString("rss.server.heartbeat.interval", "1000");
- serverConf.setInteger("rss.jetty.http.port",
IntegrationTestBase.getNextJettyServerPort());
+ serverConf.setInteger("rss.jetty.http.port", jettyPorts.get(id));
serverConf.setInteger("rss.jetty.corePool.size", 64);
serverConf.setInteger("rss.rpc.executor.size", 10);
serverConf.setString("rss.server.hadoop.dfs.replication", "2");
@@ -116,29 +118,34 @@ public class ShuffleServerWithKerberizedHadoopTest
extends KerberizedHadoopBase
serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
serverConf.set(ShuffleServerConf.RPC_SERVER_TYPE, serverType);
if (serverType == ServerType.GRPC_NETTY) {
- serverConf.setInteger(
- ShuffleServerConf.NETTY_SERVER_PORT,
IntegrationTestBase.getNextNettyServerPort());
+ serverConf.setInteger(ShuffleServerConf.NETTY_SERVER_PORT, 0);
}
return serverConf;
}
@BeforeAll
- public static void setup() throws Exception {
+ public static void setup(@TempDir File tempDir) throws Exception {
testRunner = ShuffleServerWithKerberizedHadoopTest.class;
KerberizedHadoopBase.init();
+ for (int i = 0; i < 3; i++) {
+ jettyPorts.add(PortRegistry.reservePort());
+ }
CoordinatorConf coordinatorConf = new CoordinatorConf();
- coordinatorConf.setInteger(CoordinatorConf.RPC_SERVER_PORT, 19999);
- coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT, 19998);
+ coordinatorConf.setInteger(CoordinatorConf.RPC_SERVER_PORT, 0);
+ coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT,
jettyPorts.get(3));
coordinatorConf.setInteger(CoordinatorConf.RPC_EXECUTOR_SIZE, 10);
coordinatorServer = new CoordinatorServer(coordinatorConf);
coordinatorServer.start();
- ShuffleServerConf grpcShuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
+ ShuffleServerConf grpcShuffleServerConf =
+ getShuffleServerConf(0, tempDir, coordinatorServer.getRpcListenPort(),
ServerType.GRPC);
grpcShuffleServer = new ShuffleServer(grpcShuffleServerConf);
grpcShuffleServer.start();
- ShuffleServerConf nettyShuffleServerConf =
getShuffleServerConf(ServerType.GRPC_NETTY);
+ ShuffleServerConf nettyShuffleServerConf =
+ getShuffleServerConf(
+ 1, tempDir, coordinatorServer.getRpcListenPort(),
ServerType.GRPC_NETTY);
nettyShuffleServer = new ShuffleServer(nettyShuffleServerConf);
nettyShuffleServer.start();
@@ -157,22 +164,19 @@ public class ShuffleServerWithKerberizedHadoopTest
extends KerberizedHadoopBase
if (nettyShuffleServer != null) {
nettyShuffleServer.stopServer();
}
+ for (int port : jettyPorts) {
+ PortRegistry.release(port);
+ }
}
@BeforeEach
public void beforeEach() throws Exception {
initHadoopSecurityContext();
grpcShuffleServerClient =
- new ShuffleServerGrpcClient(
- LOCALHOST,
-
getShuffleServerConf(ServerType.GRPC).getInteger(ShuffleServerConf.RPC_SERVER_PORT));
+ new ShuffleServerGrpcClient(LOCALHOST,
grpcShuffleServer.getGrpcPort());
nettyShuffleServerClient =
new ShuffleServerGrpcNettyClient(
- LOCALHOST,
- getShuffleServerConf(ServerType.GRPC_NETTY)
- .getInteger(ShuffleServerConf.RPC_SERVER_PORT),
- getShuffleServerConf(ServerType.GRPC_NETTY)
- .getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
+ LOCALHOST, nettyShuffleServer.getGrpcPort(),
nettyShuffleServer.getNettyPort());
}
@AfterEach