This is an automated email from the ASF dual-hosted git repository.
xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new a4e56d37 [#554] infer rss base storage conf from env (#555)
a4e56d37 is described below
commit a4e56d37753741e1368156d50e0d0ffe00253b53
Author: advancedxy <[email protected]>
AuthorDate: Wed Feb 8 16:02:20 2023 +0800
[#554] infer rss base storage conf from env (#555)
### What changes were proposed in this pull request?
new method `getConfiguredLocalDirs` is added to retrieve rss base storage
path from env first
### Why are the changes needed?
Fixes #554
### Does this PR introduce _any_ user-facing change?
RSS admin may specify rss base path for shuffle server when deploying
### How was this patch tested?
Added UT.
---
.../org/apache/uniffle/common/config/RssConf.java | 5 +++++
.../org/apache/uniffle/common/util/RssUtils.java | 12 ++++++++++++
.../apache/uniffle/common/util/RssUtilsTest.java | 22 ++++++++++++++++++++++
.../apache/uniffle/server/LocalStorageChecker.java | 3 ++-
.../apache/uniffle/server/ShuffleFlushManager.java | 3 ++-
.../server/storage/LocalStorageManager.java | 2 +-
6 files changed, 44 insertions(+), 3 deletions(-)
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
index 9e1c1f78..f7f832a1 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
@@ -640,4 +640,9 @@ public class RssConf implements Cloneable {
public String toString() {
return this.settings.toString();
}
+
+ public String getEnv(String key) {
+ return System.getenv(key);
+ }
+
}
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 2e2c4394..c45c4fe1 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -31,6 +31,7 @@ import java.net.InetAddress;
import java.net.InterfaceAddress;
import java.net.NetworkInterface;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
@@ -48,11 +49,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
public class RssUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(RssUtils.class);
+ public static final String RSS_LOCAL_DIR_KEY = "RSS_LOCAL_DIRS";
private RssUtils() {
}
@@ -291,4 +295,12 @@ public class RssUtils {
}
return taskIdBitmap;
}
+
+ public static List<String> getConfiguredLocalDirs(RssConf conf) {
+ if (conf.getEnv(RSS_LOCAL_DIR_KEY) != null) {
+ return Arrays.asList(conf.getEnv(RSS_LOCAL_DIR_KEY).split(","));
+ } else {
+ return conf.get(RssBaseConf.RSS_STORAGE_BASE_PATH);
+ }
+ }
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index 4d5e0ca4..759dd09c 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.common.util;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.NetworkInterface;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -34,11 +35,14 @@ import org.junit.jupiter.api.Test;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.config.RssConf;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static
uk.org.webcompere.systemstubs.SystemStubs.withEnvironmentVariable;
@@ -177,6 +181,24 @@ public class RssUtilsTest {
assertEquals(serverToPartitions.get(server4), Sets.newHashSet(2, 4));
}
+ @Test
+ public void testGetConfiguredLocalDirs() throws Exception {
+ RssConf conf = new RssConf();
+ withEnvironmentVariable(RssUtils.RSS_LOCAL_DIR_KEY, "/path/a").execute(()
-> {
+ assertEquals(Collections.singletonList("/path/a"),
RssUtils.getConfiguredLocalDirs(conf));
+ });
+
+ withEnvironmentVariable(RssUtils.RSS_LOCAL_DIR_KEY,
"/path/a,/path/b").execute(() -> {
+ assertEquals(Arrays.asList("/path/a", "/path/b"),
RssUtils.getConfiguredLocalDirs(conf));
+ });
+
+ withEnvironmentVariable(RssUtils.RSS_LOCAL_DIR_KEY, null).execute(() -> {
+ assertNull(RssUtils.getConfiguredLocalDirs(conf));
+ conf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, Arrays.asList("/path/a",
"/path/b"));
+ assertEquals(Arrays.asList("/path/a", "/path/b"),
RssUtils.getConfiguredLocalDirs(conf));
+ });
+ }
+
// Copy from ClientUtils
private Long getBlockId(long partitionId, long taskAttemptId, long
atomicInt) {
return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH +
Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
diff --git
a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
index 89019393..e833441a 100644
--- a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
+++ b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
@@ -31,6 +31,7 @@ import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
@@ -47,7 +48,7 @@ public class LocalStorageChecker extends Checker {
public LocalStorageChecker(ShuffleServerConf conf, List<LocalStorage>
storages) {
super(conf);
- List<String> basePaths = conf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH);
+ List<String> basePaths = RssUtils.getConfiguredLocalDirs(conf);
if (CollectionUtils.isEmpty(basePaths)) {
throw new IllegalArgumentException("The base path cannot be empty");
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 0d690493..7c5b2df6 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShufflePartitionedBlock;
import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.server.storage.MultiStorageManager;
import org.apache.uniffle.server.storage.StorageManager;
@@ -83,7 +84,7 @@ public class ShuffleFlushManager {
this.maxConcurrencyOfSingleOnePartition =
shuffleServerConf.get(ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION);
- storageBasePaths =
shuffleServerConf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH);
+ storageBasePaths = RssUtils.getConfiguredLocalDirs(shuffleServerConf);
pendingEventTimeoutSec =
shuffleServerConf.getLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC);
threadPoolExecutor = createFlushEventExecutor();
startEventProcessor();
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index e8801b7f..6ea3ef97 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -85,7 +85,7 @@ public class LocalStorageManager extends SingleStorageManager
{
@VisibleForTesting
LocalStorageManager(ShuffleServerConf conf) {
super(conf);
- storageBasePaths = conf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH);
+ storageBasePaths = RssUtils.getConfiguredLocalDirs(conf);
if (CollectionUtils.isEmpty(storageBasePaths)) {
throw new IllegalArgumentException("Base path dirs must not be empty");
}