This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 79e9fca0a [CELEBORN-1918] Add batchOpenStream time to fetch wait time
79e9fca0a is described below
commit 79e9fca0a56148a1294bd33e4631ddec2ff59742
Author: zhengtao <[email protected]>
AuthorDate: Sun Mar 30 21:51:53 2025 -0700
[CELEBORN-1918] Add batchOpenStream time to fetch wait time
### What changes were proposed in this pull request?
1. Add the rpc times and BatchopenStreamTime to read wait time.
2. remove the updateFileGroup rpc time for BatchopenStreamTime log.
3. reduce the sleep time waiting for creating inputStream since it cost a
lot of sleeping time for tiny shuffle read.
### Why are the changes needed?
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Tested in cluster.
Closes #3180 from zaynt4606/clb1918.
Authored-by: zhengtao <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit 99ca4dffe87b27e87f1563333f7e238409db5ea2)
Signed-off-by: Wang, Fei <[email protected]>
---
.../spark/shuffle/celeborn/CelebornShuffleReader.scala | 15 +++++++++------
1 file changed, 9 insertions(+), 6 deletions(-)
diff --git
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
index ea4254bc5..99a5bb0fc 100644
---
a/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
+++
b/client-spark/spark-3/src/main/scala/org/apache/spark/shuffle/celeborn/CelebornShuffleReader.scala
@@ -75,8 +75,8 @@ class CelebornShuffleReader[K, C](
override def read(): Iterator[Product2[K, C]] = {
+ val startTime = System.currentTimeMillis()
val serializerInstance = newSerializerInstance(dep)
-
val shuffleId = SparkUtils.celebornShuffleId(shuffleClient, handle,
context, false)
shuffleIdTracker.track(handle.shuffleId, shuffleId)
logDebug(
@@ -104,7 +104,6 @@ class CelebornShuffleReader[K, C](
}
}
- val startTime = System.currentTimeMillis()
val fetchTimeoutMs = conf.clientFetchTimeoutMs
val localFetchEnabled = conf.enableReadLocalShuffleFile
val localHostAddress = Utils.localHostName(conf)
@@ -121,6 +120,7 @@ class CelebornShuffleReader[K, C](
case e: Throwable => throw e
}
+ val batchOpenStreamStartTime = System.currentTimeMillis()
// host-port -> (TransportClient, PartitionLocation Array,
PbOpenStreamList)
val workerRequestMap = new util.HashMap[
String,
@@ -197,7 +197,9 @@ class CelebornShuffleReader[K, C](
// wait for all futures to complete
futures.foreach(f => f.get())
val end = System.currentTimeMillis()
- logInfo(s"BatchOpenStream for $partCnt cost ${end - startTime}ms")
+ // readTime should include batchOpenStreamTime, getShuffleId Rpc time and
updateFileGroup Rpc time
+ metricsCallback.incReadTime(end - startTime)
+ logInfo(s"BatchOpenStream for $partCnt cost ${end -
batchOpenStreamStartTime}ms")
val streams = new ConcurrentHashMap[Integer, CelebornInputStream]()
@@ -266,14 +268,15 @@ class CelebornShuffleReader[K, C](
}
}
if (sleepCnt == 0) {
- logInfo(s"inputStream for partition: $partitionId is null,
sleeping...")
+ logInfo(s"inputStream for partition: $partitionId is null,
sleeping 5ms")
}
sleepCnt += 1
- Thread.sleep(50)
+ Thread.sleep(5)
inputStream = streams.get(partitionId)
}
if (sleepCnt > 0) {
- logInfo(s"inputStream for partition: $partitionId is not null, sleep
count: $sleepCnt")
+ logInfo(
+ s"inputStream for partition: $partitionId is not null, sleep
$sleepCnt times for ${5 * sleepCnt} ms")
}
metricsCallback.incReadTime(
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startFetchWait))