This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 9e5cd616a improvement(spark): Involve shuffle result report time into
shuffle write time metrics (#2361)
9e5cd616a is described below
commit 9e5cd616a51e06d9f4e99546253f692b4018fc09
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Feb 7 17:02:24 2025 +0800
improvement(spark): Involve shuffle result report time into shuffle write
time metrics (#2361)
### What changes were proposed in this pull request?
1. Involve shuffle result report time into shuffle write time metrics
2. Print out all reporting to shuffle-server duration
### Why are the changes needed?
To precisely calculate shuffle write time
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Needn't
Co-authored-by: Junfan Zhang <[email protected]>
---
.../spark/shuffle/writer/RssShuffleWriter.java | 7 ++++--
.../spark/shuffle/writer/RssShuffleWriter.java | 6 +++--
.../client/impl/ShuffleWriteClientImpl.java | 29 ++++++++++------------
3 files changed, 22 insertions(+), 20 deletions(-)
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index aa4ff9f89..1cd8113c0 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -492,11 +492,14 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
bitmapSplitNum,
recordReportFailedShuffleservers,
enableWriteFailureRetry);
+ long reportDuration = System.currentTimeMillis() - start;
LOG.info(
- "Report shuffle result for task[{}] with bitmapNum[{}] cost {} ms",
+ "Reported all shuffle result for shuffleId[{}] task[{}] with
bitmapNum[{}] cost {} ms",
+ shuffleId,
taskAttemptId,
bitmapSplitNum,
- (System.currentTimeMillis() - start));
+ reportDuration);
+
shuffleWriteMetrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(reportDuration));
MapStatus mapStatus = MapStatus$.MODULE$.apply(blockManagerId,
partitionLengths);
return Option.apply(mapStatus);
} else {
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 48cbf3efa..09d88c1ca 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -818,12 +818,14 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
bitmapSplitNum,
recordReportFailedShuffleservers,
enableWriteFailureRetry);
+ long reportDuration = System.currentTimeMillis() - start;
LOG.info(
- "Report shuffle result for shuffleId[{}] task[{}] with
bitmapNum[{}] cost {} ms",
+ "Reported all shuffle result for shuffleId[{}] task[{}] with
bitmapNum[{}] cost {} ms",
shuffleId,
taskAttemptId,
bitmapSplitNum,
- (System.currentTimeMillis() - start));
+ reportDuration);
+
shuffleWriteMetrics.incWriteTime(TimeUnit.MILLISECONDS.toNanos(reportDuration));
// todo: we can replace the dummy host and port with the real shuffle
server which we prefer
// to read
final BlockManagerId blockManagerId =
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index dfd3efe30..3d3bb925d 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -756,27 +756,24 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
bitmapNum);
ShuffleServerInfo ssi = entry.getKey();
try {
+ long start = System.currentTimeMillis();
RssReportShuffleResultResponse response =
getShuffleServerClient(ssi).reportShuffleResult(request);
if (response.getStatusCode() == StatusCode.SUCCESS) {
LOG.info(
- "Report shuffle result to "
- + ssi
- + " for appId["
- + appId
- + "], shuffleId["
- + shuffleId
- + "] successfully");
+ "Reported shuffle result to {} for appId[{}], shuffleId[{}]
successfully that cost {} ms",
+ ssi,
+ appId,
+ shuffleId,
+ System.currentTimeMillis() - start);
} else {
- LOG.warn(
- "Report shuffle result to "
- + ssi
- + " for appId["
- + appId
- + "], shuffleId["
- + shuffleId
- + "] failed with "
- + response.getStatusCode());
+ LOG.info(
+ "Reported shuffle result to {} for appId[{}], shuffleId[{}]
failed with [{}] that cost {} ms",
+ ssi,
+ appId,
+ shuffleId,
+ response.getStatusCode(),
+ System.currentTimeMillis() - start);
recordFailedBlockIds(blockReportTracker, requestBlockIds);
if (enableWriteFailureRetry) {
// The failed Shuffle Server is recorded and corresponding
exceptions are raised only