This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 29ab16989 [CELEBORN-2056] Make the wait time for the client to read
non shuffle partitions configurable
29ab16989 is described below
commit 29ab16989db68a9839d0c61ce6c8effd3f9ceaa9
Author: duanhao-jk <[email protected]>
AuthorDate: Thu Jul 24 23:20:34 2025 -0700
[CELEBORN-2056] Make the wait time for the client to read non shuffle
partitions configurable
### What changes were proposed in this pull request?
Added a configuration for client to read non shuffle partition waiting time
### Why are the changes needed?
When the shuffle data of a task is relatively small and there are many
empty shuffle partitions, it will take a lot of time for invalid waiting here
### Does this PR introduce _any_ user-facing change?
add configurable
### How was this patch tested?
production environment validation
Closes #3358 from dh20/celeborn_add-20250707.
Lead-authored-by: duanhao-jk <[email protected]>
Co-authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
---
.../apache/celeborn/client/read/WorkerPartitionReader.java | 4 +++-
.../main/scala/org/apache/celeborn/common/CelebornConf.scala | 12 ++++++++++++
docs/configuration/client.md | 1 +
3 files changed, 16 insertions(+), 1 deletion(-)
diff --git
a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
index df6f3902c..b6fb2209f 100644
---
a/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
+++
b/client/src/main/java/org/apache/celeborn/client/read/WorkerPartitionReader.java
@@ -75,6 +75,7 @@ public class WorkerPartitionReader implements PartitionReader
{
private int fetchChunkRetryCnt;
private int fetchChunkMaxRetry;
private final boolean testFetch;
+ private Long pollChunkWaitTime;
private Optional<PartitionReaderCheckpointMetadata>
partitionReaderCheckpointMetadata;
@@ -97,6 +98,7 @@ public class WorkerPartitionReader implements PartitionReader
{
fetchMaxReqsInFlight = conf.clientFetchMaxReqsInFlight();
results = new LinkedBlockingQueue<>();
fetchTimeoutMs = conf.clientFetchTimeoutMs();
+ pollChunkWaitTime = conf.clientFetchPollChunkWaitTime();
inflightRequestCount = 0;
this.metricsCallback = metricsCallback;
// only add the buffer to results queue if this reader is not closed.
@@ -192,7 +194,7 @@ public class WorkerPartitionReader implements
PartitionReader {
while (chunk == null) {
checkException();
Long startFetchWait = System.nanoTime();
- chunk = results.poll(500, TimeUnit.MILLISECONDS);
+ chunk = results.poll(pollChunkWaitTime, TimeUnit.MILLISECONDS);
metricsCallback.incReadTime(
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait));
}
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 49a78f64a..21f98745d 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1002,6 +1002,7 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
// Shuffle Client Fetch //
// //////////////////////////////////////////////////////
def clientFetchTimeoutMs: Long = get(CLIENT_FETCH_TIMEOUT)
+ def clientFetchPollChunkWaitTime: Long =
get(CLIENT_FETCH_POLL_CHUNK_WAIT_TIME)
def clientFetchBufferSize: Int = get(CLIENT_FETCH_BUFFER_SIZE).toInt
def clientFetchMaxReqsInFlight: Int = get(CLIENT_FETCH_MAX_REQS_IN_FLIGHT)
def isPartitionReaderCheckpointEnabled: Boolean =
@@ -4884,6 +4885,17 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("600s")
+ val CLIENT_FETCH_POLL_CHUNK_WAIT_TIME: ConfigEntry[Long] =
+ buildConf("celeborn.client.fetch.pollChunk.wait")
+ .categories("client")
+ .version("0.6.1")
+ .doc("The waiting time for shuffle client to read the empty chunk on the
work side." +
+ "when there are many empty chunk in the shuffle partition of a small
task," +
+ "the current value can be set small to avoid long waiting times and
the illusion of the" +
+ "task getting stuck")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefault(500)
+
val CLIENT_FETCH_BUFFER_SIZE: ConfigEntry[Long] =
buildConf("celeborn.client.fetch.buffer.size")
.categories("client")
diff --git a/docs/configuration/client.md b/docs/configuration/client.md
index 1dab16526..f9f14fe65 100644
--- a/docs/configuration/client.md
+++ b/docs/configuration/client.md
@@ -35,6 +35,7 @@ license: |
| celeborn.client.fetch.excludedWorker.expireTimeout | <value of
celeborn.client.excludedWorker.expireTimeout> | false | ShuffleClient is a
static object, it will be used in the whole lifecycle of Executor, We give a
expire time for excluded workers to avoid a transient worker issues. | 0.3.0 |
|
| celeborn.client.fetch.maxReqsInFlight | 3 | false | Amount of in-flight
chunk fetch request. | 0.3.0 | celeborn.fetch.maxReqsInFlight |
| celeborn.client.fetch.maxRetriesForEachReplica | 3 | false | Max retry times
of fetch chunk on each replica | 0.3.0 |
celeborn.fetch.maxRetriesForEachReplica,celeborn.fetch.maxRetries |
+| celeborn.client.fetch.pollChunk.wait | 500ms | false | The waiting time for
shuffle client to read the empty chunk on the work side.when there are many
empty chunk in the shuffle partition of a small task,the current value can be
set small to avoid long waiting times and the illusion of thetask getting stuck
| 0.6.1 | |
| celeborn.client.fetch.timeout | 600s | false | Timeout for a task to open
stream and fetch chunk. | 0.3.0 | celeborn.fetch.timeout |
| celeborn.client.flink.compression.enabled | true | false | Whether to
compress data in Flink plugin. | 0.3.0 |
remote-shuffle.job.enable-data-compression |
| celeborn.client.flink.inputGate.concurrentReadings | 2147483647 | false |
Max concurrent reading channels for a input gate. | 0.3.0 |
remote-shuffle.job.concurrent-readings-per-gate |