This is an automated email from the ASF dual-hosted git repository. roryqi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new c75e727a1 [#2065] improvement(test): Fix QuorumTest to use random port (#2066) c75e727a1 is described below commit c75e727a1f6bbdc0fa95ba9ad2f931c84c68abeb Author: maobaolong <baoloong...@tencent.com> AuthorDate: Wed Sep 11 19:09:46 2024 +0800 [#2065] improvement(test): Fix QuorumTest to use random port (#2066) ### What changes were proposed in this pull request? As title. ### Why are the changes needed? Fix: #2065 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs. --- .../uniffle/coordinator/CoordinatorServer.java | 7 +++- .../java/org/apache/uniffle/test/QuorumTest.java | 37 +++++++++++++--------- 2 files changed, 28 insertions(+), 16 deletions(-) diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java index 847afda83..74d34bac6 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorServer.java @@ -72,6 +72,7 @@ public class CoordinatorServer { private GRPCMetrics grpcMetrics; private MetricReporter metricReporter; private String id; + private int rpcListenPort; public CoordinatorServer(CoordinatorConf coordinatorConf) throws Exception { this.startTimeMs = System.currentTimeMillis(); @@ -106,7 +107,7 @@ public class CoordinatorServer { LOG.info( "{} version: {}", this.getClass().getSimpleName(), Constants.VERSION_AND_REVISION_SHORT); jettyServer.start(); - server.start(); + rpcListenPort = server.start(); if (metricReporter != null) { metricReporter.start(); } @@ -280,4 +281,8 @@ public class CoordinatorServer { public long getStartTimeMs() { return startTimeMs; } + + public int getRpcListenPort() { + return rpcListenPort; + } } diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java index 35037470b..ff0fe3477 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java @@ -85,32 +85,45 @@ public class QuorumTest extends ShuffleReadWriteBase { .readBufferSize(1000); } - public static MockedShuffleServer createServer(int id, File tmpDir) throws Exception { + public static MockedShuffleServer createServer(int id, File tmpDir, int coordinatorRpcPort) + throws Exception { ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC); + shuffleServerConf.setInteger("rss.rpc.server.port", 0); shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 8000); shuffleServerConf.setLong("rss.server.heartbeat.interval", 5000); File dataDir1 = new File(tmpDir, id + "_1"); File dataDir2 = new File(tmpDir, id + "_2"); String basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath(); shuffleServerConf.setString("rss.storage.type", StorageType.MEMORY_LOCALFILE.name()); - shuffleServerConf.setInteger("rss.jetty.http.port", 19081 + id * 100); + shuffleServerConf.setInteger("rss.jetty.http.port", 0); shuffleServerConf.setString("rss.storage.basePath", basePath); + shuffleServerConf.setString("rss.coordinator.quorum", LOCALHOST + ":" + coordinatorRpcPort); return new MockedShuffleServer(shuffleServerConf); } @BeforeEach public void initCluster(@TempDir File tmpDir) throws Exception { CoordinatorConf coordinatorConf = getCoordinatorConf(); + coordinatorConf.setInteger(CoordinatorConf.RPC_SERVER_PORT, 0); + coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT, 0); createCoordinatorServer(coordinatorConf); + for (CoordinatorServer coordinator : coordinators) { + coordinator.start(); + } + ShuffleServerConf shuffleServerConf = getShuffleServerConf(ServerType.GRPC); shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 8000); - grpcShuffleServers.add(createServer(0, tmpDir)); - grpcShuffleServers.add(createServer(1, tmpDir)); - grpcShuffleServers.add(createServer(2, tmpDir)); - grpcShuffleServers.add(createServer(3, tmpDir)); - grpcShuffleServers.add(createServer(4, tmpDir)); + grpcShuffleServers.add(createServer(0, tmpDir, coordinators.get(0).getRpcListenPort())); + grpcShuffleServers.add(createServer(1, tmpDir, coordinators.get(0).getRpcListenPort())); + grpcShuffleServers.add(createServer(2, tmpDir, coordinators.get(0).getRpcListenPort())); + grpcShuffleServers.add(createServer(3, tmpDir, coordinators.get(0).getRpcListenPort())); + grpcShuffleServers.add(createServer(4, tmpDir, coordinators.get(0).getRpcListenPort())); + + for (ShuffleServer shuffleServer : grpcShuffleServers) { + shuffleServer.start(); + } shuffleServerInfo0 = new ShuffleServerInfo( @@ -137,12 +150,6 @@ public class QuorumTest extends ShuffleReadWriteBase { String.format("127.0.0.1-%s", grpcShuffleServers.get(4).getGrpcPort()), grpcShuffleServers.get(4).getIp(), grpcShuffleServers.get(4).getGrpcPort()); - for (CoordinatorServer coordinator : coordinators) { - coordinator.start(); - } - for (ShuffleServer shuffleServer : grpcShuffleServers) { - shuffleServer.start(); - } // simulator of failed servers fakedShuffleServerInfo0 = @@ -643,7 +650,7 @@ public class QuorumTest extends ShuffleReadWriteBase { // when one server is restarted, getShuffleResult should success grpcShuffleServers.get(1).stopServer(); - grpcShuffleServers.set(1, createServer(1, tmpDir)); + grpcShuffleServers.set(1, createServer(1, tmpDir, coordinators.get(0).getRpcListenPort())); grpcShuffleServers.get(1).start(); report = shuffleWriteClientImpl.getShuffleResult( @@ -656,7 +663,7 @@ public class QuorumTest extends ShuffleReadWriteBase { // when two servers are restarted, getShuffleResult should fail grpcShuffleServers.get(2).stopServer(); - grpcShuffleServers.set(2, createServer(2, tmpDir)); + grpcShuffleServers.set(2, createServer(2, tmpDir, coordinators.get(0).getRpcListenPort())); grpcShuffleServers.get(2).start(); try { report =