This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new c6b683e  [SPARK-36378][SHUFFLE] Switch to using RPCResponse to 
communicate common block push failures to the client
c6b683e is described below

commit c6b683e5a2b547b0259c83c90e69739252eb8fbf
Author: Min Shen <ms...@linkedin.com>
AuthorDate: Tue Aug 10 16:46:55 2021 -0500

    [SPARK-36378][SHUFFLE] Switch to using RPCResponse to communicate common 
block push failures to the client
    
    We have run performance evaluations on the version of push-based shuffle 
committed to upstream so far, and have identified a few places for further 
improvements:
    1. On the server side, we have noticed that the usage of `String.format`, 
especially when receiving a block push request, has a much higher overhead 
compared with string concatenation.
    2. On the server side, the usage of `Throwables.getStackTraceAsString` in 
the `ErrorHandler.shouldRetryError` and `ErrorHandler.shouldLogError` has 
generated quite some overhead.
    
    These 2 issues are related to how we are currently handling certain common 
block push failures.
    We are communicating such failures via `RPCFailure` by transmitting the 
exception stack trace.
    This generates the overhead on both server and client side for creating 
these exceptions and makes checking the type of failures fragile and 
inefficient with string matching of exception stack trace.
    To address these, this PR also proposes to encode the common block push 
failure as an error code and send that back to the client with a proper RPC 
message.
    
    Improve shuffle service efficiency for push-based shuffle.
    Improve code robustness for handling block push failures.
    
    No
    
    Existing unit tests.
    
    Closes #33613 from Victsm/SPARK-36378.
    
    Lead-authored-by: Min Shen <ms...@linkedin.com>
    Co-authored-by: Min Shen <victor....@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit 3f09093a21306b0fbcb132d4c9f285e56ac6b43c)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../spark/network/client/StreamCallbackWithID.java |  10 ++
 .../network/server/BlockPushNonFatalFailure.java   | 157 +++++++++++++++++++++
 .../network/server/TransportRequestHandler.java    |  10 +-
 .../apache/spark/network/shuffle/ErrorHandler.java |  33 +----
 .../network/shuffle/ExternalBlockStoreClient.java  |   2 +-
 .../network/shuffle/OneForOneBlockPusher.java      |  85 ++++++-----
 .../network/shuffle/RemoteBlockPushResolver.java   | 147 ++++++++++++-------
 .../shuffle/protocol/BlockPushReturnCode.java      |  94 ++++++++++++
 .../shuffle/protocol/BlockTransferMessage.java     |   4 +-
 .../spark/network/shuffle/ErrorHandlerSuite.java   |  23 +--
 .../network/shuffle/OneForOneBlockPusherSuite.java |  14 +-
 .../shuffle/RemoteBlockPushResolverSuite.java      | 105 +++++++++-----
 .../apache/spark/shuffle/ShuffleBlockPusher.scala  |  12 +-
 .../spark/shuffle/ShuffleBlockPusherSuite.scala    |  38 ++---
 14 files changed, 547 insertions(+), 187 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/client/StreamCallbackWithID.java
 
b/common/network-common/src/main/java/org/apache/spark/network/client/StreamCallbackWithID.java
index bd173b6..3ee524a 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/client/StreamCallbackWithID.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/client/StreamCallbackWithID.java
@@ -17,6 +17,16 @@
 
 package org.apache.spark.network.client;
 
+import java.nio.ByteBuffer;
+
 public interface StreamCallbackWithID extends StreamCallback {
   String getID();
+
+  /**
+   * Response to return to client upon the completion of a stream. Currently 
only invoked in
+   * {@link 
org.apache.spark.network.server.TransportRequestHandler#processStreamUpload}
+   */
+  default ByteBuffer getCompletionResponse() {
+    return ByteBuffer.allocate(0);
+  }
 }
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
new file mode 100644
index 0000000..5906fa2
--- /dev/null
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/BlockPushNonFatalFailure.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.server;
+
+import java.nio.ByteBuffer;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * A special RuntimeException thrown when shuffle service experiences a 
non-fatal failure
+ * with handling block push requests with push-based shuffle. Due to the 
best-effort nature
+ * of push-based shuffle, there are cases where the exceptions gets thrown 
under certain
+ * relatively common cases such as when a pushed block is received after the 
corresponding
+ * shuffle is merge finalized or when a pushed block experiences merge 
collision. Under these
+ * scenarios, we throw this special RuntimeException.
+ */
+public class BlockPushNonFatalFailure extends RuntimeException {
+  /**
+   * String constant used for generating exception messages indicating a block 
to be merged
+   * arrives too late on the server side. When we get a block push failure 
because of the
+   * block arrives too late, we will not retry pushing the block nor log the 
exception on
+   * the client side.
+   */
+  public static final String TOO_LATE_BLOCK_PUSH_MESSAGE_SUFFIX =
+    " is received after merged shuffle is finalized";
+
+  /**
+   * String constant used for generating exception messages indicating a block 
to be merged
+   * is a stale block push in the case of indeterminate stage retries on the 
server side.
+   * When we get a block push failure because of the block push being stale, 
we will not
+   * retry pushing the block nor log the exception on the client side.
+   */
+  public static final String STALE_BLOCK_PUSH_MESSAGE_SUFFIX =
+    " is a stale block push from an indeterminate stage retry";
+
+  /**
+   * String constant used for generating exception messages indicating the 
server couldn't
+   * append a block after all available attempts due to collision with other 
blocks belonging
+   * to the same shuffle partition. When we get a block push failure because 
of the block
+   * couldn't be written due to this reason, we will not log the exception on 
the client side.
+   */
+  public static final String BLOCK_APPEND_COLLISION_MSG_SUFFIX =
+    " experienced merge collision on the server side";
+
+  /**
+   * The error code of the failure, encoded as a ByteBuffer to be responded 
back to the client.
+   * Instead of responding a RPCFailure with the exception stack trace as the 
payload,
+   * which makes checking the content of the exception very tedious on the 
client side,
+   * we can respond a proper RPCResponse to make it more robust and efficient. 
This
+   * field is only set on the shuffle server side when the exception is 
originally generated.
+   */
+  private ByteBuffer response;
+
+  /**
+   * The error code of the failure. This field is only set on the client side 
when a
+   * BlockPushNonFatalFailure is recreated from the error code received from 
the server.
+   */
+  private ReturnCode returnCode;
+
+  public BlockPushNonFatalFailure(ByteBuffer response, String msg) {
+    super(msg);
+    this.response = response;
+  }
+
+  public BlockPushNonFatalFailure(ReturnCode returnCode, String msg) {
+    super(msg);
+    this.returnCode = returnCode;
+  }
+
+  /**
+   * Since this type of exception is used to only convey the error code, we 
reduce the
+   * exception initialization overhead by skipping filling the stack trace.
+   */
+  @Override
+  public synchronized Throwable fillInStackTrace() {
+    return this;
+  }
+
+  public ByteBuffer getResponse() {
+    // Ensure we do not invoke this method if response is not set
+    Preconditions.checkNotNull(response);
+    return response;
+  }
+
+  public ReturnCode getReturnCode() {
+    // Ensure we do not invoke this method if returnCode is not set
+    Preconditions.checkNotNull(returnCode);
+    return returnCode;
+  }
+
+  public enum ReturnCode {
+    /**
+     * Indicate the case of a successful merge of a pushed block.
+     */
+    SUCCESS(0, ""),
+    /**
+     * Indicate a block to be merged arrives too late on the server side, i.e. 
after the
+     * corresponding shuffle has been merge finalized. When the client gets 
this code, it
+     * will not retry pushing the block.
+     */
+    TOO_LATE_BLOCK_PUSH(1, TOO_LATE_BLOCK_PUSH_MESSAGE_SUFFIX),
+    /**
+     * Indicating the server couldn't append a block after all available 
attempts due to
+     * collision with other blocks belonging to the same shuffle partition.
+     */
+    BLOCK_APPEND_COLLISION_DETECTED(2, BLOCK_APPEND_COLLISION_MSG_SUFFIX),
+    /**
+     * Indicate a block received on the server side is a stale block push in 
the case of
+     * indeterminate stage retries. When the client receives this code, it 
will not retry
+     * pushing the block.
+     */
+    STALE_BLOCK_PUSH(3, STALE_BLOCK_PUSH_MESSAGE_SUFFIX);
+
+    private final byte id;
+    // Error message suffix used to generate an error message for a given 
ReturnCode and
+    // a given block ID
+    private final String errorMsgSuffix;
+
+    ReturnCode(int id, String errorMsgSuffix) {
+      assert id < 128 : "Cannot have more than 128 block push return code";
+      this.id = (byte) id;
+      this.errorMsgSuffix = errorMsgSuffix;
+    }
+
+    public byte id() { return id; }
+  }
+
+  public static ReturnCode getReturnCode(byte id) {
+    switch (id) {
+      case 0: return ReturnCode.SUCCESS;
+      case 1: return ReturnCode.TOO_LATE_BLOCK_PUSH;
+      case 2: return ReturnCode.BLOCK_APPEND_COLLISION_DETECTED;
+      case 3: return ReturnCode.STALE_BLOCK_PUSH;
+      default: throw new IllegalArgumentException("Unknown block push return 
code: " + id);
+    }
+  }
+
+  public static String getErrorMsg(String blockId, ReturnCode errorCode) {
+    Preconditions.checkArgument(errorCode != ReturnCode.SUCCESS);
+    return "Block " + blockId + errorCode.errorMsgSuffix;
+  }
+}
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
index ab2deac..5c07f20 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportRequestHandler.java
@@ -213,7 +213,15 @@ public class TransportRequestHandler extends 
MessageHandler<RequestMessage> {
         public void onComplete(String streamId) throws IOException {
            try {
              streamHandler.onComplete(streamId);
-             callback.onSuccess(ByteBuffer.allocate(0));
+             callback.onSuccess(streamHandler.getCompletionResponse());
+           } catch (BlockPushNonFatalFailure ex) {
+             // Respond an RPC message with the error code to client instead 
of using exceptions
+             // encoded in the RPCFailure. This type of exceptions gets thrown 
more frequently
+             // than a regular exception on the shuffle server side due to the 
best-effort nature
+             // of push-based shuffle and requires special handling on the 
client side. Using a
+             // proper RPCResponse is more efficient.
+             callback.onSuccess(ex.getResponse());
+             streamHandler.onFailure(streamId, ex);
            } catch (Exception ex) {
              IOException ioExc = new IOException("Failure post-processing 
complete stream;" +
                " failing this rpc and leaving channel active", ex);
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
index 0149ad7..271d762 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
@@ -23,6 +23,9 @@ import java.net.ConnectException;
 import com.google.common.base.Throwables;
 
 import org.apache.spark.annotation.Evolving;
+import org.apache.spark.network.server.BlockPushNonFatalFailure;
+
+import static 
org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode.*;
 
 /**
  * Plugs into {@link RetryingBlockTransferor} to further control when an 
exception should be retried
@@ -54,27 +57,6 @@ public interface ErrorHandler {
    */
   class BlockPushErrorHandler implements ErrorHandler {
     /**
-     * String constant used for generating exception messages indicating a 
block to be merged
-     * arrives too late or stale block push in the case of indeterminate stage 
retries on the
-     * server side, and also for later checking such exceptions on the client 
side. When we get
-     * a block push failure because of the block push being stale or arrives 
too late, we will
-     * not retry pushing the block nor log the exception on the client side.
-     */
-    public static final String TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX =
-      "received after merged shuffle is finalized or stale block push as 
shuffle blocks of a"
-        + " higher shuffleMergeId for the shuffle is being pushed";
-
-    /**
-     * String constant used for generating exception messages indicating the 
server couldn't
-     * append a block after all available attempts due to collision with other 
blocks belonging
-     * to the same shuffle partition, and also for later checking such 
exceptions on the client
-     * side. When we get a block push failure because of the block couldn't be 
written due to
-     * this reason, we will not log the exception on the client side.
-     */
-    public static final String BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX =
-      "Couldn't find an opportunity to write block";
-
-    /**
      * String constant used for generating exception messages indicating the 
server encountered
      * IOExceptions multiple times, greater than the configured threshold, 
while trying to merged
      * shuffle blocks of the same shuffle partition. When the client receives 
this this response,
@@ -105,16 +87,15 @@ public interface ErrorHandler {
         return false;
       }
 
-      String errorStackTrace = Throwables.getStackTraceAsString(t);
       // If the block is too late or stale block push, there is no need to 
retry it
-      return 
!errorStackTrace.contains(TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX);
+      return !(t instanceof BlockPushNonFatalFailure &&
+        (((BlockPushNonFatalFailure) t).getReturnCode() == TOO_LATE_BLOCK_PUSH 
||
+          ((BlockPushNonFatalFailure) t).getReturnCode() == STALE_BLOCK_PUSH));
     }
 
     @Override
     public boolean shouldLogError(Throwable t) {
-      String errorStackTrace = Throwables.getStackTraceAsString(t);
-      return 
!(errorStackTrace.contains(BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX) ||
-        errorStackTrace.contains(TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX));
+      return !(t instanceof BlockPushNonFatalFailure);
     }
   }
 
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
index 826402c..4c0e9f3 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
@@ -218,7 +218,7 @@ public class ExternalBlockStoreClient extends 
BlockStoreClient {
           public void onSuccess(int numChunks, ManagedBuffer buffer) {
             logger.trace("Successfully got merged block meta for shuffleId {} 
shuffleMergeId {}"
               + " reduceId {}", shuffleId, shuffleMergeId, reduceId);
-            listener.onSuccess(shuffleId, reduceId, shuffleMergeId,
+            listener.onSuccess(shuffleId, shuffleMergeId, reduceId,
               new MergedBlockMeta(numChunks, buffer));
           }
 
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
index f9d313c..8885dc9 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
@@ -21,6 +21,7 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Map;
 
+import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -28,6 +29,10 @@ import org.apache.spark.network.buffer.ManagedBuffer;
 import org.apache.spark.network.buffer.NioManagedBuffer;
 import org.apache.spark.network.client.RpcResponseCallback;
 import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.server.BlockPushNonFatalFailure;
+import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode;
+import org.apache.spark.network.shuffle.protocol.BlockPushReturnCode;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
 import org.apache.spark.network.shuffle.protocol.PushBlockStream;
 
 /**
@@ -77,42 +82,58 @@ public class OneForOneBlockPusher {
 
     @Override
     public void onSuccess(ByteBuffer response) {
-      // On receipt of a successful block push
-      listener.onBlockPushSuccess(blockId, new 
NioManagedBuffer(ByteBuffer.allocate(0)));
+      BlockPushReturnCode pushResponse =
+        (BlockPushReturnCode) 
BlockTransferMessage.Decoder.fromByteBuffer(response);
+      // If the return code is not SUCCESS, the server has responded some 
error code. Handle
+      // the error accordingly.
+      ReturnCode returnCode = 
BlockPushNonFatalFailure.getReturnCode(pushResponse.returnCode);
+      if (returnCode != ReturnCode.SUCCESS) {
+        String blockId = pushResponse.failureBlockId;
+        Preconditions.checkArgument(!blockId.isEmpty());
+        checkAndFailRemainingBlocks(index, new 
BlockPushNonFatalFailure(returnCode,
+          BlockPushNonFatalFailure.getErrorMsg(blockId, returnCode)));
+      } else {
+        // On receipt of a successful block push
+        listener.onBlockPushSuccess(blockId, new 
NioManagedBuffer(ByteBuffer.allocate(0)));
+      }
     }
 
     @Override
     public void onFailure(Throwable e) {
-      // Since block push is best effort, i.e., if we encounter a block push 
failure that's still
-      // retriable according to ErrorHandler (not a connection exception and 
the block is not too
-      // late), we should not fail all remaining block pushes even though
-      // RetryingBlockTransferor might consider this failure not retriable 
(exceeding max retry
-      // count etc). The best effort nature makes block push tolerable of a 
partial completion.
-      // Thus, we only fail the block that's actually failed in this case. 
Note that, on the
-      // RetryingBlockTransferor side, if retry is initiated, it would still 
invalidate the
-      // previous active retry listener, and retry pushing all outstanding 
blocks. However, since
-      // the blocks to be pushed are preloaded into memory and the first 
attempt of pushing these
-      // blocks might have already succeeded, retry pushing all the 
outstanding blocks should be
-      // very cheap (on the client side, the block data is in memory; on the 
server side, the block
-      // will be recognized as a duplicate which triggers noop handling). 
Here, by failing only the
-      // one block that's actually failed, we are essentially preventing 
forwarding unnecessary
-      // block push failures to the parent listener of the retry listener.
-      //
-      // Take the following as an example. For the common exception during 
block push handling,
-      // i.e. block collision, it is considered as retriable by ErrorHandler 
but not retriable
-      // by RetryingBlockTransferor. When we encounter a failure of this type, 
we only fail the
-      // one block encountering this issue not the remaining blocks in the 
same batch. On the
-      // RetryingBlockTransferor side, since this exception is considered as 
not retriable, it
-      // would immediately invoke parent listener's onBlockTransferFailure. 
However, the remaining
-      // blocks in the same batch would remain current and active and they 
won't be impacted by
-      // this exception.
-      if (PUSH_ERROR_HANDLER.shouldRetryError(e)) {
-        String[] targetBlockId = Arrays.copyOfRange(blockIds, index, index + 
1);
-        failRemainingBlocks(targetBlockId, e);
-      } else {
-        String[] targetBlockId = Arrays.copyOfRange(blockIds, index, 
blockIds.length);
-        failRemainingBlocks(targetBlockId, e);
-      }
+      checkAndFailRemainingBlocks(index, e);
+    }
+  }
+
+  private void checkAndFailRemainingBlocks(int index, Throwable e) {
+    // Since block push is best effort, i.e., if we encounter a block push 
failure that's still
+    // retriable according to ErrorHandler (not a connection exception and the 
block is not too
+    // late), we should not fail all remaining block pushes even though
+    // RetryingBlockTransferor might consider this failure not retriable 
(exceeding max retry
+    // count etc). The best effort nature makes block push tolerable of a 
partial completion.
+    // Thus, we only fail the block that's actually failed in this case. Note 
that, on the
+    // RetryingBlockTransferor side, if retry is initiated, it would still 
invalidate the
+    // previous active retry listener, and retry pushing all outstanding 
blocks. However, since
+    // the blocks to be pushed are preloaded into memory and the first attempt 
of pushing these
+    // blocks might have already succeeded, retry pushing all the outstanding 
blocks should be
+    // very cheap (on the client side, the block data is in memory; on the 
server side, the block
+    // will be recognized as a duplicate which triggers noop handling). Here, 
by failing only the
+    // one block that's actually failed, we are essentially preventing 
forwarding unnecessary
+    // block push failures to the parent listener of the retry listener.
+    //
+    // Take the following as an example. For the common exception during block 
push handling,
+    // i.e. block collision, it is considered as retriable by ErrorHandler but 
not retriable
+    // by RetryingBlockTransferor. When we encounter a failure of this type, 
we only fail the
+    // one block encountering this issue not the remaining blocks in the same 
batch. On the
+    // RetryingBlockTransferor side, since this exception is considered as not 
retriable, it
+    // would immediately invoke parent listener's onBlockTransferFailure. 
However, the remaining
+    // blocks in the same batch would remain current and active and they won't 
be impacted by
+    // this exception.
+    if (PUSH_ERROR_HANDLER.shouldRetryError(e)) {
+      String[] targetBlockId = Arrays.copyOfRange(blockIds, index, index + 1);
+      failRemainingBlocks(targetBlockId, e);
+    } else {
+      String[] targetBlockId = Arrays.copyOfRange(blockIds, index, 
blockIds.length);
+      failRemainingBlocks(targetBlockId, e);
     }
   }
 
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 4f26ddf..80174d1 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -56,6 +56,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
 import org.apache.spark.network.buffer.ManagedBuffer;
 import org.apache.spark.network.client.StreamCallbackWithID;
+import org.apache.spark.network.server.BlockPushNonFatalFailure;
+import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode;
+import org.apache.spark.network.shuffle.protocol.BlockPushReturnCode;
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
 import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
 import org.apache.spark.network.shuffle.protocol.MergeStatuses;
@@ -81,6 +84,10 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
   private static final int UNDEFINED_ATTEMPT_ID = -1;
   // Shuffles of determinate stages will have shuffleMergeId set to 0
   private static final int DETERMINATE_SHUFFLE_MERGE_ID = 0;
+  private static final ErrorHandler.BlockPushErrorHandler ERROR_HANDLER = 
createErrorHandler();
+  // ByteBuffer to respond to client upon a successful merge of a pushed block
+  private static final ByteBuffer SUCCESS_RESPONSE =
+    new BlockPushReturnCode(ReturnCode.SUCCESS.id(), 
"").toByteBuffer().asReadOnlyBuffer();
 
   // ConcurrentHashMap doesn't allow null for keys or values which is why this 
is required.
   // Marker to identify finalized indeterminate shuffle partitions in the case 
of indeterminate
@@ -101,7 +108,6 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
   private final TransportConf conf;
   private final int minChunkSize;
   private final int ioExceptionsThresholdDuringMerge;
-  private final ErrorHandler.BlockPushErrorHandler errorHandler;
 
   @SuppressWarnings("UnstableApiUsage")
   private final LoadingCache<File, ShuffleIndexInformation> indexCache;
@@ -125,7 +131,19 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       .maximumWeight(conf.mergedIndexCacheSize())
       .weigher((Weigher<File, ShuffleIndexInformation>) (file, indexInfo) -> 
indexInfo.getSize())
       .build(indexCacheLoader);
-    this.errorHandler = new ErrorHandler.BlockPushErrorHandler();
+  }
+
+  @VisibleForTesting
+  protected static ErrorHandler.BlockPushErrorHandler createErrorHandler() {
+    return new ErrorHandler.BlockPushErrorHandler() {
+      // Explicitly use a shuffle service side error handler for handling 
exceptions.
+      // BlockPushNonException on the server side only has the response field 
set. It
+      // might require different handling logic compared with a client side 
error handler.
+      @Override
+      public boolean shouldLogError(Throwable t) {
+        return !(t instanceof BlockPushNonFatalFailure);
+      }
+    };
   }
 
   @VisibleForTesting
@@ -146,7 +164,8 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       AppShuffleInfo appShuffleInfo,
       int shuffleId,
       int shuffleMergeId,
-      int reduceId) throws StaleBlockPushException {
+      int reduceId,
+      String blockId) throws BlockPushNonFatalFailure {
     ConcurrentMap<Integer, AppShuffleMergePartitionsInfo> shuffles = 
appShuffleInfo.shuffles;
     AppShuffleMergePartitionsInfo shufflePartitionsWithMergeId =
       shuffles.compute(shuffleId, (id, appShuffleMergePartitionsInfo) -> {
@@ -158,7 +177,9 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
           // In that case the block is considered late. In the case of 
indeterminate stages, most
           // recent shuffleMergeId finalized would be pointing to 
INDETERMINATE_SHUFFLE_FINALIZED
           if (dataFile.exists()) {
-            return null;
+            throw new BlockPushNonFatalFailure(new BlockPushReturnCode(
+              ReturnCode.TOO_LATE_BLOCK_PUSH.id(), blockId).toByteBuffer(),
+              BlockPushNonFatalFailure.getErrorMsg(blockId, 
ReturnCode.TOO_LATE_BLOCK_PUSH));
           } else {
             logger.info("Creating a new attempt for shuffle blocks push 
request for shuffle {}"
               + " with shuffleMergeId {} for application {}_{}", shuffleId, 
shuffleMergeId,
@@ -170,10 +191,9 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
           // current incoming one
           int latestShuffleMergeId = 
appShuffleMergePartitionsInfo.shuffleMergeId;
           if (latestShuffleMergeId > shuffleMergeId) {
-            throw new StaleBlockPushException(String.format("Rejecting shuffle 
blocks push request"
-              + " for shuffle %s with shuffleMergeId %s for application %s_%s 
as a higher"
-              + " shuffleMergeId %s request is already seen", shuffleId, 
shuffleMergeId,
-              appShuffleInfo.appId, appShuffleInfo.attemptId, 
latestShuffleMergeId));
+            throw new BlockPushNonFatalFailure(
+              new BlockPushReturnCode(ReturnCode.STALE_BLOCK_PUSH.id(), 
blockId).toByteBuffer(),
+              BlockPushNonFatalFailure.getErrorMsg(blockId, 
ReturnCode.STALE_BLOCK_PUSH));
           } else if (latestShuffleMergeId == shuffleMergeId) {
             return appShuffleMergePartitionsInfo;
           } else {
@@ -194,7 +214,9 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     // It only gets here when the shuffle is already finalized.
     if (null == shufflePartitionsWithMergeId ||
         INDETERMINATE_SHUFFLE_FINALIZED == 
shufflePartitionsWithMergeId.shuffleMergePartitions) {
-      return null;
+      throw new BlockPushNonFatalFailure(
+        new BlockPushReturnCode(ReturnCode.TOO_LATE_BLOCK_PUSH.id(), 
blockId).toByteBuffer(),
+        BlockPushNonFatalFailure.getErrorMsg(blockId, 
ReturnCode.TOO_LATE_BLOCK_PUSH));
     }
 
     Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions =
@@ -379,9 +401,6 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
   @Override
   public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
     AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
-    final String streamId = String.format("%s_%d_%d_%d_%d",
-      OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, msg.shuffleId, 
msg.shuffleMergeId,
-      msg.mapIndex, msg.reduceId);
     if (appShuffleInfo.attemptId != msg.appAttemptId) {
       // If this Block belongs to a former application attempt, it is 
considered late,
       // as only the blocks from the current application attempt will be merged
@@ -391,14 +410,20 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
           + "with the current attempt id %s stored in shuffle service for 
application %s",
           msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
     }
+    // Use string concatenation here to avoid the overhead with String.format 
on every
+    // pushed block.
+    final String streamId = OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX + 
"_"
+      + msg.shuffleId + "_" + msg.shuffleMergeId + "_" + msg.mapIndex + "_" + 
msg.reduceId;
     // Retrieve merged shuffle file metadata
     AppShufflePartitionInfo partitionInfoBeforeCheck;
+    BlockPushNonFatalFailure failure = null;
     try {
       partitionInfoBeforeCheck = 
getOrCreateAppShufflePartitionInfo(appShuffleInfo, msg.shuffleId,
-        msg.shuffleMergeId, msg.reduceId);
-    } catch(StaleBlockPushException sbp) {
+        msg.shuffleMergeId, msg.reduceId, streamId);
+    } catch (BlockPushNonFatalFailure bpf) {
       // Set partitionInfoBeforeCheck to null so that stale block push gets 
handled.
       partitionInfoBeforeCheck = null;
+      failure = bpf;
     }
     // Here partitionInfo will be null in 3 cases:
     // 1) The request is received for a block that has already been merged, 
this is possible due
@@ -442,17 +467,15 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     // getting killed. When this happens, we need to distinguish the duplicate 
blocks as they
     // arrive. More details on this is explained in later comments.
 
-    // Track if the block is received after shuffle merge finalized or from an 
older
-    // shuffleMergeId attempt.
-    final boolean isStaleBlockOrTooLate = partitionInfoBeforeCheck == null;
     // Check if the given block is already merged by checking the bitmap 
against the given map
     // index
-    final AppShufflePartitionInfo partitionInfo = isStaleBlockOrTooLate ? null 
:
+    final AppShufflePartitionInfo partitionInfo = failure != null ? null :
       partitionInfoBeforeCheck.mapTracker.contains(msg.mapIndex) ? null : 
partitionInfoBeforeCheck;
     if (partitionInfo != null) {
       return new PushBlockStreamCallback(
         this, appShuffleInfo, streamId, partitionInfo, msg.mapIndex);
     } else {
+      final BlockPushNonFatalFailure finalFailure = failure;
       // For a duplicate block or a block which is late or stale block from an 
older
       // shuffleMergeId, respond back with a callback that handles them 
differently.
       return new StreamCallbackWithID() {
@@ -469,11 +492,10 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
 
         @Override
         public void onComplete(String streamId) {
-          if (isStaleBlockOrTooLate) {
-            // Throw an exception here so the block data is drained from 
channel and server
-            // responds RpcFailure to the client.
-            throw new RuntimeException(String.format("Block %s %s", streamId,
-              
ErrorHandler.BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX));
+          // Throw non-fatal failure here so the block data is drained from 
channel and server
+          // responds the error code to the client.
+          if (finalFailure != null) {
+            throw finalFailure;
           }
           // For duplicate block that is received before the shuffle merge 
finalizes, the
           // server should respond success to the client.
@@ -482,6 +504,11 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
         @Override
         public void onFailure(String streamId, Throwable cause) {
         }
+
+        @Override
+        public ByteBuffer getCompletionResponse() {
+          return SUCCESS_RESPONSE.duplicate();
+        }
       };
     }
   }
@@ -675,6 +702,11 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       return streamId;
     }
 
+    @Override
+    public ByteBuffer getCompletionResponse() {
+      return SUCCESS_RESPONSE.duplicate();
+    }
+
     /**
      * Write a ByteBuffer to the merged shuffle file. Here we keep track of 
the length of the
      * block data written to file. In case of failure during writing block to 
file, we use the
@@ -752,20 +784,26 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     }
 
     /**
+     * If appShuffleMergePartitionsInfo's shuffleMergeId is
+     * greater than the request shuffleMergeId then it is a stale block push.
+     */
+    private boolean isStale(
+        AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo,
+        int shuffleMergeId) {
+      return appShuffleMergePartitionsInfo.shuffleMergeId > shuffleMergeId;
+    }
+
+    /**
      * If appShuffleMergePartitionsInfo is null or shuffleMergePartitions is 
set to
      * INDETERMINATE_SHUFFLE_FINALIZED or if the reduceId is not in the map 
then the
-     * shuffle is already finalized. Therefore the block push is too late. If
-     * appShuffleMergePartitionsInfo's shuffleMergeId is
-     * greater than the request shuffleMergeId then it is a stale block push.
+     * shuffle is already finalized. Therefore the block push is too late.
      */
-    private boolean isStaleOrTooLate(
+    private boolean isTooLate(
         AppShuffleMergePartitionsInfo appShuffleMergePartitionsInfo,
-        int shuffleMergeId,
         int reduceId) {
       return null == appShuffleMergePartitionsInfo ||
         INDETERMINATE_SHUFFLE_FINALIZED == 
appShuffleMergePartitionsInfo.shuffleMergePartitions ||
-          appShuffleMergePartitionsInfo.shuffleMergeId > shuffleMergeId ||
-          
!appShuffleMergePartitionsInfo.shuffleMergePartitions.containsKey(reduceId);
+        
!appShuffleMergePartitionsInfo.shuffleMergePartitions.containsKey(reduceId);
     }
 
     @Override
@@ -785,8 +823,9 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       // to disk as well. This way, we avoid having to buffer the entirety of 
every blocks in
       // memory, while still providing the necessary guarantee.
       synchronized (partitionInfo) {
-        if 
(isStaleOrTooLate(appShuffleInfo.shuffles.get(partitionInfo.shuffleId),
-            partitionInfo.shuffleMergeId, partitionInfo.reduceId)) {
+        AppShuffleMergePartitionsInfo info = 
appShuffleInfo.shuffles.get(partitionInfo.shuffleId);
+        if (isStale(info, partitionInfo.shuffleMergeId) ||
+            isTooLate(info, partitionInfo.reduceId)) {
           deferredBufs = null;
           return;
         }
@@ -857,12 +896,19 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
         // was not received yet or this was the latest stage attempt (or 
latest shuffleMergeId)
         // generating shuffle output for the shuffle ID. By the time we finish 
reading this
         // message, the block request is either stale or too late. We should 
thus respond
-        // RpcFailure to the client.
-        if 
(isStaleOrTooLate(appShuffleInfo.shuffles.get(partitionInfo.shuffleId),
-            partitionInfo.shuffleMergeId, partitionInfo.reduceId)) {
+        // the error code to the client.
+        AppShuffleMergePartitionsInfo info = 
appShuffleInfo.shuffles.get(partitionInfo.shuffleId);
+        if (isTooLate(info, partitionInfo.reduceId)) {
           deferredBufs = null;
-          throw new RuntimeException(String.format("Block %s is %s", streamId,
-            
ErrorHandler.BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX));
+          throw new BlockPushNonFatalFailure(
+            new BlockPushReturnCode(ReturnCode.TOO_LATE_BLOCK_PUSH.id(), 
streamId).toByteBuffer(),
+            BlockPushNonFatalFailure.getErrorMsg(streamId, 
ReturnCode.TOO_LATE_BLOCK_PUSH));
+        }
+        if (isStale(info, partitionInfo.shuffleMergeId)) {
+          deferredBufs = null;
+          throw new BlockPushNonFatalFailure(
+            new BlockPushReturnCode(ReturnCode.STALE_BLOCK_PUSH.id(), 
streamId).toByteBuffer(),
+            BlockPushNonFatalFailure.getErrorMsg(streamId, 
ReturnCode.STALE_BLOCK_PUSH));
         }
 
         // Check if we can commit this block
@@ -911,9 +957,10 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
           }
         } else {
           deferredBufs = null;
-          throw new RuntimeException(String.format("%s %s to merged shuffle",
-            
ErrorHandler.BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX,
-            streamId));
+          throw new BlockPushNonFatalFailure(
+            new 
BlockPushReturnCode(ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(), streamId)
+              .toByteBuffer(), BlockPushNonFatalFailure.getErrorMsg(
+                streamId, ReturnCode.BLOCK_APPEND_COLLISION_DETECTED));
         }
       }
       isWriting = false;
@@ -921,7 +968,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
 
     @Override
     public void onFailure(String streamId, Throwable throwable) throws 
IOException {
-      if (mergeManager.errorHandler.shouldLogError(throwable)) {
+      if (ERROR_HANDLER.shouldLogError(throwable)) {
         logger.error("Encountered issue when merging {}", streamId, throwable);
       } else {
         logger.debug("Encountered issue when merging {}", streamId, throwable);
@@ -932,13 +979,15 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       // an opportunity to write the block data to disk, we should also ignore 
here.
       if (isWriting) {
         synchronized (partitionInfo) {
-          if 
(!isStaleOrTooLate(appShuffleInfo.shuffles.get(partitionInfo.shuffleId),
-              partitionInfo.shuffleMergeId, partitionInfo.reduceId)) {
-              logger.debug("{} encountered failure", partitionInfo);
-              partitionInfo.setCurrentMapIndex(-1);
-            }
+          AppShuffleMergePartitionsInfo info =
+            appShuffleInfo.shuffles.get(partitionInfo.shuffleId);
+          if (!isTooLate(info, partitionInfo.reduceId) &&
+              !isStale(info, partitionInfo.shuffleMergeId)) {
+            logger.debug("{} encountered failure", partitionInfo);
+            partitionInfo.setCurrentMapIndex(-1);
           }
         }
+      }
       isWriting = false;
     }
 
@@ -1356,10 +1405,4 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       return pos;
     }
   }
-
-  public static class StaleBlockPushException extends RuntimeException {
-    public StaleBlockPushException(String message) {
-      super(message);
-    }
-  }
 }
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockPushReturnCode.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockPushReturnCode.java
new file mode 100644
index 0000000..0455d67
--- /dev/null
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockPushReturnCode.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import java.util.Objects;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
+import org.apache.spark.network.protocol.Encoders;
+import org.apache.spark.network.server.BlockPushNonFatalFailure;
+
+/**
+ * Error code indicating a non-fatal failure of a block push request.
+ * Due to the best-effort nature of push-based shuffle, these failures
+ * do not impact the completion of the block push process. The list of
+ * such errors is in
+ * {@link org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode}.
+ *
+ * @since 3.2.0
+ */
+public class BlockPushReturnCode extends BlockTransferMessage {
+  public final byte returnCode;
+  // Block ID of the block that experiences a non-fatal block push failure.
+  // Will be an empty string for any successfully pushed block.
+  public final String failureBlockId;
+
+  public BlockPushReturnCode(byte returnCode, String failureBlockId) {
+    
Preconditions.checkNotNull(BlockPushNonFatalFailure.getReturnCode(returnCode));
+    this.returnCode = returnCode;
+    this.failureBlockId = failureBlockId;
+  }
+
+  @Override
+  protected Type type() {
+    return Type.PUSH_BLOCK_RETURN_CODE;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(returnCode, failureBlockId);
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+      .append("returnCode", returnCode)
+      .append("failureBlockId", failureBlockId)
+      .toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other != null && other instanceof BlockPushReturnCode) {
+      BlockPushReturnCode o = (BlockPushReturnCode) other;
+      return returnCode == o.returnCode && Objects.equals(failureBlockId, 
o.failureBlockId);
+    }
+    return false;
+  }
+
+  @Override
+  public int encodedLength() {
+    return 1 + Encoders.Strings.encodedLength(failureBlockId);
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+    buf.writeByte(returnCode);
+    Encoders.Strings.encode(buf, failureBlockId);
+  }
+
+  public static BlockPushReturnCode decode(ByteBuf buf) {
+    byte type = buf.readByte();
+    String failureBlockId = Encoders.Strings.decode(buf);
+    return new BlockPushReturnCode(type, failureBlockId);
+  }
+}
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
index 453791d..ad959c7e 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
@@ -49,7 +49,8 @@ public abstract class BlockTransferMessage implements 
Encodable {
     HEARTBEAT(5), UPLOAD_BLOCK_STREAM(6), REMOVE_BLOCKS(7), BLOCKS_REMOVED(8),
     FETCH_SHUFFLE_BLOCKS(9), GET_LOCAL_DIRS_FOR_EXECUTORS(10), 
LOCAL_DIRS_FOR_EXECUTORS(11),
     PUSH_BLOCK_STREAM(12), FINALIZE_SHUFFLE_MERGE(13), MERGE_STATUSES(14),
-    FETCH_SHUFFLE_BLOCK_CHUNKS(15), DIAGNOSE_CORRUPTION(16), 
CORRUPTION_CAUSE(17);
+    FETCH_SHUFFLE_BLOCK_CHUNKS(15), DIAGNOSE_CORRUPTION(16), 
CORRUPTION_CAUSE(17),
+    PUSH_BLOCK_RETURN_CODE(18);
 
     private final byte id;
 
@@ -86,6 +87,7 @@ public abstract class BlockTransferMessage implements 
Encodable {
         case 15: return FetchShuffleBlockChunks.decode(buf);
         case 16: return DiagnoseCorruption.decode(buf);
         case 17: return CorruptionCause.decode(buf);
+        case 18: return BlockPushReturnCode.decode(buf);
         default: throw new IllegalArgumentException("Unknown message type: " + 
type);
       }
     }
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
index c8066d1..56c9a97 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
@@ -21,6 +21,9 @@ import java.net.ConnectException;
 
 import org.junit.Test;
 
+import org.apache.spark.network.server.BlockPushNonFatalFailure;
+import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode;
+
 import static org.junit.Assert.*;
 
 /**
@@ -31,11 +34,13 @@ public class ErrorHandlerSuite {
   @Test
   public void testErrorRetry() {
     ErrorHandler.BlockPushErrorHandler pushHandler = new 
ErrorHandler.BlockPushErrorHandler();
-    assertFalse(pushHandler.shouldRetryError(new RuntimeException(new 
IllegalArgumentException(
-      
ErrorHandler.BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX))));
+    assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
+      ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
+    assertFalse(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
+      ReturnCode.STALE_BLOCK_PUSH, "")));
     assertFalse(pushHandler.shouldRetryError(new RuntimeException(new 
ConnectException())));
-    assertTrue(pushHandler.shouldRetryError(new RuntimeException(new 
IllegalArgumentException(
-      
ErrorHandler.BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX))));
+    assertTrue(pushHandler.shouldRetryError(new BlockPushNonFatalFailure(
+      ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")));
     assertTrue(pushHandler.shouldRetryError(new Throwable()));
 
     ErrorHandler.BlockFetchErrorHandler fetchHandler = new 
ErrorHandler.BlockFetchErrorHandler();
@@ -46,10 +51,12 @@ public class ErrorHandlerSuite {
   @Test
   public void testErrorLogging() {
     ErrorHandler.BlockPushErrorHandler pushHandler = new 
ErrorHandler.BlockPushErrorHandler();
-    assertFalse(pushHandler.shouldLogError(new RuntimeException(new 
IllegalArgumentException(
-      
ErrorHandler.BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX))));
-    assertFalse(pushHandler.shouldLogError(new RuntimeException(new 
IllegalArgumentException(
-      
ErrorHandler.BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX))));
+    assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
+      ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
+    assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
+      ReturnCode.STALE_BLOCK_PUSH, "")));
+    assertFalse(pushHandler.shouldLogError(new BlockPushNonFatalFailure(
+      ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")));
     assertTrue(pushHandler.shouldLogError(new Throwable()));
 
     ErrorHandler.BlockFetchErrorHandler fetchHandler = new 
ErrorHandler.BlockFetchErrorHandler();
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
index d2fd5d9..2aadb77 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
@@ -36,7 +36,10 @@ import org.apache.spark.network.buffer.NettyManagedBuffer;
 import org.apache.spark.network.buffer.NioManagedBuffer;
 import org.apache.spark.network.client.RpcResponseCallback;
 import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.server.BlockPushNonFatalFailure;
+import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode;
 import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.shuffle.protocol.BlockPushReturnCode;
 import org.apache.spark.network.shuffle.protocol.PushBlockStream;
 
 
@@ -140,15 +143,16 @@ public class OneForOneBlockPusherSuite {
       BlockTransferMessage message = 
BlockTransferMessage.Decoder.fromByteBuffer(header);
       RpcResponseCallback callback = (RpcResponseCallback) 
invocation.getArguments()[2];
       Map.Entry<String, ManagedBuffer> entry = blockIterator.next();
+      String blockId = entry.getKey();
       ManagedBuffer block = entry.getValue();
       if (block != null && block.nioByteBuffer().capacity() > 0) {
-        callback.onSuccess(header);
+        callback.onSuccess(new BlockPushReturnCode(ReturnCode.SUCCESS.id(), 
"").toByteBuffer());
       } else if (block != null) {
-        callback.onFailure(new RuntimeException("Failed " + entry.getKey()
-          + 
ErrorHandler.BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX));
+        callback.onSuccess(new BlockPushReturnCode(
+          ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(), 
blockId).toByteBuffer());
       } else {
-        callback.onFailure(new RuntimeException("Quick fail " + entry.getKey()
-          + 
ErrorHandler.BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX));
+        callback.onFailure(new BlockPushNonFatalFailure(
+          ReturnCode.TOO_LATE_BLOCK_PUSH, ""));
       }
       assertEquals(msgIterator.next(), message);
       return null;
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 6bf39c8..46d6366 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -48,7 +48,10 @@ import static org.junit.Assert.*;
 
 import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
 import org.apache.spark.network.client.StreamCallbackWithID;
+import org.apache.spark.network.server.BlockPushNonFatalFailure;
 import 
org.apache.spark.network.shuffle.RemoteBlockPushResolver.MergeShuffleFile;
+import org.apache.spark.network.shuffle.protocol.BlockPushReturnCode;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
 import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
 import org.apache.spark.network.shuffle.protocol.MergeStatuses;
@@ -103,6 +106,18 @@ public class RemoteBlockPushResolverSuite {
     }
   }
 
+  @Test
+  public void testErrorLogging() {
+    ErrorHandler.BlockPushErrorHandler errorHandler = 
RemoteBlockPushResolver.createErrorHandler();
+    assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
+      BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH, "")));
+    assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
+      BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH, "")));
+    assertFalse(errorHandler.shouldLogError(new BlockPushNonFatalFailure(
+      BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, 
"")));
+    assertTrue(errorHandler.shouldLogError(new Throwable()));
+  }
+
   @Test(expected = RuntimeException.class)
   public void testNoIndexFile() {
     try {
@@ -286,7 +301,7 @@ public class RemoteBlockPushResolverSuite {
     validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{9}, new 
int[][]{{0}});
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test(expected = BlockPushNonFatalFailure.class)
   public void testBlockReceivedAfterMergeFinalize() throws IOException {
     ByteBuffer[] blocks = new ByteBuffer[]{
       ByteBuffer.wrap(new byte[4]),
@@ -304,13 +319,15 @@ public class RemoteBlockPushResolverSuite {
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4]));
     try {
       stream1.onComplete(stream1.getID());
-    } catch (RuntimeException re) {
-      assertEquals("Block shufflePush_0_0_1_0 received after merged shuffle is 
finalized or stale"
-        + " block push as shuffle blocks of a higher shuffleMergeId for the 
shuffle is being"
-          + " pushed", re.getMessage());
+    } catch (BlockPushNonFatalFailure e) {
+      BlockPushReturnCode errorCode =
+        (BlockPushReturnCode) 
BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+      
assertEquals(BlockPushNonFatalFailure.ReturnCode.TOO_LATE_BLOCK_PUSH.id(),
+        errorCode.returnCode);
+      assertEquals(errorCode.failureBlockId, stream1.getID());
       MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0, 0);
       validateChunks(TEST_APP, 0, 0, 0, blockMeta, new int[]{9}, new 
int[][]{{0}});
-      throw re;
+      throw e;
     }
   }
 
@@ -348,7 +365,7 @@ public class RemoteBlockPushResolverSuite {
     assertArrayEquals(expectedBytes, mb.nioByteBuffer().array());
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test(expected = BlockPushNonFatalFailure.class)
   public void testCollision() throws IOException {
     StreamCallbackWithID stream1 =
       pushResolver.receiveBlockDataAsStream(
@@ -362,15 +379,17 @@ public class RemoteBlockPushResolverSuite {
     // Since stream2 didn't get any opportunity it will throw couldn't find 
opportunity error
     try {
       stream2.onComplete(stream2.getID());
-    } catch (RuntimeException re) {
-      assertEquals(
-        "Couldn't find an opportunity to write block shufflePush_0_0_1_0 to 
merged shuffle",
-        re.getMessage());
-      throw re;
+    } catch (BlockPushNonFatalFailure e) {
+      BlockPushReturnCode errorCode =
+        (BlockPushReturnCode) 
BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+      
assertEquals(BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(),
+        errorCode.returnCode);
+      assertEquals(errorCode.failureBlockId, stream2.getID());
+      throw e;
     }
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test(expected = BlockPushNonFatalFailure.class)
   public void testFailureInAStreamDoesNotInterfereWithStreamWhichIsWriting() 
throws IOException {
     StreamCallbackWithID stream1 =
       pushResolver.receiveBlockDataAsStream(
@@ -387,14 +406,16 @@ public class RemoteBlockPushResolverSuite {
     // This should be deferred
     stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[5]));
     // Since this stream didn't get any opportunity it will throw couldn't 
find opportunity error
-    RuntimeException failedEx = null;
+    BlockPushNonFatalFailure failedEx = null;
     try {
       stream3.onComplete(stream3.getID());
-    } catch (RuntimeException re) {
-      assertEquals(
-        "Couldn't find an opportunity to write block shufflePush_0_0_2_0 to 
merged shuffle",
-        re.getMessage());
-      failedEx = re;
+    } catch (BlockPushNonFatalFailure e) {
+      BlockPushReturnCode errorCode =
+        (BlockPushReturnCode) 
BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+      
assertEquals(BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(),
+        errorCode.returnCode);
+      assertEquals(errorCode.failureBlockId, stream3.getID());
+      failedEx = e;
     }
     // stream 1 now completes
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
@@ -871,7 +892,7 @@ public class RemoteBlockPushResolverSuite {
     removeApplication(TEST_APP);
   }
 
-  @Test(expected = RuntimeException.class)
+  @Test(expected = BlockPushNonFatalFailure.class)
   public void testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() 
throws IOException {
     StreamCallbackWithID stream1 =
       pushResolver.receiveBlockDataAsStream(
@@ -895,14 +916,16 @@ public class RemoteBlockPushResolverSuite {
         new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 2, 0, 0));
     // This should be deferred as stream 2 is still the active stream
     stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2]));
-    RuntimeException failedEx = null;
+    BlockPushNonFatalFailure failedEx = null;
     try {
       stream3.onComplete(stream3.getID());
-    } catch (RuntimeException re) {
-      assertEquals(
-        "Couldn't find an opportunity to write block shufflePush_0_0_2_0 to 
merged shuffle",
-        re.getMessage());
-      failedEx = re;
+    } catch (BlockPushNonFatalFailure e) {
+      BlockPushReturnCode errorCode =
+        (BlockPushReturnCode) 
BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+      
assertEquals(BlockPushNonFatalFailure.ReturnCode.BLOCK_APPEND_COLLISION_DETECTED.id(),
+        errorCode.returnCode);
+      assertEquals(errorCode.failureBlockId, stream3.getID());
+      failedEx = e;
     }
     // Stream 2 writes more and completes
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[4]));
@@ -1072,10 +1095,12 @@ public class RemoteBlockPushResolverSuite {
     try {
       // stream 1 push should be rejected as it is from an older shuffleMergeId
       stream1.onComplete(stream1.getID());
-    } catch(RuntimeException re) {
-      assertEquals("Block shufflePush_0_1_0_0 is received after merged shuffle 
is finalized or"
-        + " stale block push as shuffle blocks of a higher shuffleMergeId for 
the shuffle is being"
-          + " pushed", re.getMessage());
+    } catch (BlockPushNonFatalFailure e) {
+      BlockPushReturnCode errorCode =
+        (BlockPushReturnCode) 
BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+      assertEquals(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH.id(),
+        errorCode.returnCode);
+      assertEquals(errorCode.failureBlockId, stream1.getID());
     }
     // stream 2 now completes
     stream2.onComplete(stream2.getID());
@@ -1099,10 +1124,12 @@ public class RemoteBlockPushResolverSuite {
     try {
       // stream 1 push should be rejected as it is from an older shuffleMergeId
       stream1.onComplete(stream1.getID());
-    } catch(RuntimeException re) {
-      assertEquals("Block shufflePush_0_1_0_0 is received after merged shuffle 
is finalized or"
-        + " stale block push as shuffle blocks of a higher shuffleMergeId for 
the shuffle is being"
-          + " pushed", re.getMessage());
+    } catch (BlockPushNonFatalFailure e) {
+      BlockPushReturnCode errorCode =
+        (BlockPushReturnCode) 
BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+      assertEquals(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH.id(),
+        errorCode.returnCode);
+      assertEquals(errorCode.failureBlockId, stream1.getID());
     }
     // stream 2 now completes
     stream2.onComplete(stream2.getID());
@@ -1153,10 +1180,12 @@ public class RemoteBlockPushResolverSuite {
     try {
       // stream 1 push should be rejected as it is from an older shuffleMergeId
       stream1.onComplete(stream1.getID());
-    } catch(RuntimeException re) {
-      assertEquals("Block shufflePush_0_1_0_0 is received after merged shuffle 
is finalized or"
-        + " stale block push as shuffle blocks of a higher shuffleMergeId for 
the shuffle is being"
-          + " pushed", re.getMessage());
+    } catch (BlockPushNonFatalFailure e) {
+      BlockPushReturnCode errorCode =
+        (BlockPushReturnCode) 
BlockTransferMessage.Decoder.fromByteBuffer(e.getResponse());
+      assertEquals(BlockPushNonFatalFailure.ReturnCode.STALE_BLOCK_PUSH.id(),
+        errorCode.returnCode);
+      assertEquals(errorCode.failureBlockId, stream1.getID());
     }
     // stream 2 now completes
     stream2.onComplete(stream2.getID());
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
index ecaa4f0..e6af767 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
@@ -24,8 +24,6 @@ import java.util.concurrent.ExecutorService
 
 import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
 
-import com.google.common.base.Throwables
-
 import org.apache.spark.{ShuffleDependency, SparkConf, SparkEnv}
 import org.apache.spark.annotation.Since
 import org.apache.spark.internal.Logging
@@ -33,6 +31,8 @@ import org.apache.spark.internal.config._
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer, NioManagedBuffer}
 import org.apache.spark.network.netty.SparkTransportConf
+import org.apache.spark.network.server.BlockPushNonFatalFailure
+import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode
 import org.apache.spark.network.shuffle.BlockPushingListener
 import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler
 import org.apache.spark.network.util.TransportConf
@@ -78,10 +78,12 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) 
extends Logging {
         if (t.getCause != null && 
t.getCause.isInstanceOf[FileNotFoundException]) {
           return false
         }
-        val errorStackTraceString = Throwables.getStackTraceAsString(t)
         // If the block is too late or the invalid block push, there is no 
need to retry it
-        !errorStackTraceString.contains(
-          BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX)
+        !(t.isInstanceOf[BlockPushNonFatalFailure] &&
+          (t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode
+            == ReturnCode.TOO_LATE_BLOCK_PUSH ||
+            t.asInstanceOf[BlockPushNonFatalFailure].getReturnCode
+            == ReturnCode.STALE_BLOCK_PUSH))
       }
     }
   }
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala 
b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
index 26cdad8..6f9b5e4 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
@@ -33,8 +33,9 @@ import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark._
 import org.apache.spark.network.buffer.ManagedBuffer
+import org.apache.spark.network.server.BlockPushNonFatalFailure
+import org.apache.spark.network.server.BlockPushNonFatalFailure.ReturnCode
 import org.apache.spark.network.shuffle.{BlockPushingListener, 
BlockStoreClient}
-import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler
 import org.apache.spark.network.util.TransportConf
 import org.apache.spark.serializer.JavaSerializer
 import org.apache.spark.shuffle.ShuffleBlockPusher.PushRequest
@@ -219,13 +220,15 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     val pusher = new ShuffleBlockPusher(conf)
     val errorHandler = pusher.createErrorHandler()
     assert(
-      !errorHandler.shouldRetryError(new RuntimeException(
-        new IllegalArgumentException(
-          BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX))))
+      !errorHandler.shouldRetryError(new BlockPushNonFatalFailure(
+        ReturnCode.TOO_LATE_BLOCK_PUSH, "")))
+    assert(
+      !errorHandler.shouldRetryError(new BlockPushNonFatalFailure(
+        ReturnCode.STALE_BLOCK_PUSH, "")))
     assert(errorHandler.shouldRetryError(new RuntimeException(new 
ConnectException())))
     assert(
-      errorHandler.shouldRetryError(new RuntimeException(new 
IllegalArgumentException(
-        BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX))))
+      errorHandler.shouldRetryError(new BlockPushNonFatalFailure(
+        ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")))
     assert (errorHandler.shouldRetryError(new Throwable()))
   }
 
@@ -233,12 +236,13 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     val pusher = new ShuffleBlockPusher(conf)
     val errorHandler = pusher.createErrorHandler()
     assert(
-      !errorHandler.shouldLogError(new RuntimeException(
-        new IllegalArgumentException(
-          BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX))))
-    assert(!errorHandler.shouldLogError(new RuntimeException(
-      new IllegalArgumentException(
-        BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX))))
+      !errorHandler.shouldLogError(new BlockPushNonFatalFailure(
+        ReturnCode.TOO_LATE_BLOCK_PUSH, "")))
+    assert(
+      !errorHandler.shouldLogError(new BlockPushNonFatalFailure(
+        ReturnCode.STALE_BLOCK_PUSH, "")))
+    assert(!errorHandler.shouldLogError(new BlockPushNonFatalFailure(
+      ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, "")))
     assert(errorHandler.shouldLogError(new Throwable()))
   }
 
@@ -255,9 +259,8 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with 
BeforeAndAfterEach {
           if (failBlock) {
             failBlock = false
             // Fail the first block with the collision exception.
-            blockPushListener.onBlockPushFailure(blockId, new RuntimeException(
-              new IllegalArgumentException(
-                
BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX)))
+            blockPushListener.onBlockPushFailure(blockId, new 
BlockPushNonFatalFailure(
+              ReturnCode.BLOCK_APPEND_COLLISION_DETECTED, ""))
           } else {
             pushedBlocks += blockId
             blockPushListener.onBlockPushSuccess(blockId, 
mock(classOf[ManagedBuffer]))
@@ -285,9 +288,8 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with 
BeforeAndAfterEach {
           if (failBlock) {
             failBlock = false
             // Fail the first block with the too late exception.
-            blockPushListener.onBlockPushFailure(blockId, new RuntimeException(
-              new IllegalArgumentException(
-                
BlockPushErrorHandler.TOO_LATE_OR_STALE_BLOCK_PUSH_MESSAGE_SUFFIX)))
+            blockPushListener.onBlockPushFailure(blockId, new 
BlockPushNonFatalFailure(
+              ReturnCode.TOO_LATE_BLOCK_PUSH, ""))
           } else {
             pushedBlocks += blockId
             blockPushListener.onBlockPushSuccess(blockId, 
mock(classOf[ManagedBuffer]))

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to