This is an automated email from the ASF dual-hosted git repository. rickyma pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new 774d64edc [MINOR] feat(spark): Support reporting more error messages from client when failing to send blocks (#1914) 774d64edc is described below commit 774d64edc42214f8b34ccbd6b321769c6e9c3035 Author: maobaolong <baoloong...@tencent.com> AuthorDate: Thu Jul 18 02:50:19 2024 +0800 [MINOR] feat(spark): Support reporting more error messages from client when failing to send blocks (#1914) ### What changes were proposed in this pull request? Support reporting more error messages rather than only `Fail to send the block`. ### Why are the changes needed? Let users know the error code so users can solve issues by themselves. ### Does this PR introduce _any_ user-facing change? No, just output more error messages. ### How was this patch tested? In our env. --- .../spark/shuffle/writer/RssShuffleWriter.java | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java index 6660a5e7b..870141c4b 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java @@ -101,6 +101,7 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> { private static final Logger LOG = LoggerFactory.getLogger(RssShuffleWriter.class); private static final String DUMMY_HOST = "dummy_host"; private static final int DUMMY_PORT = 99999; + public static final String DEFAULT_ERROR_MESSAGE = "Default Error Message"; private final String appId; private final int shuffleId; @@ -513,23 +514,32 @@ public class RssShuffleWriter<K, V, C> extends ShuffleWriter<K, V> { if (blockFailSentRetryEnabled) { collectFailedBlocksToResend(); } else { - if (hasAnyBlockFailure()) { - throw new RssSendFailedException("Fail to send the block"); + String errorMsg = getFirstBlockFailure(); + if (errorMsg != null) { + throw new RssSendFailedException("Fail to send the block. Error: " + errorMsg); } } } - private boolean hasAnyBlockFailure() { + private String getFirstBlockFailure() { Set<Long> failedBlockIds = shuffleManager.getFailedBlockIds(taskId); if (!failedBlockIds.isEmpty()) { + List<TrackingBlockStatus> trackingBlockStatues = + shuffleManager + .getBlockIdsFailedSendTracker(taskId) + .getFailedBlockStatus(failedBlockIds.iterator().next()); + String errorMsg = DEFAULT_ERROR_MESSAGE; + if (CollectionUtils.isNotEmpty(trackingBlockStatues)) { + errorMsg = trackingBlockStatues.get(0).getStatusCode().name(); + } LOG.error( "Errors on sending blocks for task[{}]. {} blocks can't be sent to remote servers: {}", taskId, failedBlockIds.size(), shuffleManager.getBlockIdsFailedSendTracker(taskId).getFaultyShuffleServers()); - return true; + return errorMsg; } - return false; + return null; } private void collectFailedBlocksToResend() {