This is an automated email from the ASF dual-hosted git repository.

xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a1bc07e [#642]feat(server): better default options for shuffle server 
(#662)
9a1bc07e is described below

commit 9a1bc07ee48aec7b92d6fd9175e90d65e0e8310e
Author: advancedxy <[email protected]>
AuthorDate: Mon Feb 27 21:42:46 2023 +0800

    [#642]feat(server): better default options for shuffle server (#662)
    
    ### What changes were proposed in this pull request?
    1. `rss.server.buffer.capacity` uses JVM heap size * ratio(0.6) by default
    2. `rss.server.read.buffer.capacity` uses JVM heap size * ratio(0.2) by 
default
    3. `rss.server.disk.capacity` uses disk space * ratio(0.9) by default
    
    ### Why are the changes needed?
    Fix: #642
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. Three new configurations are introduced, users can specify ratio values
    for buffer, read buffer and disk capacity
    
    ### How was this patch tested?
    New UTs.
---
 docs/server_guide.md                               | 47 ++++++++++++----------
 .../apache/uniffle/server/ShuffleServerConf.java   | 25 +++++++++++-
 .../server/buffer/ShuffleBufferManager.java        | 13 ++++++
 .../server/storage/LocalStorageManager.java        |  2 +
 .../server/buffer/ShuffleBufferManagerTest.java    | 18 +++++++++
 .../uniffle/storage/common/LocalStorage.java       | 13 ++++--
 .../uniffle/storage/common/LocalStorageTest.java   | 13 ++++++
 7 files changed, 104 insertions(+), 27 deletions(-)

diff --git a/docs/server_guide.md b/docs/server_guide.md
index 0f2cc017..fe20f342 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -64,28 +64,31 @@ This document will introduce how to deploy Uniffle shuffle 
servers.
    ```
 
 ## Configuration
-|Property Name|Default|Description|
-|---|---|---|
-|rss.coordinator.quorum|-|Coordinator quorum|
-|rss.rpc.server.port|-|RPC port for Shuffle server|
-|rss.jetty.http.port|-|Http port for Shuffle server|
-|rss.server.buffer.capacity|-|Max memory of buffer manager for shuffle server|
-|rss.server.memory.shuffle.highWaterMark.percentage|75.0|Threshold of spill 
data to storage, percentage of rss.server.buffer.capacity|
-|rss.server.memory.shuffle.lowWaterMark.percentage|25.0|Threshold of keep data 
in memory, percentage of rss.server.buffer.capacity|
-|rss.server.read.buffer.capacity|-|Max size of buffer for reading data|
-|rss.server.heartbeat.interval|10000|Heartbeat interval to Coordinator (ms)|
-|rss.server.flush.threadPool.size|10|Thread pool for flush data to file|
-|rss.server.commit.timeout|600000|Timeout when commit shuffle data (ms)|
-|rss.storage.type|-|Supports MEMORY_LOCALFILE, MEMORY_HDFS, 
MEMORY_LOCALFILE_HDFS|
-|rss.server.flush.cold.storage.threshold.size|64M| The threshold of data size 
for LOACALFILE and HDFS if MEMORY_LOCALFILE_HDFS is used|
-|rss.server.tags|-|The comma-separated list of tags to indicate the shuffle 
server's attributes. It will be used as the assignment basis for the 
coordinator|
-|rss.server.single.buffer.flush.enabled|false|Whether single buffer flush when 
size exceeded rss.server.single.buffer.flush.threshold|
-|rss.server.single.buffer.flush.threshold|64M|The threshold of single shuffle 
buffer flush|
-|rss.server.disk.capacity|-1|Disk capacity that shuffle server can use. If 
it's negative, it will use the default disk whole space|
-|rss.server.multistorage.fallback.strategy.class|-|The fallback strategy for 
`MEMORY_LOCALFILE_HDFS`. Support 
`org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy`
 and `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy`. If 
not set, `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy` 
will be used.|
-|rss.server.leak.shuffledata.check.interval|3600000|The interval of leak 
shuffle data check (ms)|
-|rss.server.max.concurrency.of.single.partition.writer|1|The max concurrency 
of single partition writer, the data partition file number is equal to this 
value. Default value is 1. This config could improve the writing speed, 
especially for huge partition.|
-|rss.metrics.reporter.class|-|The class of metrics reporter.|
+| Property Name                                         | Default | 
Description                                                                     
                                                                                
                                                                                
                                                                                
                                                         |
+|-------------------------------------------------------|---------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| rss.coordinator.quorum                                | -       | 
Coordinator quorum                                                              
                                                                                
                                                                                
                                                                                
                                                         |
+| rss.rpc.server.port                                   | -       | RPC port 
for Shuffle server                                                              
                                                                                
                                                                                
                                                                                
                                                |
+| rss.jetty.http.port                                   | -       | Http port 
for Shuffle server                                                              
                                                                                
                                                                                
                                                                                
                                               |
+| rss.server.buffer.capacity                            | -1      | Max memory 
of buffer manager for shuffle server. If negative, JVM heap size * buffer.ratio 
is used                                                                         
                                                                                
                                                                                
                                              |
+| rss.server.buffer.capacity.ratio                      | 0.8     | when 
`rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size * 
ratio                                                                           
                                                                                
                                                                                
                                                       |
+| rss.server.memory.shuffle.highWaterMark.percentage    | 75.0    | Threshold 
of spill data to storage, percentage of rss.server.buffer.capacity              
                                                                                
                                                                                
                                                                                
                                               |
+| rss.server.memory.shuffle.lowWaterMark.percentage     | 25.0    | Threshold 
of keep data in memory, percentage of rss.server.buffer.capacity                
                                                                                
                                                                                
                                                                                
                                               |
+| rss.server.read.buffer.capacity                       | -1      | Max size 
of buffer for reading data. If negative, JVM heap size * read.buffer.ratio is 
used                                                                            
                                                                                
                                                                                
                                                  |
+| rss.server.read.buffer.capacity.ratio                 | 0.4     | when 
`rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap 
size * ratio                                                                    
                                                                                
                                                                                
                                                        |
+| rss.server.heartbeat.interval                         | 10000   | Heartbeat 
interval to Coordinator (ms)                                                    
                                                                                
                                                                                
                                                                                
                                               |
+| rss.server.flush.threadPool.size                      | 10      | Thread 
pool for flush data to file                                                     
                                                                                
                                                                                
                                                                                
                                                  |
+| rss.server.commit.timeout                             | 600000  | Timeout 
when commit shuffle data (ms)                                                   
                                                                                
                                                                                
                                                                                
                                                 |
+| rss.storage.type                                      | -       | Supports 
MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS                            
                                                                                
                                                                                
                                                                                
                                                |
+| rss.server.flush.cold.storage.threshold.size          | 64M     | The 
threshold of data size for LOACALFILE and HDFS if MEMORY_LOCALFILE_HDFS is used 
                                                                                
                                                                                
                                                                                
                                                     |
+| rss.server.tags                                       | -       | The 
comma-separated list of tags to indicate the shuffle server's attributes. It 
will be used as the assignment basis for the coordinator                        
                                                                                
                                                                                
                                                        |
+| rss.server.single.buffer.flush.enabled                | false   | Whether 
single buffer flush when size exceeded rss.server.single.buffer.flush.threshold 
                                                                                
                                                                                
                                                                                
                                                 |
+| rss.server.single.buffer.flush.threshold              | 64M     | The 
threshold of single shuffle buffer flush                                        
                                                                                
                                                                                
                                                                                
                                                     |
+| rss.server.disk.capacity                              | -1      | Disk 
capacity that shuffle server can use. If negative, it will use disk whole space 
* ratio                                                                         
                                                                                
                                                                                
                                                    |
+| rss.server.disk.capacity.ratio                        | 0.9     | When 
`rss.server.disk.capacity` is negative, disk whole space * ratio is used        
                                                                                
                                                                                
                                                                                
                                                    |
+| rss.server.multistorage.fallback.strategy.class       | -       | The 
fallback strategy for `MEMORY_LOCALFILE_HDFS`. Support 
`org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy`
 and `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy`. If 
not set, `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy` 
will be used. |
+| rss.server.leak.shuffledata.check.interval            | 3600000 | The 
interval of leak shuffle data check (ms)                                        
                                                                                
                                                                                
                                                                                
                                                     |
+| rss.server.max.concurrency.of.single.partition.writer | 1       | The max 
concurrency of single partition writer, the data partition file number is equal 
to this value. Default value is 1. This config could improve the writing speed, 
especially for huge partition.                                                  
                                                                                
                                                 |
+| rss.metrics.reporter.class                            | -       | The class 
of metrics reporter.                                                            
                                                                                
                                                                                
                                                                                
                                               |
 
 ### Advanced Configurations
 |Property Name|Default| Description                                            
                                                                                
                                                     |
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 6d501f18..5cf52049 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -35,15 +35,29 @@ public class ShuffleServerConf extends RssBaseConf {
   public static final ConfigOption<Long> SERVER_BUFFER_CAPACITY = ConfigOptions
       .key("rss.server.buffer.capacity")
       .longType()
-      .noDefaultValue()
+      .defaultValue(-1L)
       .withDescription("Max memory of buffer manager for shuffle server");
 
+  public static final ConfigOption<Double> SERVER_BUFFER_CAPACITY_RATIO = 
ConfigOptions
+          .key("rss.server.buffer.capacity.ratio")
+          .doubleType()
+          .defaultValue(0.6)
+          .withDescription("JVM heap size * ratio for the maximum memory of 
buffer manager for shuffle server, this "
+              + "is only effective when `rss.server.buffer.capacity` is not 
explicitly set");
+
   public static final ConfigOption<Long> SERVER_READ_BUFFER_CAPACITY = 
ConfigOptions
       .key("rss.server.read.buffer.capacity")
       .longType()
-      .defaultValue(10000L)
+      .defaultValue(-1L)
       .withDescription("Max size of buffer for reading data");
 
+  public static final ConfigOption<Double> SERVER_READ_BUFFER_CAPACITY_RATIO = 
ConfigOptions
+          .key("rss.server.read.buffer.capacity.ratio")
+          .doubleType()
+          .defaultValue(0.2)
+          .withDescription("JVM heap size * ratio for read buffer size, this 
is only effective when "
+              + "`rss.server.reader.buffer.capacity.ratio` is not explicitly 
set");
+
   public static final ConfigOption<Long> SERVER_HEARTBEAT_DELAY = ConfigOptions
       .key("rss.server.heartbeat.delay")
       .longType()
@@ -181,6 +195,13 @@ public class ShuffleServerConf extends RssBaseConf {
       .withDescription("Disk capacity that shuffle server can use. "
           + "If it's negative, it will use the default whole space");
 
+  public static final ConfigOption<Double> DISK_CAPACITY_RATIO = ConfigOptions
+          .key("rss.server.disk.capacity.ratio")
+          .doubleType()
+          .defaultValue(0.9)
+          .withDescription("The maximum ratio of disk that could be used as 
shuffle server. This is only effective "
+              + "when `rss.server.disk.capacity` is not explicitly set");
+
   public static final ConfigOption<Long> SHUFFLE_EXPIRED_TIMEOUT_MS = 
ConfigOptions
       .key("rss.server.shuffle.expired.timeout.ms")
       .longType()
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 4b4fc22a..497b3430 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -77,8 +77,16 @@ public class ShuffleBufferManager {
   protected Map<String, Map<Integer, AtomicLong>> shuffleSizeMap = 
Maps.newConcurrentMap();
 
   public ShuffleBufferManager(ShuffleServerConf conf, ShuffleFlushManager 
shuffleFlushManager) {
+    long heapSize = Runtime.getRuntime().maxMemory();
     this.capacity = 
conf.getSizeAsBytes(ShuffleServerConf.SERVER_BUFFER_CAPACITY);
+    if (this.capacity < 0) {
+      this.capacity = (long) (heapSize * 
conf.getDouble(ShuffleServerConf.SERVER_BUFFER_CAPACITY_RATIO));
+    }
     this.readCapacity = 
conf.getSizeAsBytes(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY);
+    if (this.readCapacity < 0) {
+      this.readCapacity = (long) (heapSize * 
conf.getDouble(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO));
+    }
+    LOG.info("Init shuffle buffer manager with capacity: {}, read buffer 
capacity: {}.", capacity, readCapacity);
     this.shuffleFlushManager = shuffleFlushManager;
     this.bufferPool = new ConcurrentHashMap<>();
     this.retryNum = 
conf.getInteger(ShuffleServerConf.SERVER_MEMORY_REQUEST_RETRY_MAX);
@@ -437,6 +445,11 @@ public class ShuffleBufferManager {
     return capacity;
   }
 
+  @VisibleForTesting
+  public long getReadCapacity() {
+    return readCapacity;
+  }
+
   @VisibleForTesting
   public void resetSize() {
     usedMemory = new AtomicLong(0L);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 6ea3ef97..53f1a7ef 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -92,6 +92,7 @@ public class LocalStorageManager extends SingleStorageManager 
{
     this.partitionsOfStorage = Maps.newConcurrentMap();
     long shuffleExpiredTimeoutMs = 
conf.get(ShuffleServerConf.SHUFFLE_EXPIRED_TIMEOUT_MS);
     long capacity = conf.getSizeAsBytes(ShuffleServerConf.DISK_CAPACITY);
+    double ratio = conf.getDouble(ShuffleServerConf.DISK_CAPACITY_RATIO);
     double highWaterMarkOfWrite = 
conf.get(ShuffleServerConf.HIGH_WATER_MARK_OF_WRITE);
     double lowWaterMarkOfWrite = 
conf.get(ShuffleServerConf.LOW_WATER_MARK_OF_WRITE);
     if (highWaterMarkOfWrite < lowWaterMarkOfWrite) {
@@ -117,6 +118,7 @@ public class LocalStorageManager extends 
SingleStorageManager {
           localStorageArray[idx] = LocalStorage.newBuilder()
               .basePath(storagePath)
               .capacity(capacity)
+              .ratio(ratio)
               .lowWaterMarkOfWrite(lowWaterMarkOfWrite)
               .highWaterMarkOfWrite(highWaterMarkOfWrite)
               .shuffleExpiredTimeoutMs(shuffleExpiredTimeoutMs)
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 670e684e..a5faffa6 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -629,4 +629,22 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     // `shuffleBufferManager.getUsedMemory()` and 
`shuffleBufferManager.getInFlushSize()`.
     Awaitility.await().atMost(Duration.ofSeconds(5)).until(() -> 
shuffleBufferManager.getUsedMemory() == 0);
   }
+
+  @Test
+  public void bufferManagerInitTest() {
+    ShuffleServerConf serverConf = new ShuffleServerConf();
+    shuffleBufferManager = new ShuffleBufferManager(serverConf, 
mockShuffleFlushManager);
+    double ratio = 
ShuffleServerConf.SERVER_BUFFER_CAPACITY_RATIO.defaultValue();
+    double readRatio = 
ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO.defaultValue();
+    assertEquals((long) (Runtime.getRuntime().maxMemory() * ratio), 
shuffleBufferManager.getCapacity());
+    assertEquals((long) (Runtime.getRuntime().maxMemory() * readRatio), 
shuffleBufferManager.getReadCapacity());
+    ratio = 0.6;
+    readRatio = 0.1;
+    serverConf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY_RATIO, ratio);
+    serverConf.set(ShuffleServerConf.SERVER_READ_BUFFER_CAPACITY_RATIO, 
readRatio);
+    shuffleBufferManager = new ShuffleBufferManager(serverConf, 
mockShuffleFlushManager);
+    assertEquals((long) (Runtime.getRuntime().maxMemory() * ratio), 
shuffleBufferManager.getCapacity());
+    assertEquals((long) (Runtime.getRuntime().maxMemory() * readRatio), 
shuffleBufferManager.getReadCapacity());
+
+  }
 }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java 
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
index 1def5bfc..44c08082 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
@@ -87,9 +87,10 @@ public class LocalStorage extends AbstractStorage {
       throw new RuntimeException(ioe);
     }
     if (capacity < 0L) {
-      this.capacity = baseFolder.getTotalSpace();
-      LOG.info("Make the disk capacity the total space when 
\"rss.server.disk.capacity\" is not specified "
-          + "or less than 0");
+      long totalSpace = baseFolder.getTotalSpace();
+      this.capacity = (long) (totalSpace * builder.ratio);
+      LOG.info("The `rss.server.disk.capacity` is not specified nor negative, 
the "
+          + "ratio(`rss.server.disk.capacity.ratio`:{}) * disk space({}) is 
used, ", builder.ratio, totalSpace);
     } else {
       long freeSpace = baseFolder.getFreeSpace();
       if (freeSpace < capacity) {
@@ -336,6 +337,7 @@ public class LocalStorage extends AbstractStorage {
 
   public static class Builder {
     private long capacity;
+    private double ratio;
     private double lowWaterMarkOfWrite;
     private double highWaterMarkOfWrite;
     private double cleanupThreshold;
@@ -352,6 +354,11 @@ public class LocalStorage extends AbstractStorage {
       return this;
     }
 
+    public Builder ratio(double ratio) {
+      this.ratio = ratio;
+      return this;
+    }
+
     public Builder lowWaterMarkOfWrite(double lowWaterMarkOfWrite) {
       this.lowWaterMarkOfWrite = lowWaterMarkOfWrite;
       return this;
diff --git 
a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java 
b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
index 574ffc06..4aa7e2e3 100644
--- 
a/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
+++ 
b/storage/src/test/java/org/apache/uniffle/storage/common/LocalStorageTest.java
@@ -94,6 +94,19 @@ public class LocalStorageTest {
     assertTrue(item.canWrite());
   }
 
+  @Test
+  public void getCapacityInitTest() {
+    LocalStorage item = 
LocalStorage.newBuilder().basePath(testBaseDir.getAbsolutePath())
+            .cleanupThreshold(50)
+            .highWaterMarkOfWrite(95)
+            .lowWaterMarkOfWrite(80)
+            .capacity(-1)
+            .ratio(0.1)
+            .cleanIntervalMs(5000)
+            .build();
+    assertEquals((long) (testBaseDir.getTotalSpace() * 0.1), 
item.getCapacity());
+  }
+
   @Test
   public void baseDirectoryInitTest() throws IOException {
     // empty and writable base dir

Reply via email to