This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ff084d2852e [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry ff084d2852e is described below commit ff084d2852e62c6670e074ef423ae16c915710bc Author: Harunobu Daikoku <harunobu.daik...@agoda.com> AuthorDate: Tue Sep 26 11:07:41 2023 -0500 [SPARK-44756][CORE] Executor hangs when RetryingBlockTransferor fails to initiate retry ### What changes were proposed in this pull request? This PR fixes a bug in `RetryingBlockTransferor` that happens when retry initiation has failed. With this patch, the callers of `RetryingBlockTransfeathror#initiateRetry()` will catch any error and invoke the parent listener's exception handler. ### Why are the changes needed? This is needed to prevent an edge case where retry initiation fails and executor gets stuck. More details in SPARK-44756 ### Does this PR introduce _any_ user-facing change? N/A ### How was this patch tested? Added a new test case in `RetryingBlockTransferorSuite` that simulates the problematic scenario. <img width="772" alt="image" src="https://github.com/apache/spark/assets/17327104/f20ec327-f5c9-4d74-b861-1ea4e05eb46b"> Closes #42426 from hdaikoku/SPARK-44756. Authored-by: Harunobu Daikoku <harunobu.daik...@agoda.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../network/shuffle/RetryingBlockTransferor.java | 47 ++++++++++++++++------ .../shuffle/RetryingBlockTransferorSuite.java | 34 +++++++++++++++- 2 files changed, 67 insertions(+), 14 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java index 892de991612..c628b201b20 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java @@ -144,6 +144,11 @@ public class RetryingBlockTransferor { this(conf, transferStarter, blockIds, listener, ErrorHandler.NOOP_ERROR_HANDLER); } + @VisibleForTesting + synchronized void setCurrentListener(RetryingBlockTransferListener listener) { + this.currentListener = listener; + } + /** * Initiates the transfer of all blocks provided in the constructor, with possible retries * in the event of transient IOExceptions. @@ -176,12 +181,14 @@ public class RetryingBlockTransferor { listener.getTransferType(), blockIdsToTransfer.length, numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e); - if (shouldRetry(e)) { - initiateRetry(e); - } else { - for (String bid : blockIdsToTransfer) { - listener.onBlockTransferFailure(bid, e); - } + if (shouldRetry(e) && initiateRetry(e)) { + // successfully initiated a retry + return; + } + + // retry is not possible, so fail remaining blocks + for (String bid : blockIdsToTransfer) { + listener.onBlockTransferFailure(bid, e); } } } @@ -189,8 +196,10 @@ public class RetryingBlockTransferor { /** * Lightweight method which initiates a retry in a different thread. The retry will involve * calling transferAllOutstanding() after a configured wait time. + * Returns true if the retry was successfully initiated, false otherwise. */ - private synchronized void initiateRetry(Throwable e) { + @VisibleForTesting + synchronized boolean initiateRetry(Throwable e) { if (enableSaslRetries && e instanceof SaslTimeoutException) { saslRetryCount += 1; } @@ -201,10 +210,17 @@ public class RetryingBlockTransferor { listener.getTransferType(), retryCount, maxRetries, outstandingBlocksIds.size(), retryWaitTime); - executorService.submit(() -> { - Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS); - transferAllOutstanding(); - }); + try { + executorService.execute(() -> { + Uninterruptibles.sleepUninterruptibly(retryWaitTime, TimeUnit.MILLISECONDS); + transferAllOutstanding(); + }); + } catch (Throwable t) { + logger.error("Exception while trying to initiate retry", t); + return false; + } + + return true; } /** @@ -240,7 +256,8 @@ public class RetryingBlockTransferor { * listener. Note that in the event of a retry, we will immediately replace the 'currentListener' * field, indicating that any responses from non-current Listeners should be ignored. */ - private class RetryingBlockTransferListener implements + @VisibleForTesting + class RetryingBlockTransferListener implements BlockFetchingListener, BlockPushingListener { private void handleBlockTransferSuccess(String blockId, ManagedBuffer data) { // We will only forward this success message to our parent listener if this block request is @@ -274,7 +291,11 @@ public class RetryingBlockTransferor { synchronized (RetryingBlockTransferor.this) { if (this == currentListener && outstandingBlocksIds.contains(blockId)) { if (shouldRetry(exception)) { - initiateRetry(exception); + if (!initiateRetry(exception)) { + // failed to initiate a retry, so fail this block + outstandingBlocksIds.remove(blockId); + shouldForwardFailure = true; + } } else { if (errorHandler.shouldLogError(exception)) { logger.error( diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java index 041d88c698d..eed92ced4e1 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java @@ -372,6 +372,32 @@ public class RetryingBlockTransferorSuite { assert(_retryingBlockTransferor.getRetryCount() == MAX_RETRIES); } + @Test + public void testRetryInitiationFailure() throws IOException, InterruptedException { + BlockFetchingListener listener = mock(BlockFetchingListener.class); + + List<? extends Map<String, Object>> interactions = Arrays.asList( + // IOException will initiate a retry, but the initiation will fail + ImmutableMap.<String, Object>builder() + .put("b0", new IOException("Connection failed or something")) + .put("b1", block1) + .build() + ); + + configureInteractions(interactions, listener); + _retryingBlockTransferor = spy(_retryingBlockTransferor); + // Simulate a failure to initiate a retry. + doReturn(false).when(_retryingBlockTransferor).initiateRetry(any()); + // Override listener, so that it delegates to the spied instance and not the original class. + _retryingBlockTransferor.setCurrentListener( + _retryingBlockTransferor.new RetryingBlockTransferListener()); + _retryingBlockTransferor.start(); + + verify(listener, timeout(5000)).onBlockTransferFailure(eq("b0"), any()); + verify(listener, timeout(5000)).onBlockTransferSuccess("b1", block1); + verifyNoMoreInteractions(listener); + } + /** * Performs a set of interactions in response to block requests from a RetryingBlockFetcher. * Each interaction is a Map from BlockId to either ManagedBuffer or Exception. This interaction @@ -384,6 +410,13 @@ public class RetryingBlockTransferorSuite { */ private static void performInteractions(List<? extends Map<String, Object>> interactions, BlockFetchingListener listener) + throws IOException, InterruptedException { + configureInteractions(interactions, listener); + _retryingBlockTransferor.start(); + } + + private static void configureInteractions(List<? extends Map<String, Object>> interactions, + BlockFetchingListener listener) throws IOException, InterruptedException { MapConfigProvider provider = new MapConfigProvider(configMap); @@ -440,6 +473,5 @@ public class RetryingBlockTransferorSuite { String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]); _retryingBlockTransferor = new RetryingBlockTransferor(conf, fetchStarter, blockIdArray, listener); - _retryingBlockTransferor.start(); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org