This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new d5e689c32 [#2592] fix(spark): Ignore failure when reporting shuffle
read metrics to driver (#2593)
d5e689c32 is described below
commit d5e689c32aaaa69465fe92d0c79a773321ffd1b9
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Aug 27 10:15:00 2025 +0800
[#2592] fix(spark): Ignore failure when reporting shuffle read metrics to
driver (#2593)
### What changes were proposed in this pull request?
Ignore failure when reporting shuffle read metrics to driver
### Why are the changes needed?
fix #2592
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Needn't
---
.../spark/shuffle/reader/RssShuffleReader.java | 52 ++++++++++++----------
1 file changed, 28 insertions(+), 24 deletions(-)
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index b53d67f62..4113f0627 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -367,30 +367,34 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
if (managerClientSupplier != null) {
ShuffleManagerClient client = managerClientSupplier.get();
if (client != null) {
- RssReportShuffleReadMetricResponse response =
- client.reportShuffleReadMetric(
- new RssReportShuffleReadMetricRequest(
- context.stageId(),
- shuffleId,
- context.taskAttemptId(),
- shuffleServerReadCostTracker.list().entrySet().stream()
- .collect(
- Collectors.toMap(
- Map.Entry::getKey,
- x ->
- new
RssReportShuffleReadMetricRequest.TaskShuffleReadMetric(
- x.getValue().getDurationMillis(),
- x.getValue().getReadBytes(),
-
x.getValue().getMemoryReadDurationMillis(),
- x.getValue().getMemoryReadBytes(),
-
x.getValue().getLocalfileReadDurationMillis(),
- x.getValue().getLocalfileReadBytes(),
-
x.getValue().getHadoopReadLocalFileDurationMillis(),
-
x.getValue().getHadoopReadLocalFileBytes()))),
- isShuffleReadFailed,
- shuffleReadReason));
- if (response != null && response.getStatusCode() !=
StatusCode.SUCCESS) {
- LOG.error("Errors on reporting shuffle read metrics to driver");
+ try {
+ RssReportShuffleReadMetricResponse response =
+ client.reportShuffleReadMetric(
+ new RssReportShuffleReadMetricRequest(
+ context.stageId(),
+ shuffleId,
+ context.taskAttemptId(),
+ shuffleServerReadCostTracker.list().entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey,
+ x ->
+ new
RssReportShuffleReadMetricRequest.TaskShuffleReadMetric(
+ x.getValue().getDurationMillis(),
+ x.getValue().getReadBytes(),
+
x.getValue().getMemoryReadDurationMillis(),
+ x.getValue().getMemoryReadBytes(),
+
x.getValue().getLocalfileReadDurationMillis(),
+ x.getValue().getLocalfileReadBytes(),
+
x.getValue().getHadoopReadLocalFileDurationMillis(),
+
x.getValue().getHadoopReadLocalFileBytes()))),
+ isShuffleReadFailed,
+ shuffleReadReason));
+ if (response != null && response.getStatusCode() !=
StatusCode.SUCCESS) {
+ LOG.error("Errors on reporting shuffle read metrics to driver");
+ }
+ } catch (Exception e) {
+ LOG.error("Errors on post shuffle read metric to driver", e);
}
}
}