This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 884921b6 [ISSUE-285][Improvement] Only use HDFS and LOCALFILE
storageType in the test (#360)
884921b6 is described below
commit 884921b6d7c5452f6f3548ebc87840071c6be81f
Author: Tingting Tian <[email protected]>
AuthorDate: Tue Nov 29 22:58:06 2022 +0800
[ISSUE-285][Improvement] Only use HDFS and LOCALFILE storageType in the
test (#360)
### What changes were proposed in this pull request?
We add some config options:
1) rss.test.mode.enable, for rss server;
2) mapreduce.rss.test.mode.enable, for mr client;
3) spark.rss.test.mode.enable, for spark client.
When we use HDFS or LOCALFILE storageType in the client or shuffle server,
we should throw an exception if run with test mode.
### Why are the changes needed?
HDFS and LOCALFILE storageType have poor performance, but they are useful
for tests. We don't recommend to use them.
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
Test locally
---
.../src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java | 3 +++
.../org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java | 7 ++++---
.../src/main/java/org/apache/spark/shuffle/RssSparkConfig.java | 5 +++++
.../java/org/apache/spark/shuffle/RssSparkShuffleUtils.java | 4 ++++
.../apache/spark/shuffle/DelegationRssShuffleManagerTest.java | 2 ++
.../org/apache/spark/shuffle/writer/RssShuffleWriterTest.java | 2 ++
.../apache/spark/shuffle/DelegationRssShuffleManagerTest.java | 2 ++
.../org/apache/spark/shuffle/writer/RssShuffleWriterTest.java | 2 ++
.../main/java/org/apache/uniffle/client/util/ClientUtils.java | 8 ++++++++
.../java/org/apache/uniffle/client/util/RssClientConfig.java | 1 +
.../java/org/apache/uniffle/common/config/RssBaseConf.java | 6 ++++++
.../test/java/org/apache/uniffle/test/IntegrationTestBase.java | 1 +
.../uniffle/test/ShuffleServerWithKerberizedHdfsTest.java | 1 +
.../apache/uniffle/test/RepartitionWithLocalFileRssTest.java | 1 +
.../java/org/apache/uniffle/test/SparkIntegrationTestBase.java | 1 +
.../src/main/java/org/apache/uniffle/server/ShuffleServer.java | 10 ++++++++++
.../server/ShuffleFlushManagerOnKerberizedHdfsTest.java | 1 +
.../test/java/org/apache/uniffle/server/ShuffleServerTest.java | 1 +
.../java/org/apache/uniffle/server/ShuffleTaskManagerTest.java | 8 ++++++++
19 files changed, 63 insertions(+), 3 deletions(-)
diff --git
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
index 96bec024..f9feb85f 100644
--- a/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
+++ b/client-mr/src/main/java/org/apache/hadoop/mapreduce/RssMRConfig.java
@@ -184,6 +184,9 @@ public class RssMRConfig {
public static final Set<String> RSS_MANDATORY_CLUSTER_CONF =
ImmutableSet.of(RSS_STORAGE_TYPE, RSS_REMOTE_STORAGE_PATH);
+ //Whether enable test mode for the MR Client
+ public static final String RSS_TEST_MODE_ENABLE = MR_RSS_CONFIG_PREFIX +
RssClientConfig.RSS_TEST_MODE_ENABLE;
+
public static RssConf toRssConf(JobConf jobConf) {
RssConf rssConf = new RssConf();
for (Map.Entry<String, String> entry : jobConf) {
diff --git
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
index 1fb6a96a..9d6c38ec 100644
---
a/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
+++
b/client-mr/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java
@@ -131,9 +131,6 @@ public class RssMRAppMaster extends MRAppMaster {
}
assignmentTags.add(Constants.SHUFFLE_SERVER_VERSION);
- ApplicationAttemptId applicationAttemptId =
RssMRUtils.getApplicationAttemptId();
- String appId = applicationAttemptId.toString();
-
final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
@@ -161,6 +158,10 @@ public class RssMRAppMaster extends MRAppMaster {
}
String storageType = RssMRUtils.getString(extraConf, conf,
RssMRConfig.RSS_STORAGE_TYPE);
+ boolean testMode = RssMRUtils.getBoolean(extraConf, conf,
RssMRConfig.RSS_TEST_MODE_ENABLE, false);
+ ClientUtils.validateTestModeConf(testMode, storageType);
+ ApplicationAttemptId applicationAttemptId =
RssMRUtils.getApplicationAttemptId();
+ String appId = applicationAttemptId.toString();
RemoteStorageInfo defaultRemoteStorage =
new RemoteStorageInfo(conf.get(RssMRConfig.RSS_REMOTE_STORAGE_PATH,
""));
RemoteStorageInfo remoteStorage = ClientUtils.fetchRemoteStorage(
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index f14459ca..9da401d2 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -81,6 +81,11 @@ public class RssSparkConfig {
new ConfigBuilder("spark.rss.test"))
.createWithDefault(false);
+ public static final ConfigEntry<Boolean> RSS_TEST_MODE_ENABLE =
createBooleanBuilder(
+ new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX +
RssClientConfig.RSS_TEST_MODE_ENABLE)
+ .doc("Whether enable test mode for the Spark Client"))
+ .createWithDefault(false);
+
public static final ConfigEntry<String> RSS_REMOTE_STORAGE_PATH =
createStringBuilder(
new ConfigBuilder(SPARK_RSS_CONFIG_PREFIX +
RssClientConfig.RSS_REMOTE_STORAGE_PATH))
.createWithDefault("");
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
index 6b4cc29f..358b0ae1 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkShuffleUtils.java
@@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.CoordinatorClient;
import org.apache.uniffle.client.factory.CoordinatorClientFactory;
+import org.apache.uniffle.client.util.ClientUtils;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.exception.RssException;
@@ -114,6 +115,9 @@ public class RssSparkShuffleUtils {
throw new IllegalArgumentException(msg);
}
+ String storageType = sparkConf.get(RssSparkConfig.RSS_STORAGE_TYPE.key());
+ boolean testMode =
sparkConf.getBoolean(RssSparkConfig.RSS_TEST_MODE_ENABLE.key(), false);
+ ClientUtils.validateTestModeConf(testMode, storageType);
int retryMax = sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_MAX);
long retryIntervalMax =
sparkConf.get(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX);
long sendCheckTimeout =
sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS);
diff --git
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
index 12897489..9295d5b3 100644
---
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
+++
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
@@ -84,6 +84,7 @@ public class DelegationRssShuffleManagerTest {
conf.set(RssSparkConfig.RSS_DYNAMIC_CLIENT_CONF_ENABLED.key(), "false");
conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
+ conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true);
assertCreateSortShuffleManager(conf);
conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
assertCreateRssShuffleManager(conf);
@@ -152,6 +153,7 @@ public class DelegationRssShuffleManagerTest {
conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
+ conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true);
assertCreateRssShuffleManager(conf);
CoordinatorClient mockCoordinatorClient = mock(CoordinatorClient.class);
diff --git
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 0189b79f..25de2e35 100644
---
a/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++
b/client-spark/spark2/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -70,6 +70,7 @@ public class RssShuffleWriterTest {
conf.setAppName("testApp")
.setMaster("local[2]")
.set(RssSparkConfig.RSS_TEST_FLAG.key(), "true")
+ .set(RssSparkConfig.RSS_TEST_MODE_ENABLE.key(), "true")
.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(), "10000")
.set(RssSparkConfig.RSS_CLIENT_RETRY_MAX.key(), "10")
.set(RssSparkConfig.RSS_CLIENT_RETRY_INTERVAL_MAX.key(), "1000")
@@ -135,6 +136,7 @@ public class RssShuffleWriterTest {
SparkConf conf = new SparkConf();
conf.setAppName("testApp").setMaster("local[2]")
.set(RssSparkConfig.RSS_TEST_FLAG.key(), "true")
+ .set(RssSparkConfig.RSS_TEST_MODE_ENABLE.key(), "true")
.set(RssSparkConfig.RSS_WRITER_BUFFER_SIZE.key(), "32")
.set(RssSparkConfig.RSS_WRITER_SERIALIZER_BUFFER_SIZE.key(), "32")
.set(RssSparkConfig.RSS_WRITER_BUFFER_SEGMENT_SIZE.key(), "64")
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
index 222dbc00..d9271476 100644
---
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
+++
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/DelegationRssShuffleManagerTest.java
@@ -86,6 +86,7 @@ public class DelegationRssShuffleManagerTest {
assertCreateSortShuffleManager(conf);
conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
+ conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true);
assertCreateRssShuffleManager(conf);
conf = new SparkConf();
@@ -152,6 +153,7 @@ public class DelegationRssShuffleManagerTest {
conf.set(RssSparkConfig.RSS_ACCESS_ID.key(), "mockId");
conf.set(RssSparkConfig.RSS_COORDINATOR_QUORUM.key(), "m1:8001,m2:8002");
conf.set("spark.rss.storage.type", StorageType.LOCALFILE.name());
+ conf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE, true);
assertCreateRssShuffleManager(conf);
CoordinatorClient mockCoordinatorClient = mock(CoordinatorClient.class);
diff --git
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
index 1e821997..bb92f079 100644
---
a/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
+++
b/client-spark/spark3/src/test/java/org/apache/spark/shuffle/writer/RssShuffleWriterTest.java
@@ -70,6 +70,7 @@ public class RssShuffleWriterTest {
conf.setAppName("testApp")
.setMaster("local[2]")
.set(RssSparkConfig.RSS_TEST_FLAG.key(), "true")
+ .set(RssSparkConfig.RSS_TEST_MODE_ENABLE.key(), "true")
.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_TIMEOUT_MS.key(), "10000")
.set(RssSparkConfig.RSS_CLIENT_RETRY_MAX.key(), "10")
.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.key(), "1000")
@@ -140,6 +141,7 @@ public class RssShuffleWriterTest {
.set(RssSparkConfig.RSS_WRITER_SERIALIZER_BUFFER_SIZE.key(), "32")
.set(RssSparkConfig.RSS_WRITER_BUFFER_SIZE.key(), "32")
.set(RssSparkConfig.RSS_TEST_FLAG.key(), "true")
+ .set(RssSparkConfig.RSS_TEST_MODE_ENABLE.key(), "true")
.set(RssSparkConfig.RSS_WRITER_BUFFER_SEGMENT_SIZE.key(), "64")
.set(RssSparkConfig.RSS_CLIENT_SEND_CHECK_INTERVAL_MS.key(), "1000")
.set(RssSparkConfig.RSS_WRITER_BUFFER_SPILL_SIZE.key(), "128")
diff --git
a/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
b/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
index eb2ee933..803d65bc 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/ClientUtils.java
@@ -114,4 +114,12 @@ public class ClientUtils {
}
}
}
+
+ public static void validateTestModeConf(boolean testMode, String
storageType) {
+ if (!testMode && (StorageType.LOCALFILE.name().equals(storageType)
+ || (StorageType.HDFS.name()).equals(storageType))) {
+ throw new IllegalArgumentException("RSS storage type about LOCALFILE and
HDFS should be used in test mode, "
+ + "because of the poor performance of these two types.");
+ }
+ }
}
diff --git
a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
index fd69221b..5ab3a6fb 100644
--- a/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
+++ b/client/src/main/java/org/apache/uniffle/client/util/RssClientConfig.java
@@ -60,6 +60,7 @@ public class RssClientConfig {
public static final String RSS_CLIENT_READ_BUFFER_SIZE_DEFAULT_VALUE = "14m";
// The tags specified by rss client to determine server assignment.
public static final String RSS_CLIENT_ASSIGNMENT_TAGS =
"rss.client.assignment.tags";
+ public static final String RSS_TEST_MODE_ENABLE = "rss.test.mode.enable";
public static final String RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL =
"rss.client.assignment.retry.interval";
public static final long RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE
= 65000;
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
index dee1d062..ec28417f 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssBaseConf.java
@@ -193,6 +193,12 @@ public class RssBaseConf extends RssConf {
.defaultValue(60L)
.withDescription("The kerberos authentication relogin interval. unit:
sec");
+ public static final ConfigOption<Boolean> RSS_TEST_MODE_ENABLE =
ConfigOptions
+ .key("rss.test.mode.enable")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether enable test mode for the shuffle server.");
+
public boolean loadCommonConf(Map<String, String> properties) {
if (properties == null) {
return false;
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
index ee58ade8..c8867a98 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/IntegrationTestBase.java
@@ -120,6 +120,7 @@ public abstract class IntegrationTestBase extends
HdfsTestBase {
serverConf.setString("rss.server.hadoop.dfs.replication", "2");
serverConf.setLong("rss.server.disk.capacity", 10L * 1024L * 1024L *
1024L);
serverConf.setBoolean("rss.server.health.check.enable", false);
+ serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
return serverConf;
}
diff --git
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java
index 4bce65c3..00bd8a6e 100644
---
a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java
+++
b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleServerWithKerberizedHdfsTest.java
@@ -101,6 +101,7 @@ public class ShuffleServerWithKerberizedHdfsTest extends
KerberizedHdfsBase {
serverConf.setLong("rss.server.disk.capacity", 10L * 1024L * 1024L *
1024L);
serverConf.setBoolean("rss.server.health.check.enable", false);
serverConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE,
StorageType.HDFS.name());
+ serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
return serverConf;
}
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java
index f97f043e..7a0fe5f4 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java
@@ -52,6 +52,7 @@ public class RepartitionWithLocalFileRssTest extends
RepartitionTest {
File dataDir2 = new File(tmpDir, "data2");
String basePath = dataDir1.getAbsolutePath() + "," +
dataDir2.getAbsolutePath();
shuffleServerConf.setString("rss.storage.type",
StorageType.LOCALFILE.name());
+ shuffleServerConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
shuffleServerConf.setString("rss.storage.basePath", basePath);
createShuffleServer(shuffleServerConf);
startServers();
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
index 3ef2dd6e..5347aad9 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java
@@ -107,6 +107,7 @@ public abstract class SparkIntegrationTestBase extends
IntegrationTestBase {
sparkConf.set(RssSparkConfig.RSS_INDEX_READ_LIMIT.key(), "100");
sparkConf.set(RssSparkConfig.RSS_CLIENT_READ_BUFFER_SIZE.key(), "1m");
sparkConf.set(RssSparkConfig.RSS_HEARTBEAT_INTERVAL.key(), "2000");
+ sparkConf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE.key(), "true");
}
protected void verifyTestResult(Map expected, Map actual) {
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index c0e92aa0..10d59f8a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -44,12 +44,15 @@ import org.apache.uniffle.common.web.JettyServer;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.server.storage.StorageManagerFactory;
+import org.apache.uniffle.storage.util.StorageType;
import static
org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_ENABLE;
import static
org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_KEYTAB_FILE;
import static
org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_PRINCIPAL;
import static
org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KERBEROS_RELOGIN_INTERVAL_SEC;
import static
org.apache.uniffle.common.config.RssBaseConf.RSS_SECURITY_HADOOP_KRB5_CONF_FILE;
+import static org.apache.uniffle.common.config.RssBaseConf.RSS_STORAGE_TYPE;
+import static
org.apache.uniffle.common.config.RssBaseConf.RSS_TEST_MODE_ENABLE;
/**
* Server that manages startup/shutdown of a {@code Greeter} server.
@@ -143,6 +146,13 @@ public class ShuffleServer {
}
private void initialization() throws Exception {
+ boolean testMode = shuffleServerConf.getBoolean(RSS_TEST_MODE_ENABLE);
+ String storageType = shuffleServerConf.getString(RSS_STORAGE_TYPE);
+ if (!testMode && (StorageType.LOCALFILE.name().equals(storageType)
+ || (StorageType.HDFS.name()).equals(storageType))) {
+ throw new IllegalArgumentException("RSS storage type about LOCALFILE and
HDFS should be used in test mode, "
+ + "because of the poor performance of these two types.");
+ }
ip = RssUtils.getHostIp();
if (ip == null) {
throw new RuntimeException("Couldn't acquire host Ip");
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
index 69d55fb0..2c730aa8 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerOnKerberizedHdfsTest.java
@@ -66,6 +66,7 @@ public class ShuffleFlushManagerOnKerberizedHdfsTest extends
KerberizedHdfsBase
ShuffleServerMetrics.register();
shuffleServerConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Collections.emptyList());
shuffleServerConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE,
StorageType.HDFS.name());
+ shuffleServerConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
LogManager.getRootLogger().setLevel(Level.INFO);
initHadoopSecurityContext();
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
index bc0639a5..9a9e7c19 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleServerTest.java
@@ -36,6 +36,7 @@ public class ShuffleServerTest {
ShuffleServerConf serverConf = new ShuffleServerConf();
serverConf.setInteger(ShuffleServerConf.RPC_SERVER_PORT, 9527);
serverConf.setString(ShuffleServerConf.RSS_STORAGE_TYPE,
StorageType.LOCALFILE.name());
+ serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
serverConf.setInteger(ShuffleServerConf.JETTY_HTTP_PORT, 9528);
serverConf.setString(ShuffleServerConf.RSS_COORDINATOR_QUORUM,
"localhost:0");
serverConf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList("/tmp/null"));
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 90b67a94..03d1e1b5 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -83,6 +83,7 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
0.0);
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
+ conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
ShuffleServer shuffleServer = new ShuffleServer(conf);
@@ -137,6 +138,7 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
50.0);
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
0.0);
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
+ conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED, 3000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
@@ -267,6 +269,7 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
String confFile = ClassLoader.getSystemResource("server.conf").getFile();
ShuffleServerConf conf = new ShuffleServerConf(confFile);
String storageBasePath = HDFS_URI + "rss/clearTest";
+ conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
@@ -343,6 +346,7 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, "LOCALFILE");
+ conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
java.nio.file.Path path1 =
Files.createTempDirectory("removeShuffleDataWithLocalfileTest");
java.nio.file.Path path2 =
Files.createTempDirectory("removeShuffleDataWithLocalfileTest");
conf.setString(ShuffleServerConf.RSS_STORAGE_BASE_PATH.key(),
@@ -399,6 +403,7 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
0.0);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList(storageBasePath));
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
+ conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
@@ -551,6 +556,7 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
0.0);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList(storageBasePath));
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
+ conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
@@ -618,6 +624,7 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
0.0);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList(storageBasePath));
conf.set(ShuffleServerConf.RSS_STORAGE_TYPE, StorageType.HDFS.name());
+ conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
@@ -655,6 +662,7 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList(tempDir.getAbsolutePath()));
conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE,
StorageType.LOCALFILE.name());
+ conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
// make sure not to check leak shuffle data automatically
conf.setLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL,
600 * 1000L);