This is an automated email from the ASF dual-hosted git repository. zhengchenyu pushed a commit to branch branch-0.9 in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/branch-0.9 by this push: new 72f8305a3 [#1743] fix: Add exception handling for thread pools (#1744) 72f8305a3 is described below commit 72f8305a3b23761cf86df43c0cc29ecf5d9ef8b6 Author: RickyMa <rick...@tencent.com> AuthorDate: Thu May 30 14:50:07 2024 +0800 [#1743] fix: Add exception handling for thread pools (#1744) ### What changes were proposed in this pull request? Add exception handling for thread pools. ### Why are the changes needed? Fix: https://github.com/apache/incubator-uniffle/issues/1743. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs. --- .../apache/spark/shuffle/writer/DataPusher.java | 67 ++++++----- .../client/impl/ShuffleWriteClientImpl.java | 127 +++++++++++---------- 2 files changed, 103 insertions(+), 91 deletions(-) diff --git a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java index 1517b7173..e9ef2ba61 100644 --- a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java +++ b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java @@ -85,38 +85,43 @@ public class DataPusher implements Closeable { throw new RssException("RssAppId should be set."); } return CompletableFuture.supplyAsync( - () -> { - String taskId = event.getTaskId(); - List<ShuffleBlockInfo> shuffleBlockInfoList = event.getShuffleDataInfoList(); - SendShuffleDataResult result = null; - try { - result = - shuffleWriteClient.sendShuffleData( - rssAppId, shuffleBlockInfoList, () -> !isValidTask(taskId)); - putBlockId(taskToSuccessBlockIds, taskId, result.getSuccessBlockIds()); - putFailedBlockSendTracker( - taskToFailedBlockSendTracker, taskId, result.getFailedBlockSendTracker()); - } finally { - Set<Long> succeedBlockIds = - result.getSuccessBlockIds() == null - ? Collections.emptySet() - : result.getSuccessBlockIds(); - for (ShuffleBlockInfo block : shuffleBlockInfoList) { - block.executeCompletionCallback(succeedBlockIds.contains(block.getBlockId())); - } + () -> { + String taskId = event.getTaskId(); + List<ShuffleBlockInfo> shuffleBlockInfoList = event.getShuffleDataInfoList(); + SendShuffleDataResult result = null; + try { + result = + shuffleWriteClient.sendShuffleData( + rssAppId, shuffleBlockInfoList, () -> !isValidTask(taskId)); + putBlockId(taskToSuccessBlockIds, taskId, result.getSuccessBlockIds()); + putFailedBlockSendTracker( + taskToFailedBlockSendTracker, taskId, result.getFailedBlockSendTracker()); + } finally { + Set<Long> succeedBlockIds = + result.getSuccessBlockIds() == null + ? Collections.emptySet() + : result.getSuccessBlockIds(); + for (ShuffleBlockInfo block : shuffleBlockInfoList) { + block.executeCompletionCallback(succeedBlockIds.contains(block.getBlockId())); + } - List<Runnable> callbackChain = - Optional.of(event.getProcessedCallbackChain()).orElse(Collections.EMPTY_LIST); - for (Runnable runnable : callbackChain) { - runnable.run(); - } - } - return shuffleBlockInfoList.stream() - .map(x -> x.getFreeMemory()) - .reduce((a, b) -> a + b) - .get(); - }, - executorService); + List<Runnable> callbackChain = + Optional.of(event.getProcessedCallbackChain()).orElse(Collections.EMPTY_LIST); + for (Runnable runnable : callbackChain) { + runnable.run(); + } + } + return shuffleBlockInfoList.stream() + .map(x -> x.getFreeMemory()) + .reduce((a, b) -> a + b) + .get(); + }, + executorService) + .exceptionally( + ex -> { + LOGGER.error("Unexpected exceptions occurred while sending shuffle data", ex); + return null; + }); } private synchronized void putBlockId( diff --git a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java index 7912e098d..bd5981b6a 100644 --- a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java +++ b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java @@ -171,69 +171,76 @@ public class ShuffleWriteClientImpl implements ShuffleWriteClient { serverToBlocks.entrySet()) { CompletableFuture<Boolean> future = CompletableFuture.supplyAsync( - () -> { - if (needCancelRequest.get()) { - LOG.info("The upstream task has been failed. Abort this data send."); - return true; - } - ShuffleServerInfo ssi = entry.getKey(); - try { - Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks = - entry.getValue(); - // todo: compact unnecessary blocks that reach replicaWrite - RssSendShuffleDataRequest request = - new RssSendShuffleDataRequest( - appId, retryMax, retryIntervalMax, shuffleIdToBlocks); - long s = System.currentTimeMillis(); - RssSendShuffleDataResponse response = - getShuffleServerClient(ssi).sendShuffleData(request); - - String logMsg = - String.format( - "ShuffleWriteClientImpl sendShuffleData with %s blocks to %s cost: %s(ms)", - serverToBlockIds.get(ssi).size(), - ssi.getId(), - System.currentTimeMillis() - s); - - if (response.getStatusCode() == StatusCode.SUCCESS) { - // mark a replica of block that has been sent - serverToBlockIds - .get(ssi) - .forEach( - blockId -> blockIdsSendSuccessTracker.get(blockId).incrementAndGet()); - if (defectiveServers != null) { - defectiveServers.remove(ssi); - } - if (LOG.isDebugEnabled()) { - LOG.debug("{} successfully.", logMsg); + () -> { + if (needCancelRequest.get()) { + LOG.info("The upstream task has been failed. Abort this data send."); + return true; } - } else { - recordFailedBlocks( - failedBlockSendTracker, serverToBlocks, ssi, response.getStatusCode()); - if (defectiveServers != null) { - defectiveServers.add(ssi); + ShuffleServerInfo ssi = entry.getKey(); + try { + Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> shuffleIdToBlocks = + entry.getValue(); + // todo: compact unnecessary blocks that reach replicaWrite + RssSendShuffleDataRequest request = + new RssSendShuffleDataRequest( + appId, retryMax, retryIntervalMax, shuffleIdToBlocks); + long s = System.currentTimeMillis(); + RssSendShuffleDataResponse response = + getShuffleServerClient(ssi).sendShuffleData(request); + + String logMsg = + String.format( + "ShuffleWriteClientImpl sendShuffleData with %s blocks to %s cost: %s(ms)", + serverToBlockIds.get(ssi).size(), + ssi.getId(), + System.currentTimeMillis() - s); + + if (response.getStatusCode() == StatusCode.SUCCESS) { + // mark a replica of block that has been sent + serverToBlockIds + .get(ssi) + .forEach( + blockId -> + blockIdsSendSuccessTracker.get(blockId).incrementAndGet()); + if (defectiveServers != null) { + defectiveServers.remove(ssi); + } + if (LOG.isDebugEnabled()) { + LOG.debug("{} successfully.", logMsg); + } + } else { + recordFailedBlocks( + failedBlockSendTracker, serverToBlocks, ssi, response.getStatusCode()); + if (defectiveServers != null) { + defectiveServers.add(ssi); + } + LOG.warn( + "{}, it failed wth statusCode[{}]", logMsg, response.getStatusCode()); + return false; + } + } catch (Exception e) { + recordFailedBlocks( + failedBlockSendTracker, serverToBlocks, ssi, StatusCode.INTERNAL_ERROR); + if (defectiveServers != null) { + defectiveServers.add(ssi); + } + LOG.warn( + "Send: " + + serverToBlockIds.get(ssi).size() + + " blocks to [" + + ssi.getId() + + "] failed.", + e); + return false; } - LOG.warn("{}, it failed wth statusCode[{}]", logMsg, response.getStatusCode()); + return true; + }, + dataTransferPool) + .exceptionally( + ex -> { + LOG.error("Unexpected exceptions occurred while sending shuffle data", ex); return false; - } - } catch (Exception e) { - recordFailedBlocks( - failedBlockSendTracker, serverToBlocks, ssi, StatusCode.INTERNAL_ERROR); - if (defectiveServers != null) { - defectiveServers.add(ssi); - } - LOG.warn( - "Send: " - + serverToBlockIds.get(ssi).size() - + " blocks to [" - + ssi.getId() - + "] failed.", - e); - return false; - } - return true; - }, - dataTransferPool); + }); futures.add(future); }