This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 41155a7f9 [#2382] improvement(test): Fix ServletTest, 
ShuffleServerFaultToleranceTest,ShuffleServerWithKerberizedHadoopTest to use 
random port (#2384)
41155a7f9 is described below

commit 41155a7f9c91c933384eee96a54a6fb90983e6f4
Author: summaryzb <[email protected]>
AuthorDate: Mon Mar 10 19:34:46 2025 +0800

    [#2382] improvement(test): Fix ServletTest, 
ShuffleServerFaultToleranceTest,ShuffleServerWithKerberizedHadoopTest to use 
random port (#2384)
    
    ### What changes were proposed in this pull request?
    Introduce a new way to start servers
    1. Store coordinator and shuffleServer conf
    2. Initialize coordinator and shuffleServer lazily by using random port in 
`startServersWithRandomPorts`
    3. Pass coordinator quorum to shuffleServer after coordinator started
    
    ### Why are the changes needed?
    Avoid port conflicts of unit test
    Fix: #2382
    
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    UT
---
 .../apache/uniffle/test/IntegrationTestBase.java   | 122 +++++++++++++++++--
 .../java/org/apache/uniffle/test/ServletTest.java  | 131 ++++++++-------------
 .../test/ShuffleServerFaultToleranceTest.java      |  34 ++----
 .../ShuffleServerWithKerberizedHadoopTest.java     |  52 ++++----
 4 files changed, 207 insertions(+), 132 deletions(-)

diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
index 42be2aa91..bd827616c 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
@@ -24,11 +24,14 @@ import java.nio.file.Files;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import com.google.common.collect.Lists;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.io.TempDir;
 
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.port.PortRegistry;
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -69,6 +72,10 @@ public abstract class IntegrationTestBase extends 
HadoopTestBase {
   protected static List<ShuffleServer> nettyShuffleServers = 
Lists.newArrayList();
   protected static List<CoordinatorServer> coordinators = Lists.newArrayList();
 
+  private static List<ShuffleServerConf> shuffleServerConfList = 
Lists.newArrayList();
+  private static List<ShuffleServerConf> mockShuffleServerConfList = 
Lists.newArrayList();
+  protected static List<CoordinatorConf> coordinatorConfList = 
Lists.newArrayList();
+
   /** Should not be accessed directly, use `getNextNettyServerPort` instead */
   private static final int NETTY_INITIAL_PORT = 21000;
 
@@ -90,6 +97,59 @@ public abstract class IntegrationTestBase extends 
HadoopTestBase {
     }
   }
 
+  public static void startServersWithRandomPorts() throws Exception {
+    final int jettyPortSize =
+        coordinatorConfList.size()
+            + shuffleServerConfList.size()
+            + mockShuffleServerConfList.size();
+    reserveJettyPorts(jettyPortSize);
+    int index = 0;
+    for (CoordinatorConf coordinatorConf : coordinatorConfList) {
+      coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT, 
jettyPorts.get(index));
+      index++;
+      coordinatorConf.setInteger(CoordinatorConf.RPC_SERVER_PORT, 0);
+      createCoordinatorServer(coordinatorConf);
+    }
+    for (CoordinatorServer coordinator : coordinators) {
+      coordinator.start();
+    }
+    String quorum =
+        coordinators.stream()
+            .map(CoordinatorServer::getRpcListenPort)
+            .map(port -> LOCALHOST + ":" + port)
+            .collect(Collectors.joining(","));
+
+    for (ShuffleServerConf serverConf : shuffleServerConfList) {
+      serverConf.setInteger(RssBaseConf.JETTY_HTTP_PORT, 
jettyPorts.get(index));
+      index++;
+      serverConf.setInteger(RssBaseConf.RPC_SERVER_PORT, 0);
+      serverConf.setString(RssBaseConf.RSS_COORDINATOR_QUORUM, quorum);
+      createShuffleServer(serverConf);
+    }
+    for (ShuffleServerConf serverConf : mockShuffleServerConfList) {
+      serverConf.setInteger(RssBaseConf.JETTY_HTTP_PORT, 
jettyPorts.get(index));
+      index++;
+      serverConf.setInteger(RssBaseConf.RPC_SERVER_PORT, 0);
+      serverConf.setString(RssBaseConf.RSS_COORDINATOR_QUORUM, quorum);
+      createMockedShuffleServer(serverConf);
+    }
+    for (ShuffleServer server : grpcShuffleServers) {
+      server.start();
+    }
+    for (ShuffleServer server : nettyShuffleServers) {
+      
server.getShuffleServerConf().setInteger(ShuffleServerConf.NETTY_SERVER_PORT, 
0);
+      server.start();
+    }
+  }
+
+  protected static List<Integer> jettyPorts = Lists.newArrayList();
+
+  public static void reserveJettyPorts(int numPorts) {
+    for (int i = 0; i < numPorts; i++) {
+      jettyPorts.add(PortRegistry.reservePort());
+    }
+  }
+
   @AfterAll
   public static void shutdownServers() throws Exception {
     for (CoordinatorServer coordinator : coordinators) {
@@ -101,9 +161,16 @@ public abstract class IntegrationTestBase extends 
HadoopTestBase {
     for (ShuffleServer shuffleServer : nettyShuffleServers) {
       shuffleServer.stopServer();
     }
-    grpcShuffleServers = Lists.newArrayList();
-    nettyShuffleServers = Lists.newArrayList();
-    coordinators = Lists.newArrayList();
+    for (int port : jettyPorts) {
+      PortRegistry.release(port);
+    }
+    grpcShuffleServers.clear();
+    nettyShuffleServers.clear();
+    coordinators.clear();
+    shuffleServerConfList.clear();
+    mockShuffleServerConfList.clear();
+    coordinatorConfList.clear();
+    jettyPorts.clear();
     ShuffleServerMetrics.clear();
     CoordinatorMetrics.clear();
   }
@@ -116,6 +183,12 @@ public abstract class IntegrationTestBase extends 
HadoopTestBase {
     return coordinatorConf;
   }
 
+  protected static CoordinatorConf coordinatorConfWithoutPort() {
+    CoordinatorConf coordinatorConf = new CoordinatorConf();
+    coordinatorConf.setInteger(CoordinatorConf.RPC_EXECUTOR_SIZE, 10);
+    return coordinatorConf;
+  }
+
   protected static void addDynamicConf(
       CoordinatorConf coordinatorConf, Map<String, String> dynamicConf) throws 
Exception {
     File file = createDynamicConfFile(dynamicConf);
@@ -126,19 +199,32 @@ public abstract class IntegrationTestBase extends 
HadoopTestBase {
         CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC, 
5);
   }
 
+  // TODO(summaryzb) when all test use random port,
+  // https://github.com/apache/incubator-uniffle/issues/2064 should be closed, 
then this method
+  // should be removed
   protected static ShuffleServerConf getShuffleServerConf(ServerType 
serverType) throws Exception {
+    return getShuffleServerConf(
+        serverType,
+        COORDINATOR_QUORUM,
+        getNextRpcServerPort(),
+        getNextNettyServerPort(),
+        getNextJettyServerPort());
+  }
+
+  private static ShuffleServerConf getShuffleServerConf(
+      ServerType serverType, String quorum, int grpcPort, int nettyPort, int 
jettyPort) {
     ShuffleServerConf serverConf = new ShuffleServerConf();
-    serverConf.setInteger("rss.rpc.server.port", getNextRpcServerPort());
+    serverConf.setInteger("rss.rpc.server.port", grpcPort);
     serverConf.setString("rss.storage.type", 
StorageType.MEMORY_LOCALFILE_HDFS.name());
     serverConf.setString("rss.storage.basePath", tempDir.getAbsolutePath());
     serverConf.setString("rss.server.buffer.capacity", "671088640");
     serverConf.setString("rss.server.memory.shuffle.highWaterMark", "50.0");
     serverConf.setString("rss.server.memory.shuffle.lowWaterMark", "0.0");
     serverConf.setString("rss.server.read.buffer.capacity", "335544320");
-    serverConf.setString("rss.coordinator.quorum", COORDINATOR_QUORUM);
+    serverConf.setString("rss.coordinator.quorum", quorum);
     serverConf.setString("rss.server.heartbeat.delay", "1000");
     serverConf.setString("rss.server.heartbeat.interval", "1000");
-    serverConf.setInteger("rss.jetty.http.port", getNextJettyServerPort());
+    serverConf.setInteger(RssBaseConf.JETTY_HTTP_PORT, jettyPort);
     serverConf.setInteger("rss.jetty.corePool.size", 64);
     serverConf.setInteger("rss.rpc.executor.size", 10);
     serverConf.setString("rss.server.hadoop.dfs.replication", "2");
@@ -148,11 +234,21 @@ public abstract class IntegrationTestBase extends 
HadoopTestBase {
     serverConf.set(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL, 
500L);
     serverConf.set(ShuffleServerConf.RPC_SERVER_TYPE, serverType);
     if (serverType == ServerType.GRPC_NETTY) {
-      serverConf.setInteger(ShuffleServerConf.NETTY_SERVER_PORT, 
getNextNettyServerPort());
+      serverConf.setInteger(ShuffleServerConf.NETTY_SERVER_PORT, nettyPort);
     }
     return serverConf;
   }
 
+  protected static ShuffleServerConf shuffleServerConfWithoutPort(
+      int subDirIndex, File tmpDir, ServerType serverType) {
+    ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType, "", 
0, 0, 0);
+    File dataDir1 = new File(tmpDir, subDirIndex + "_1");
+    File dataDir2 = new File(tmpDir, subDirIndex + "_2");
+    String basePath = dataDir1.getAbsolutePath() + "," + 
dataDir2.getAbsolutePath();
+    shuffleServerConf.setString("rss.storage.basePath", basePath);
+    return shuffleServerConf;
+  }
+
   public static int getNextRpcServerPort() {
     return SHUFFLE_SERVER_INITIAL_PORT + 
serverRpcPortCounter.getAndIncrement();
   }
@@ -169,6 +265,10 @@ public abstract class IntegrationTestBase extends 
HadoopTestBase {
     coordinators.add(new CoordinatorServer(coordinatorConf));
   }
 
+  protected static void storeCoordinatorConf(CoordinatorConf coordinatorConf) {
+    coordinatorConfList.add(coordinatorConf);
+  }
+
   protected static void createShuffleServer(ShuffleServerConf serverConf) 
throws Exception {
     ServerType serverType = serverConf.get(ShuffleServerConf.RPC_SERVER_TYPE);
     switch (serverType) {
@@ -183,6 +283,14 @@ public abstract class IntegrationTestBase extends 
HadoopTestBase {
     }
   }
 
+  protected static void storeShuffleServerConf(ShuffleServerConf serverConf) {
+    shuffleServerConfList.add(serverConf);
+  }
+
+  protected static void storeMockShuffleServerConf(ShuffleServerConf 
serverConf) {
+    mockShuffleServerConfList.add(serverConf);
+  }
+
   protected static void createMockedShuffleServer(ShuffleServerConf 
serverConf) throws Exception {
     ServerType serverType = serverConf.get(ShuffleServerConf.RPC_SERVER_TYPE);
     switch (serverType) {
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
index aeaa58d97..692763e86 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -54,14 +55,14 @@ import 
org.apache.uniffle.coordinator.web.request.CancelDecommissionRequest;
 import org.apache.uniffle.coordinator.web.request.DecommissionRequest;
 import org.apache.uniffle.server.ShuffleServer;
 import org.apache.uniffle.server.ShuffleServerConf;
-import org.apache.uniffle.storage.util.StorageType;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class ServletTest extends IntegrationTestBase {
-  private static final String URL_PREFIX = "http://127.0.0.1:12345/api/";;
+  private static int coordinatorHttpPort;
+  private static final String URL_PREFIX = "http://127.0.0.1:%s/api/";;
   private static final String SINGLE_NODE_URL = URL_PREFIX + "server/nodes/%s";
   private static final String NODES_URL = URL_PREFIX + "server/nodes";
   private static final String LOSTNODES_URL = URL_PREFIX + 
"server/nodes?status=LOST";
@@ -79,68 +80,28 @@ public class ServletTest extends IntegrationTestBase {
   private static CoordinatorServer coordinatorServer;
   private ObjectMapper objectMapper = new ObjectMapper();
 
-  private static int rpcPort1;
-  private static int rpcPort2;
-  private static int rpcPort3;
-  private static int rpcPort4;
+  private static void prepareShuffleServerConf(int subDirIndex, File tmpDir) 
throws Exception {
+    ShuffleServerConf shuffleServerConf =
+        shuffleServerConfWithoutPort(subDirIndex, tmpDir, ServerType.GRPC);
+    shuffleServerConf.set(ShuffleServerConf.SERVER_DECOMMISSION_SHUTDOWN, 
false);
+    storeShuffleServerConf(shuffleServerConf);
+  }
 
   @BeforeAll
   public static void setUp(@TempDir File tmpDir) throws Exception {
-    CoordinatorConf coordinatorConf = new CoordinatorConf();
-    coordinatorConf.set(RssBaseConf.JETTY_HTTP_PORT, 12345);
+
+    CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
     coordinatorConf.set(RssBaseConf.JETTY_CORE_POOL_SIZE, 128);
-    coordinatorConf.set(RssBaseConf.RPC_SERVER_PORT, 12346);
     coordinatorConf.set(RssBaseConf.REST_AUTHORIZATION_CREDENTIALS, 
AUTHORIZATION_CREDENTIALS);
-    createCoordinatorServer(coordinatorConf);
+    storeCoordinatorConf(coordinatorConf);
 
-    ShuffleServerConf shuffleServerConf = 
getShuffleServerConf(ServerType.GRPC);
-    shuffleServerConf.set(RssBaseConf.RSS_COORDINATOR_QUORUM, 
"127.0.0.1:12346");
-    shuffleServerConf.set(ShuffleServerConf.SERVER_DECOMMISSION_SHUTDOWN, 
false);
-    File dataDir1 = new File(tmpDir, "data1");
-    File dataDir2 = new File(tmpDir, "data2");
-    List<String> basePath =
-        Lists.newArrayList(dataDir1.getAbsolutePath(), 
dataDir2.getAbsolutePath());
-    shuffleServerConf.setString(RssBaseConf.RSS_STORAGE_TYPE.key(), 
StorageType.LOCALFILE.name());
-    shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, basePath);
-    rpcPort1 = shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT);
-    createShuffleServer(shuffleServerConf);
-    File dataDir3 = new File(tmpDir, "data3");
-    File dataDir4 = new File(tmpDir, "data4");
-    basePath = Lists.newArrayList(dataDir3.getAbsolutePath(), 
dataDir4.getAbsolutePath());
-    shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, basePath);
-    shuffleServerConf.set(
-        RssBaseConf.RPC_SERVER_PORT,
-        shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT) + 1);
-    shuffleServerConf.set(
-        RssBaseConf.JETTY_HTTP_PORT,
-        shuffleServerConf.getInteger(ShuffleServerConf.JETTY_HTTP_PORT) + 1);
-    rpcPort2 = shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT);
-    createShuffleServer(shuffleServerConf);
-    File dataDir5 = new File(tmpDir, "data5");
-    File dataDir6 = new File(tmpDir, "data6");
-    basePath = Lists.newArrayList(dataDir5.getAbsolutePath(), 
dataDir6.getAbsolutePath());
-    shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, basePath);
-    shuffleServerConf.set(
-        RssBaseConf.RPC_SERVER_PORT,
-        shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT) + 1);
-    shuffleServerConf.set(
-        RssBaseConf.JETTY_HTTP_PORT,
-        shuffleServerConf.getInteger(ShuffleServerConf.JETTY_HTTP_PORT) + 1);
-    rpcPort3 = shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT);
-    createShuffleServer(shuffleServerConf);
-    File dataDir7 = new File(tmpDir, "data7");
-    File dataDir8 = new File(tmpDir, "data8");
-    basePath = Lists.newArrayList(dataDir7.getAbsolutePath(), 
dataDir8.getAbsolutePath());
-    shuffleServerConf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, basePath);
-    shuffleServerConf.set(
-        RssBaseConf.RPC_SERVER_PORT,
-        shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT) + 1);
-    shuffleServerConf.set(
-        RssBaseConf.JETTY_HTTP_PORT,
-        shuffleServerConf.getInteger(ShuffleServerConf.JETTY_HTTP_PORT) + 1);
-    rpcPort4 = shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT);
-    createShuffleServer(shuffleServerConf);
-    startServers();
+    prepareShuffleServerConf(0, tmpDir);
+    prepareShuffleServerConf(1, tmpDir);
+    prepareShuffleServerConf(2, tmpDir);
+    prepareShuffleServerConf(3, tmpDir);
+
+    startServersWithRandomPorts();
+    coordinatorHttpPort = jettyPorts.get(0);
     coordinatorServer = coordinators.get(0);
     Awaitility.await()
         .timeout(30, TimeUnit.SECONDS)
@@ -150,28 +111,34 @@ public class ServletTest extends IntegrationTestBase {
   @Test
   public void testGetSingleNode() throws Exception {
     ShuffleServer shuffleServer = grpcShuffleServers.get(0);
-    String content = TestUtils.httpGet(String.format(SINGLE_NODE_URL, 
shuffleServer.getId()));
+    String content =
+        TestUtils.httpGet(
+            String.format(SINGLE_NODE_URL, coordinatorHttpPort, 
shuffleServer.getId()));
     Response<HashMap<String, Object>> response =
         objectMapper.readValue(content, new 
TypeReference<Response<HashMap<String, Object>>>() {});
     HashMap<String, Object> server = response.getData();
     assertEquals(0, response.getCode());
-    assertEquals(rpcPort1, 
Integer.parseInt(server.get("grpcPort").toString()));
+    assertEquals(
+        grpcShuffleServers.get(0).getGrpcPort(),
+        Integer.parseInt(server.get("grpcPort").toString()));
     assertEquals(ServerStatus.ACTIVE.toString(), server.get("status"));
   }
 
   @Test
   public void testNodesServlet() throws Exception {
-    String content = TestUtils.httpGet(NODES_URL);
+    String content = TestUtils.httpGet(String.format(NODES_URL, 
coordinatorHttpPort));
     Response<List<HashMap<String, Object>>> response =
         objectMapper.readValue(
             content, new TypeReference<Response<List<HashMap<String, 
Object>>>>() {});
     List<HashMap<String, Object>> serverList = response.getData();
     assertEquals(0, response.getCode());
     assertEquals(4, serverList.size());
-    assertEquals(rpcPort1, 
Integer.parseInt(serverList.get(0).get("grpcPort").toString()));
-    assertEquals(ServerStatus.ACTIVE.toString(), 
serverList.get(0).get("status"));
-    assertEquals(rpcPort2, 
Integer.parseInt(serverList.get(1).get("grpcPort").toString()));
-    assertEquals(ServerStatus.ACTIVE.toString(), 
serverList.get(1).get("status"));
+    Set<Integer> portSet =
+        grpcShuffleServers.stream().map(server -> 
server.getGrpcPort()).collect(Collectors.toSet());
+    for (int i = 0; i < serverList.size(); i++) {
+      assertEquals(ServerStatus.ACTIVE.toString(), 
serverList.get(i).get("status"));
+      
assertTrue(portSet.contains(Integer.parseInt(serverList.get(i).get("grpcPort").toString())));
+    }
   }
 
   @Test
@@ -188,7 +155,7 @@ public class ServletTest extends IntegrationTestBase {
       List<String> shuffleIds = new ArrayList<>();
       Response<List<HashMap<String, Object>>> response =
           objectMapper.readValue(
-              TestUtils.httpGet(LOSTNODES_URL),
+              TestUtils.httpGet(String.format(LOSTNODES_URL, 
coordinatorHttpPort)),
               new TypeReference<Response<List<HashMap<String, Object>>>>() {});
       List<HashMap<String, Object>> serverList = response.getData();
       for (HashMap<String, Object> stringObjectHashMap : serverList) {
@@ -209,7 +176,8 @@ public class ServletTest extends IntegrationTestBase {
             () -> {
               Response<List<HashMap<String, Object>>> response =
                   objectMapper.readValue(
-                      TestUtils.httpGet(DECOMMISSIONEDNODES_URL),
+                      TestUtils.httpGet(
+                          String.format(DECOMMISSIONEDNODES_URL, 
coordinatorHttpPort)),
                       new TypeReference<Response<List<HashMap<String, 
Object>>>>() {});
               List<HashMap<String, Object>> serverList = response.getData();
               for (HashMap<String, Object> stringObjectHashMap : serverList) {
@@ -234,7 +202,7 @@ public class ServletTest extends IntegrationTestBase {
             () -> {
               Response<List<HashMap<String, Object>>> response =
                   objectMapper.readValue(
-                      TestUtils.httpGet(UNHEALTHYNODES_URL),
+                      TestUtils.httpGet(String.format(UNHEALTHYNODES_URL, 
coordinatorHttpPort)),
                       new TypeReference<Response<List<HashMap<String, 
Object>>>>() {});
               List<HashMap<String, Object>> serverList = response.getData();
               if (serverList.size() != 2) {
@@ -259,7 +227,7 @@ public class ServletTest extends IntegrationTestBase {
     decommissionRequest.setServerIds(Sets.newHashSet("not_exist_serverId"));
     String content =
         TestUtils.httpPost(
-            CANCEL_DECOMMISSION_URL,
+            String.format(CANCEL_DECOMMISSION_URL, coordinatorHttpPort),
             objectMapper.writeValueAsString(decommissionRequest),
             authorizationHeader);
     Response<?> response = objectMapper.readValue(content, Response.class);
@@ -269,21 +237,22 @@ public class ServletTest extends IntegrationTestBase {
     
cancelDecommissionRequest.setServerIds(Sets.newHashSet(shuffleServer.getId()));
     content =
         TestUtils.httpPost(
-            CANCEL_DECOMMISSION_URL,
+            String.format(CANCEL_DECOMMISSION_URL, coordinatorHttpPort),
             objectMapper.writeValueAsString(cancelDecommissionRequest),
             authorizationHeader);
     response = objectMapper.readValue(content, Response.class);
     assertEquals(0, response.getCode());
 
     // Register shuffle, avoid server exiting immediately.
-    ShuffleServerGrpcClient shuffleServerClient = new 
ShuffleServerGrpcClient(LOCALHOST, rpcPort1);
+    ShuffleServerGrpcClient shuffleServerClient =
+        new ShuffleServerGrpcClient(LOCALHOST, 
grpcShuffleServers.get(0).getGrpcPort());
     shuffleServerClient.registerShuffle(
         new RssRegisterShuffleRequest(
             "testDecommissionServlet_appId", 0, Lists.newArrayList(new 
PartitionRange(0, 1)), ""));
     decommissionRequest.setServerIds(Sets.newHashSet(shuffleServer.getId()));
     content =
         TestUtils.httpPost(
-            DECOMMISSION_URL,
+            String.format(DECOMMISSION_URL, coordinatorHttpPort),
             objectMapper.writeValueAsString(decommissionRequest),
             authorizationHeader);
     response = objectMapper.readValue(content, Response.class);
@@ -303,7 +272,7 @@ public class ServletTest extends IntegrationTestBase {
     // Cancel decommission.
     content =
         TestUtils.httpPost(
-            CANCEL_DECOMMISSION_URL,
+            String.format(CANCEL_DECOMMISSION_URL, coordinatorHttpPort),
             objectMapper.writeValueAsString(cancelDecommissionRequest),
             authorizationHeader);
     response = objectMapper.readValue(content, Response.class);
@@ -317,7 +286,8 @@ public class ServletTest extends IntegrationTestBase {
     assertEquals(ServerStatus.ACTIVE, shuffleServer.getServerStatus());
     String content =
         TestUtils.httpPost(
-            String.format(CANCEL_DECOMMISSION_SINGLENODE_URL, 
"not_exist_serverId"),
+            String.format(
+                CANCEL_DECOMMISSION_SINGLENODE_URL, coordinatorHttpPort, 
"not_exist_serverId"),
             null,
             authorizationHeader);
     Response<?> response = objectMapper.readValue(content, Response.class);
@@ -325,20 +295,22 @@ public class ServletTest extends IntegrationTestBase {
     assertNotNull(response.getErrMsg());
     content =
         TestUtils.httpPost(
-            String.format(CANCEL_DECOMMISSION_SINGLENODE_URL, 
shuffleServer.getId()),
+            String.format(
+                CANCEL_DECOMMISSION_SINGLENODE_URL, coordinatorHttpPort, 
shuffleServer.getId()),
             null,
             authorizationHeader);
     response = objectMapper.readValue(content, Response.class);
     assertEquals(0, response.getCode());
 
     // Register shuffle, avoid server exiting immediately.
-    ShuffleServerGrpcClient shuffleServerClient = new 
ShuffleServerGrpcClient(LOCALHOST, rpcPort1);
+    ShuffleServerGrpcClient shuffleServerClient =
+        new ShuffleServerGrpcClient(LOCALHOST, 
grpcShuffleServers.get(0).getGrpcPort());
     shuffleServerClient.registerShuffle(
         new RssRegisterShuffleRequest(
             "testDecommissionServlet_appId", 0, Lists.newArrayList(new 
PartitionRange(0, 1)), ""));
     content =
         TestUtils.httpPost(
-            String.format(DECOMMISSION_SINGLENODE_URL, shuffleServer.getId()),
+            String.format(DECOMMISSION_SINGLENODE_URL, coordinatorHttpPort, 
shuffleServer.getId()),
             null,
             authorizationHeader);
     response = objectMapper.readValue(content, Response.class);
@@ -358,7 +330,8 @@ public class ServletTest extends IntegrationTestBase {
     // Cancel decommission.
     content =
         TestUtils.httpPost(
-            String.format(CANCEL_DECOMMISSION_SINGLENODE_URL, 
shuffleServer.getId()),
+            String.format(
+                CANCEL_DECOMMISSION_SINGLENODE_URL, coordinatorHttpPort, 
shuffleServer.getId()),
             null,
             authorizationHeader);
     response = objectMapper.readValue(content, Response.class);
@@ -373,7 +346,7 @@ public class ServletTest extends IntegrationTestBase {
     String wrongCredentials = "dW5pZmZsZTp1bmlmZmxlMTIz1";
     String content =
         TestUtils.httpPost(
-            CANCEL_DECOMMISSION_URL,
+            String.format(CANCEL_DECOMMISSION_URL, coordinatorHttpPort),
             objectMapper.writeValueAsString(decommissionRequest),
             ImmutableMap.of("Authorization", "Basic " + wrongCredentials));
     assertEquals("Authentication Failed", content);
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
index d08bf80bd..03c40ea69 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerFaultToleranceTest.java
@@ -49,8 +49,6 @@ import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShuffleServerInfo;
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.util.ByteBufUtils;
-import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.MockedShuffleServer;
 import org.apache.uniffle.server.ShuffleServer;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.buffer.ShuffleBuffer;
@@ -74,15 +72,14 @@ public class ShuffleServerFaultToleranceTest extends 
ShuffleReadWriteBase {
 
   @BeforeEach
   public void setupServers(@TempDir File tmpDir) throws Exception {
-    CoordinatorConf coordinatorConf = getCoordinatorConf();
-    createCoordinatorServer(coordinatorConf);
-    grpcShuffleServers.add(createServer(0, tmpDir, ServerType.GRPC));
-    grpcShuffleServers.add(createServer(1, tmpDir, ServerType.GRPC));
-    grpcShuffleServers.add(createServer(2, tmpDir, ServerType.GRPC));
-    nettyShuffleServers.add(createServer(0, tmpDir, ServerType.GRPC_NETTY));
-    nettyShuffleServers.add(createServer(1, tmpDir, ServerType.GRPC_NETTY));
-    nettyShuffleServers.add(createServer(2, tmpDir, ServerType.GRPC_NETTY));
-    startServers();
+    storeCoordinatorConf(coordinatorConfWithoutPort());
+    prepareShuffleServerConf(0, tmpDir, ServerType.GRPC);
+    prepareShuffleServerConf(1, tmpDir, ServerType.GRPC);
+    prepareShuffleServerConf(2, tmpDir, ServerType.GRPC);
+    prepareShuffleServerConf(3, tmpDir, ServerType.GRPC_NETTY);
+    prepareShuffleServerConf(4, tmpDir, ServerType.GRPC_NETTY);
+    prepareShuffleServerConf(5, tmpDir, ServerType.GRPC_NETTY);
+    startServersWithRandomPorts();
     grpcShuffleServerClients = new ArrayList<>();
     nettyShuffleServerClients = new ArrayList<>();
     for (ShuffleServer shuffleServer : grpcShuffleServers) {
@@ -286,9 +283,10 @@ public class ShuffleServerFaultToleranceTest extends 
ShuffleReadWriteBase {
     return new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
   }
 
-  public static MockedShuffleServer createServer(int id, File tmpDir, 
ServerType serverType)
+  public static void prepareShuffleServerConf(int subDirIndex, File tmpDir, 
ServerType serverType)
       throws Exception {
-    ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType);
+    ShuffleServerConf shuffleServerConf =
+        shuffleServerConfWithoutPort(subDirIndex, tmpDir, serverType);
     shuffleServerConf.setString(
         ShuffleServerConf.RSS_STORAGE_TYPE.key(), 
StorageType.LOCALFILE.name());
     
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 
5000L);
@@ -298,18 +296,10 @@ public class ShuffleServerFaultToleranceTest extends 
ShuffleReadWriteBase {
     
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 
5000L);
     shuffleServerConf.set(ShuffleServerConf.DISK_CAPACITY, 1000000L);
     shuffleServerConf.setLong("rss.server.heartbeat.interval", 5000);
-    File dataDir1 = new File(tmpDir, id + "_1");
-    File dataDir2 = new File(tmpDir, id + "_2");
-    String basePath = dataDir1.getAbsolutePath() + "," + 
dataDir2.getAbsolutePath();
     shuffleServerConf.setString(
         ShuffleServerConf.RSS_STORAGE_TYPE.key(), 
StorageType.MEMORY_LOCALFILE_HDFS.name());
     
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE, 
450L);
-    shuffleServerConf.setInteger(
-        "rss.rpc.server.port",
-        shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT) + 20 + 
id);
-    shuffleServerConf.setInteger("rss.jetty.http.port", 19081 + id * 100);
-    shuffleServerConf.setString("rss.storage.basePath", basePath);
-    return new MockedShuffleServer(shuffleServerConf);
+    storeShuffleServerConf(shuffleServerConf);
   }
 
   protected void waitFlush(String appId, int shuffleId, boolean isNettyMode)
diff --git 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
index 77330fe2b..270b3d4b2 100644
--- 
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
+++ 
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
@@ -54,6 +54,7 @@ import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleBlockInfo;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.port.PortRegistry;
 import org.apache.uniffle.common.rpc.ServerType;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.RssUtils;
@@ -81,9 +82,6 @@ public class ShuffleServerWithKerberizedHadoopTest extends 
KerberizedHadoopBase
     }
   }
 
-  private static final int COORDINATOR_RPC_PORT = 19999;
-  private static final String COORDINATOR_QUORUM = LOCALHOST + ":" + 
COORDINATOR_RPC_PORT;
-
   private ShuffleServerGrpcClient grpcShuffleServerClient;
   private ShuffleServerGrpcNettyClient nettyShuffleServerClient;
   private static CoordinatorServer coordinatorServer;
@@ -91,22 +89,26 @@ public class ShuffleServerWithKerberizedHadoopTest extends 
KerberizedHadoopBase
   private static ShuffleServer nettyShuffleServer;
   private static ShuffleServerConf grpcShuffleServerConfig;
   private static ShuffleServerConf nettyShuffleServerConfig;
+  protected static List<Integer> jettyPorts = Lists.newArrayList();
 
   static @TempDir File tempDir;
 
-  private static ShuffleServerConf getShuffleServerConf(ServerType serverType) 
throws Exception {
+  private static ShuffleServerConf getShuffleServerConf(
+      int id, File tmpDir, int coordinatorRpcPort, ServerType serverType) 
throws Exception {
     ShuffleServerConf serverConf = new ShuffleServerConf();
-    serverConf.setInteger("rss.rpc.server.port", 
IntegrationTestBase.getNextRpcServerPort());
-    serverConf.setString("rss.storage.type", 
StorageType.MEMORY_LOCALFILE_HDFS.name());
-    serverConf.setString("rss.storage.basePath", tempDir.getAbsolutePath());
+    File dataDir1 = new File(tmpDir, id + "_1");
+    File dataDir2 = new File(tmpDir, id + "_2");
+    String basePath = dataDir1.getAbsolutePath() + "," + 
dataDir2.getAbsolutePath();
+    serverConf.setInteger("rss.rpc.server.port", 0);
+    serverConf.setString("rss.storage.basePath", basePath);
     serverConf.setString("rss.server.buffer.capacity", "671088640");
     serverConf.setString("rss.server.memory.shuffle.highWaterMark", "50.0");
     serverConf.setString("rss.server.memory.shuffle.lowWaterMark", "0.0");
     serverConf.setString("rss.server.read.buffer.capacity", "335544320");
-    serverConf.setString("rss.coordinator.quorum", COORDINATOR_QUORUM);
+    serverConf.setString("rss.coordinator.quorum", LOCALHOST + ":" + 
coordinatorRpcPort);
     serverConf.setString("rss.server.heartbeat.delay", "1000");
     serverConf.setString("rss.server.heartbeat.interval", "1000");
-    serverConf.setInteger("rss.jetty.http.port", 
IntegrationTestBase.getNextJettyServerPort());
+    serverConf.setInteger("rss.jetty.http.port", jettyPorts.get(id));
     serverConf.setInteger("rss.jetty.corePool.size", 64);
     serverConf.setInteger("rss.rpc.executor.size", 10);
     serverConf.setString("rss.server.hadoop.dfs.replication", "2");
@@ -116,29 +118,34 @@ public class ShuffleServerWithKerberizedHadoopTest 
extends KerberizedHadoopBase
     serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
     serverConf.set(ShuffleServerConf.RPC_SERVER_TYPE, serverType);
     if (serverType == ServerType.GRPC_NETTY) {
-      serverConf.setInteger(
-          ShuffleServerConf.NETTY_SERVER_PORT, 
IntegrationTestBase.getNextNettyServerPort());
+      serverConf.setInteger(ShuffleServerConf.NETTY_SERVER_PORT, 0);
     }
     return serverConf;
   }
 
   @BeforeAll
-  public static void setup() throws Exception {
+  public static void setup(@TempDir File tempDir) throws Exception {
     testRunner = ShuffleServerWithKerberizedHadoopTest.class;
     KerberizedHadoopBase.init();
+    for (int i = 0; i < 3; i++) {
+      jettyPorts.add(PortRegistry.reservePort());
+    }
 
     CoordinatorConf coordinatorConf = new CoordinatorConf();
-    coordinatorConf.setInteger(CoordinatorConf.RPC_SERVER_PORT, 19999);
-    coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT, 19998);
+    coordinatorConf.setInteger(CoordinatorConf.RPC_SERVER_PORT, 0);
+    coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT, 
jettyPorts.get(3));
     coordinatorConf.setInteger(CoordinatorConf.RPC_EXECUTOR_SIZE, 10);
     coordinatorServer = new CoordinatorServer(coordinatorConf);
     coordinatorServer.start();
 
-    ShuffleServerConf grpcShuffleServerConf = 
getShuffleServerConf(ServerType.GRPC);
+    ShuffleServerConf grpcShuffleServerConf =
+        getShuffleServerConf(0, tempDir, coordinatorServer.getRpcListenPort(), 
ServerType.GRPC);
     grpcShuffleServer = new ShuffleServer(grpcShuffleServerConf);
     grpcShuffleServer.start();
 
-    ShuffleServerConf nettyShuffleServerConf = 
getShuffleServerConf(ServerType.GRPC_NETTY);
+    ShuffleServerConf nettyShuffleServerConf =
+        getShuffleServerConf(
+            1, tempDir, coordinatorServer.getRpcListenPort(), 
ServerType.GRPC_NETTY);
     nettyShuffleServer = new ShuffleServer(nettyShuffleServerConf);
     nettyShuffleServer.start();
 
@@ -157,22 +164,19 @@ public class ShuffleServerWithKerberizedHadoopTest 
extends KerberizedHadoopBase
     if (nettyShuffleServer != null) {
       nettyShuffleServer.stopServer();
     }
+    for (int port : jettyPorts) {
+      PortRegistry.release(port);
+    }
   }
 
   @BeforeEach
   public void beforeEach() throws Exception {
     initHadoopSecurityContext();
     grpcShuffleServerClient =
-        new ShuffleServerGrpcClient(
-            LOCALHOST,
-            
getShuffleServerConf(ServerType.GRPC).getInteger(ShuffleServerConf.RPC_SERVER_PORT));
+        new ShuffleServerGrpcClient(LOCALHOST, 
grpcShuffleServer.getGrpcPort());
     nettyShuffleServerClient =
         new ShuffleServerGrpcNettyClient(
-            LOCALHOST,
-            getShuffleServerConf(ServerType.GRPC_NETTY)
-                .getInteger(ShuffleServerConf.RPC_SERVER_PORT),
-            getShuffleServerConf(ServerType.GRPC_NETTY)
-                .getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
+            LOCALHOST, nettyShuffleServer.getGrpcPort(), 
nettyShuffleServer.getNettyPort());
   }
 
   @AfterEach


Reply via email to