This is an automated email from the ASF dual-hosted git repository.
yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 3874f154c [#2388] improvement(test): mr, tez, spark integration test
use random port (#2397)
3874f154c is described below
commit 3874f154c95338d0d304b3037c04c075dbebb913
Author: summaryzb <[email protected]>
AuthorDate: Mon Mar 17 19:56:33 2025 +0800
[#2388] improvement(test): mr, tez, spark integration test use random port
(#2397)
### What changes were proposed in this pull request?
- Integration test of mr, tez, spark use random port
- Re-enable `AccessClusterTest`
### Why are the changes needed?
Fix: #2388
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT
---
.../apache/uniffle/common/port/PortRegistry.java | 178 ---------------------
.../org/apache/uniffle/test/AccessClusterTest.java | 53 +++---
.../uniffle/test/CoordinatorAdminServiceTest.java | 2 +-
.../apache/uniffle/test/CoordinatorGrpcTest.java | 6 +-
.../apache/uniffle/test/IntegrationTestBase.java | 127 +++------------
.../java/org/apache/uniffle/test/QuorumTest.java | 7 +-
.../java/org/apache/uniffle/test/ServletTest.java | 2 +-
.../ShuffleServerWithKerberizedHadoopTest.java | 12 +-
.../apache/uniffle/test/MRIntegrationTestBase.java | 18 ++-
.../org/apache/uniffle/test/AutoAccessTest.java | 18 ++-
.../uniffle/test/DynamicFetchClientConfTest.java | 16 +-
.../org/apache/uniffle/test/FailingTasksTest.java | 20 +--
.../test/RSSStageDynamicServerReWriteTest.java | 60 +++----
.../apache/uniffle/test/RSSStageResubmitTest.java | 19 +--
.../RepartitionWithHadoopHybridStorageRssTest.java | 32 +---
.../test/RepartitionWithLocalFileRssTest.java | 16 +-
.../RepartitionWithMemoryHybridStorageRssTest.java | 31 ++--
.../uniffle/test/RepartitionWithMemoryRssTest.java | 28 ++--
.../apache/uniffle/test/RssShuffleManagerTest.java | 8 +-
.../test/ShuffleUnregisterWithHadoopTest.java | 22 ++-
.../test/ShuffleUnregisterWithLocalfileTest.java | 29 ++--
.../org/apache/uniffle/test/SimpleTestBase.java | 19 +--
...thLocalForMultiPartLocalStorageManagerTest.java | 62 +++----
.../uniffle/test/SparkClientWithLocalTest.java | 37 ++---
.../uniffle/test/SparkIntegrationTestBase.java | 4 +-
...QLWithDelegationShuffleManagerFallbackTest.java | 17 +-
.../SparkSQLWithDelegationShuffleManagerTest.java | 29 ++--
.../uniffle/test/SparkSQLWithMemoryLocalTest.java | 15 +-
.../org/apache/uniffle/test/GetReaderTest.java | 16 +-
.../apache/uniffle/test/AQERepartitionTest.java | 14 +-
.../org/apache/uniffle/test/AQESkewedJoinTest.java | 19 ++-
.../ContinuousSelectPartitionStrategyTest.java | 51 ++----
.../org/apache/uniffle/test/GetReaderTest.java | 12 +-
.../test/GetShuffleReportForMultiPartTest.java | 51 ++----
.../apache/uniffle/test/MapSideCombineTest.java | 15 +-
.../test/PartitionBlockDataReassignBasicTest.java | 29 ++--
.../PartitionBlockDataReassignMultiTimesTest.java | 30 ++--
.../uniffle/test/TezIntegrationTestBase.java | 18 ++-
.../uniffle/test/TezWordCountWithFailuresTest.java | 16 +-
39 files changed, 355 insertions(+), 803 deletions(-)
diff --git
a/common/src/test/java/org/apache/uniffle/common/port/PortRegistry.java
b/common/src/test/java/org/apache/uniffle/common/port/PortRegistry.java
deleted file mode 100644
index 2a02390a4..000000000
--- a/common/src/test/java/org/apache/uniffle/common/port/PortRegistry.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.common.port;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.net.ServerSocket;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
-import java.nio.channels.OverlappingFileLockException;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Registry for reserving ports during tests.
- *
- * <p>The registry reserves ports by taking file locks on files in the port
coordination directory.
- * This doesn't prevent external processes from stealing our ports, but it
will prevent us from
- * conflicting with ourselves. We can then run tests in a dockerized
environment to completely
- * prevent conflicts.
- *
- * <p>The default coordination directory is determined by the "user.dir" jvm
property. The
- * coordination directory can be overridden by setting the
UNIFFLE_PORT_COORDINATION_DIR environment
- * variable.
- */
-public final class PortRegistry {
- private static final String PORT_COORDINATION_DIR_PROPERTY =
"UNIFFLE_PORT_COORDINATION_DIR";
-
- private static final Registry INSTANCE = new Registry();
-
- private PortRegistry() {} // Class should not be instantiated.
-
- /**
- * Reserves a free port so that other tests will not take it.
- *
- * @return the free port
- */
- public static int reservePort() {
- return INSTANCE.reservePort();
- }
-
- /** @param port the port to release */
- public static void release(int port) {
- INSTANCE.release(port);
- }
-
- /** Clears the registry. */
- public static void clear() {
- INSTANCE.clear();
- }
-
- /**
- * @return a port that is currently free. This does not reserve the port, so
the port may be taken
- * by the time this method returns.
- */
- public static int getFreePort() {
- int port;
- try {
- ServerSocket socket = new ServerSocket(0);
- port = socket.getLocalPort();
- socket.close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- return port;
- }
-
- private static class Registry {
- // Map from port number to the reservation for that port.
- private final Map<Integer, Reservation> reserved = new
ConcurrentHashMap<>();
- private final File coordinationDir;
-
- private Registry() {
- String dir = System.getenv(PORT_COORDINATION_DIR_PROPERTY);
- if (dir == null) {
- dir = System.getProperty("user.dir");
- }
- coordinationDir = new File(dir, ".port_coordination");
- coordinationDir.mkdirs();
- }
-
- /**
- * Reserves a free port so that other tests will not take it.
- *
- * @return the free port
- */
- public int reservePort() {
- for (int i = 0; i < 1000; i++) {
- int port = getFreePort();
- if (lockPort(port)) {
- return port;
- }
- }
- throw new RuntimeException("Failed to acquire port");
- }
-
- /**
- * Attempts to lock the given port.
- *
- * @param port the port to lock
- * @return whether the locking succeeded
- */
- public boolean lockPort(int port) {
- File portFile = portFile(port);
- try {
- FileChannel channel = new RandomAccessFile(portFile,
"rw").getChannel();
- FileLock lock = channel.tryLock();
- if (lock == null) {
- channel.close();
- return false;
- }
- reserved.put(port, new Reservation(portFile, lock));
- return true;
- } catch (IOException | OverlappingFileLockException e) {
- return false;
- }
- }
-
- /** @param port the port to release */
- public void release(int port) {
- Reservation r = reserved.remove(port);
- if (r != null) {
- // If delete fails, we may leave a file behind. However, the file will
be unlocked, so
- // another process can still take the port.
- r.file.delete();
- try {
- r.lock.release();
- r.lock.channel().close();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
- }
-
- /** Clears the registry. */
- public void clear() {
- new HashSet<>(reserved.keySet()).forEach(this::release);
- }
-
- /**
- * Creates a file in coordination dir to lock the port.
- *
- * @param port the port to lock
- * @return the created file
- */
- public File portFile(int port) {
- return new File(coordinationDir, Integer.toString(port));
- }
-
- /** Resources used to reserve a port. */
- private static class Reservation {
- private final File file;
- private final FileLock lock;
-
- private Reservation(File file, FileLock lock) {
- this.file = file;
- this.lock = lock;
- }
- }
- }
-}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
index 9e92d1a95..1a962bcb9 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/AccessClusterTest.java
@@ -29,15 +29,12 @@ import java.util.concurrent.TimeUnit;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
-import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
-import org.apache.uniffle.client.api.CoordinatorClient;
-import org.apache.uniffle.client.factory.CoordinatorClientFactory;
import org.apache.uniffle.client.request.RssAccessClusterRequest;
import org.apache.uniffle.client.response.RssAccessClusterResponse;
-import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.Constants;
@@ -46,13 +43,13 @@ import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.access.AccessCheckResult;
import org.apache.uniffle.coordinator.access.AccessInfo;
import org.apache.uniffle.coordinator.access.checker.AccessChecker;
+import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;
import org.apache.uniffle.server.ShuffleServer;
import org.apache.uniffle.server.ShuffleServerConf;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
-@Disabled("flaky test")
public class AccessClusterTest extends CoordinatorTestBase {
public static class MockedAccessChecker implements AccessChecker {
@@ -83,20 +80,27 @@ public class AccessClusterTest extends CoordinatorTestBase {
}
}
+ @AfterEach
+ public void afterEach() throws Exception {
+ shutdownServers();
+ CoordinatorMetrics.clear();
+ }
+
@Test
public void testUsingCustomExtraProperties() throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
coordinatorConf.setString(
"rss.coordinator.access.checkers",
"org.apache.uniffle.test.AccessClusterTest$MockedAccessChecker");
- createCoordinatorServer(coordinatorConf);
- startServers();
+ storeCoordinatorConf(coordinatorConf);
+ startServersWithRandomPorts();
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
// case1: empty map
String accessID = "acessid";
RssAccessClusterRequest request =
new RssAccessClusterRequest(
accessID, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000,
"user");
+ createClient();
RssAccessClusterResponse response =
coordinatorClient.accessCluster(request);
assertEquals(StatusCode.ACCESS_DENIED, response.getStatusCode());
@@ -125,8 +129,6 @@ public class AccessClusterTest extends CoordinatorTestBase {
"user");
response = coordinatorClient.accessCluster(request);
assertEquals(StatusCode.SUCCESS, response.getStatusCode());
-
- shutdownServers();
}
@Test
@@ -140,23 +142,23 @@ public class AccessClusterTest extends
CoordinatorTestBase {
printWriter.flush();
printWriter.close();
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
coordinatorConf.setInteger("rss.coordinator.access.loadChecker.serverNum.threshold",
2);
coordinatorConf.setString("rss.coordinator.access.candidates.path",
cfgFile.getAbsolutePath());
coordinatorConf.setString(
"rss.coordinator.access.checkers",
"org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker,"
+
"org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker");
- createCoordinatorServer(coordinatorConf);
+ storeCoordinatorConf(coordinatorConf);
- ShuffleServerConf shuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
- createShuffleServer(shuffleServerConf);
- startServers();
+ storeShuffleServerConf(shuffleServerConfWithoutPort(0, tempDir,
ServerType.GRPC));
+ startServersWithRandomPorts();
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
String accessId = "111111";
RssAccessClusterRequest request =
new RssAccessClusterRequest(
accessId, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000,
"user");
+ createClient();
RssAccessClusterResponse response =
coordinatorClient.accessCluster(request);
assertEquals(StatusCode.ACCESS_DENIED, response.getStatusCode());
assertTrue(response.getMessage().startsWith("Denied by
AccessCandidatesChecker"));
@@ -168,32 +170,19 @@ public class AccessClusterTest extends
CoordinatorTestBase {
response = coordinatorClient.accessCluster(request);
assertEquals(StatusCode.ACCESS_DENIED, response.getStatusCode());
assertTrue(response.getMessage().startsWith("Denied by
AccessClusterLoadChecker"));
-
- shuffleServerConf.setInteger(
- "rss.rpc.server.port",
shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT) + 2);
- shuffleServerConf.setInteger(
- "rss.jetty.http.port",
shuffleServerConf.getInteger(ShuffleServerConf.JETTY_HTTP_PORT) + 1);
+ ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0,
tempDir, ServerType.GRPC);
+ shuffleServerConf.setString("rss.coordinator.quorum", getQuorum());
ShuffleServer shuffleServer = new ShuffleServer(shuffleServerConf);
shuffleServer.start();
+ // this make sure the server can be shutdown
+ grpcShuffleServers.add(shuffleServer);
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
- CoordinatorClient client =
- CoordinatorClientFactory.getInstance()
- .createCoordinatorClient(ClientType.GRPC, LOCALHOST,
COORDINATOR_PORT_1 + 13);
- request =
- new RssAccessClusterRequest(
- accessId, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000,
"user");
- response = client.accessCluster(request);
- assertEquals(StatusCode.INTERNAL_ERROR, response.getStatusCode());
- assertTrue(response.getMessage().startsWith("UNAVAILABLE: io exception"));
-
request =
new RssAccessClusterRequest(
accessId, Sets.newHashSet(Constants.SHUFFLE_SERVER_VERSION), 2000,
"user");
response = coordinatorClient.accessCluster(request);
assertEquals(StatusCode.SUCCESS, response.getStatusCode());
assertTrue(response.getMessage().startsWith("SUCCESS"));
- shuffleServer.stopServer();
- shutdownServers();
}
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminServiceTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminServiceTest.java
index 17e87e0b0..9d46d5d5d 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminServiceTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorAdminServiceTest.java
@@ -52,7 +52,7 @@ public class CoordinatorAdminServiceTest extends
IntegrationTestBase {
@BeforeEach
public void createClient() {
- String hostUrl = String.format("http://%s:%d", LOCALHOST,
jettyPorts.get(0));
+ String hostUrl = String.format("http://%s:%d", LOCALHOST,
coordinators.get(0).getJettyPort());
adminRestApi = new
AdminRestApi(UniffleRestClient.builder(hostUrl).build());
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
index cacc46c54..455236bb9 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/CoordinatorGrpcTest.java
@@ -143,7 +143,6 @@ public class CoordinatorGrpcTest extends
CoordinatorTestBase {
CoordinatorTestUtils.waitForRegister(coordinatorClient, 2);
grpcShuffleServers.get(0).stopServer();
- List<Integer> ports = reserveJettyPorts(1);
ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0,
tmpDir, ServerType.GRPC);
shuffleServerConf.set(ShuffleServerConf.STORAGE_MEDIA_PROVIDER_ENV_KEY,
"RSS_ENV_KEY");
String baseDir =
shuffleServerConf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH).get(0);
@@ -154,7 +153,7 @@ public class CoordinatorGrpcTest extends
CoordinatorTestBase {
() -> {
shuffleServerConf.setString("rss.coordinator.quorum",
getQuorum());
shuffleServerConf.setInteger(RssBaseConf.RPC_SERVER_PORT, 0);
- shuffleServerConf.setInteger(RssBaseConf.JETTY_HTTP_PORT,
ports.get(0));
+ shuffleServerConf.setInteger(RssBaseConf.JETTY_HTTP_PORT, 0);
ShuffleServer ss = new ShuffleServer(shuffleServerConf);
ss.start();
grpcShuffleServers.set(0, ss);
@@ -298,7 +297,6 @@ public class CoordinatorGrpcTest extends
CoordinatorTestBase {
assertTrue(node.getTags().contains(Constants.SHUFFLE_SERVER_VERSION));
assertTrue(scm.getTagToNodes().get(Constants.SHUFFLE_SERVER_VERSION).contains(node));
- List<Integer> ports = reserveJettyPorts(1);
ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0,
tempDir, ServerType.GRPC);
shuffleServerConf.set(ShuffleServerConf.STORAGE_MEDIA_PROVIDER_ENV_KEY,
"RSS_ENV_KEY");
String baseDir =
shuffleServerConf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH).get(0);
@@ -311,7 +309,7 @@ public class CoordinatorGrpcTest extends
CoordinatorTestBase {
shuffleServerConf.set(ShuffleServerConf.TAGS,
Lists.newArrayList("SSD"));
shuffleServerConf.setString("rss.coordinator.quorum",
getQuorum());
shuffleServerConf.setInteger(RssBaseConf.RPC_SERVER_PORT, 0);
- shuffleServerConf.setInteger(RssBaseConf.JETTY_HTTP_PORT,
ports.get(0));
+ shuffleServerConf.setInteger(RssBaseConf.JETTY_HTTP_PORT, 0);
ShuffleServer ss = new ShuffleServer(shuffleServerConf);
ss.start();
grpcShuffleServers.set(0, ss);
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
index d3a9b8ffa..cdb3f8937 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
@@ -26,7 +26,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import com.google.common.collect.Lists;
@@ -36,7 +35,6 @@ import org.junit.jupiter.api.io.TempDir;
import org.apache.uniffle.client.factory.ShuffleServerClientFactory;
import org.apache.uniffle.common.config.RssBaseConf;
-import org.apache.uniffle.common.port.PortRegistry;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -51,12 +49,6 @@ import org.apache.uniffle.storage.util.StorageType;
public abstract class IntegrationTestBase extends HadoopTestBase {
- /** Should not be accessed directly, use `getNextRpcServerPort` instead */
- private static final int SHUFFLE_SERVER_INITIAL_PORT = 20001;
-
- /** Should not be accessed directly, use `getNextJettyServerPort` instead */
- private static final int JETTY_SERVER_INITIAL_PORT = 18080;
-
protected static final String LOCALHOST;
static {
@@ -67,12 +59,6 @@ public abstract class IntegrationTestBase extends
HadoopTestBase {
}
}
- protected static final int COORDINATOR_PORT_1 = 19999;
- protected static final int COORDINATOR_PORT_2 = 20030;
- protected static final int JETTY_PORT_1 = 19998;
- protected static final int JETTY_PORT_2 = 20040;
- protected static final String COORDINATOR_QUORUM = LOCALHOST + ":" +
COORDINATOR_PORT_1;
-
protected static List<ShuffleServer> grpcShuffleServers =
Lists.newArrayList();
protected static List<ShuffleServer> nettyShuffleServers =
Lists.newArrayList();
protected static List<CoordinatorServer> coordinators = Lists.newArrayList();
@@ -81,27 +67,8 @@ public abstract class IntegrationTestBase extends
HadoopTestBase {
private static List<ShuffleServerConf> mockShuffleServerConfList =
Lists.newArrayList();
protected static List<CoordinatorConf> coordinatorConfList =
Lists.newArrayList();
- /** Should not be accessed directly, use `getNextNettyServerPort` instead */
- private static final int NETTY_INITIAL_PORT = 21000;
-
- private static AtomicInteger serverRpcPortCounter = new AtomicInteger();
- private static AtomicInteger nettyPortCounter = new AtomicInteger();
- private static AtomicInteger jettyPortCounter = new AtomicInteger();
-
static @TempDir File tempDir;
- public static void startServers() throws Exception {
- for (CoordinatorServer coordinator : coordinators) {
- coordinator.start();
- }
- for (ShuffleServer shuffleServer : grpcShuffleServers) {
- shuffleServer.start();
- }
- for (ShuffleServer shuffleServer : nettyShuffleServers) {
- shuffleServer.start();
- }
- }
-
public static String getQuorum() {
return coordinators.stream()
.map(CoordinatorServer::getRpcListenPort)
@@ -111,11 +78,22 @@ public abstract class IntegrationTestBase extends
HadoopTestBase {
public static List<Integer> generateNonExistingPorts(int num) {
Set<Integer> portExistsSet = Sets.newHashSet();
- jettyPorts.forEach(port -> portExistsSet.add(port));
- coordinators.forEach(server ->
portExistsSet.add(server.getRpcListenPort()));
- grpcShuffleServers.forEach(server ->
portExistsSet.add(server.getGrpcPort()));
- nettyShuffleServers.forEach(server ->
portExistsSet.add(server.getGrpcPort()));
- nettyShuffleServers.forEach(server ->
portExistsSet.add(server.getNettyPort()));
+ coordinators.forEach(
+ server -> {
+ portExistsSet.add(server.getJettyPort());
+ portExistsSet.add(server.getRpcListenPort());
+ });
+ grpcShuffleServers.forEach(
+ server -> {
+ portExistsSet.add(server.getJettyPort());
+ portExistsSet.add(server.getGrpcPort());
+ });
+ nettyShuffleServers.forEach(
+ server -> {
+ portExistsSet.add(server.getJettyPort());
+ portExistsSet.add(server.getGrpcPort());
+ portExistsSet.add(server.getNettyPort());
+ });
int i = 0;
List<Integer> fakePorts = new ArrayList<>(num);
while (i < num) {
@@ -129,15 +107,8 @@ public abstract class IntegrationTestBase extends
HadoopTestBase {
}
public static void startServersWithRandomPorts() throws Exception {
- final int jettyPortSize =
- coordinatorConfList.size()
- + shuffleServerConfList.size()
- + mockShuffleServerConfList.size();
- reserveJettyPorts(jettyPortSize);
- int index = 0;
for (CoordinatorConf coordinatorConf : coordinatorConfList) {
- coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT,
jettyPorts.get(index));
- index++;
+ coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT, 0);
coordinatorConf.setInteger(CoordinatorConf.RPC_SERVER_PORT, 0);
createCoordinatorServer(coordinatorConf);
}
@@ -147,15 +118,13 @@ public abstract class IntegrationTestBase extends
HadoopTestBase {
String quorum = getQuorum();
for (ShuffleServerConf serverConf : shuffleServerConfList) {
- serverConf.setInteger(RssBaseConf.JETTY_HTTP_PORT,
jettyPorts.get(index));
- index++;
+ serverConf.setInteger(RssBaseConf.JETTY_HTTP_PORT, 0);
serverConf.setInteger(RssBaseConf.RPC_SERVER_PORT, 0);
serverConf.setString(RssBaseConf.RSS_COORDINATOR_QUORUM, quorum);
createShuffleServer(serverConf);
}
for (ShuffleServerConf serverConf : mockShuffleServerConfList) {
- serverConf.setInteger(RssBaseConf.JETTY_HTTP_PORT,
jettyPorts.get(index));
- index++;
+ serverConf.setInteger(RssBaseConf.JETTY_HTTP_PORT, 0);
serverConf.setInteger(RssBaseConf.RPC_SERVER_PORT, 0);
serverConf.setString(RssBaseConf.RSS_COORDINATOR_QUORUM, quorum);
createMockedShuffleServer(serverConf);
@@ -169,18 +138,6 @@ public abstract class IntegrationTestBase extends
HadoopTestBase {
}
}
- protected static List<Integer> jettyPorts = Lists.newArrayList();
-
- public static List<Integer> reserveJettyPorts(int numPorts) {
- List<Integer> ports = new ArrayList<>(numPorts);
- for (int i = 0; i < numPorts; i++) {
- int port = PortRegistry.reservePort();
- jettyPorts.add(port);
- ports.add(port);
- }
- return ports;
- }
-
@AfterAll
public static void shutdownServers() throws Exception {
for (CoordinatorServer coordinator : coordinators) {
@@ -192,29 +149,17 @@ public abstract class IntegrationTestBase extends
HadoopTestBase {
for (ShuffleServer shuffleServer : nettyShuffleServers) {
shuffleServer.stopServer();
}
- for (int port : jettyPorts) {
- PortRegistry.release(port);
- }
grpcShuffleServers.clear();
nettyShuffleServers.clear();
coordinators.clear();
shuffleServerConfList.clear();
mockShuffleServerConfList.clear();
coordinatorConfList.clear();
- jettyPorts.clear();
ShuffleServerMetrics.clear();
CoordinatorMetrics.clear();
ShuffleServerClientFactory.getInstance().cleanupCache();
}
- protected static CoordinatorConf getCoordinatorConf() {
- CoordinatorConf coordinatorConf = new CoordinatorConf();
- coordinatorConf.setInteger(CoordinatorConf.RPC_SERVER_PORT,
COORDINATOR_PORT_1);
- coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT, JETTY_PORT_1);
- coordinatorConf.setInteger(CoordinatorConf.RPC_EXECUTOR_SIZE, 10);
- return coordinatorConf;
- }
-
protected static CoordinatorConf coordinatorConfWithoutPort() {
CoordinatorConf coordinatorConf = new CoordinatorConf();
coordinatorConf.setInteger(CoordinatorConf.RPC_EXECUTOR_SIZE, 10);
@@ -231,22 +176,9 @@ public abstract class IntegrationTestBase extends
HadoopTestBase {
CoordinatorConf.COORDINATOR_DYNAMIC_CLIENT_CONF_UPDATE_INTERVAL_SEC,
5);
}
- // TODO(summaryzb) when all test use random port,
- // https://github.com/apache/incubator-uniffle/issues/2064 should be closed,
then this method
- // should be removed
- protected static ShuffleServerConf getShuffleServerConf(ServerType
serverType) throws Exception {
- return getShuffleServerConf(
- serverType,
- COORDINATOR_QUORUM,
- getNextRpcServerPort(),
- getNextNettyServerPort(),
- getNextJettyServerPort());
- }
-
- private static ShuffleServerConf getShuffleServerConf(
- ServerType serverType, String quorum, int grpcPort, int nettyPort, int
jettyPort) {
+ private static ShuffleServerConf getShuffleServerConf(ServerType serverType,
String quorum) {
ShuffleServerConf serverConf = new ShuffleServerConf();
- serverConf.setInteger("rss.rpc.server.port", grpcPort);
+ serverConf.setInteger("rss.rpc.server.port", 0);
serverConf.setString("rss.storage.type",
StorageType.MEMORY_LOCALFILE_HDFS.name());
serverConf.setString("rss.storage.basePath", tempDir.getAbsolutePath());
serverConf.setString("rss.server.buffer.capacity", "671088640");
@@ -256,7 +188,6 @@ public abstract class IntegrationTestBase extends
HadoopTestBase {
serverConf.setString("rss.coordinator.quorum", quorum);
serverConf.setString("rss.server.heartbeat.delay", "1000");
serverConf.setString("rss.server.heartbeat.interval", "1000");
- serverConf.setInteger(RssBaseConf.JETTY_HTTP_PORT, jettyPort);
serverConf.setInteger("rss.jetty.corePool.size", 64);
serverConf.setInteger("rss.rpc.executor.size", 10);
serverConf.setString("rss.server.hadoop.dfs.replication", "2");
@@ -266,14 +197,14 @@ public abstract class IntegrationTestBase extends
HadoopTestBase {
serverConf.set(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL,
500L);
serverConf.set(ShuffleServerConf.RPC_SERVER_TYPE, serverType);
if (serverType == ServerType.GRPC_NETTY) {
- serverConf.setInteger(ShuffleServerConf.NETTY_SERVER_PORT, nettyPort);
+ serverConf.setInteger(ShuffleServerConf.NETTY_SERVER_PORT, 0);
}
return serverConf;
}
protected static ShuffleServerConf shuffleServerConfWithoutPort(
int subDirIndex, File tmpDir, ServerType serverType) {
- ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType, "",
0, 0, 0);
+ ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType, "");
if (tmpDir != null) {
File dataDir1 = new File(tmpDir, subDirIndex + "_1");
File dataDir2 = new File(tmpDir, subDirIndex + "_2");
@@ -283,18 +214,6 @@ public abstract class IntegrationTestBase extends
HadoopTestBase {
return shuffleServerConf;
}
- public static int getNextRpcServerPort() {
- return SHUFFLE_SERVER_INITIAL_PORT +
serverRpcPortCounter.getAndIncrement();
- }
-
- public static int getNextJettyServerPort() {
- return JETTY_SERVER_INITIAL_PORT + jettyPortCounter.getAndIncrement();
- }
-
- public static int getNextNettyServerPort() {
- return NETTY_INITIAL_PORT + nettyPortCounter.getAndIncrement();
- }
-
protected static void createCoordinatorServer(CoordinatorConf
coordinatorConf) throws Exception {
coordinators.add(new CoordinatorServer(coordinatorConf));
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
index 5de2693d2..1cfec60ef 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/QuorumTest.java
@@ -586,15 +586,12 @@ public class QuorumTest extends ShuffleReadWriteBase {
assertEquals(0, result.getFailedBlockIds().size());
assertEquals(blockIdBitmap, succBlockIdBitmap);
- // tricky to reserve port, it'll be released after test
- @SuppressWarnings("checkstyle:VariableDeclarationUsageDistance")
- List<Integer> ports = reserveJettyPorts(2);
// when one server is restarted, getShuffleResult should success
grpcShuffleServers.get(1).stopServer();
ShuffleServerConf shuffleServerConf1 = buildServerConf(5, tmpDir);
shuffleServerConf1.setString("rss.coordinator.quorum", getQuorum());
shuffleServerConf1.setInteger(RssBaseConf.RPC_SERVER_PORT, 0);
- shuffleServerConf1.setInteger(RssBaseConf.JETTY_HTTP_PORT, ports.get(0));
+ shuffleServerConf1.setInteger(RssBaseConf.JETTY_HTTP_PORT, 0);
grpcShuffleServers.set(1, new MockedShuffleServer(shuffleServerConf1));
grpcShuffleServers.get(1).start();
report =
@@ -611,7 +608,7 @@ public class QuorumTest extends ShuffleReadWriteBase {
ShuffleServerConf shuffleServerConf2 = buildServerConf(5, tmpDir);
shuffleServerConf2.setString("rss.coordinator.quorum", getQuorum());
shuffleServerConf2.setInteger(RssBaseConf.RPC_SERVER_PORT, 0);
- shuffleServerConf2.setInteger(RssBaseConf.JETTY_HTTP_PORT, ports.get(1));
+ shuffleServerConf2.setInteger(RssBaseConf.JETTY_HTTP_PORT, 0);
grpcShuffleServers.set(2, new MockedShuffleServer(shuffleServerConf2));
grpcShuffleServers.get(2).start();
try {
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
index 692763e86..2ba14bf14 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ServletTest.java
@@ -101,8 +101,8 @@ public class ServletTest extends IntegrationTestBase {
prepareShuffleServerConf(3, tmpDir);
startServersWithRandomPorts();
- coordinatorHttpPort = jettyPorts.get(0);
coordinatorServer = coordinators.get(0);
+ coordinatorHttpPort = coordinatorServer.getJettyPort();
Awaitility.await()
.timeout(30, TimeUnit.SECONDS)
.until(() -> coordinatorServer.getClusterManager().list().size() == 4);
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
index bcbc29f55..ee0eda249 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHadoopTest.java
@@ -54,7 +54,6 @@ import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
-import org.apache.uniffle.common.port.PortRegistry;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.RssUtils;
@@ -87,7 +86,6 @@ public class ShuffleServerWithKerberizedHadoopTest extends
KerberizedHadoopBase
private static CoordinatorServer coordinatorServer;
private static ShuffleServer grpcShuffleServer;
private static ShuffleServer nettyShuffleServer;
- protected static List<Integer> jettyPorts = Lists.newArrayList();
private static ShuffleServerConf getShuffleServerConf(
int id, File tmpDir, int coordinatorRpcPort, ServerType serverType)
throws Exception {
@@ -104,7 +102,7 @@ public class ShuffleServerWithKerberizedHadoopTest extends
KerberizedHadoopBase
serverConf.setString("rss.coordinator.quorum", LOCALHOST + ":" +
coordinatorRpcPort);
serverConf.setString("rss.server.heartbeat.delay", "1000");
serverConf.setString("rss.server.heartbeat.interval", "1000");
- serverConf.setInteger("rss.jetty.http.port", jettyPorts.get(id));
+ serverConf.setInteger("rss.jetty.http.port", 0);
serverConf.setInteger("rss.jetty.corePool.size", 64);
serverConf.setInteger("rss.rpc.executor.size", 10);
serverConf.setString("rss.server.hadoop.dfs.replication", "2");
@@ -123,13 +121,10 @@ public class ShuffleServerWithKerberizedHadoopTest
extends KerberizedHadoopBase
public static void setup(@TempDir File tempDir) throws Exception {
testRunner = ShuffleServerWithKerberizedHadoopTest.class;
KerberizedHadoopBase.init();
- for (int i = 0; i < 3; i++) {
- jettyPorts.add(PortRegistry.reservePort());
- }
CoordinatorConf coordinatorConf = new CoordinatorConf();
coordinatorConf.setInteger(CoordinatorConf.RPC_SERVER_PORT, 0);
- coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT,
jettyPorts.get(3));
+ coordinatorConf.setInteger(CoordinatorConf.JETTY_HTTP_PORT, 0);
coordinatorConf.setInteger(CoordinatorConf.RPC_EXECUTOR_SIZE, 10);
coordinatorServer = new CoordinatorServer(coordinatorConf);
coordinatorServer.start();
@@ -157,9 +152,6 @@ public class ShuffleServerWithKerberizedHadoopTest extends
KerberizedHadoopBase
if (nettyShuffleServer != null) {
nettyShuffleServer.stopServer();
}
- for (int port : jettyPorts) {
- PortRegistry.release(port);
- }
}
@BeforeEach
diff --git
a/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
b/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
index 0c6e32652..c41157caa 100644
---
a/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
+++
b/integration-test/mr/src/test/java/org/apache/uniffle/test/MRIntegrationTestBase.java
@@ -214,7 +214,7 @@ public class MRIntegrationTestBase extends
IntegrationTestBase {
+ localFile.getName()
+ ","
+ MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH);
- jobConf.set(RssMRConfig.RSS_COORDINATOR_QUORUM, COORDINATOR_QUORUM);
+ jobConf.set(RssMRConfig.RSS_COORDINATOR_QUORUM, getQuorum());
updateRssConfiguration(jobConf, clientType);
runMRApp(jobConf, getTestTool(), getTestArgs());
fs.delete(newPath, true);
@@ -230,18 +230,20 @@ public class MRIntegrationTestBase extends
IntegrationTestBase {
protected static void setupServers(Map<String, String> dynamicConf,
ShuffleServerConf serverConf)
throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
- ShuffleServerConf grpcShuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
- ShuffleServerConf nettyShuffleServerConf =
getShuffleServerConf(ServerType.GRPC_NETTY);
+ storeCoordinatorConf(coordinatorConf);
+ ShuffleServerConf grpcShuffleServerConf =
+ shuffleServerConfWithoutPort(0, null, ServerType.GRPC);
+ ShuffleServerConf nettyShuffleServerConf =
+ shuffleServerConfWithoutPort(1, null, ServerType.GRPC_NETTY);
if (serverConf != null) {
grpcShuffleServerConf.addAll(serverConf);
nettyShuffleServerConf.addAll(serverConf);
}
- createShuffleServer(grpcShuffleServerConf);
- createShuffleServer(nettyShuffleServerConf);
- startServers();
+ storeShuffleServerConf(grpcShuffleServerConf);
+ storeShuffleServerConf(nettyShuffleServerConf);
+ startServersWithRandomPorts();
}
protected static Map<String, String> getDynamicConf() {
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java
index b8d31c5b6..b04f3efd7 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/AutoAccessTest.java
@@ -17,6 +17,7 @@
package org.apache.uniffle.test;
+import java.io.File;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.concurrent.TimeUnit;
@@ -31,6 +32,7 @@ import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.shuffle.ShuffleManager;
import org.apache.spark.shuffle.sort.SortShuffleManager;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -46,14 +48,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class AutoAccessTest extends IntegrationTestBase {
@Test
- public void test() throws Exception {
+ public void test(@TempDir File tmpDir) throws Exception {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.shuffle.manager",
"org.apache.spark.shuffle.DelegationRssShuffleManager");
- sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(),
COORDINATOR_QUORUM);
+
sparkConf.set("spark.mock.2", "no-overwrite-conf");
sparkConf.set(RssSparkConfig.RSS_REMOTE_STORAGE_PATH.key(),
"overwrite-path");
sparkConf.set("spark.shuffle.service.enabled", "true");
-
String cfgFile = HDFS_URI + "/test/client_conf";
Path path = new Path(cfgFile);
FSDataOutputStream out = fs.create(path);
@@ -75,7 +76,7 @@ public class AutoAccessTest extends IntegrationTestBase {
printWriter1.flush();
printWriter1.close();
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
coordinatorConf.setBoolean("rss.coordinator.dynamicClientConf.enabled",
true);
coordinatorConf.setString("rss.coordinator.dynamicClientConf.path",
cfgFile);
coordinatorConf.setInteger("rss.coordinator.dynamicClientConf.updateIntervalSec",
1);
@@ -86,13 +87,14 @@ public class AutoAccessTest extends IntegrationTestBase {
"rss.coordinator.access.checkers",
"org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker,"
+
"org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker");
- createCoordinatorServer(coordinatorConf);
+ storeCoordinatorConf(coordinatorConf);
- ShuffleServerConf shuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
- createShuffleServer(shuffleServerConf);
- startServers();
+ ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0,
tmpDir, ServerType.GRPC);
+ storeShuffleServerConf(shuffleServerConf);
+ startServersWithRandomPorts();
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+ sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), getQuorum());
assertFalse(sparkConf.contains("spark.mock.1"));
assertEquals("no-overwrite-conf", sparkConf.get("spark.mock.2"));
assertTrue(sparkConf.getBoolean("spark.shuffle.service.enabled", true));
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/DynamicFetchClientConfTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/DynamicFetchClientConfTest.java
index 3bf18a5d8..886e28c65 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/DynamicFetchClientConfTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/DynamicFetchClientConfTest.java
@@ -42,7 +42,6 @@ public class DynamicFetchClientConfTest extends
IntegrationTestBase {
public void test() throws Exception {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.shuffle.manager",
"org.apache.spark.shuffle.RssShuffleManager");
- sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(),
COORDINATOR_QUORUM);
sparkConf.set("spark.mock.2", "no-overwrite-conf");
sparkConf.set("spark.shuffle.service.enabled", "true");
@@ -62,14 +61,15 @@ public class DynamicFetchClientConfTest extends
IntegrationTestBase {
}
sparkConf.set("spark.mock.2", "no-overwrite-conf");
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
coordinatorConf.setBoolean("rss.coordinator.dynamicClientConf.enabled",
true);
coordinatorConf.setString("rss.coordinator.dynamicClientConf.path",
cfgFile);
coordinatorConf.setInteger("rss.coordinator.dynamicClientConf.updateIntervalSec",
10);
- createCoordinatorServer(coordinatorConf);
- startServers();
+ storeCoordinatorConf(coordinatorConf);
+ startServersWithRandomPorts();
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
+ sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), getQuorum());
assertFalse(sparkConf.contains("spark.mock.1"));
assertEquals("no-overwrite-conf", sparkConf.get("spark.mock.2"));
@@ -95,7 +95,6 @@ public class DynamicFetchClientConfTest extends
IntegrationTestBase {
shutdownServers();
sparkConf = new SparkConf();
sparkConf.set("spark.shuffle.manager",
"org.apache.spark.shuffle.RssShuffleManager");
- sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(),
COORDINATOR_QUORUM);
sparkConf.set("spark.mock.2", "no-overwrite-conf");
sparkConf.set("spark.shuffle.service.enabled", "true");
@@ -106,13 +105,14 @@ public class DynamicFetchClientConfTest extends
IntegrationTestBase {
printWriter.println(" spark.mock.3 true ");
printWriter.flush();
printWriter.close();
- coordinatorConf = getCoordinatorConf();
+ coordinatorConf = coordinatorConfWithoutPort();
coordinatorConf.setBoolean("rss.coordinator.dynamicClientConf.enabled",
true);
coordinatorConf.setString("rss.coordinator.dynamicClientConf.path",
cfgFile);
coordinatorConf.setInteger("rss.coordinator.dynamicClientConf.updateIntervalSec",
10);
- createCoordinatorServer(coordinatorConf);
- startServers();
+ storeCoordinatorConf(coordinatorConf);
+ startServersWithRandomPorts();
Uninterruptibles.sleepUninterruptibly(3, TimeUnit.SECONDS);
+ sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), getQuorum());
Exception expectException = null;
try {
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/FailingTasksTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/FailingTasksTest.java
index a10382a79..9edddc03b 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/FailingTasksTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/FailingTasksTest.java
@@ -17,6 +17,7 @@
package org.apache.uniffle.test;
+import java.io.File;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Collectors;
@@ -30,10 +31,10 @@ import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
// This test has all tasks fail twice, the third attempt succeeds.
@@ -44,20 +45,21 @@ import org.apache.uniffle.storage.util.StorageType;
public class FailingTasksTest extends SparkTaskFailureIntegrationTestBase {
@BeforeAll
- public static void setupServers() throws Exception {
+ public static void setupServers(@TempDir File tmpDir) throws Exception {
shutdownServers();
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
Map<String, String> dynamicConf = Maps.newHashMap();
dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
dynamicConf.put(
RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE_HDFS.name());
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
- ShuffleServerConf grpcShuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
- createShuffleServer(grpcShuffleServerConf);
- ShuffleServerConf nettyShuffleServerConf =
getShuffleServerConf(ServerType.GRPC_NETTY);
- createShuffleServer(nettyShuffleServerConf);
- startServers();
+ storeCoordinatorConf(coordinatorConf);
+
+ storeShuffleServerConf(shuffleServerConfWithoutPort(0, tmpDir,
ServerType.GRPC));
+ storeShuffleServerConf(shuffleServerConfWithoutPort(1, tmpDir,
ServerType.GRPC_NETTY));
+
+ startServersWithRandomPorts();
}
@Override
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java
index 444250f56..26f43542d 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageDynamicServerReWriteTest.java
@@ -39,56 +39,42 @@ import org.apache.uniffle.storage.util.StorageType;
public class RSSStageDynamicServerReWriteTest extends
SparkTaskFailureIntegrationTestBase {
@BeforeAll
public static void setupServers(@TempDir File tmpDir) throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
Map<String, String> dynamicConf = Maps.newHashMap();
dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE.name());
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
- createServer(0, tmpDir, true, ServerType.GRPC);
- createServer(1, tmpDir, false, ServerType.GRPC);
- createServer(2, tmpDir, false, ServerType.GRPC);
- createServer(3, tmpDir, true, ServerType.GRPC_NETTY);
- createServer(4, tmpDir, false, ServerType.GRPC_NETTY);
- createServer(5, tmpDir, false, ServerType.GRPC_NETTY);
- startServers();
+ storeCoordinatorConf(coordinatorConf);
+ prepareServerConf(0, tmpDir, true, ServerType.GRPC);
+ prepareServerConf(1, tmpDir, false, ServerType.GRPC);
+ prepareServerConf(2, tmpDir, false, ServerType.GRPC);
+ prepareServerConf(3, tmpDir, true, ServerType.GRPC_NETTY);
+ prepareServerConf(4, tmpDir, false, ServerType.GRPC_NETTY);
+ prepareServerConf(5, tmpDir, false, ServerType.GRPC_NETTY);
+ startServersWithRandomPorts();
+
+ // Set the sending block data timeout for the first shuffleServer
+ ((MockedGrpcServer) grpcShuffleServers.get(2).getServer())
+ .getService()
+ .enableMockSendDataFailed(true);
+
+ ((MockedGrpcServer) nettyShuffleServers.get(2).getServer())
+ .getService()
+ .enableMockSendDataFailed(true);
}
- public static void createServer(int id, File tmpDir, boolean abnormalFlag,
ServerType serverType)
- throws Exception {
- ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType);
+ public static void prepareServerConf(
+ int id, File tmpDir, boolean abnormalFlag, ServerType serverType) {
+ ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(id,
tmpDir, serverType);
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 8000);
shuffleServerConf.setLong("rss.server.heartbeat.interval", 5000);
- File dataDir1 = new File(tmpDir, id + "_1");
- File dataDir2 = new File(tmpDir, id + "_2");
- String basePath = dataDir1.getAbsolutePath() + "," +
dataDir2.getAbsolutePath();
shuffleServerConf.setString("rss.storage.type",
StorageType.MEMORY_LOCALFILE.name());
shuffleServerConf.set(ShuffleServerConf.SERVER_RPC_AUDIT_LOG_ENABLED,
false);
- shuffleServerConf.setInteger(
- "rss.rpc.server.port",
- shuffleServerConf.getInteger(ShuffleServerConf.RPC_SERVER_PORT) + id);
- shuffleServerConf.setInteger("rss.jetty.http.port", 19081 + id * 100);
- shuffleServerConf.setString("rss.storage.basePath", basePath);
if (abnormalFlag) {
- createMockedShuffleServer(shuffleServerConf);
- // Set the sending block data timeout for the first shuffleServer
- switch (serverType) {
- case GRPC:
- ((MockedGrpcServer) grpcShuffleServers.get(0).getServer())
- .getService()
- .enableMockSendDataFailed(true);
- break;
- case GRPC_NETTY:
- ((MockedGrpcServer) nettyShuffleServers.get(0).getServer())
- .getService()
- .enableMockSendDataFailed(true);
- break;
- default:
- throw new UnsupportedOperationException("Unsupported server type " +
serverType);
- }
+ storeMockShuffleServerConf(shuffleServerConf);
} else {
- createShuffleServer(shuffleServerConf);
+ storeShuffleServerConf(shuffleServerConf);
}
}
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
index 2ea1176ab..b9453ff9a 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RSSStageResubmitTest.java
@@ -17,6 +17,7 @@
package org.apache.uniffle.test;
+import java.io.File;
import java.util.List;
import java.util.Map;
@@ -27,20 +28,20 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import org.apache.uniffle.client.util.RssClientConfig;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.MockedGrpcServer;
import org.apache.uniffle.server.ShuffleServer;
-import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
public class RSSStageResubmitTest extends SparkTaskFailureIntegrationTestBase {
@BeforeAll
- public static void setupServers() throws Exception {
- final CoordinatorConf coordinatorConf = getCoordinatorConf();
+ public static void setupServers(@TempDir File tmpDir) throws Exception {
+ final CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
Map<String, String> dynamicConf = Maps.newHashMap();
dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE.name());
@@ -48,13 +49,13 @@ public class RSSStageResubmitTest extends
SparkTaskFailureIntegrationTestBase {
RssSparkConfig.SPARK_RSS_CONFIG_PREFIX +
RssClientConfig.RSS_RESUBMIT_STAGE, "true");
dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE.name());
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
- ShuffleServerConf grpcShuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
- createMockedShuffleServer(grpcShuffleServerConf);
+ storeCoordinatorConf(coordinatorConf);
+
+ storeMockShuffleServerConf(shuffleServerConfWithoutPort(0, tmpDir,
ServerType.GRPC));
+ storeMockShuffleServerConf(shuffleServerConfWithoutPort(1, tmpDir,
ServerType.GRPC_NETTY));
+
+ startServersWithRandomPorts();
enableFirstReadRequest(2 * maxTaskFailures);
- ShuffleServerConf nettyShuffleServerConf =
getShuffleServerConf(ServerType.GRPC_NETTY);
- createMockedShuffleServer(nettyShuffleServerConf);
- startServers();
}
private static void enableFirstReadRequest(int failCount) {
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHadoopHybridStorageRssTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHadoopHybridStorageRssTest.java
index 64626237a..a435c632d 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHadoopHybridStorageRssTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithHadoopHybridStorageRssTest.java
@@ -18,8 +18,6 @@
package org.apache.uniffle.test;
import java.io.File;
-import java.util.Arrays;
-import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -53,34 +51,20 @@ public class RepartitionWithHadoopHybridStorageRssTest
extends RepartitionTest {
LOG.info("use off heap: " + useOffHeap);
dynamicConf.put(
RssSparkConfig.RSS_CLIENT_OFF_HEAP_MEMORY_ENABLE.key(),
String.valueOf(useOffHeap));
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
+ storeCoordinatorConf(coordinatorConf);
- // local storage config
- File dataDir1 = new File(tmpDir, "data1");
- File dataDir2 = new File(tmpDir, "data2");
- List<String> grpcBasePath =
- Arrays.asList(dataDir1.getAbsolutePath(), dataDir2.getAbsolutePath());
- ShuffleServerConf grpcShuffleServerConf =
buildShuffleServerConf(ServerType.GRPC, grpcBasePath);
- createShuffleServer(grpcShuffleServerConf);
+ storeShuffleServerConf(buildShuffleServerConf(0, tmpDir, ServerType.GRPC));
+ storeShuffleServerConf(buildShuffleServerConf(1, tmpDir,
ServerType.GRPC_NETTY));
- // local storage config
- File dataDir3 = new File(tmpDir, "data3");
- File dataDir4 = new File(tmpDir, "data4");
- List<String> nettyBasePath =
- Arrays.asList(dataDir3.getAbsolutePath(), dataDir4.getAbsolutePath());
- ShuffleServerConf nettyShuffleServerConf =
- buildShuffleServerConf(ServerType.GRPC_NETTY, nettyBasePath);
- createShuffleServer(nettyShuffleServerConf);
-
- startServers();
+ startServersWithRandomPorts();
}
private static ShuffleServerConf buildShuffleServerConf(
- ServerType serverType, List<String> basePath) throws Exception {
- ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType);
- shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH, basePath);
+ int subDirIndex, File tmpDir, ServerType serverType) {
+ ShuffleServerConf shuffleServerConf =
+ shuffleServerConfWithoutPort(subDirIndex, tmpDir, serverType);
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(),
StorageType.LOCALFILE_HDFS.name());
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE,
1024L * 1024L);
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java
index 4f30c4997..2bdc29f19 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java
@@ -42,31 +42,33 @@ public class RepartitionWithLocalFileRssTest extends
RepartitionTest {
@BeforeAll
public static void setupServers(@TempDir File tmpDir) throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
Map<String, String> dynamicConf = Maps.newHashMap();
dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.LOCALFILE.name());
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
+ storeCoordinatorConf(coordinatorConf);
- ShuffleServerConf grpcShuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
+ ShuffleServerConf grpcShuffleServerConf =
+ shuffleServerConfWithoutPort(0, null, ServerType.GRPC);
File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
String grpcBasePath = dataDir1.getAbsolutePath() + "," +
dataDir2.getAbsolutePath();
grpcShuffleServerConf.setString("rss.storage.type",
StorageType.LOCALFILE.name());
grpcShuffleServerConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE,
true);
grpcShuffleServerConf.setString("rss.storage.basePath", grpcBasePath);
- createShuffleServer(grpcShuffleServerConf);
+ storeShuffleServerConf(grpcShuffleServerConf);
- ShuffleServerConf nettyShuffleServerConf =
getShuffleServerConf(ServerType.GRPC_NETTY);
+ ShuffleServerConf nettyShuffleServerConf =
+ shuffleServerConfWithoutPort(1, null, ServerType.GRPC_NETTY);
File dataDir3 = new File(tmpDir, "data3");
File dataDir4 = new File(tmpDir, "data4");
String nettyBasePath = dataDir3.getAbsolutePath() + "," +
dataDir4.getAbsolutePath();
nettyShuffleServerConf.setString("rss.storage.type",
StorageType.LOCALFILE.name());
nettyShuffleServerConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE,
true);
nettyShuffleServerConf.setString("rss.storage.basePath", nettyBasePath);
- createShuffleServer(nettyShuffleServerConf);
+ storeShuffleServerConf(nettyShuffleServerConf);
- startServers();
+ startServersWithRandomPorts();
}
@Override
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryHybridStorageRssTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryHybridStorageRssTest.java
index 02545b46f..50f1fb0b0 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryHybridStorageRssTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryHybridStorageRssTest.java
@@ -18,7 +18,6 @@
package org.apache.uniffle.test;
import java.io.File;
-import java.util.Arrays;
import java.util.Map;
import com.google.common.collect.Maps;
@@ -35,36 +34,24 @@ import org.apache.uniffle.storage.util.StorageType;
public class RepartitionWithMemoryHybridStorageRssTest extends RepartitionTest
{
@BeforeAll
public static void setupServers(@TempDir File tmpDir) throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
Map<String, String> dynamicConf = Maps.newHashMap();
dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
dynamicConf.put(
RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE_HDFS.name());
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
+ storeCoordinatorConf(coordinatorConf);
- // local storage config
- File dataDir1 = new File(tmpDir, "data1");
- File dataDir2 = new File(tmpDir, "data2");
- String grpcBasePath = dataDir1.getAbsolutePath() + "," +
dataDir2.getAbsolutePath();
- ShuffleServerConf grpcShuffleServerConf =
buildShuffleServerConf(ServerType.GRPC, grpcBasePath);
+ storeShuffleServerConf(buildShuffleServerConf(0, tmpDir, ServerType.GRPC));
+ storeShuffleServerConf(buildShuffleServerConf(1, tmpDir,
ServerType.GRPC_NETTY));
- File dataDir3 = new File(tmpDir, "data3");
- File dataDir4 = new File(tmpDir, "data4");
- String nettyBasePath = dataDir3.getAbsolutePath() + "," +
dataDir4.getAbsolutePath();
- ShuffleServerConf nettyShuffleServerConf =
- buildShuffleServerConf(ServerType.GRPC_NETTY, nettyBasePath);
-
- createShuffleServer(grpcShuffleServerConf);
- createShuffleServer(nettyShuffleServerConf);
-
- startServers();
+ startServersWithRandomPorts();
}
- private static ShuffleServerConf buildShuffleServerConf(ServerType
serverType, String basePath)
- throws Exception {
- ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType);
- shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList(basePath));
+ private static ShuffleServerConf buildShuffleServerConf(
+ int subDirIndex, File tmpDir, ServerType serverType) {
+ ShuffleServerConf shuffleServerConf =
+ shuffleServerConfWithoutPort(subDirIndex, tmpDir, serverType);
shuffleServerConf.setLong(ShuffleServerConf.FLUSH_COLD_STORAGE_THRESHOLD_SIZE,
1024L * 1024L);
return shuffleServerConf;
}
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java
index 3444a1cee..df53d947e 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithMemoryRssTest.java
@@ -18,7 +18,6 @@
package org.apache.uniffle.test;
import java.io.File;
-import java.util.Arrays;
import java.util.Map;
import com.google.common.collect.Maps;
@@ -37,26 +36,17 @@ public class RepartitionWithMemoryRssTest extends
RepartitionTest {
@BeforeAll
public static void setupServers(@TempDir File tmpDir) throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
coordinatorConf.set(CoordinatorConf.COORDINATOR_APP_EXPIRED, 5000L);
Map<String, String> dynamicConf = Maps.newHashMap();
dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE.name());
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
+ storeCoordinatorConf(coordinatorConf);
- File dataDir1 = new File(tmpDir, "data1");
- File dataDir2 = new File(tmpDir, "data2");
- String grpcBasePath = dataDir1.getAbsolutePath() + "," +
dataDir2.getAbsolutePath();
- ShuffleServerConf grpcShuffleServerConf =
buildShuffleServerConf(ServerType.GRPC, grpcBasePath);
+ storeShuffleServerConf(buildShuffleServerConf(0, tmpDir, ServerType.GRPC));
+ storeShuffleServerConf(buildShuffleServerConf(1, tmpDir,
ServerType.GRPC_NETTY));
- File dataDir3 = new File(tmpDir, "data3");
- File dataDir4 = new File(tmpDir, "data4");
- String nettyBasePath = dataDir3.getAbsolutePath() + "," +
dataDir4.getAbsolutePath();
- ShuffleServerConf nettyShuffleServerConf =
- buildShuffleServerConf(ServerType.GRPC_NETTY, nettyBasePath);
- createShuffleServer(grpcShuffleServerConf);
- createShuffleServer(nettyShuffleServerConf);
- startServers();
+ startServersWithRandomPorts();
}
@Test
@@ -72,14 +62,14 @@ public class RepartitionWithMemoryRssTest extends
RepartitionTest {
runSparkApp(sparkConf, fileName);
}
- private static ShuffleServerConf buildShuffleServerConf(ServerType grpc,
String basePath)
- throws Exception {
- ShuffleServerConf grpcShuffleServerConf = getShuffleServerConf(grpc);
+ private static ShuffleServerConf buildShuffleServerConf(
+ int subDirIndex, File tmpDir, ServerType serverType) {
+ ShuffleServerConf grpcShuffleServerConf =
+ shuffleServerConfWithoutPort(subDirIndex, tmpDir, serverType);
grpcShuffleServerConf.set(ShuffleServerConf.SERVER_HEARTBEAT_INTERVAL,
5000L);
grpcShuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT,
4000L);
grpcShuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE.name());
- grpcShuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList(basePath));
grpcShuffleServerConf.setString(ShuffleServerConf.SERVER_BUFFER_CAPACITY.key(),
"512mb");
return grpcShuffleServerConf;
}
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
index 9589293e0..8ba28e15f 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RssShuffleManagerTest.java
@@ -90,11 +90,11 @@ public class RssShuffleManagerTest extends
SparkIntegrationTestBase {
RssClientConf.BLOCKID_TASK_ATTEMPT_ID_BITS.key(),
String.valueOf(dynamicConfLayout.taskAttemptIdBits));
}
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
- createShuffleServer(getShuffleServerConf(ServerType.GRPC));
- startServers();
+ storeCoordinatorConf(coordinatorConf);
+ storeShuffleServerConf(shuffleServerConfWithoutPort(0, tempDir,
ServerType.GRPC));
+ startServersWithRandomPorts();
return dynamicConf;
}
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHadoopTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHadoopTest.java
index dff36b63e..8fbce3cd4 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHadoopTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHadoopTest.java
@@ -17,6 +17,7 @@
package org.apache.uniffle.test;
+import java.io.File;
import java.util.Map;
import scala.Tuple2;
@@ -31,6 +32,7 @@ import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.coordinator.CoordinatorConf;
@@ -43,20 +45,24 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class ShuffleUnregisterWithHadoopTest extends SparkIntegrationTestBase {
@BeforeAll
- public static void setupServers() throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ public static void setupServers(@TempDir File tmpDir) throws Exception {
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
Map<String, String> dynamicConf = Maps.newHashMap();
dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.HDFS.name());
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
- ShuffleServerConf grpcShuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
+ storeCoordinatorConf(coordinatorConf);
+
+ ShuffleServerConf grpcShuffleServerConf =
+ shuffleServerConfWithoutPort(0, tmpDir, ServerType.GRPC);
+ storeShuffleServerConf(grpcShuffleServerConf);
grpcShuffleServerConf.setString("rss.storage.type",
StorageType.HDFS.name());
- ShuffleServerConf nettyShuffleServerConf =
getShuffleServerConf(ServerType.GRPC_NETTY);
+ ShuffleServerConf nettyShuffleServerConf =
+ shuffleServerConfWithoutPort(1, tmpDir, ServerType.GRPC_NETTY);
nettyShuffleServerConf.setString("rss.storage.type",
StorageType.HDFS.name());
- createShuffleServer(grpcShuffleServerConf);
- createShuffleServer(nettyShuffleServerConf);
- startServers();
+ storeShuffleServerConf(nettyShuffleServerConf);
+
+ startServersWithRandomPorts();
}
@Override
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
index fbaabf703..f0420b4af 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
@@ -31,6 +31,7 @@ import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.common.rpc.ServerType;
@@ -44,20 +45,31 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class ShuffleUnregisterWithLocalfileTest extends
SparkIntegrationTestBase {
@BeforeAll
- public static void setupServers() throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ public static void setupServers(@TempDir File tmpDir) throws Exception {
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
Map<String, String> dynamicConf = Maps.newHashMap();
dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.LOCALFILE.name());
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
- ShuffleServerConf grpcShuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
+ storeCoordinatorConf(coordinatorConf);
+
+ ShuffleServerConf grpcShuffleServerConf =
+ shuffleServerConfWithoutPort(0, tmpDir, ServerType.GRPC);
grpcShuffleServerConf.setString("rss.storage.type",
StorageType.LOCALFILE.name());
- ShuffleServerConf nettyShuffleServerConf =
getShuffleServerConf(ServerType.GRPC_NETTY);
+ grpcShuffleServerConf.setString(
+ RssBaseConf.RSS_STORAGE_BASE_PATH.key(),
+ grpcShuffleServerConf.get(RssBaseConf.RSS_STORAGE_BASE_PATH).get(0));
+ storeShuffleServerConf(grpcShuffleServerConf);
+
+ ShuffleServerConf nettyShuffleServerConf =
+ shuffleServerConfWithoutPort(1, tmpDir, ServerType.GRPC_NETTY);
nettyShuffleServerConf.setString("rss.storage.type",
StorageType.LOCALFILE.name());
- createShuffleServer(grpcShuffleServerConf);
- createShuffleServer(nettyShuffleServerConf);
- startServers();
+ nettyShuffleServerConf.setString(
+ RssBaseConf.RSS_STORAGE_BASE_PATH.key(),
+ nettyShuffleServerConf.get(RssBaseConf.RSS_STORAGE_BASE_PATH).get(0));
+ storeShuffleServerConf(nettyShuffleServerConf);
+
+ startServersWithRandomPorts();
}
@Override
@@ -94,7 +106,6 @@ public class ShuffleUnregisterWithLocalfileTest extends
SparkIntegrationTestBase
.get(RssBaseConf.RSS_STORAGE_BASE_PATH)
.get(0);
String appPath = new File(path).listFiles()[0].getAbsolutePath();
-
String shufflePath = appPath + "/0";
assertTrue(new File(shufflePath).exists());
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleTestBase.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleTestBase.java
index 08084ecdb..90e126b30 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleTestBase.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SimpleTestBase.java
@@ -17,33 +17,34 @@
package org.apache.uniffle.test;
+import java.io.File;
import java.util.Map;
import com.google.common.collect.Maps;
import org.apache.spark.SparkConf;
import org.apache.spark.shuffle.RssSparkConfig;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
public abstract class SimpleTestBase extends SparkIntegrationTestBase {
@BeforeAll
- public static void setupServers() throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ public static void setupServers(@TempDir File tmpDir) throws Exception {
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
Map<String, String> dynamicConf = Maps.newHashMap();
dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
dynamicConf.put(
RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE_HDFS.name());
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
- ShuffleServerConf grpcShuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
- createShuffleServer(grpcShuffleServerConf);
- ShuffleServerConf nettyShuffleServerConf =
getShuffleServerConf(ServerType.GRPC_NETTY);
- createShuffleServer(nettyShuffleServerConf);
- startServers();
+ storeCoordinatorConf(coordinatorConf);
+
+ storeShuffleServerConf(shuffleServerConfWithoutPort(0, tmpDir,
ServerType.GRPC));
+ storeShuffleServerConf(shuffleServerConfWithoutPort(1, tmpDir,
ServerType.GRPC_NETTY));
+
+ startServersWithRandomPorts();
}
@Override
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalForMultiPartLocalStorageManagerTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalForMultiPartLocalStorageManagerTest.java
index bebee47ea..f965a1bb8 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalForMultiPartLocalStorageManagerTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalForMultiPartLocalStorageManagerTest.java
@@ -57,65 +57,40 @@ import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
public class SparkClientWithLocalForMultiPartLocalStorageManagerTest extends
ShuffleReadWriteBase {
-
- private static File GRPC_DATA_DIR1;
- private static File GRPC_DATA_DIR2;
- private static File NETTY_DATA_DIR1;
- private static File NETTY_DATA_DIR2;
private ShuffleServerGrpcClient grpcShuffleServerClient;
private ShuffleServerGrpcNettyClient nettyShuffleServerClient;
- private static ShuffleServerConf grpcShuffleServerConfig;
- private static ShuffleServerConf nettyShuffleServerConfig;
@BeforeAll
public static void setupServers(@TempDir File tmpDir) throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
- createCoordinatorServer(coordinatorConf);
-
- GRPC_DATA_DIR1 = new File(tmpDir, "data1");
- GRPC_DATA_DIR2 = new File(tmpDir, "data2");
- String grpcBasePath = GRPC_DATA_DIR1.getAbsolutePath() + "," +
GRPC_DATA_DIR2.getAbsolutePath();
- ShuffleServerConf grpcShuffleServerConf =
buildShuffleServerConf(grpcBasePath, ServerType.GRPC);
- grpcShuffleServerConf.set(
- ShuffleServerConf.SERVER_LOCAL_STORAGE_MANAGER_CLASS,
- MultiPartLocalStorageManager.class.getName());
- createShuffleServer(grpcShuffleServerConf);
-
- NETTY_DATA_DIR1 = new File(tmpDir, "netty_data1");
- NETTY_DATA_DIR2 = new File(tmpDir, "netty_data2");
- String nettyBasePath =
- NETTY_DATA_DIR1.getAbsolutePath() + "," +
NETTY_DATA_DIR2.getAbsolutePath();
- ShuffleServerConf nettyShuffleServerConf =
- buildShuffleServerConf(nettyBasePath, ServerType.GRPC_NETTY);
- nettyShuffleServerConf.set(
- ShuffleServerConf.SERVER_LOCAL_STORAGE_MANAGER_CLASS,
- MultiPartLocalStorageManager.class.getName());
- createShuffleServer(nettyShuffleServerConf);
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
+ storeCoordinatorConf(coordinatorConf);
- startServers();
+ storeShuffleServerConf(buildShuffleServerConf(0, tmpDir, ServerType.GRPC));
+ storeShuffleServerConf(buildShuffleServerConf(1, tmpDir,
ServerType.GRPC_NETTY));
- grpcShuffleServerConfig = grpcShuffleServerConf;
- nettyShuffleServerConfig = nettyShuffleServerConf;
+ startServersWithRandomPorts();
}
- private static ShuffleServerConf buildShuffleServerConf(String basePath,
ServerType serverType)
- throws Exception {
- ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType);
+ private static ShuffleServerConf buildShuffleServerConf(
+ int subDirIndex, File tmpDir, ServerType serverType) {
+ ShuffleServerConf shuffleServerConf =
+ shuffleServerConfWithoutPort(subDirIndex, tmpDir, serverType);
shuffleServerConf.setString("rss.storage.type",
StorageType.LOCALFILE.name());
- shuffleServerConf.setString("rss.storage.basePath", basePath);
+ shuffleServerConf.set(
+ ShuffleServerConf.SERVER_LOCAL_STORAGE_MANAGER_CLASS,
+ MultiPartLocalStorageManager.class.getName());
return shuffleServerConf;
}
@BeforeEach
public void createClient() throws Exception {
grpcShuffleServerClient =
- new ShuffleServerGrpcClient(
- LOCALHOST,
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
+ new ShuffleServerGrpcClient(LOCALHOST,
grpcShuffleServers.get(0).getGrpcPort());
nettyShuffleServerClient =
new ShuffleServerGrpcNettyClient(
LOCALHOST,
-
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
-
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
+ nettyShuffleServers.get(0).getGrpcPort(),
+ nettyShuffleServers.get(0).getNettyPort());
}
@AfterEach
@@ -130,12 +105,11 @@ public class
SparkClientWithLocalForMultiPartLocalStorageManagerTest extends Shu
? Lists.newArrayList(
new ShuffleServerInfo(
LOCALHOST,
-
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
-
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)))
+ nettyShuffleServers.get(0).getGrpcPort(),
+ nettyShuffleServers.get(0).getNettyPort()))
: Lists.newArrayList(
new ShuffleServerInfo(
- LOCALHOST,
-
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)));
+ LOCALHOST, LOCALHOST,
grpcShuffleServers.get(0).getGrpcPort()));
return ShuffleClientFactory.newReadBuilder()
.clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.LOCALFILE.name())
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
index 11e60540e..6cd4a3141 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkClientWithLocalTest.java
@@ -51,7 +51,6 @@ import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.util.BlockId;
import org.apache.uniffle.common.util.BlockIdLayout;
import org.apache.uniffle.common.util.RssUtils;
-import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
@@ -68,19 +67,16 @@ public class SparkClientWithLocalTest extends
ShuffleReadWriteBase {
private static File NETTY_DATA_DIR2;
private ShuffleServerGrpcClient grpcShuffleServerClient;
private ShuffleServerGrpcNettyClient nettyShuffleServerClient;
- private static ShuffleServerConf grpcShuffleServerConfig;
- private static ShuffleServerConf nettyShuffleServerConfig;
@BeforeAll
public static void setupServers(@TempDir File tmpDir) throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
- createCoordinatorServer(coordinatorConf);
+ storeCoordinatorConf(coordinatorConfWithoutPort());
GRPC_DATA_DIR1 = new File(tmpDir, "data1");
GRPC_DATA_DIR2 = new File(tmpDir, "data2");
String grpcBasePath = GRPC_DATA_DIR1.getAbsolutePath() + "," +
GRPC_DATA_DIR2.getAbsolutePath();
ShuffleServerConf grpcShuffleServerConf =
buildShuffleServerConf(grpcBasePath, ServerType.GRPC);
- createShuffleServer(grpcShuffleServerConf);
+ storeShuffleServerConf(grpcShuffleServerConf);
NETTY_DATA_DIR1 = new File(tmpDir, "netty_data1");
NETTY_DATA_DIR2 = new File(tmpDir, "netty_data2");
@@ -88,32 +84,27 @@ public class SparkClientWithLocalTest extends
ShuffleReadWriteBase {
NETTY_DATA_DIR1.getAbsolutePath() + "," +
NETTY_DATA_DIR2.getAbsolutePath();
ShuffleServerConf nettyShuffleServerConf =
buildShuffleServerConf(nettyBasePath, ServerType.GRPC_NETTY);
- createShuffleServer(nettyShuffleServerConf);
+ storeShuffleServerConf(nettyShuffleServerConf);
- startServers();
-
- grpcShuffleServerConfig = grpcShuffleServerConf;
- nettyShuffleServerConfig = nettyShuffleServerConf;
+ startServersWithRandomPorts();
}
- private static ShuffleServerConf buildShuffleServerConf(String basePath,
ServerType serverType)
- throws Exception {
- ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType);
+ private static ShuffleServerConf buildShuffleServerConf(String basePath,
ServerType serverType) {
+ ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0,
null, serverType);
shuffleServerConf.setString("rss.storage.type",
StorageType.LOCALFILE.name());
shuffleServerConf.setString("rss.storage.basePath", basePath);
return shuffleServerConf;
}
@BeforeEach
- public void createClient() throws Exception {
+ public void createClient() {
grpcShuffleServerClient =
- new ShuffleServerGrpcClient(
- LOCALHOST,
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT));
+ new ShuffleServerGrpcClient(LOCALHOST,
grpcShuffleServers.get(0).getGrpcPort());
nettyShuffleServerClient =
new ShuffleServerGrpcNettyClient(
LOCALHOST,
-
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
-
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT));
+ nettyShuffleServers.get(0).getGrpcPort(),
+ nettyShuffleServers.get(0).getNettyPort());
}
@AfterEach
@@ -128,12 +119,10 @@ public class SparkClientWithLocalTest extends
ShuffleReadWriteBase {
? Lists.newArrayList(
new ShuffleServerInfo(
LOCALHOST,
-
nettyShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT),
-
nettyShuffleServerConfig.getInteger(ShuffleServerConf.NETTY_SERVER_PORT)))
+ nettyShuffleServers.get(0).getGrpcPort(),
+ nettyShuffleServers.get(0).getNettyPort()))
: Lists.newArrayList(
- new ShuffleServerInfo(
- LOCALHOST,
-
grpcShuffleServerConfig.getInteger(ShuffleServerConf.RPC_SERVER_PORT)));
+ new ShuffleServerInfo(LOCALHOST,
grpcShuffleServers.get(0).getGrpcPort()));
return ShuffleClientFactory.newReadBuilder()
.clientType(isNettyMode ? ClientType.GRPC_NETTY : ClientType.GRPC)
.storageType(StorageType.LOCALFILE.name())
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
index 9c5926370..64956d5e5 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
@@ -122,7 +122,7 @@ public abstract class SparkIntegrationTestBase extends
IntegrationTestBase {
sparkConf.set(RssSparkConfig.RSS_CLIENT_READ_BUFFER_SIZE.key(), "2m");
sparkConf.set(RssSparkConfig.RSS_WRITER_SERIALIZER_BUFFER_SIZE.key(),
"128k");
sparkConf.set(RssSparkConfig.RSS_WRITER_BUFFER_SEGMENT_SIZE.key(), "256k");
- sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(),
COORDINATOR_QUORUM);
+ sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), getQuorum());
sparkConf.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(),
"30000");
sparkConf.set(RssSparkConfig.RSS_CLIENT_RETRY_MAX.key(), "10");
sparkConf.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.key(),
"1000");
@@ -144,7 +144,7 @@ public abstract class SparkIntegrationTestBase extends
IntegrationTestBase {
sparkConf.set(RssSparkConfig.RSS_CLIENT_READ_BUFFER_SIZE.key(), "2m");
sparkConf.set(RssSparkConfig.RSS_WRITER_SERIALIZER_BUFFER_SIZE.key(),
"128k");
sparkConf.set(RssSparkConfig.RSS_WRITER_BUFFER_SEGMENT_SIZE.key(), "256k");
- sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(),
COORDINATOR_QUORUM);
+ sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), getQuorum());
sparkConf.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(),
"30000");
sparkConf.set(RssSparkConfig.RSS_CLIENT_RETRY_MAX.key(), "10");
sparkConf.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.key(),
"1000");
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallbackTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallbackTest.java
index dda999b37..e1431d5eb 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallbackTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerFallbackTest.java
@@ -18,7 +18,6 @@
package org.apache.uniffle.test;
import java.io.File;
-import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@@ -45,7 +44,7 @@ public class SparkSQLWithDelegationShuffleManagerFallbackTest
extends SparkSQLTe
.getClassLoader()
.getResource("candidates"))
.getFile();
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
coordinatorConf.setString(
CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker,"
@@ -56,19 +55,17 @@ public class
SparkSQLWithDelegationShuffleManagerFallbackTest extends SparkSQLTe
Map<String, String> dynamicConf = Maps.newHashMap();
dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE.name());
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
- ShuffleServerConf shuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
+ storeCoordinatorConf(coordinatorConf);
+
+ ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0,
tmpDir, ServerType.GRPC);
shuffleServerConf.set(ShuffleServerConf.SERVER_HEARTBEAT_INTERVAL, 1000L);
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT,
4000L);
- File dataDir1 = new File(tmpDir, "data1");
- File dataDir2 = new File(tmpDir, "data2");
- String basePath = dataDir1.getAbsolutePath() + "," +
dataDir2.getAbsolutePath();
shuffleServerConf.setString(
ShuffleServerConf.RSS_STORAGE_TYPE.key(),
StorageType.LOCALFILE.name());
- shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList(basePath));
shuffleServerConf.setString(ShuffleServerConf.SERVER_BUFFER_CAPACITY.key(),
"512mb");
- createShuffleServer(shuffleServerConf);
- startServers();
+ storeShuffleServerConf(shuffleServerConf);
+
+ startServersWithRandomPorts();
Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.SECONDS);
}
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerTest.java
index 5b8d9f15e..86a67014a 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithDelegationShuffleManagerTest.java
@@ -18,7 +18,6 @@
package org.apache.uniffle.test;
import java.io.File;
-import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
@@ -45,7 +44,7 @@ public class SparkSQLWithDelegationShuffleManagerTest extends
SparkSQLTest {
.getClassLoader()
.getResource("candidates"))
.getFile();
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
coordinatorConf.setString(
CoordinatorConf.COORDINATOR_ACCESS_CHECKERS.key(),
"org.apache.uniffle.coordinator.access.checker.AccessCandidatesChecker,"
@@ -56,31 +55,21 @@ public class SparkSQLWithDelegationShuffleManagerTest
extends SparkSQLTest {
Map<String, String> dynamicConf = Maps.newHashMap();
dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE.name());
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
+ storeCoordinatorConf(coordinatorConf);
- File dataDir1 = new File(tmpDir, "data1");
- File dataDir2 = new File(tmpDir, "data2");
- String grpcBasePath = dataDir1.getAbsolutePath() + "," +
dataDir2.getAbsolutePath();
- ShuffleServerConf grpcShuffleServerConf =
buildShuffleServerConf(ServerType.GRPC, grpcBasePath);
- createShuffleServer(grpcShuffleServerConf);
+ storeShuffleServerConf(buildShuffleServerConf(0, tmpDir, ServerType.GRPC));
+ storeShuffleServerConf(buildShuffleServerConf(1, tmpDir,
ServerType.GRPC_NETTY));
- File dataDir3 = new File(tmpDir, "data3");
- File dataDir4 = new File(tmpDir, "data4");
- String nettyBasePath = dataDir3.getAbsolutePath() + "," +
dataDir4.getAbsolutePath();
- ShuffleServerConf nettyShuffleServerConf =
- buildShuffleServerConf(ServerType.GRPC_NETTY, nettyBasePath);
- createShuffleServer(nettyShuffleServerConf);
-
- startServers();
+ startServersWithRandomPorts();
Uninterruptibles.sleepUninterruptibly(1L, TimeUnit.SECONDS);
}
- private static ShuffleServerConf buildShuffleServerConf(ServerType
serverType, String basePath)
- throws Exception {
- ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType);
+ private static ShuffleServerConf buildShuffleServerConf(
+ int subDirIndex, File tmpDir, ServerType serverType) {
+ ShuffleServerConf shuffleServerConf =
+ shuffleServerConfWithoutPort(subDirIndex, tmpDir, serverType);
shuffleServerConf.set(ShuffleServerConf.SERVER_HEARTBEAT_INTERVAL, 1000L);
shuffleServerConf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT,
4000L);
- shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList(basePath));
shuffleServerConf.setString(ShuffleServerConf.SERVER_BUFFER_CAPACITY.key(),
"512mb");
return shuffleServerConf;
}
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithMemoryLocalTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithMemoryLocalTest.java
index 927b7d5a1..fdcc430bc 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithMemoryLocalTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkSQLWithMemoryLocalTest.java
@@ -39,27 +39,24 @@ public class SparkSQLWithMemoryLocalTest extends
SparkSQLTest {
@BeforeAll
public static void setupServers(@TempDir File tmpDir) throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
coordinatorConf.setLong("rss.coordinator.app.expired", 5000);
Map<String, String> dynamicConf = Maps.newHashMap();
dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE.name());
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
+ storeCoordinatorConf(coordinatorConf);
File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
- ShuffleServerConf grpcShuffleServerConf =
buildShuffleServerConf(ServerType.GRPC);
- createShuffleServer(grpcShuffleServerConf);
+ storeShuffleServerConf(buildShuffleServerConf(ServerType.GRPC));
+ storeShuffleServerConf(buildShuffleServerConf(ServerType.GRPC_NETTY));
- ShuffleServerConf nettyShuffleServerConf =
buildShuffleServerConf(ServerType.GRPC_NETTY);
- createShuffleServer(nettyShuffleServerConf);
-
- startServers();
+ startServersWithRandomPorts();
}
private static ShuffleServerConf buildShuffleServerConf(ServerType
serverType) throws Exception {
- ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType);
+ ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0,
null, serverType);
shuffleServerConf.setLong("rss.server.heartbeat.interval", 5000);
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 4000);
shuffleServerConf.setString("rss.storage.basePath", basePath);
diff --git
a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
index 59e341569..c112f2cda 100644
---
a/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
+++
b/integration-test/spark2/src/test/java/org/apache/uniffle/test/GetReaderTest.java
@@ -17,6 +17,7 @@
package org.apache.uniffle.test;
+import java.io.File;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.Collections;
@@ -39,12 +40,12 @@ import org.apache.spark.shuffle.RssSparkConfig;
import org.apache.spark.shuffle.reader.RssShuffleReader;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -54,11 +55,10 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class GetReaderTest extends IntegrationTestBase {
@Test
- public void test() throws Exception {
+ public void test(@TempDir File tmpDir) throws Exception {
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.shuffle.manager",
"org.apache.spark.shuffle.RssShuffleManager");
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
- sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(),
COORDINATOR_QUORUM);
sparkConf.setMaster("local[4]");
final String remoteStorage1 = "hdfs://h1/p1";
final String remoteStorage2 = "hdfs://h2/p2";
@@ -79,7 +79,7 @@ public class GetReaderTest extends IntegrationTestBase {
printWriter.flush();
printWriter.close();
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
coordinatorConf.setBoolean("rss.coordinator.dynamicClientConf.enabled",
true);
coordinatorConf.setString("rss.coordinator.dynamicClientConf.path",
cfgFile);
coordinatorConf.setInteger("rss.coordinator.dynamicClientConf.updateIntervalSec",
1);
@@ -87,13 +87,13 @@ public class GetReaderTest extends IntegrationTestBase {
coordinatorConf.setInteger("rss.coordinator.access.loadChecker.serverNum.threshold",
1);
coordinatorConf.setLong("rss.coordinator.remote.storage.schedule.time",
200);
coordinatorConf.setInteger("rss.coordinator.remote.storage.schedule.access.times",
1);
- createCoordinatorServer(coordinatorConf);
+ storeCoordinatorConf(coordinatorConf);
- ShuffleServerConf shuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
- createShuffleServer(shuffleServerConf);
- startServers();
+ storeShuffleServerConf(shuffleServerConfWithoutPort(0, tmpDir,
ServerType.GRPC));
+ startServersWithRandomPorts();
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+ sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), getQuorum());
SparkSession sparkSession =
SparkSession.builder().config(sparkConf).getOrCreate();
JavaSparkContext jsc1 = new JavaSparkContext(sparkSession.sparkContext());
JavaPairRDD<String, Tuple2<Integer, Integer>> javaPairRDD1 =
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQERepartitionTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQERepartitionTest.java
index 5603dbfb3..a78364af2 100644
---
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQERepartitionTest.java
+++
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQERepartitionTest.java
@@ -36,7 +36,6 @@ import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -45,18 +44,17 @@ public class AQERepartitionTest extends
SparkIntegrationTestBase {
@BeforeAll
public static void setupServers() throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
Map<String, String> dynamicConf = Maps.newHashMap();
dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
dynamicConf.put(
RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE_HDFS.name());
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
- ShuffleServerConf grpcShuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
- createShuffleServer(grpcShuffleServerConf);
- ShuffleServerConf nettyShuffleServerConf =
getShuffleServerConf(ServerType.GRPC_NETTY);
- createShuffleServer(nettyShuffleServerConf);
- startServers();
+ storeCoordinatorConf(coordinatorConf);
+
+ storeShuffleServerConf(shuffleServerConfWithoutPort(0, null,
ServerType.GRPC));
+ storeShuffleServerConf(shuffleServerConfWithoutPort(1, null,
ServerType.GRPC_NETTY));
+ startServersWithRandomPorts();
}
@Override
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java
index 09860fccd..78e936eb0 100644
---
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java
+++
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/AQESkewedJoinTest.java
@@ -17,6 +17,7 @@
package org.apache.uniffle.test;
+import java.io.File;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
@@ -34,10 +35,9 @@ import org.apache.spark.sql.functions;
import org.apache.spark.sql.internal.SQLConf;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import org.apache.uniffle.common.rpc.ServerType;
-import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -45,14 +45,13 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class AQESkewedJoinTest extends SparkIntegrationTestBase {
@BeforeAll
- public static void setupServers() throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
- createCoordinatorServer(coordinatorConf);
- ShuffleServerConf grpcShuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
- createShuffleServer(grpcShuffleServerConf);
- ShuffleServerConf nettyShuffleServerConf =
getShuffleServerConf(ServerType.GRPC_NETTY);
- createShuffleServer(nettyShuffleServerConf);
- startServers();
+ public static void setupServers(@TempDir File tmpDir) throws Exception {
+ storeCoordinatorConf(coordinatorConfWithoutPort());
+
+ storeShuffleServerConf(shuffleServerConfWithoutPort(0, tmpDir,
ServerType.GRPC));
+ storeShuffleServerConf(shuffleServerConfWithoutPort(1, tmpDir,
ServerType.GRPC_NETTY));
+
+ startServersWithRandomPorts();
}
@Override
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java
index 24b9d70b5..eb20effa6 100644
---
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java
+++
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/ContinuousSelectPartitionStrategyTest.java
@@ -45,7 +45,6 @@ import
org.apache.uniffle.coordinator.strategy.assignment.AbstractAssignmentStra
import org.apache.uniffle.server.MockedGrpcServer;
import org.apache.uniffle.server.MockedShuffleServerGrpcService;
import org.apache.uniffle.server.ShuffleServer;
-import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
import static
org.apache.spark.shuffle.RssSparkConfig.RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED;
@@ -57,11 +56,9 @@ public class ContinuousSelectPartitionStrategyTest extends
SparkIntegrationTestB
private static final int replicateWrite = 3;
private static final int replicateRead = 2;
- static @TempDir File tempDir;
-
@BeforeAll
- public static void setupServers() throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ public static void setupServers(@TempDir File tmpDir) throws Exception {
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
Map<String, String> dynamicConf = Maps.newHashMap();
dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
dynamicConf.put(
@@ -71,47 +68,21 @@ public class ContinuousSelectPartitionStrategyTest extends
SparkIntegrationTestB
CoordinatorConf.COORDINATOR_SELECT_PARTITION_STRATEGY,
AbstractAssignmentStrategy.SelectPartitionStrategyName.CONTINUOUS);
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
+ storeCoordinatorConf(coordinatorConf);
// Create multi shuffle servers
- createShuffleServers();
- startServers();
- }
+ createShuffleServers(tmpDir);
+ startServersWithRandomPorts();
- private static void createShuffleServers() throws Exception {
- for (int i = 0; i < 3; i++) {
- // Copy from IntegrationTestBase#getShuffleServerConf
- ShuffleServerConf grpcServerConf = buildShuffleServerConf(i,
ServerType.GRPC);
- createMockedShuffleServer(grpcServerConf);
- ShuffleServerConf nettyServerConf = buildShuffleServerConf(i,
ServerType.GRPC_NETTY);
- createMockedShuffleServer(nettyServerConf);
- }
enableRecordGetShuffleResult();
}
- private static ShuffleServerConf buildShuffleServerConf(int i, ServerType
serverType) {
- ShuffleServerConf serverConf = new ShuffleServerConf();
- serverConf.setInteger("rss.rpc.server.port",
IntegrationTestBase.getNextRpcServerPort());
- serverConf.setString("rss.storage.type",
StorageType.MEMORY_LOCALFILE_HDFS.name());
- serverConf.setString("rss.storage.basePath", tempDir.getAbsolutePath());
- serverConf.setString("rss.server.buffer.capacity",
String.valueOf(671088640 - i));
- serverConf.setString("rss.server.memory.shuffle.highWaterMark", "50.0");
- serverConf.setString("rss.server.memory.shuffle.lowWaterMark", "0.0");
- serverConf.setString("rss.server.read.buffer.capacity", "335544320");
- serverConf.setString("rss.coordinator.quorum", COORDINATOR_QUORUM);
- serverConf.setString("rss.server.heartbeat.delay", "1000");
- serverConf.setString("rss.server.heartbeat.interval", "1000");
- serverConf.setInteger("rss.jetty.http.port",
IntegrationTestBase.getNextJettyServerPort());
- serverConf.setInteger("rss.jetty.corePool.size", 64);
- serverConf.setInteger("rss.rpc.executor.size", 10);
- serverConf.setString("rss.server.hadoop.dfs.replication", "2");
- serverConf.setLong("rss.server.disk.capacity", 10L * 1024L * 1024L *
1024L);
- serverConf.setBoolean("rss.server.health.check.enable", false);
- serverConf.set(ShuffleServerConf.RPC_SERVER_TYPE, serverType);
- if (serverType == ServerType.GRPC_NETTY) {
- serverConf.setInteger(
- ShuffleServerConf.NETTY_SERVER_PORT,
IntegrationTestBase.getNextNettyServerPort());
+ private static void createShuffleServers(File tmpDir) {
+ int index = 0;
+ for (int i = 0; i < 3; i++) {
+ storeMockShuffleServerConf(shuffleServerConfWithoutPort(index++, tmpDir,
ServerType.GRPC));
+ storeMockShuffleServerConf(
+ shuffleServerConfWithoutPort(index++, tmpDir,
ServerType.GRPC_NETTY));
}
- return serverConf;
}
private static void enableRecordGetShuffleResult() {
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
index 986416e84..bb8caba2d 100644
---
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
+++
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetReaderTest.java
@@ -57,7 +57,6 @@ import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -73,7 +72,6 @@ public class GetReaderTest extends IntegrationTestBase {
sparkConf.set(
"spark.shuffle.sort.io.plugin.class",
"org.apache.spark.shuffle.RssShuffleDataIo");
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
- sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(),
COORDINATOR_QUORUM);
sparkConf.setMaster("local[4]");
final String remoteStorage1 = "hdfs://h1/p1";
final String remoteStorage2 = "hdfs://h2/p2";
@@ -92,7 +90,7 @@ public class GetReaderTest extends IntegrationTestBase {
printWriter.flush();
printWriter.close();
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
coordinatorConf.setBoolean("rss.coordinator.dynamicClientConf.enabled",
true);
coordinatorConf.setString("rss.coordinator.dynamicClientConf.path",
cfgFile);
coordinatorConf.setInteger("rss.coordinator.dynamicClientConf.updateIntervalSec",
1);
@@ -100,12 +98,12 @@ public class GetReaderTest extends IntegrationTestBase {
coordinatorConf.setInteger("rss.coordinator.access.loadChecker.serverNum.threshold",
1);
coordinatorConf.setLong("rss.coordinator.remote.storage.schedule.time",
200);
coordinatorConf.setInteger("rss.coordinator.remote.storage.schedule.access.times",
1);
- createCoordinatorServer(coordinatorConf);
+ storeCoordinatorConf(coordinatorConf);
- ShuffleServerConf shuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
- createShuffleServer(shuffleServerConf);
- startServers();
+ storeShuffleServerConf(shuffleServerConfWithoutPort(0, null,
ServerType.GRPC));
+ startServersWithRandomPorts();
Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+ sparkConf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), getQuorum());
SparkSession sparkSession =
SparkSession.builder().config(sparkConf).getOrCreate();
JavaSparkContext jsc1 = new JavaSparkContext(sparkSession.sparkContext());
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
index cd5510e48..be7200273 100644
---
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
+++
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java
@@ -53,7 +53,6 @@ import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.MockedGrpcServer;
import org.apache.uniffle.server.MockedShuffleServerGrpcService;
import org.apache.uniffle.server.ShuffleServer;
-import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
import static
org.apache.spark.shuffle.RssSparkConfig.RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED;
@@ -65,57 +64,29 @@ public class GetShuffleReportForMultiPartTest extends
SparkIntegrationTestBase {
private static final int replicateWrite = 3;
private static final int replicateRead = 2;
- static @TempDir File tempDir;
-
@BeforeAll
- public static void setupServers() throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ public static void setupServers(@TempDir File tmpdir) throws Exception {
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
Map<String, String> dynamicConf = Maps.newHashMap();
dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
dynamicConf.put(
RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE_HDFS.name());
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
+ storeCoordinatorConf(coordinatorConf);
// Create multi shuffle servers
- createShuffleServers();
- startServers();
- }
+ createShuffleServers(tmpdir);
+ startServersWithRandomPorts();
- private static void createShuffleServers() throws Exception {
- for (int i = 0; i < 4; i++) {
- // Copy from IntegrationTestBase#getShuffleServerConf
- ShuffleServerConf grpcServerConf =
buildShuffleServerConf(ServerType.GRPC);
- createMockedShuffleServer(grpcServerConf);
- ShuffleServerConf nettyServerConf =
buildShuffleServerConf(ServerType.GRPC_NETTY);
- createMockedShuffleServer(nettyServerConf);
- }
enableRecordGetShuffleResult();
}
- private static ShuffleServerConf buildShuffleServerConf(ServerType
serverType) {
- ShuffleServerConf serverConf = new ShuffleServerConf();
- serverConf.setInteger("rss.rpc.server.port",
IntegrationTestBase.getNextRpcServerPort());
- serverConf.setString("rss.storage.type",
StorageType.MEMORY_LOCALFILE_HDFS.name());
- serverConf.setString("rss.storage.basePath", tempDir.getAbsolutePath());
- serverConf.setString("rss.server.buffer.capacity", "671088640");
- serverConf.setString("rss.server.memory.shuffle.highWaterMark", "50.0");
- serverConf.setString("rss.server.memory.shuffle.lowWaterMark", "0.0");
- serverConf.setString("rss.server.read.buffer.capacity", "335544320");
- serverConf.setString("rss.coordinator.quorum", COORDINATOR_QUORUM);
- serverConf.setString("rss.server.heartbeat.delay", "1000");
- serverConf.setString("rss.server.heartbeat.interval", "1000");
- serverConf.setInteger("rss.jetty.http.port",
IntegrationTestBase.getNextJettyServerPort());
- serverConf.setInteger("rss.jetty.corePool.size", 64);
- serverConf.setInteger("rss.rpc.executor.size", 10);
- serverConf.setString("rss.server.hadoop.dfs.replication", "2");
- serverConf.setLong("rss.server.disk.capacity", 10L * 1024L * 1024L *
1024L);
- serverConf.setBoolean("rss.server.health.check.enable", false);
- serverConf.set(ShuffleServerConf.RPC_SERVER_TYPE, serverType);
- if (serverType == ServerType.GRPC_NETTY) {
- serverConf.setInteger(
- ShuffleServerConf.NETTY_SERVER_PORT,
IntegrationTestBase.getNextNettyServerPort());
+ private static void createShuffleServers(File tmpdir) throws Exception {
+ int index = 0;
+ for (int i = 0; i < 4; i++) {
+ storeMockShuffleServerConf(shuffleServerConfWithoutPort(index++, tmpdir,
ServerType.GRPC));
+ storeMockShuffleServerConf(
+ shuffleServerConfWithoutPort(index++, tmpdir,
ServerType.GRPC_NETTY));
}
- return serverConf;
}
private static void enableRecordGetShuffleResult() {
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/MapSideCombineTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/MapSideCombineTest.java
index e2aac274a..dcca62a8d 100644
---
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/MapSideCombineTest.java
+++
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/MapSideCombineTest.java
@@ -36,8 +36,6 @@ import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.StorageType;
import org.apache.uniffle.common.rpc.ServerType;
-import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.test.listener.WriteAndReadMetricsSparkListener;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -46,13 +44,12 @@ public class MapSideCombineTest extends
SparkIntegrationTestBase {
@BeforeAll
public static void setupServers() throws Exception {
- CoordinatorConf coordinatorConf = getCoordinatorConf();
- createCoordinatorServer(coordinatorConf);
- ShuffleServerConf grpcShuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
- createShuffleServer(grpcShuffleServerConf);
- ShuffleServerConf nettyShuffleServerConf =
getShuffleServerConf(ServerType.GRPC_NETTY);
- createShuffleServer(nettyShuffleServerConf);
- startServers();
+ storeCoordinatorConf(coordinatorConfWithoutPort());
+
+ storeShuffleServerConf(shuffleServerConfWithoutPort(0, null,
ServerType.GRPC));
+ storeShuffleServerConf(shuffleServerConfWithoutPort(1, null,
ServerType.GRPC_NETTY));
+
+ startServersWithRandomPorts();
}
@Override
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java
index 4dd2bab8e..c0fbb3a22 100644
---
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java
+++
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignBasicTest.java
@@ -51,31 +51,25 @@ public class PartitionBlockDataReassignBasicTest extends
SparkSQLTest {
LOGGER.info("Setup servers");
// for coordinator
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
coordinatorConf.setLong("rss.coordinator.app.expired", 5000);
Map<String, String> dynamicConf = Maps.newHashMap();
dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE.name());
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
+ storeCoordinatorConf(coordinatorConf);
// for shuffle-server
File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
- ShuffleServerConf grpcShuffleServerConf1 =
buildShuffleServerConf(ServerType.GRPC);
- createShuffleServer(grpcShuffleServerConf1);
-
- ShuffleServerConf grpcShuffleServerConf2 =
buildShuffleServerConf(ServerType.GRPC);
- createShuffleServer(grpcShuffleServerConf2);
-
- ShuffleServerConf grpcShuffleServerConf3 =
buildShuffleServerConf(ServerType.GRPC_NETTY);
- createShuffleServer(grpcShuffleServerConf3);
-
- ShuffleServerConf grpcShuffleServerConf4 =
buildShuffleServerConf(ServerType.GRPC_NETTY);
- createShuffleServer(grpcShuffleServerConf4);
-
- startServers();
+ for (int i = 0; i < 3; i++) {
+ storeShuffleServerConf(buildShuffleServerConf(ServerType.GRPC));
+ }
+ for (int i = 0; i < 2; i++) {
+ storeShuffleServerConf(buildShuffleServerConf(ServerType.GRPC_NETTY));
+ }
+ startServersWithRandomPorts();
// simulate one server without enough buffer for grpc
ShuffleServer grpcServer = grpcShuffleServers.get(0);
@@ -88,9 +82,8 @@ public class PartitionBlockDataReassignBasicTest extends
SparkSQLTest {
bufferManager.setUsedMemory(bufferManager.getCapacity() + 100);
}
- protected static ShuffleServerConf buildShuffleServerConf(ServerType
serverType)
- throws Exception {
- ShuffleServerConf shuffleServerConf = getShuffleServerConf(serverType);
+ protected static ShuffleServerConf buildShuffleServerConf(ServerType
serverType) {
+ ShuffleServerConf shuffleServerConf = shuffleServerConfWithoutPort(0,
null, serverType);
shuffleServerConf.setLong("rss.server.heartbeat.interval", 5000);
shuffleServerConf.setLong("rss.server.app.expired.withoutHeartbeat", 4000);
shuffleServerConf.setString("rss.storage.basePath", basePath);
diff --git
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java
index a01b695e3..1b9d1072d 100644
---
a/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java
+++
b/integration-test/spark3/src/test/java/org/apache/uniffle/test/PartitionBlockDataReassignMultiTimesTest.java
@@ -31,7 +31,6 @@ import org.apache.uniffle.coordinator.CoordinatorConf;
import
org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory;
import org.apache.uniffle.server.MockedGrpcServer;
import org.apache.uniffle.server.ShuffleServer;
-import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.storage.util.StorageType;
@@ -46,7 +45,7 @@ public class PartitionBlockDataReassignMultiTimesTest extends
PartitionBlockData
@BeforeAll
public static void setupServers(@TempDir File tmpDir) throws Exception {
// for coordinator
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
coordinatorConf.setLong("rss.coordinator.app.expired", 5000);
coordinatorConf.set(
COORDINATOR_ASSIGNMENT_STRATEGY,
AssignmentStrategyFactory.StrategyName.BASIC);
@@ -54,31 +53,20 @@ public class PartitionBlockDataReassignMultiTimesTest
extends PartitionBlockData
Map<String, String> dynamicConf = Maps.newHashMap();
dynamicConf.put(RssSparkConfig.RSS_STORAGE_TYPE.key(),
StorageType.MEMORY_LOCALFILE.name());
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
+ storeCoordinatorConf(coordinatorConf);
// for shuffle-server
File dataDir1 = new File(tmpDir, "data1");
File dataDir2 = new File(tmpDir, "data2");
basePath = dataDir1.getAbsolutePath() + "," + dataDir2.getAbsolutePath();
- // grpc server.
- ShuffleServerConf grpcShuffleServerConf1 =
buildShuffleServerConf(ServerType.GRPC);
- createMockedShuffleServer(grpcShuffleServerConf1);
-
- ShuffleServerConf grpcShuffleServerConf2 =
buildShuffleServerConf(ServerType.GRPC);
- createMockedShuffleServer(grpcShuffleServerConf2);
-
- ShuffleServerConf grpcShuffleServerConf3 =
buildShuffleServerConf(ServerType.GRPC);
- createMockedShuffleServer(grpcShuffleServerConf3);
-
- // netty server.
- ShuffleServerConf grpcShuffleServerConf4 =
buildShuffleServerConf(ServerType.GRPC_NETTY);
- createShuffleServer(grpcShuffleServerConf4);
-
- ShuffleServerConf grpcShuffleServerConf5 =
buildShuffleServerConf(ServerType.GRPC_NETTY);
- createShuffleServer(grpcShuffleServerConf5);
-
- startServers();
+ for (int i = 0; i < 3; i++) {
+ storeMockShuffleServerConf(buildShuffleServerConf(ServerType.GRPC));
+ }
+ for (int i = 0; i < 2; i++) {
+ storeShuffleServerConf(buildShuffleServerConf(ServerType.GRPC_NETTY));
+ }
+ startServersWithRandomPorts();
}
@Override
diff --git
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezIntegrationTestBase.java
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezIntegrationTestBase.java
index d077e3608..d46eb9460 100644
---
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezIntegrationTestBase.java
+++
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezIntegrationTestBase.java
@@ -79,23 +79,25 @@ public class TezIntegrationTestBase extends
IntegrationTestBase {
protected static void setupServers(ShuffleServerConf serverConf) throws
Exception {
LOG.info("Starting coordinators and shuffle servers");
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
Map<String, String> dynamicConf = new HashMap<>();
dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
dynamicConf.put(RssTezConfig.RSS_STORAGE_TYPE,
StorageType.MEMORY_LOCALFILE_HDFS.name());
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
- ShuffleServerConf grpcShuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
+ storeCoordinatorConf(coordinatorConf);
+ ShuffleServerConf grpcShuffleServerConf =
+ shuffleServerConfWithoutPort(0, null, ServerType.GRPC);
if (serverConf != null) {
grpcShuffleServerConf.addAll(serverConf);
}
- createShuffleServer(grpcShuffleServerConf);
- ShuffleServerConf nettyShuffleServerConf =
getShuffleServerConf(ServerType.GRPC_NETTY);
+ storeShuffleServerConf(grpcShuffleServerConf);
+ ShuffleServerConf nettyShuffleServerConf =
+ shuffleServerConfWithoutPort(0, null, ServerType.GRPC_NETTY);
if (serverConf != null) {
nettyShuffleServerConf.addAll(serverConf);
}
- createShuffleServer(nettyShuffleServerConf);
- startServers();
+ storeShuffleServerConf(nettyShuffleServerConf);
+ startServersWithRandomPorts();
}
@AfterAll
@@ -182,7 +184,7 @@ public class TezIntegrationTestBase extends
IntegrationTestBase {
appConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx384m");
appConf.setInt(TezConfiguration.TEZ_TASK_RESOURCE_MEMORY_MB, 512);
appConf.set(TezConfiguration.TEZ_TASK_LAUNCH_CMD_OPTS, " -Xmx384m");
- appConf.set(RssTezConfig.RSS_COORDINATOR_QUORUM, COORDINATOR_QUORUM);
+ appConf.set(RssTezConfig.RSS_COORDINATOR_QUORUM, getQuorum());
appConf.set(RssTezConfig.RSS_CLIENT_TYPE, clientType.name());
appConf.set(
TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
diff --git
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezWordCountWithFailuresTest.java
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezWordCountWithFailuresTest.java
index a11b250b6..2e3cac9a6 100644
---
a/integration-test/tez/src/test/java/org/apache/uniffle/test/TezWordCountWithFailuresTest.java
+++
b/integration-test/tez/src/test/java/org/apache/uniffle/test/TezWordCountWithFailuresTest.java
@@ -62,7 +62,6 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.rpc.ServerType;
import org.apache.uniffle.coordinator.CoordinatorConf;
-import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;
import static
org.apache.tez.common.RssTezConfig.RSS_AVOID_RECOMPUTE_SUCCEEDED_TASK;
@@ -97,17 +96,16 @@ public class TezWordCountWithFailuresTest extends
IntegrationTestBase {
miniTezCluster.start();
}
LOG.info("Starting coordinators and shuffle servers");
- CoordinatorConf coordinatorConf = getCoordinatorConf();
+ CoordinatorConf coordinatorConf = coordinatorConfWithoutPort();
Map<String, String> dynamicConf = new HashMap<>();
dynamicConf.put(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_PATH.key(),
HDFS_URI + "rss/test");
dynamicConf.put(RssTezConfig.RSS_STORAGE_TYPE,
StorageType.MEMORY_LOCALFILE_HDFS.name());
addDynamicConf(coordinatorConf, dynamicConf);
- createCoordinatorServer(coordinatorConf);
- ShuffleServerConf grpcShuffleServerConf =
getShuffleServerConf(ServerType.GRPC);
- createShuffleServer(grpcShuffleServerConf);
- ShuffleServerConf nettyShuffleServerConf =
getShuffleServerConf(ServerType.GRPC_NETTY);
- createShuffleServer(nettyShuffleServerConf);
- startServers();
+ storeCoordinatorConf(coordinatorConf);
+
+ storeShuffleServerConf(shuffleServerConfWithoutPort(0, null,
ServerType.GRPC));
+ storeShuffleServerConf(shuffleServerConfWithoutPort(0, null,
ServerType.GRPC_NETTY));
+ startServersWithRandomPorts();
}
@AfterAll
@@ -257,7 +255,7 @@ public class TezWordCountWithFailuresTest extends
IntegrationTestBase {
appConf.setBoolean(TEZ_AM_NODE_BLACKLISTING_ENABLED, true);
appConf.setInt(TEZ_AM_NODE_BLACKLISTING_IGNORE_THRESHOLD, 99);
appConf.setInt(TEZ_AM_MAX_TASK_FAILURES_PER_NODE, maxFailures);
- appConf.set(RssTezConfig.RSS_COORDINATOR_QUORUM, COORDINATOR_QUORUM);
+ appConf.set(RssTezConfig.RSS_COORDINATOR_QUORUM, getQuorum());
appConf.set(RssTezConfig.RSS_CLIENT_TYPE, clientType.name());
appConf.set(
TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,