This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a15396cb [CELEBORN-1145] Separate clientPushBufferMaxSize from 
CelebornInputStreamImpl
8a15396cb is described below

commit 8a15396cb64140fff83f157f5858862ea44a7205
Author: exmy <[email protected]>
AuthorDate: Thu Nov 30 18:56:03 2023 +0800

    [CELEBORN-1145] Separate clientPushBufferMaxSize from 
CelebornInputStreamImpl
    
    ### What changes were proposed in this pull request?
    The `clientPushBufferMaxSize` config is also used by 
`CelebornInputStreamImpl`, it's a config about push side and should not be used 
by fetch side. This pr introduces a fetch config to replace it.
    
    ### Why are the changes needed?
    
    As above
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, a new config `celeborn.client.fetch.buffer.size` is introduced.
    
    ### How was this patch tested?
    
    Pass CI
    
    Closes #2118 from exmy/celeborn-1145.
    
    Authored-by: exmy <[email protected]>
    Signed-off-by: Cheng Pan <[email protected]>
---
 .../org/apache/celeborn/client/read/CelebornInputStream.java  |  8 ++++----
 .../main/scala/org/apache/celeborn/common/CelebornConf.scala  | 11 +++++++++++
 docs/configuration/client.md                                  |  1 +
 3 files changed, 16 insertions(+), 4 deletions(-)

diff --git 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index 8484406c6..bb1e95ce9 100644
--- 
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++ 
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -182,15 +182,15 @@ public abstract class CelebornInputStream extends 
InputStream {
       this.fetchExcludedWorkerExpireTimeout = 
conf.clientFetchExcludedWorkerExpireTimeout();
       this.fetchExcludedWorkers = fetchExcludedWorkers;
 
-      int blockSize = conf.clientPushBufferMaxSize();
+      int bufferSize = conf.clientFetchBufferSize();
       if (shuffleCompressionEnabled) {
         int headerLen = Decompressor.getCompressionHeaderLength(conf);
-        blockSize += headerLen;
-        compressedBuf = new byte[blockSize];
+        bufferSize += headerLen;
+        compressedBuf = new byte[bufferSize];
 
         decompressor = Decompressor.getDecompressor(conf);
       }
-      rawDataBuf = new byte[blockSize];
+      rawDataBuf = new byte[bufferSize];
 
       if (conf.clientPushReplicateEnabled()) {
         fetchChunkMaxRetry = conf.clientFetchMaxRetriesForEachReplica() * 2;
diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 422d0f555..75742b48a 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -793,6 +793,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
   //               Shuffle Client Fetch                  //
   // //////////////////////////////////////////////////////
   def clientFetchTimeoutMs: Long = get(CLIENT_FETCH_TIMEOUT)
+  def clientFetchBufferSize: Int = get(CLIENT_FETCH_BUFFER_SIZE).toInt
   def clientFetchMaxReqsInFlight: Int = get(CLIENT_FETCH_MAX_REQS_IN_FLIGHT)
   def clientFetchMaxRetriesForEachReplica: Int = 
get(CLIENT_FETCH_MAX_RETRIES_FOR_EACH_REPLICA)
   def clientFetchThrowsFetchFailure: Boolean = 
get(CLIENT_FETCH_THROWS_FETCH_FAILURE)
@@ -3299,6 +3300,16 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("600s")
 
+  val CLIENT_FETCH_BUFFER_SIZE: ConfigEntry[Long] =
+    buildConf("celeborn.client.fetch.buffer.size")
+      .categories("client")
+      .version("0.4.0")
+      .doc("Size of reducer partition buffer memory for shuffle reader. The 
fetched data " +
+        "will be buffered in memory before consuming. For performance 
consideration keep " +
+        s"this buffer size not less than 
`${CLIENT_PUSH_BUFFER_MAX_SIZE.key}`.")
+      .bytesConf(ByteUnit.BYTE)
+      .createWithDefaultString("64k")
+
   val CLIENT_FETCH_MAX_REQS_IN_FLIGHT: ConfigEntry[Int] =
     buildConf("celeborn.client.fetch.maxReqsInFlight")
       .withAlternative("celeborn.fetch.maxReqsInFlight")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 4871ac550..c46103e98 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -26,6 +26,7 @@ license: |
 | celeborn.client.eagerlyCreateInputStream.threads | 32 | Threads count for 
streamCreatorPool in CelebornShuffleReader. | 0.3.1 | 
 | celeborn.client.excludePeerWorkerOnFailure.enabled | true | When true, 
Celeborn will exclude partition's peer worker on failure when push data to 
replica failed. | 0.3.0 | 
 | celeborn.client.excludedWorker.expireTimeout | 180s | Timeout time for 
LifecycleManager to clear reserved excluded worker. Default to be 1.5 * 
`celeborn.master.heartbeat.worker.timeout`to cover worker heartbeat timeout 
check period | 0.3.0 | 
+| celeborn.client.fetch.buffer.size | 64k | Size of reducer partition buffer 
memory for shuffle reader. The fetched data will be buffered in memory before 
consuming. For performance consideration keep this buffer size not less than 
`celeborn.client.push.buffer.max.size`. | 0.4.0 | 
 | celeborn.client.fetch.dfsReadChunkSize | 8m | Max chunk size for 
DfsPartitionReader. | 0.3.1 | 
 | celeborn.client.fetch.excludeWorkerOnFailure.enabled | false | Whether to 
enable shuffle client-side fetch exclude workers on failure. | 0.3.0 | 
 | celeborn.client.fetch.excludedWorker.expireTimeout | &lt;value of 
celeborn.client.excludedWorker.expireTimeout&gt; | ShuffleClient is a static 
object, it will be used in the whole lifecycle of Executor,We give a expire 
time for excluded workers to avoid a transient worker issues. | 0.3.0 | 

Reply via email to