This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ff084d2852e [SPARK-44756][CORE] Executor hangs when 
RetryingBlockTransferor fails to initiate retry
ff084d2852e is described below

commit ff084d2852e62c6670e074ef423ae16c915710bc
Author: Harunobu Daikoku <harunobu.daik...@agoda.com>
AuthorDate: Tue Sep 26 11:07:41 2023 -0500

    [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to 
initiate retry
    
    ### What changes were proposed in this pull request?
    This PR fixes a bug in `RetryingBlockTransferor` that happens when retry 
initiation has failed.
    
    With this patch, the callers of 
`RetryingBlockTransfeathror#initiateRetry()` will catch any error and invoke 
the parent listener's exception handler.
    
    ### Why are the changes needed?
    This is needed to prevent an edge case where retry initiation fails and 
executor gets stuck.
    
    More details in SPARK-44756
    
    ### Does this PR introduce _any_ user-facing change?
    N/A
    
    ### How was this patch tested?
    Added a new test case in `RetryingBlockTransferorSuite` that simulates the 
problematic scenario.
    
    <img width="772" alt="image" 
src="https://github.com/apache/spark/assets/17327104/f20ec327-f5c9-4d74-b861-1ea4e05eb46b";>
    
    Closes #42426 from hdaikoku/SPARK-44756.
    
    Authored-by: Harunobu Daikoku <harunobu.daik...@agoda.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../network/shuffle/RetryingBlockTransferor.java   | 47 ++++++++++++++++------
 .../shuffle/RetryingBlockTransferorSuite.java      | 34 +++++++++++++++-
 2 files changed, 67 insertions(+), 14 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
index 892de991612..c628b201b20 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
@@ -144,6 +144,11 @@ public class RetryingBlockTransferor {
     this(conf, transferStarter, blockIds, listener, 
ErrorHandler.NOOP_ERROR_HANDLER);
   }
 
+  @VisibleForTesting
+  synchronized void setCurrentListener(RetryingBlockTransferListener listener) 
{
+    this.currentListener = listener;
+  }
+
   /**
    * Initiates the transfer of all blocks provided in the constructor, with 
possible retries
    * in the event of transient IOExceptions.
@@ -176,12 +181,14 @@ public class RetryingBlockTransferor {
         listener.getTransferType(), blockIdsToTransfer.length,
         numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);
 
-      if (shouldRetry(e)) {
-        initiateRetry(e);
-      } else {
-        for (String bid : blockIdsToTransfer) {
-          listener.onBlockTransferFailure(bid, e);
-        }
+      if (shouldRetry(e) && initiateRetry(e)) {
+        // successfully initiated a retry
+        return;
+      }
+
+      // retry is not possible, so fail remaining blocks
+      for (String bid : blockIdsToTransfer) {
+        listener.onBlockTransferFailure(bid, e);
       }
     }
   }
@@ -189,8 +196,10 @@ public class RetryingBlockTransferor {
   /**
    * Lightweight method which initiates a retry in a different thread. The 
retry will involve
    * calling transferAllOutstanding() after a configured wait time.
+   * Returns true if the retry was successfully initiated, false otherwise.
    */
-  private synchronized void initiateRetry(Throwable e) {
+  @VisibleForTesting
+  synchronized boolean initiateRetry(Throwable e) {
     if (enableSaslRetries && e instanceof SaslTimeoutException) {
       saslRetryCount += 1;
     }
@@ -201,10 +210,17 @@ public class RetryingBlockTransferor {
       listener.getTransferType(), retryCount, maxRetries, 
outstandingBlocksIds.size(),
       retryWaitTime);
 
-    executorService.submit(() -> {
-      Uninterruptibles.sleepUninterruptibly(retryWaitTime, 
TimeUnit.MILLISECONDS);
-      transferAllOutstanding();
-    });
+    try {
+      executorService.execute(() -> {
+        Uninterruptibles.sleepUninterruptibly(retryWaitTime, 
TimeUnit.MILLISECONDS);
+        transferAllOutstanding();
+      });
+    } catch (Throwable t) {
+      logger.error("Exception while trying to initiate retry", t);
+      return false;
+    }
+
+    return true;
   }
 
   /**
@@ -240,7 +256,8 @@ public class RetryingBlockTransferor {
    * listener. Note that in the event of a retry, we will immediately replace 
the 'currentListener'
    * field, indicating that any responses from non-current Listeners should be 
ignored.
    */
-  private class RetryingBlockTransferListener implements
+  @VisibleForTesting
+  class RetryingBlockTransferListener implements
       BlockFetchingListener, BlockPushingListener {
     private void handleBlockTransferSuccess(String blockId, ManagedBuffer 
data) {
       // We will only forward this success message to our parent listener if 
this block request is
@@ -274,7 +291,11 @@ public class RetryingBlockTransferor {
       synchronized (RetryingBlockTransferor.this) {
         if (this == currentListener && outstandingBlocksIds.contains(blockId)) 
{
           if (shouldRetry(exception)) {
-            initiateRetry(exception);
+            if (!initiateRetry(exception)) {
+              // failed to initiate a retry, so fail this block
+              outstandingBlocksIds.remove(blockId);
+              shouldForwardFailure = true;
+            }
           } else {
             if (errorHandler.shouldLogError(exception)) {
               logger.error(
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java
index 041d88c698d..eed92ced4e1 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java
@@ -372,6 +372,32 @@ public class RetryingBlockTransferorSuite {
     assert(_retryingBlockTransferor.getRetryCount() == MAX_RETRIES);
   }
 
+  @Test
+  public void testRetryInitiationFailure() throws IOException, 
InterruptedException {
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+
+    List<? extends Map<String, Object>> interactions = Arrays.asList(
+        // IOException will initiate a retry, but the initiation will fail
+        ImmutableMap.<String, Object>builder()
+            .put("b0", new IOException("Connection failed or something"))
+            .put("b1", block1)
+            .build()
+    );
+
+    configureInteractions(interactions, listener);
+    _retryingBlockTransferor = spy(_retryingBlockTransferor);
+    // Simulate a failure to initiate a retry.
+    doReturn(false).when(_retryingBlockTransferor).initiateRetry(any());
+    // Override listener, so that it delegates to the spied instance and not 
the original class.
+    _retryingBlockTransferor.setCurrentListener(
+        _retryingBlockTransferor.new RetryingBlockTransferListener());
+    _retryingBlockTransferor.start();
+
+    verify(listener, timeout(5000)).onBlockTransferFailure(eq("b0"), any());
+    verify(listener, timeout(5000)).onBlockTransferSuccess("b1", block1);
+    verifyNoMoreInteractions(listener);
+  }
+
   /**
    * Performs a set of interactions in response to block requests from a 
RetryingBlockFetcher.
    * Each interaction is a Map from BlockId to either ManagedBuffer or 
Exception. This interaction
@@ -384,6 +410,13 @@ public class RetryingBlockTransferorSuite {
    */
   private static void performInteractions(List<? extends Map<String, Object>> 
interactions,
                                           BlockFetchingListener listener)
+      throws IOException, InterruptedException {
+    configureInteractions(interactions, listener);
+    _retryingBlockTransferor.start();
+  }
+
+  private static void configureInteractions(List<? extends Map<String, 
Object>> interactions,
+                                          BlockFetchingListener listener)
     throws IOException, InterruptedException {
 
     MapConfigProvider provider = new MapConfigProvider(configMap);
@@ -440,6 +473,5 @@ public class RetryingBlockTransferorSuite {
     String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
     _retryingBlockTransferor =
         new RetryingBlockTransferor(conf, fetchStarter, blockIdArray, 
listener);
-    _retryingBlockTransferor.start();
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to