This is an automated email from the ASF dual-hosted git repository.

zhengchenyu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/branch-0.9 by this push:
     new 72f8305a3 [#1743] fix: Add exception handling for thread pools (#1744)
72f8305a3 is described below

commit 72f8305a3b23761cf86df43c0cc29ecf5d9ef8b6
Author: RickyMa <rick...@tencent.com>
AuthorDate: Thu May 30 14:50:07 2024 +0800

    [#1743] fix: Add exception handling for thread pools (#1744)
    
    ### What changes were proposed in this pull request?
    
    Add exception handling for thread pools.
    
    ### Why are the changes needed?
    
    Fix: https://github.com/apache/incubator-uniffle/issues/1743.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing UTs.
---
 .../apache/spark/shuffle/writer/DataPusher.java    |  67 ++++++-----
 .../client/impl/ShuffleWriteClientImpl.java        | 127 +++++++++++----------
 2 files changed, 103 insertions(+), 91 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
index 1517b7173..e9ef2ba61 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/DataPusher.java
@@ -85,38 +85,43 @@ public class DataPusher implements Closeable {
       throw new RssException("RssAppId should be set.");
     }
     return CompletableFuture.supplyAsync(
-        () -> {
-          String taskId = event.getTaskId();
-          List<ShuffleBlockInfo> shuffleBlockInfoList = 
event.getShuffleDataInfoList();
-          SendShuffleDataResult result = null;
-          try {
-            result =
-                shuffleWriteClient.sendShuffleData(
-                    rssAppId, shuffleBlockInfoList, () -> 
!isValidTask(taskId));
-            putBlockId(taskToSuccessBlockIds, taskId, 
result.getSuccessBlockIds());
-            putFailedBlockSendTracker(
-                taskToFailedBlockSendTracker, taskId, 
result.getFailedBlockSendTracker());
-          } finally {
-            Set<Long> succeedBlockIds =
-                result.getSuccessBlockIds() == null
-                    ? Collections.emptySet()
-                    : result.getSuccessBlockIds();
-            for (ShuffleBlockInfo block : shuffleBlockInfoList) {
-              
block.executeCompletionCallback(succeedBlockIds.contains(block.getBlockId()));
-            }
+            () -> {
+              String taskId = event.getTaskId();
+              List<ShuffleBlockInfo> shuffleBlockInfoList = 
event.getShuffleDataInfoList();
+              SendShuffleDataResult result = null;
+              try {
+                result =
+                    shuffleWriteClient.sendShuffleData(
+                        rssAppId, shuffleBlockInfoList, () -> 
!isValidTask(taskId));
+                putBlockId(taskToSuccessBlockIds, taskId, 
result.getSuccessBlockIds());
+                putFailedBlockSendTracker(
+                    taskToFailedBlockSendTracker, taskId, 
result.getFailedBlockSendTracker());
+              } finally {
+                Set<Long> succeedBlockIds =
+                    result.getSuccessBlockIds() == null
+                        ? Collections.emptySet()
+                        : result.getSuccessBlockIds();
+                for (ShuffleBlockInfo block : shuffleBlockInfoList) {
+                  
block.executeCompletionCallback(succeedBlockIds.contains(block.getBlockId()));
+                }
 
-            List<Runnable> callbackChain =
-                
Optional.of(event.getProcessedCallbackChain()).orElse(Collections.EMPTY_LIST);
-            for (Runnable runnable : callbackChain) {
-              runnable.run();
-            }
-          }
-          return shuffleBlockInfoList.stream()
-              .map(x -> x.getFreeMemory())
-              .reduce((a, b) -> a + b)
-              .get();
-        },
-        executorService);
+                List<Runnable> callbackChain =
+                    
Optional.of(event.getProcessedCallbackChain()).orElse(Collections.EMPTY_LIST);
+                for (Runnable runnable : callbackChain) {
+                  runnable.run();
+                }
+              }
+              return shuffleBlockInfoList.stream()
+                  .map(x -> x.getFreeMemory())
+                  .reduce((a, b) -> a + b)
+                  .get();
+            },
+            executorService)
+        .exceptionally(
+            ex -> {
+              LOGGER.error("Unexpected exceptions occurred while sending 
shuffle data", ex);
+              return null;
+            });
   }
 
   private synchronized void putBlockId(
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 7912e098d..bd5981b6a 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -171,69 +171,76 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
         serverToBlocks.entrySet()) {
       CompletableFuture<Boolean> future =
           CompletableFuture.supplyAsync(
-              () -> {
-                if (needCancelRequest.get()) {
-                  LOG.info("The upstream task has been failed. Abort this data 
send.");
-                  return true;
-                }
-                ShuffleServerInfo ssi = entry.getKey();
-                try {
-                  Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> 
shuffleIdToBlocks =
-                      entry.getValue();
-                  // todo: compact unnecessary blocks that reach replicaWrite
-                  RssSendShuffleDataRequest request =
-                      new RssSendShuffleDataRequest(
-                          appId, retryMax, retryIntervalMax, 
shuffleIdToBlocks);
-                  long s = System.currentTimeMillis();
-                  RssSendShuffleDataResponse response =
-                      getShuffleServerClient(ssi).sendShuffleData(request);
-
-                  String logMsg =
-                      String.format(
-                          "ShuffleWriteClientImpl sendShuffleData with %s 
blocks to %s cost: %s(ms)",
-                          serverToBlockIds.get(ssi).size(),
-                          ssi.getId(),
-                          System.currentTimeMillis() - s);
-
-                  if (response.getStatusCode() == StatusCode.SUCCESS) {
-                    // mark a replica of block that has been sent
-                    serverToBlockIds
-                        .get(ssi)
-                        .forEach(
-                            blockId -> 
blockIdsSendSuccessTracker.get(blockId).incrementAndGet());
-                    if (defectiveServers != null) {
-                      defectiveServers.remove(ssi);
-                    }
-                    if (LOG.isDebugEnabled()) {
-                      LOG.debug("{} successfully.", logMsg);
+                  () -> {
+                    if (needCancelRequest.get()) {
+                      LOG.info("The upstream task has been failed. Abort this 
data send.");
+                      return true;
                     }
-                  } else {
-                    recordFailedBlocks(
-                        failedBlockSendTracker, serverToBlocks, ssi, 
response.getStatusCode());
-                    if (defectiveServers != null) {
-                      defectiveServers.add(ssi);
+                    ShuffleServerInfo ssi = entry.getKey();
+                    try {
+                      Map<Integer, Map<Integer, List<ShuffleBlockInfo>>> 
shuffleIdToBlocks =
+                          entry.getValue();
+                      // todo: compact unnecessary blocks that reach 
replicaWrite
+                      RssSendShuffleDataRequest request =
+                          new RssSendShuffleDataRequest(
+                              appId, retryMax, retryIntervalMax, 
shuffleIdToBlocks);
+                      long s = System.currentTimeMillis();
+                      RssSendShuffleDataResponse response =
+                          getShuffleServerClient(ssi).sendShuffleData(request);
+
+                      String logMsg =
+                          String.format(
+                              "ShuffleWriteClientImpl sendShuffleData with %s 
blocks to %s cost: %s(ms)",
+                              serverToBlockIds.get(ssi).size(),
+                              ssi.getId(),
+                              System.currentTimeMillis() - s);
+
+                      if (response.getStatusCode() == StatusCode.SUCCESS) {
+                        // mark a replica of block that has been sent
+                        serverToBlockIds
+                            .get(ssi)
+                            .forEach(
+                                blockId ->
+                                    
blockIdsSendSuccessTracker.get(blockId).incrementAndGet());
+                        if (defectiveServers != null) {
+                          defectiveServers.remove(ssi);
+                        }
+                        if (LOG.isDebugEnabled()) {
+                          LOG.debug("{} successfully.", logMsg);
+                        }
+                      } else {
+                        recordFailedBlocks(
+                            failedBlockSendTracker, serverToBlocks, ssi, 
response.getStatusCode());
+                        if (defectiveServers != null) {
+                          defectiveServers.add(ssi);
+                        }
+                        LOG.warn(
+                            "{}, it failed wth statusCode[{}]", logMsg, 
response.getStatusCode());
+                        return false;
+                      }
+                    } catch (Exception e) {
+                      recordFailedBlocks(
+                          failedBlockSendTracker, serverToBlocks, ssi, 
StatusCode.INTERNAL_ERROR);
+                      if (defectiveServers != null) {
+                        defectiveServers.add(ssi);
+                      }
+                      LOG.warn(
+                          "Send: "
+                              + serverToBlockIds.get(ssi).size()
+                              + " blocks to ["
+                              + ssi.getId()
+                              + "] failed.",
+                          e);
+                      return false;
                     }
-                    LOG.warn("{}, it failed wth statusCode[{}]", logMsg, 
response.getStatusCode());
+                    return true;
+                  },
+                  dataTransferPool)
+              .exceptionally(
+                  ex -> {
+                    LOG.error("Unexpected exceptions occurred while sending 
shuffle data", ex);
                     return false;
-                  }
-                } catch (Exception e) {
-                  recordFailedBlocks(
-                      failedBlockSendTracker, serverToBlocks, ssi, 
StatusCode.INTERNAL_ERROR);
-                  if (defectiveServers != null) {
-                    defectiveServers.add(ssi);
-                  }
-                  LOG.warn(
-                      "Send: "
-                          + serverToBlockIds.get(ssi).size()
-                          + " blocks to ["
-                          + ssi.getId()
-                          + "] failed.",
-                      e);
-                  return false;
-                }
-                return true;
-              },
-              dataTransferPool);
+                  });
       futures.add(future);
     }
 

Reply via email to