This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 8a15396cb [CELEBORN-1145] Separate clientPushBufferMaxSize from
CelebornInputStreamImpl
8a15396cb is described below
commit 8a15396cb64140fff83f157f5858862ea44a7205
Author: exmy <[email protected]>
AuthorDate: Thu Nov 30 18:56:03 2023 +0800
[CELEBORN-1145] Separate clientPushBufferMaxSize from
CelebornInputStreamImpl
### What changes were proposed in this pull request?
The `clientPushBufferMaxSize` config is also used by
`CelebornInputStreamImpl`, it's a config about push side and should not be used
by fetch side. This pr introduces a fetch config to replace it.
### Why are the changes needed?
As above
### Does this PR introduce _any_ user-facing change?
Yes, a new config `celeborn.client.fetch.buffer.size` is introduced.
### How was this patch tested?
Pass CI
Closes #2118 from exmy/celeborn-1145.
Authored-by: exmy <[email protected]>
Signed-off-by: Cheng Pan <[email protected]>
---
.../org/apache/celeborn/client/read/CelebornInputStream.java | 8 ++++----
.../main/scala/org/apache/celeborn/common/CelebornConf.scala | 11 +++++++++++
docs/configuration/client.md | 1 +
3 files changed, 16 insertions(+), 4 deletions(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
index 8484406c6..bb1e95ce9 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/CelebornInputStream.java
@@ -182,15 +182,15 @@ public abstract class CelebornInputStream extends
InputStream {
this.fetchExcludedWorkerExpireTimeout =
conf.clientFetchExcludedWorkerExpireTimeout();
this.fetchExcludedWorkers = fetchExcludedWorkers;
- int blockSize = conf.clientPushBufferMaxSize();
+ int bufferSize = conf.clientFetchBufferSize();
if (shuffleCompressionEnabled) {
int headerLen = Decompressor.getCompressionHeaderLength(conf);
- blockSize += headerLen;
- compressedBuf = new byte[blockSize];
+ bufferSize += headerLen;
+ compressedBuf = new byte[bufferSize];
decompressor = Decompressor.getDecompressor(conf);
}
- rawDataBuf = new byte[blockSize];
+ rawDataBuf = new byte[bufferSize];
if (conf.clientPushReplicateEnabled()) {
fetchChunkMaxRetry = conf.clientFetchMaxRetriesForEachReplica() * 2;
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 422d0f555..75742b48a 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -793,6 +793,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
// Shuffle Client Fetch //
// //////////////////////////////////////////////////////
def clientFetchTimeoutMs: Long = get(CLIENT_FETCH_TIMEOUT)
+ def clientFetchBufferSize: Int = get(CLIENT_FETCH_BUFFER_SIZE).toInt
def clientFetchMaxReqsInFlight: Int = get(CLIENT_FETCH_MAX_REQS_IN_FLIGHT)
def clientFetchMaxRetriesForEachReplica: Int =
get(CLIENT_FETCH_MAX_RETRIES_FOR_EACH_REPLICA)
def clientFetchThrowsFetchFailure: Boolean =
get(CLIENT_FETCH_THROWS_FETCH_FAILURE)
@@ -3299,6 +3300,16 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("600s")
+ val CLIENT_FETCH_BUFFER_SIZE: ConfigEntry[Long] =
+ buildConf("celeborn.client.fetch.buffer.size")
+ .categories("client")
+ .version("0.4.0")
+ .doc("Size of reducer partition buffer memory for shuffle reader. The
fetched data " +
+ "will be buffered in memory before consuming. For performance
consideration keep " +
+ s"this buffer size not less than
`${CLIENT_PUSH_BUFFER_MAX_SIZE.key}`.")
+ .bytesConf(ByteUnit.BYTE)
+ .createWithDefaultString("64k")
+
val CLIENT_FETCH_MAX_REQS_IN_FLIGHT: ConfigEntry[Int] =
buildConf("celeborn.client.fetch.maxReqsInFlight")
.withAlternative("celeborn.fetch.maxReqsInFlight")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 4871ac550..c46103e98 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -26,6 +26,7 @@ license: |
| celeborn.client.eagerlyCreateInputStream.threads | 32 | Threads count for
streamCreatorPool in CelebornShuffleReader. | 0.3.1 |
| celeborn.client.excludePeerWorkerOnFailure.enabled | true | When true,
Celeborn will exclude partition's peer worker on failure when push data to
replica failed. | 0.3.0 |
| celeborn.client.excludedWorker.expireTimeout | 180s | Timeout time for
LifecycleManager to clear reserved excluded worker. Default to be 1.5 *
`celeborn.master.heartbeat.worker.timeout`to cover worker heartbeat timeout
check period | 0.3.0 |
+| celeborn.client.fetch.buffer.size | 64k | Size of reducer partition buffer
memory for shuffle reader. The fetched data will be buffered in memory before
consuming. For performance consideration keep this buffer size not less than
`celeborn.client.push.buffer.max.size`. | 0.4.0 |
| celeborn.client.fetch.dfsReadChunkSize | 8m | Max chunk size for
DfsPartitionReader. | 0.3.1 |
| celeborn.client.fetch.excludeWorkerOnFailure.enabled | false | Whether to
enable shuffle client-side fetch exclude workers on failure. | 0.3.0 |
| celeborn.client.fetch.excludedWorker.expireTimeout | <value of
celeborn.client.excludedWorker.expireTimeout> | ShuffleClient is a static
object, it will be used in the whole lifecycle of Executor,We give a expire
time for excluded workers to avoid a transient worker issues. | 0.3.0 |