This is an automated email from the ASF dual-hosted git repository.

xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new a4e56d37 [#554] infer rss base storage conf from env (#555)
a4e56d37 is described below

commit a4e56d37753741e1368156d50e0d0ffe00253b53
Author: advancedxy <[email protected]>
AuthorDate: Wed Feb 8 16:02:20 2023 +0800

    [#554] infer rss base storage conf from env (#555)
    
    ### What changes were proposed in this pull request?
    new method `getConfiguredLocalDirs` is added to retrieve rss base storage 
path from env first
    
    ### Why are the changes needed?
    Fixes #554
    
    ### Does this PR introduce _any_ user-facing change?
    RSS admin may specify rss base path for shuffle server when deploying
    
    ### How was this patch tested?
    Added UT.
---
 .../org/apache/uniffle/common/config/RssConf.java  |  5 +++++
 .../org/apache/uniffle/common/util/RssUtils.java   | 12 ++++++++++++
 .../apache/uniffle/common/util/RssUtilsTest.java   | 22 ++++++++++++++++++++++
 .../apache/uniffle/server/LocalStorageChecker.java |  3 ++-
 .../apache/uniffle/server/ShuffleFlushManager.java |  3 ++-
 .../server/storage/LocalStorageManager.java        |  2 +-
 6 files changed, 44 insertions(+), 3 deletions(-)

diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
index 9e1c1f78..f7f832a1 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssConf.java
@@ -640,4 +640,9 @@ public class RssConf implements Cloneable {
   public String toString() {
     return this.settings.toString();
   }
+
+  public String getEnv(String key) {
+    return System.getenv(key);
+  }
+
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java 
b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
index 2e2c4394..c45c4fe1 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/RssUtils.java
@@ -31,6 +31,7 @@ import java.net.InetAddress;
 import java.net.InterfaceAddress;
 import java.net.NetworkInterface;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -48,11 +49,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
 
 public class RssUtils {
 
   private static final Logger LOGGER = LoggerFactory.getLogger(RssUtils.class);
+  public static final String RSS_LOCAL_DIR_KEY = "RSS_LOCAL_DIRS";
 
   private RssUtils() {
   }
@@ -291,4 +295,12 @@ public class RssUtils {
     }
     return taskIdBitmap;
   }
+
+  public static List<String> getConfiguredLocalDirs(RssConf conf) {
+    if (conf.getEnv(RSS_LOCAL_DIR_KEY) != null) {
+      return Arrays.asList(conf.getEnv(RSS_LOCAL_DIR_KEY).split(","));
+    } else {
+      return conf.get(RssBaseConf.RSS_STORAGE_BASE_PATH);
+    }
+  }
 }
diff --git 
a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java 
b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
index 4d5e0ca4..759dd09c 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/RssUtilsTest.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.common.util;
 import java.net.Inet4Address;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -34,11 +35,14 @@ import org.junit.jupiter.api.Test;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
 import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.config.RssConf;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static 
uk.org.webcompere.systemstubs.SystemStubs.withEnvironmentVariable;
@@ -177,6 +181,24 @@ public class RssUtilsTest {
     assertEquals(serverToPartitions.get(server4), Sets.newHashSet(2, 4));
   }
 
+  @Test
+  public void testGetConfiguredLocalDirs() throws Exception {
+    RssConf conf = new RssConf();
+    withEnvironmentVariable(RssUtils.RSS_LOCAL_DIR_KEY, "/path/a").execute(() 
-> {
+      assertEquals(Collections.singletonList("/path/a"), 
RssUtils.getConfiguredLocalDirs(conf));
+    });
+
+    withEnvironmentVariable(RssUtils.RSS_LOCAL_DIR_KEY, 
"/path/a,/path/b").execute(() -> {
+      assertEquals(Arrays.asList("/path/a", "/path/b"), 
RssUtils.getConfiguredLocalDirs(conf));
+    });
+
+    withEnvironmentVariable(RssUtils.RSS_LOCAL_DIR_KEY, null).execute(() -> {
+      assertNull(RssUtils.getConfiguredLocalDirs(conf));
+      conf.set(RssBaseConf.RSS_STORAGE_BASE_PATH, Arrays.asList("/path/a", 
"/path/b"));
+      assertEquals(Arrays.asList("/path/a", "/path/b"), 
RssUtils.getConfiguredLocalDirs(conf));
+    });
+  }
+
   // Copy from ClientUtils
   private Long getBlockId(long partitionId, long taskAttemptId, long 
atomicInt) {
     return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH + 
Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
diff --git 
a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java 
b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
index 89019393..e833441a 100644
--- a/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
+++ b/server/src/main/java/org/apache/uniffle/server/LocalStorageChecker.java
@@ -31,6 +31,7 @@ import org.apache.commons.lang3.RandomUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.storage.common.LocalStorage;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 
@@ -47,7 +48,7 @@ public class LocalStorageChecker extends Checker {
 
   public LocalStorageChecker(ShuffleServerConf conf, List<LocalStorage> 
storages) {
     super(conf);
-    List<String> basePaths = conf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH);
+    List<String> basePaths = RssUtils.getConfiguredLocalDirs(conf);
     if (CollectionUtils.isEmpty(basePaths)) {
       throw new IllegalArgumentException("The base path cannot be empty");
     }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
index 0d690493..7c5b2df6 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleFlushManager.java
@@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.uniffle.common.ShuffleDataDistributionType;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.config.RssBaseConf;
+import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.server.storage.MultiStorageManager;
 import org.apache.uniffle.server.storage.StorageManager;
@@ -83,7 +84,7 @@ public class ShuffleFlushManager {
     this.maxConcurrencyOfSingleOnePartition =
         
shuffleServerConf.get(ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION);
 
-    storageBasePaths = 
shuffleServerConf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH);
+    storageBasePaths = RssUtils.getConfiguredLocalDirs(shuffleServerConf);
     pendingEventTimeoutSec = 
shuffleServerConf.getLong(ShuffleServerConf.PENDING_EVENT_TIMEOUT_SEC);
     threadPoolExecutor = createFlushEventExecutor();
     startEventProcessor();
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index e8801b7f..6ea3ef97 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -85,7 +85,7 @@ public class LocalStorageManager extends SingleStorageManager 
{
   @VisibleForTesting
   LocalStorageManager(ShuffleServerConf conf) {
     super(conf);
-    storageBasePaths = conf.get(ShuffleServerConf.RSS_STORAGE_BASE_PATH);
+    storageBasePaths = RssUtils.getConfiguredLocalDirs(conf);
     if (CollectionUtils.isEmpty(storageBasePaths)) {
       throw new IllegalArgumentException("Base path dirs must not be empty");
     }

Reply via email to