This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c4aa54e  [SPARK-36266][SHUFFLE] Rename classes in shuffle RPC used for 
block push operations
c4aa54e is described below

commit c4aa54ed4e8cf7942335bfafdeacf57b5d148f2a
Author: Min Shen <ms...@linkedin.com>
AuthorDate: Mon Jul 26 17:39:19 2021 -0500

    [SPARK-36266][SHUFFLE] Rename classes in shuffle RPC used for block push 
operations
    
    ### What changes were proposed in this pull request?
    This is a follow-up to #29855 according to the 
[comments](https://github.com/apache/spark/pull/29855/files#r505536514)
    In this PR, the following changes are made:
    
    1. A new `BlockPushingListener` interface is created specifically for block 
push. The existing `BlockFetchingListener` interface is left as is, since it 
might be used by external shuffle solutions. These 2 interfaces are unified 
under `BlockTransferListener` to enable code reuse.
    2. `RetryingBlockFetcher`, `BlockFetchStarter`, and 
`RetryingBlockFetchListener` are renamed to `RetryingBlockTransferor`, 
`BlockTransferStarter`, and `RetryingBlockTransferListener` respectively. This 
makes their names more generic to be reused across both block fetch and push.
    3. Comments in `OneForOneBlockPusher` are further clarified to better 
explain how we handle retries for block push.
    
    ### Why are the changes needed?
    To make code cleaner without sacrificing backward compatibility.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing unit tests.
    
    Closes #33340 from Victsm/SPARK-32915-followup.
    
    Lead-authored-by: Min Shen <ms...@linkedin.com>
    Co-authored-by: Min Shen <victor....@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../network/shuffle/BlockFetchingListener.java     |  19 ++-
 ...hingListener.java => BlockPushingListener.java} |  30 +++-
 .../spark/network/shuffle/BlockStoreClient.java    |   2 +-
 ...ingListener.java => BlockTransferListener.java} |  22 ++-
 .../apache/spark/network/shuffle/ErrorHandler.java |   4 +-
 .../network/shuffle/ExternalBlockStoreClient.java  |  30 ++--
 .../network/shuffle/OneForOneBlockPusher.java      |  41 +++--
 ...ckFetcher.java => RetryingBlockTransferor.java} | 176 +++++++++++++--------
 .../network/shuffle/OneForOneBlockPusherSuite.java |  36 ++---
 ...uite.java => RetryingBlockTransferorSuite.java} |  44 +++---
 .../network/netty/NettyBlockTransferService.scala  |  14 +-
 .../apache/spark/shuffle/ShuffleBlockPusher.scala  |   8 +-
 .../netty/NettyBlockTransferServiceSuite.scala     |   2 +-
 .../spark/shuffle/ShuffleBlockPusherSuite.scala    |  38 ++---
 14 files changed, 288 insertions(+), 178 deletions(-)

diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java
index 138fd53..0be913e 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java
@@ -17,11 +17,9 @@
 
 package org.apache.spark.network.shuffle;
 
-import java.util.EventListener;
-
 import org.apache.spark.network.buffer.ManagedBuffer;
 
-public interface BlockFetchingListener extends EventListener {
+public interface BlockFetchingListener extends BlockTransferListener {
   /**
    * Called once per successfully fetched block. After this call returns, data 
will be released
    * automatically. If the data will be passed to another thread, the receiver 
should retain()
@@ -33,4 +31,19 @@ public interface BlockFetchingListener extends EventListener 
{
    * Called at least once per block upon failures.
    */
   void onBlockFetchFailure(String blockId, Throwable exception);
+
+  @Override
+  default void onBlockTransferSuccess(String blockId, ManagedBuffer data) {
+    onBlockFetchSuccess(blockId, data);
+  }
+
+  @Override
+  default void onBlockTransferFailure(String blockId, Throwable exception) {
+    onBlockFetchFailure(blockId, exception);
+  }
+
+  @Override
+  default String getTransferType() {
+    return "fetch";
+  }
 }
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockPushingListener.java
similarity index 57%
copy from 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java
copy to 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockPushingListener.java
index 138fd53..1421b22 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockPushingListener.java
@@ -17,20 +17,38 @@
 
 package org.apache.spark.network.shuffle;
 
-import java.util.EventListener;
-
 import org.apache.spark.network.buffer.ManagedBuffer;
 
-public interface BlockFetchingListener extends EventListener {
+/**
+ * Callback to handle block push success and failure. This interface and
+ * {@link BlockFetchingListener} are unified under {@link 
BlockTransferListener} to allow
+ * code reuse for handling block push and fetch retry.
+ */
+public interface BlockPushingListener extends BlockTransferListener  {
   /**
-   * Called once per successfully fetched block. After this call returns, data 
will be released
+   * Called once per successfully pushed block. After this call returns, data 
will be released
    * automatically. If the data will be passed to another thread, the receiver 
should retain()
    * and release() the buffer on their own, or copy the data to a new buffer.
    */
-  void onBlockFetchSuccess(String blockId, ManagedBuffer data);
+  void onBlockPushSuccess(String blockId, ManagedBuffer data);
 
   /**
    * Called at least once per block upon failures.
    */
-  void onBlockFetchFailure(String blockId, Throwable exception);
+  void onBlockPushFailure(String blockId, Throwable exception);
+
+  @Override
+  default void onBlockTransferSuccess(String blockId, ManagedBuffer data) {
+    onBlockPushSuccess(blockId, data);
+  }
+
+  @Override
+  default void onBlockTransferFailure(String blockId, Throwable exception) {
+    onBlockPushFailure(blockId, exception);
+  }
+
+  @Override
+  default String getTransferType() {
+    return "push";
+  }
 }
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
index 238d26e..b685213 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
@@ -155,7 +155,7 @@ public abstract class BlockStoreClient implements Closeable 
{
       int port,
       String[] blockIds,
       ManagedBuffer[] buffers,
-      BlockFetchingListener listener) {
+      BlockPushingListener listener) {
     throw new UnsupportedOperationException();
   }
 
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockTransferListener.java
similarity index 58%
copy from 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java
copy to 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockTransferListener.java
index 138fd53..e019dab 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockFetchingListener.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockTransferListener.java
@@ -21,16 +21,24 @@ import java.util.EventListener;
 
 import org.apache.spark.network.buffer.ManagedBuffer;
 
-public interface BlockFetchingListener extends EventListener {
+/**
+ * This interface unifies both {@link BlockFetchingListener} and {@link 
BlockPushingListener}
+ * under a single interface to allow code reuse, while also keeping the 
existing public interface
+ * to facilitate backward compatibility.
+ */
+public interface BlockTransferListener extends EventListener {
+  /**
+   * Called once per successfully transferred block.
+   */
+  void onBlockTransferSuccess(String blockId, ManagedBuffer data);
+
   /**
-   * Called once per successfully fetched block. After this call returns, data 
will be released
-   * automatically. If the data will be passed to another thread, the receiver 
should retain()
-   * and release() the buffer on their own, or copy the data to a new buffer.
+   * Called at least once per block transfer failures.
    */
-  void onBlockFetchSuccess(String blockId, ManagedBuffer data);
+  void onBlockTransferFailure(String blockId, Throwable exception);
 
   /**
-   * Called at least once per block upon failures.
+   * Return a string indicating the type of the listener such as fetch, push, 
or something else
    */
-  void onBlockFetchFailure(String blockId, Throwable exception);
+  String getTransferType();
 }
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
index 2e15671..a758875 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
@@ -25,9 +25,9 @@ import com.google.common.base.Throwables;
 import org.apache.spark.annotation.Evolving;
 
 /**
- * Plugs into {@link RetryingBlockFetcher} to further control when an 
exception should be retried
+ * Plugs into {@link RetryingBlockTransferor} to further control when an 
exception should be retried
  * and logged.
- * Note: {@link RetryingBlockFetcher} will delegate the exception to this 
handler only when
+ * Note: {@link RetryingBlockTransferor} will delegate the exception to this 
handler only when
  * - remaining retries < max retries
  * - exception is an IOException
  *
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
index 63bf787..f88915b 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
@@ -95,13 +95,15 @@ public class ExternalBlockStoreClient extends 
BlockStoreClient {
     logger.debug("External shuffle fetch from {}:{} (executor id {})", host, 
port, execId);
     try {
       int maxRetries = conf.maxIORetries();
-      RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
+      RetryingBlockTransferor.BlockTransferStarter blockFetchStarter =
           (inputBlockId, inputListener) -> {
             // Unless this client is closed.
             if (clientFactory != null) {
+              assert inputListener instanceof BlockFetchingListener :
+                "Expecting a BlockFetchingListener, but got " + 
inputListener.getClass();
               TransportClient client = clientFactory.createClient(host, port, 
maxRetries > 0);
-              new OneForOneBlockFetcher(client, appId, execId,
-                inputBlockId, inputListener, conf, 
downloadFileManager).start();
+              new OneForOneBlockFetcher(client, appId, execId, inputBlockId,
+                (BlockFetchingListener) inputListener, conf, 
downloadFileManager).start();
             } else {
               logger.info("This clientFactory was closed. Skipping further 
block fetch retries.");
             }
@@ -110,7 +112,7 @@ public class ExternalBlockStoreClient extends 
BlockStoreClient {
       if (maxRetries > 0) {
         // Note this Fetcher will correctly handle maxRetries == 0; we avoid 
it just in case there's
         // a bug in this code. We should remove the if statement once we're 
sure of the stability.
-        new RetryingBlockFetcher(conf, blockFetchStarter, blockIds, 
listener).start();
+        new RetryingBlockTransferor(conf, blockFetchStarter, blockIds, 
listener).start();
       } else {
         blockFetchStarter.createAndStart(blockIds, listener);
       }
@@ -128,7 +130,7 @@ public class ExternalBlockStoreClient extends 
BlockStoreClient {
       int port,
       String[] blockIds,
       ManagedBuffer[] buffers,
-      BlockFetchingListener listener) {
+      BlockPushingListener listener) {
     checkInit();
     assert blockIds.length == buffers.length : "Number of block ids and 
buffers do not match.";
 
@@ -138,15 +140,21 @@ public class ExternalBlockStoreClient extends 
BlockStoreClient {
     }
     logger.debug("Push {} shuffle blocks to {}:{}", blockIds.length, host, 
port);
     try {
-      RetryingBlockFetcher.BlockFetchStarter blockPushStarter =
+      RetryingBlockTransferor.BlockTransferStarter blockPushStarter =
           (inputBlockId, inputListener) -> {
-            TransportClient client = clientFactory.createClient(host, port);
-            new OneForOneBlockPusher(client, appId, conf.appAttemptId(), 
inputBlockId,
-              inputListener, buffersWithId).start();
+            if (clientFactory != null) {
+              assert inputListener instanceof BlockPushingListener :
+                "Expecting a BlockPushingListener, but got " + 
inputListener.getClass();
+              TransportClient client = clientFactory.createClient(host, port);
+              new OneForOneBlockPusher(client, appId, conf.appAttemptId(), 
inputBlockId,
+                (BlockPushingListener) inputListener, buffersWithId).start();
+            } else {
+              logger.info("This clientFactory was closed. Skipping further 
block push retries.");
+            }
           };
       int maxRetries = conf.maxIORetries();
       if (maxRetries > 0) {
-        new RetryingBlockFetcher(
+        new RetryingBlockTransferor(
           conf, blockPushStarter, blockIds, listener, 
PUSH_ERROR_HANDLER).start();
       } else {
         blockPushStarter.createAndStart(blockIds, listener);
@@ -154,7 +162,7 @@ public class ExternalBlockStoreClient extends 
BlockStoreClient {
     } catch (Exception e) {
       logger.error("Exception while beginning pushBlocks", e);
       for (String blockId : blockIds) {
-        listener.onBlockFetchFailure(blockId, e);
+        listener.onBlockPushFailure(blockId, e);
       }
     }
   }
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
index b8b32e2..0e1c59f 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
@@ -47,7 +47,7 @@ public class OneForOneBlockPusher {
   private final String appId;
   private final int appAttemptId;
   private final String[] blockIds;
-  private final BlockFetchingListener listener;
+  private final BlockPushingListener listener;
   private final Map<String, ManagedBuffer> buffers;
 
   public OneForOneBlockPusher(
@@ -55,7 +55,7 @@ public class OneForOneBlockPusher {
       String appId,
       int appAttemptId,
       String[] blockIds,
-      BlockFetchingListener listener,
+      BlockPushingListener listener,
       Map<String, ManagedBuffer> buffers) {
     this.client = client;
     this.appId = appId;
@@ -78,21 +78,34 @@ public class OneForOneBlockPusher {
     @Override
     public void onSuccess(ByteBuffer response) {
       // On receipt of a successful block push
-      listener.onBlockFetchSuccess(blockId, new 
NioManagedBuffer(ByteBuffer.allocate(0)));
+      listener.onBlockPushSuccess(blockId, new 
NioManagedBuffer(ByteBuffer.allocate(0)));
     }
 
     @Override
     public void onFailure(Throwable e) {
-      // Since block push is best effort, i.e., if we encountered a block push 
failure that's not
-      // retriable or exceeding the max retires, we should not fail all 
remaining block pushes.
-      // The best effort nature makes block push tolerable of a partial 
completion. Thus, we only
-      // fail the block that's actually failed. Not that, on the 
RetryingBlockFetcher side, once
-      // retry is initiated, it would still invalidate the previous active 
retry listener, and
-      // retry all outstanding blocks. We are preventing forwarding 
unnecessary block push failures
-      // to the parent listener of the retry listener. The only exceptions 
would be if the block
-      // push failure is due to block arriving on the server side after merge 
finalization, or the
-      // client fails to establish connection to the server side. In both 
cases, we would fail all
-      // remaining blocks.
+      // Since block push is best effort, i.e., if we encounter a block push 
failure that's still
+      // retriable according to ErrorHandler (not a connection exception and 
the block is not too
+      // late), we should not fail all remaining block pushes even though
+      // RetryingBlockTransferor might consider this failure not retriable 
(exceeding max retry
+      // count etc). The best effort nature makes block push tolerable of a 
partial completion.
+      // Thus, we only fail the block that's actually failed in this case. 
Note that, on the
+      // RetryingBlockTransferor side, if retry is initiated, it would still 
invalidate the
+      // previous active retry listener, and retry pushing all outstanding 
blocks. However, since
+      // the blocks to be pushed are preloaded into memory and the first 
attempt of pushing these
+      // blocks might have already succeeded, retry pushing all the 
outstanding blocks should be
+      // very cheap (on the client side, the block data is in memory; on the 
server side, the block
+      // will be recognized as a duplicate which triggers noop handling). 
Here, by failing only the
+      // one block that's actually failed, we are essentially preventing 
forwarding unnecessary
+      // block push failures to the parent listener of the retry listener.
+      //
+      // Take the following as an example. For the common exception during 
block push handling,
+      // i.e. block collision, it is considered as retriable by ErrorHandler 
but not retriable
+      // by RetryingBlockTransferor. When we encounter a failure of this type, 
we only fail the
+      // one block encountering this issue not the remaining blocks in the 
same batch. On the
+      // RetryingBlockTransferor side, since this exception is considered as 
not retriable, it
+      // would immediately invoke parent listener's onBlockTransferFailure. 
However, the remaining
+      // blocks in the same batch would remain current and active and they 
won't be impacted by
+      // this exception.
       if (PUSH_ERROR_HANDLER.shouldRetryError(e)) {
         String[] targetBlockId = Arrays.copyOfRange(blockIds, index, index + 
1);
         failRemainingBlocks(targetBlockId, e);
@@ -106,7 +119,7 @@ public class OneForOneBlockPusher {
   private void failRemainingBlocks(String[] failedBlockIds, Throwable e) {
     for (String blockId : failedBlockIds) {
       try {
-        listener.onBlockFetchFailure(blockId, e);
+        listener.onBlockPushFailure(blockId, e);
       } catch (Exception e2) {
         logger.error("Error in block push failure callback", e2);
       }
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
similarity index 51%
rename from 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
rename to 
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
index 43bde16..512e4a5 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockTransferor.java
@@ -34,44 +34,44 @@ import org.apache.spark.network.util.NettyUtils;
 import org.apache.spark.network.util.TransportConf;
 
 /**
- * Wraps another BlockFetcher with the ability to automatically retry fetches 
which fail due to
- * IOExceptions, which we hope are due to transient network conditions.
+ * Wraps another BlockFetcher or BlockPusher with the ability to automatically 
retry block
+ * transfers which fail due to IOExceptions, which we hope are due to 
transient network conditions.
  *
- * This fetcher provides stronger guarantees regarding the parent 
BlockFetchingListener. In
+ * This transferor provides stronger guarantees regarding the parent 
BlockTransferListener. In
  * particular, the listener will be invoked exactly once per blockId, with a 
success or failure.
  */
-public class RetryingBlockFetcher {
+public class RetryingBlockTransferor {
 
   /**
-   * Used to initiate the first fetch for all blocks, and subsequently for 
retrying the fetch on any
-   * remaining blocks.
+   * Used to initiate the first transfer for all blocks, and subsequently for 
retrying the
+   * transfer on any remaining blocks.
    */
-  public interface BlockFetchStarter {
+  public interface BlockTransferStarter {
     /**
-     * Creates a new BlockFetcher to fetch the given block ids which may do 
some synchronous
-     * bootstrapping followed by fully asynchronous block fetching.
-     * The BlockFetcher must eventually invoke the Listener on every input 
blockId, or else this
-     * method must throw an exception.
+     * Creates a new BlockFetcher or BlockPusher to transfer the given block 
ids which may do
+     * some synchronous bootstrapping followed by fully asynchronous block 
transferring.
+     * The BlockFetcher or BlockPusher must eventually invoke the Listener on 
every input blockId,
+     * or else this method must throw an exception.
      *
      * This method should always attempt to get a new TransportClient from the
      * {@link org.apache.spark.network.client.TransportClientFactory} in order 
to fix connection
      * issues.
      */
-    void createAndStart(String[] blockIds, BlockFetchingListener listener)
+    void createAndStart(String[] blockIds, BlockTransferListener listener)
          throws IOException, InterruptedException;
   }
 
   /** Shared executor service used for waiting and retrying. */
   private static final ExecutorService executorService = 
Executors.newCachedThreadPool(
-    NettyUtils.createThreadFactory("Block Fetch Retry"));
+    NettyUtils.createThreadFactory("Block Transfer Retry"));
 
-  private static final Logger logger = 
LoggerFactory.getLogger(RetryingBlockFetcher.class);
+  private static final Logger logger = 
LoggerFactory.getLogger(RetryingBlockTransferor.class);
 
-  /** Used to initiate new Block Fetches on our remaining blocks. */
-  private final BlockFetchStarter fetchStarter;
+  /** Used to initiate new Block transfer on our remaining blocks. */
+  private final BlockTransferStarter transferStarter;
 
-  /** Parent listener which we delegate all successful or permanently failed 
block fetches to. */
-  private final BlockFetchingListener listener;
+  /** Parent listener which we delegate all successful or permanently failed 
block transfers to. */
+  private final BlockTransferListener listener;
 
   /** Max number of times we are allowed to retry. */
   private final int maxRetries;
@@ -86,80 +86,82 @@ public class RetryingBlockFetcher {
   private int retryCount = 0;
 
   /**
-   * Set of all block ids which have not been fetched successfully or with a 
non-IO Exception.
+   * Set of all block ids which have not been transferred successfully or with 
a non-IO Exception.
    * A retry involves requesting every outstanding block. Note that since this 
is a LinkedHashSet,
    * input ordering is preserved, so we always request blocks in the same 
order the user provided.
    */
   private final LinkedHashSet<String> outstandingBlocksIds;
 
   /**
-   * The BlockFetchingListener that is active with our current BlockFetcher.
+   * The BlockTransferListener that is active with our current BlockFetcher.
    * When we start a retry, we immediately replace this with a new Listener, 
which causes all any
    * old Listeners to ignore all further responses.
    */
-  private RetryingBlockFetchListener currentListener;
+  private RetryingBlockTransferListener currentListener;
 
   private final ErrorHandler errorHandler;
 
-  public RetryingBlockFetcher(
+  public RetryingBlockTransferor(
       TransportConf conf,
-      RetryingBlockFetcher.BlockFetchStarter fetchStarter,
+      BlockTransferStarter transferStarter,
       String[] blockIds,
-      BlockFetchingListener listener,
+      BlockTransferListener listener,
       ErrorHandler errorHandler) {
-    this.fetchStarter = fetchStarter;
+    this.transferStarter = transferStarter;
     this.listener = listener;
     this.maxRetries = conf.maxIORetries();
     this.retryWaitTime = conf.ioRetryWaitTimeMs();
     this.outstandingBlocksIds = Sets.newLinkedHashSet();
     Collections.addAll(outstandingBlocksIds, blockIds);
-    this.currentListener = new RetryingBlockFetchListener();
+    this.currentListener = new RetryingBlockTransferListener();
     this.errorHandler = errorHandler;
   }
 
-  public RetryingBlockFetcher(
+  public RetryingBlockTransferor(
       TransportConf conf,
-      BlockFetchStarter fetchStarter,
+      BlockTransferStarter transferStarter,
       String[] blockIds,
       BlockFetchingListener listener) {
-    this(conf, fetchStarter, blockIds, listener, 
ErrorHandler.NOOP_ERROR_HANDLER);
+    this(conf, transferStarter, blockIds, listener, 
ErrorHandler.NOOP_ERROR_HANDLER);
   }
 
   /**
-   * Initiates the fetch of all blocks provided in the constructor, with 
possible retries in the
-   * event of transient IOExceptions.
+   * Initiates the transfer of all blocks provided in the constructor, with 
possible retries
+   * in the event of transient IOExceptions.
    */
   public void start() {
-    fetchAllOutstanding();
+    transferAllOutstanding();
   }
 
   /**
-   * Fires off a request to fetch all blocks that have not been fetched 
successfully or permanently
-   * failed (i.e., by a non-IOException).
+   * Fires off a request to transfer all blocks that have not been transferred 
successfully or
+   * permanently failed (i.e., by a non-IOException).
    */
-  private void fetchAllOutstanding() {
+  private void transferAllOutstanding() {
     // Start by retrieving our shared state within a synchronized block.
-    String[] blockIdsToFetch;
+    String[] blockIdsToTransfer;
     int numRetries;
-    RetryingBlockFetchListener myListener;
+    RetryingBlockTransferListener myListener;
     synchronized (this) {
-      blockIdsToFetch = outstandingBlocksIds.toArray(new 
String[outstandingBlocksIds.size()]);
+      blockIdsToTransfer = outstandingBlocksIds.toArray(new 
String[outstandingBlocksIds.size()]);
       numRetries = retryCount;
       myListener = currentListener;
     }
 
-    // Now initiate the fetch on all outstanding blocks, possibly initiating a 
retry if that fails.
+    // Now initiate the transfer on all outstanding blocks, possibly 
initiating a retry if that
+    // fails.
     try {
-      fetchStarter.createAndStart(blockIdsToFetch, myListener);
+      transferStarter.createAndStart(blockIdsToTransfer, myListener);
     } catch (Exception e) {
-      logger.error(String.format("Exception while beginning fetch of %s 
outstanding blocks %s",
-        blockIdsToFetch.length, numRetries > 0 ? "(after " + numRetries + " 
retries)" : ""), e);
+      logger.error(String.format("Exception while beginning %s of %s 
outstanding blocks %s",
+        listener.getTransferType(), blockIdsToTransfer.length,
+        numRetries > 0 ? "(after " + numRetries + " retries)" : ""), e);
 
       if (shouldRetry(e)) {
         initiateRetry();
       } else {
-        for (String bid : blockIdsToFetch) {
-          listener.onBlockFetchFailure(bid, e);
+        for (String bid : blockIdsToTransfer) {
+          listener.onBlockTransferFailure(bid, e);
         }
       }
     }
@@ -167,23 +169,24 @@ public class RetryingBlockFetcher {
 
   /**
    * Lightweight method which initiates a retry in a different thread. The 
retry will involve
-   * calling fetchAllOutstanding() after a configured wait time.
+   * calling transferAllOutstanding() after a configured wait time.
    */
   private synchronized void initiateRetry() {
     retryCount += 1;
-    currentListener = new RetryingBlockFetchListener();
+    currentListener = new RetryingBlockTransferListener();
 
-    logger.info("Retrying fetch ({}/{}) for {} outstanding blocks after {} ms",
-      retryCount, maxRetries, outstandingBlocksIds.size(), retryWaitTime);
+    logger.info("Retrying {} ({}/{}) for {} outstanding blocks after {} ms",
+      listener.getTransferType(), retryCount, maxRetries, 
outstandingBlocksIds.size(),
+      retryWaitTime);
 
     executorService.submit(() -> {
       Uninterruptibles.sleepUninterruptibly(retryWaitTime, 
TimeUnit.MILLISECONDS);
-      fetchAllOutstanding();
+      transferAllOutstanding();
     });
   }
 
   /**
-   * Returns true if we should retry due a block fetch failure. We will retry 
if and only if
+   * Returns true if we should retry due a block transfer failure. We will 
retry if and only if
    * the exception was an IOException and we haven't retried 'maxRetries' 
times already.
    */
   private synchronized boolean shouldRetry(Throwable e) {
@@ -194,17 +197,17 @@ public class RetryingBlockFetcher {
   }
 
   /**
-   * Our RetryListener intercepts block fetch responses and forwards them to 
our parent listener.
-   * Note that in the event of a retry, we will immediately replace the 
'currentListener' field,
-   * indicating that any responses from non-current Listeners should be 
ignored.
+   * Our RetryListener intercepts block transfer responses and forwards them 
to our parent
+   * listener. Note that in the event of a retry, we will immediately replace 
the 'currentListener'
+   * field, indicating that any responses from non-current Listeners should be 
ignored.
    */
-  private class RetryingBlockFetchListener implements BlockFetchingListener {
-    @Override
-    public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
+  private class RetryingBlockTransferListener implements
+      BlockFetchingListener, BlockPushingListener {
+    private void handleBlockTransferSuccess(String blockId, ManagedBuffer 
data) {
       // We will only forward this success message to our parent listener if 
this block request is
       // outstanding and we are still the active listener.
       boolean shouldForwardSuccess = false;
-      synchronized (RetryingBlockFetcher.this) {
+      synchronized (RetryingBlockTransferor.this) {
         if (this == currentListener && outstandingBlocksIds.contains(blockId)) 
{
           outstandingBlocksIds.remove(blockId);
           shouldForwardSuccess = true;
@@ -213,28 +216,27 @@ public class RetryingBlockFetcher {
 
       // Now actually invoke the parent listener, outside of the synchronized 
block.
       if (shouldForwardSuccess) {
-        listener.onBlockFetchSuccess(blockId, data);
+        listener.onBlockTransferSuccess(blockId, data);
       }
     }
 
-    @Override
-    public void onBlockFetchFailure(String blockId, Throwable exception) {
+    private void handleBlockTransferFailure(String blockId, Throwable 
exception) {
       // We will only forward this failure to our parent listener if this 
block request is
-      // outstanding, we are still the active listener, AND we cannot retry 
the fetch.
+      // outstanding, we are still the active listener, AND we cannot retry 
the transfer.
       boolean shouldForwardFailure = false;
-      synchronized (RetryingBlockFetcher.this) {
+      synchronized (RetryingBlockTransferor.this) {
         if (this == currentListener && outstandingBlocksIds.contains(blockId)) 
{
           if (shouldRetry(exception)) {
             initiateRetry();
           } else {
             if (errorHandler.shouldLogError(exception)) {
               logger.error(
-                String.format("Failed to fetch block %s, and will not retry 
(%s retries)",
-                  blockId, retryCount), exception);
+                String.format("Failed to %s block %s, and will not retry (%s 
retries)",
+                  listener.getTransferType(), blockId, retryCount), exception);
             } else {
               logger.debug(
-                String.format("Failed to fetch block %s, and will not retry 
(%s retries)",
-                  blockId, retryCount), exception);
+                String.format("Failed to %s block %s, and will not retry (%s 
retries)",
+                  listener.getTransferType(), blockId, retryCount), exception);
             }
             outstandingBlocksIds.remove(blockId);
             shouldForwardFailure = true;
@@ -244,8 +246,48 @@ public class RetryingBlockFetcher {
 
       // Now actually invoke the parent listener, outside of the synchronized 
block.
       if (shouldForwardFailure) {
-        listener.onBlockFetchFailure(blockId, exception);
+        listener.onBlockTransferFailure(blockId, exception);
       }
     }
+
+    @Override
+    public void onBlockFetchSuccess(String blockId, ManagedBuffer data) {
+      handleBlockTransferSuccess(blockId, data);
+    }
+
+    @Override
+    public void onBlockFetchFailure(String blockId, Throwable exception) {
+      handleBlockTransferFailure(blockId, exception);
+    }
+
+    @Override
+    public void onBlockPushSuccess(String blockId, ManagedBuffer data) {
+      handleBlockTransferSuccess(blockId, data);
+    }
+
+    @Override
+    public void onBlockPushFailure(String blockId, Throwable exception) {
+      handleBlockTransferFailure(blockId, exception);
+    }
+
+    // RetryingBlockTransferListener's onBlockTransferSuccess and 
onBlockTransferFailure
+    // shouldn't be invoked. We only invoke these 2 methods on the parent 
listener.
+    @Override
+    public void onBlockTransferSuccess(String blockId, ManagedBuffer data) {
+      throw new RuntimeException(
+        "Invocation on RetryingBlockTransferListener.onBlockTransferSuccess is 
unexpected.");
+    }
+
+    @Override
+    public void onBlockTransferFailure(String blockId, Throwable exception) {
+      throw new RuntimeException(
+        "Invocation on RetryingBlockTransferListener.onBlockTransferFailure is 
unexpected.");
+    }
+
+    @Override
+    public String getTransferType() {
+      throw new RuntimeException(
+        "Invocation on RetryingBlockTransferListener.getTransferType is 
unexpected.");
+    }
   }
 }
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
index e41198f..f709a56 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
@@ -48,12 +48,12 @@ public class OneForOneBlockPusherSuite {
     blocks.put("shufflePush_0_0_0", new NioManagedBuffer(ByteBuffer.wrap(new 
byte[1])));
     String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
 
-    BlockFetchingListener listener = pushBlocks(
+    BlockPushingListener listener = pushBlocks(
       blocks,
       blockIds,
       Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0)));
 
-    verify(listener).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
+    verify(listener).onBlockPushSuccess(eq("shufflePush_0_0_0"), any());
   }
 
   @Test
@@ -64,16 +64,16 @@ public class OneForOneBlockPusherSuite {
     blocks.put("shufflePush_0_2_0", new 
NettyManagedBuffer(Unpooled.wrappedBuffer(new byte[23])));
     String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
 
-    BlockFetchingListener listener = pushBlocks(
+    BlockPushingListener listener = pushBlocks(
       blocks,
       blockIds,
       Arrays.asList(new PushBlockStream("app-id",0,  0, 0, 0, 0),
         new PushBlockStream("app-id", 0, 0, 1, 0, 1),
         new PushBlockStream("app-id", 0, 0, 2, 0, 2)));
 
-    verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), 
any());
-    verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_1_0"), 
any());
-    verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_2_0"), 
any());
+    verify(listener, times(1)).onBlockPushSuccess(eq("shufflePush_0_0_0"), 
any());
+    verify(listener, times(1)).onBlockPushSuccess(eq("shufflePush_0_1_0"), 
any());
+    verify(listener, times(1)).onBlockPushSuccess(eq("shufflePush_0_2_0"), 
any());
   }
 
   @Test
@@ -84,16 +84,16 @@ public class OneForOneBlockPusherSuite {
     blocks.put("shufflePush_0_2_0", new NioManagedBuffer(ByteBuffer.wrap(new 
byte[0])));
     String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
 
-    BlockFetchingListener listener = pushBlocks(
+    BlockPushingListener listener = pushBlocks(
       blocks,
       blockIds,
       Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0),
         new PushBlockStream("app-id", 0, 0, 1, 0, 1),
         new PushBlockStream("app-id", 0, 0, 2, 0, 2)));
 
-    verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), 
any());
-    verify(listener, times(1)).onBlockFetchFailure(eq("shufflePush_0_1_0"), 
any());
-    verify(listener, times(1)).onBlockFetchFailure(eq("shufflePush_0_2_0"), 
any());
+    verify(listener, times(1)).onBlockPushSuccess(eq("shufflePush_0_0_0"), 
any());
+    verify(listener, times(1)).onBlockPushFailure(eq("shufflePush_0_1_0"), 
any());
+    verify(listener, times(1)).onBlockPushFailure(eq("shufflePush_0_2_0"), 
any());
   }
 
   @Test
@@ -104,18 +104,18 @@ public class OneForOneBlockPusherSuite {
     blocks.put("shufflePush_0_2_0", new NioManagedBuffer(ByteBuffer.wrap(new 
byte[0])));
     String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
 
-    BlockFetchingListener listener = pushBlocks(
+    BlockPushingListener listener = pushBlocks(
       blocks,
       blockIds,
       Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0),
         new PushBlockStream("app-id", 0, 0, 1, 0, 1),
         new PushBlockStream("app-id", 0, 0, 2, 0, 2)));
 
-    verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), 
any());
-    verify(listener, 
times(0)).onBlockFetchSuccess(not(eq("shufflePush_0_0_0")), any());
-    verify(listener, times(0)).onBlockFetchFailure(eq("shufflePush_0_0_0"), 
any());
-    verify(listener, times(1)).onBlockFetchFailure(eq("shufflePush_0_1_0"), 
any());
-    verify(listener, times(2)).onBlockFetchFailure(eq("shufflePush_0_2_0"), 
any());
+    verify(listener, times(1)).onBlockPushSuccess(eq("shufflePush_0_0_0"), 
any());
+    verify(listener, 
times(0)).onBlockPushSuccess(not(eq("shufflePush_0_0_0")), any());
+    verify(listener, times(0)).onBlockPushFailure(eq("shufflePush_0_0_0"), 
any());
+    verify(listener, times(1)).onBlockPushFailure(eq("shufflePush_0_1_0"), 
any());
+    verify(listener, times(2)).onBlockPushFailure(eq("shufflePush_0_2_0"), 
any());
   }
 
   /**
@@ -123,12 +123,12 @@ public class OneForOneBlockPusherSuite {
    * If a block is an empty byte, a server side retriable exception will be 
thrown.
    * If a block is null, a non-retriable exception will be thrown.
    */
-  private static BlockFetchingListener pushBlocks(
+  private static BlockPushingListener pushBlocks(
       LinkedHashMap<String, ManagedBuffer> blocks,
       String[] blockIds,
       Iterable<BlockTransferMessage> expectMessages) {
     TransportClient client = mock(TransportClient.class);
-    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+    BlockPushingListener listener = mock(BlockPushingListener.class);
     OneForOneBlockPusher pusher =
       new OneForOneBlockPusher(client, "app-id", 0, blockIds, listener, 
blocks);
 
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java
similarity index 85%
rename from 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
rename to 
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java
index 6f90df5..1b44b06 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockFetcherSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RetryingBlockTransferorSuite.java
@@ -38,13 +38,13 @@ import org.apache.spark.network.buffer.ManagedBuffer;
 import org.apache.spark.network.buffer.NioManagedBuffer;
 import org.apache.spark.network.util.MapConfigProvider;
 import org.apache.spark.network.util.TransportConf;
-import static 
org.apache.spark.network.shuffle.RetryingBlockFetcher.BlockFetchStarter;
+import static 
org.apache.spark.network.shuffle.RetryingBlockTransferor.BlockTransferStarter;
 
 /**
  * Tests retry logic by throwing IOExceptions and ensuring that subsequent 
attempts are made to
  * fetch the lost blocks.
  */
-public class RetryingBlockFetcherSuite {
+public class RetryingBlockTransferorSuite {
 
   private final ManagedBuffer block0 = new 
NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
   private final ManagedBuffer block1 = new 
NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
@@ -64,8 +64,8 @@ public class RetryingBlockFetcherSuite {
 
     performInteractions(interactions, listener);
 
-    verify(listener).onBlockFetchSuccess("b0", block0);
-    verify(listener).onBlockFetchSuccess("b1", block1);
+    verify(listener).onBlockTransferSuccess("b0", block0);
+    verify(listener).onBlockTransferSuccess("b1", block1);
     verifyNoMoreInteractions(listener);
   }
 
@@ -83,8 +83,9 @@ public class RetryingBlockFetcherSuite {
 
     performInteractions(interactions, listener);
 
-    verify(listener).onBlockFetchFailure(eq("b0"), any());
-    verify(listener).onBlockFetchSuccess("b1", block1);
+    verify(listener).onBlockTransferFailure(eq("b0"), any());
+    verify(listener).onBlockTransferSuccess("b1", block1);
+    verify(listener, atLeastOnce()).getTransferType();
     verifyNoMoreInteractions(listener);
   }
 
@@ -106,8 +107,9 @@ public class RetryingBlockFetcherSuite {
 
     performInteractions(interactions, listener);
 
-    verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
-    verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1);
+    verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
+    verify(listener, timeout(5000)).onBlockTransferSuccess("b1", block1);
+    verify(listener, atLeastOnce()).getTransferType();
     verifyNoMoreInteractions(listener);
   }
 
@@ -128,8 +130,9 @@ public class RetryingBlockFetcherSuite {
 
     performInteractions(interactions, listener);
 
-    verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
-    verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1);
+    verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
+    verify(listener, timeout(5000)).onBlockTransferSuccess("b1", block1);
+    verify(listener, atLeastOnce()).getTransferType();
     verifyNoMoreInteractions(listener);
   }
 
@@ -156,8 +159,9 @@ public class RetryingBlockFetcherSuite {
 
     performInteractions(interactions, listener);
 
-    verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
-    verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1);
+    verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
+    verify(listener, timeout(5000)).onBlockTransferSuccess("b1", block1);
+    verify(listener, atLeastOnce()).getTransferType();
     verifyNoMoreInteractions(listener);
   }
 
@@ -188,8 +192,9 @@ public class RetryingBlockFetcherSuite {
 
     performInteractions(interactions, listener);
 
-    verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
-    verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), any());
+    verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
+    verify(listener, timeout(5000)).onBlockTransferFailure(eq("b1"), any());
+    verify(listener, atLeastOnce()).getTransferType();
     verifyNoMoreInteractions(listener);
   }
 
@@ -218,9 +223,10 @@ public class RetryingBlockFetcherSuite {
 
     performInteractions(interactions, listener);
 
-    verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
-    verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), any());
-    verify(listener, timeout(5000)).onBlockFetchSuccess("b2", block2);
+    verify(listener, timeout(5000)).onBlockTransferSuccess("b0", block0);
+    verify(listener, timeout(5000)).onBlockTransferFailure(eq("b1"), any());
+    verify(listener, timeout(5000)).onBlockTransferSuccess("b2", block2);
+    verify(listener, atLeastOnce()).getTransferType();
     verifyNoMoreInteractions(listener);
   }
 
@@ -243,7 +249,7 @@ public class RetryingBlockFetcherSuite {
       "spark.shuffle.io.maxRetries", "2",
       "spark.shuffle.io.retryWait", "0"));
     TransportConf conf = new TransportConf("shuffle", provider);
-    BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class);
+    BlockTransferStarter fetchStarter = mock(BlockTransferStarter.class);
 
     Stubber stub = null;
 
@@ -293,6 +299,6 @@ public class RetryingBlockFetcherSuite {
     assertNotNull(stub);
     stub.when(fetchStarter).createAndStart(any(), any());
     String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
-    new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, 
listener).start();
+    new RetryingBlockTransferor(conf, fetchStarter, blockIdArray, 
listener).start();
   }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
index 8288498..4e0beea 100644
--- 
a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
+++ 
b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
@@ -36,7 +36,7 @@ import org.apache.spark.network.buffer.{ManagedBuffer, 
NioManagedBuffer}
 import org.apache.spark.network.client.{RpcResponseCallback, 
TransportClientBootstrap}
 import org.apache.spark.network.crypto.{AuthClientBootstrap, 
AuthServerBootstrap}
 import org.apache.spark.network.server._
-import org.apache.spark.network.shuffle.{BlockFetchingListener, 
DownloadFileManager, OneForOneBlockFetcher, RetryingBlockFetcher}
+import org.apache.spark.network.shuffle.{BlockFetchingListener, 
BlockTransferListener, DownloadFileManager, OneForOneBlockFetcher, 
RetryingBlockTransferor}
 import org.apache.spark.network.shuffle.protocol.{UploadBlock, 
UploadBlockStream}
 import org.apache.spark.network.util.JavaUtils
 import org.apache.spark.rpc.RpcEndpointRef
@@ -116,13 +116,15 @@ private[spark] class NettyBlockTransferService(
     }
     try {
       val maxRetries = transportConf.maxIORetries()
-      val blockFetchStarter = new RetryingBlockFetcher.BlockFetchStarter {
+      val blockFetchStarter = new RetryingBlockTransferor.BlockTransferStarter 
{
         override def createAndStart(blockIds: Array[String],
-            listener: BlockFetchingListener): Unit = {
+            listener: BlockTransferListener): Unit = {
+          assert(listener.isInstanceOf[BlockFetchingListener],
+            s"Expecting a BlockFetchingListener, but got ${listener.getClass}")
           try {
             val client = clientFactory.createClient(host, port, maxRetries > 0)
-            new OneForOneBlockFetcher(client, appId, execId, blockIds, 
listener,
-              transportConf, tempFileManager).start()
+            new OneForOneBlockFetcher(client, appId, execId, blockIds,
+              listener.asInstanceOf[BlockFetchingListener], transportConf, 
tempFileManager).start()
           } catch {
             case e: IOException =>
               Try {
@@ -140,7 +142,7 @@ private[spark] class NettyBlockTransferService(
       if (maxRetries > 0) {
         // Note this Fetcher will correctly handle maxRetries == 0; we avoid 
it just in case there's
         // a bug in this code. We should remove the if statement once we're 
sure of the stability.
-        new RetryingBlockFetcher(transportConf, blockFetchStarter, blockIds, 
listener).start()
+        new RetryingBlockTransferor(transportConf, blockFetchStarter, 
blockIds, listener).start()
       } else {
         blockFetchStarter.createAndStart(blockIds, listener)
       }
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
index 53687bb..544c753 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleBlockPusher.scala
@@ -33,7 +33,7 @@ import org.apache.spark.internal.config._
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, 
ManagedBuffer, NioManagedBuffer}
 import org.apache.spark.network.netty.SparkTransportConf
-import org.apache.spark.network.shuffle.BlockFetchingListener
+import org.apache.spark.network.shuffle.BlockPushingListener
 import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler
 import org.apache.spark.network.util.TransportConf
 import org.apache.spark.shuffle.ShuffleBlockPusher._
@@ -205,7 +205,7 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) 
extends Logging {
     val blockIds = request.blocks.map(_._1.toString)
     val remainingBlocks = new HashSet[String]() ++= blockIds
 
-    val blockPushListener = new BlockFetchingListener {
+    val blockPushListener = new BlockPushingListener {
       // Initiating a connection and pushing blocks to a remote shuffle 
service is always handled by
       // the block-push-threads. We should not initiate the connection 
creation in the
       // blockPushListener callbacks which are invoked by the netty eventloop 
because:
@@ -224,12 +224,12 @@ private[spark] class ShuffleBlockPusher(conf: SparkConf) 
extends Logging {
         })
       }
 
-      override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): 
Unit = {
+      override def onBlockPushSuccess(blockId: String, data: ManagedBuffer): 
Unit = {
         logTrace(s"Push for block $blockId to $address successful.")
         handleResult(PushResult(blockId, null))
       }
 
-      override def onBlockFetchFailure(blockId: String, exception: Throwable): 
Unit = {
+      override def onBlockPushFailure(blockId: String, exception: Throwable): 
Unit = {
         // check the message or it's cause to see it needs to be logged.
         if (!errorHandler.shouldLogError(exception)) {
           logTrace(s"Pushing block $blockId to $address failed.", exception)
diff --git 
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
 
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
index c8a8f37..3a6bc47 100644
--- 
a/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/network/netty/NettyBlockTransferServiceSuite.scala
@@ -114,7 +114,7 @@ class NettyBlockTransferServiceSuite
 
     val listener = mock(classOf[BlockFetchingListener])
     var hitExecutorDeadException = false
-    when(listener.onBlockFetchFailure(any(), 
any(classOf[ExecutorDeadException])))
+    when(listener.onBlockTransferFailure(any(), 
any(classOf[ExecutorDeadException])))
       .thenAnswer(_ => {hitExecutorDeadException = true})
 
     service0 = createService(port, driverEndpointRef)
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala 
b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
index 6a07fef..2800be1 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleBlockPusherSuite.scala
@@ -33,7 +33,7 @@ import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark._
 import org.apache.spark.network.buffer.ManagedBuffer
-import org.apache.spark.network.shuffle.{BlockFetchingListener, 
BlockStoreClient}
+import org.apache.spark.network.shuffle.{BlockPushingListener, 
BlockStoreClient}
 import org.apache.spark.network.shuffle.ErrorHandler.BlockPushErrorHandler
 import org.apache.spark.network.util.TransportConf
 import org.apache.spark.serializer.JavaSerializer
@@ -75,9 +75,9 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with 
BeforeAndAfterEach {
         val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]]
         pushedBlocks ++= blocks
         val managedBuffers = 
invocation.getArguments()(3).asInstanceOf[Array[ManagedBuffer]]
-        val blockFetchListener = 
invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
+        val blockPushListener = 
invocation.getArguments()(4).asInstanceOf[BlockPushingListener]
         (blocks, managedBuffers).zipped.foreach((blockId, buffer) => {
-          blockFetchListener.onBlockFetchSuccess(blockId, buffer)
+          blockPushListener.onBlockPushSuccess(blockId, buffer)
         })
       })
   }
@@ -164,24 +164,24 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with 
BeforeAndAfterEach {
   test("Hit maxBlocksInFlightPerAddress limit so that the blocks are 
deferred") {
     conf.set("spark.reducer.maxBlocksInFlightPerAddress", "2")
     var blockPendingResponse : String = null
-    var listener : BlockFetchingListener = null
+    var listener : BlockPushingListener = null
     when(shuffleClient.pushBlocks(any(), any(), any(), any(), any()))
       .thenAnswer((invocation: InvocationOnMock) => {
         val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]]
         pushedBlocks ++= blocks
         val managedBuffers = 
invocation.getArguments()(3).asInstanceOf[Array[ManagedBuffer]]
-        val blockFetchListener = 
invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
+        val blockPushListener = 
invocation.getArguments()(4).asInstanceOf[BlockPushingListener]
         // Expecting 2 blocks
         assert(blocks.length == 2)
         if (blockPendingResponse == null) {
           blockPendingResponse = blocks(1)
-          listener = blockFetchListener
+          listener = blockPushListener
           // Respond with success only for the first block which will cause 
all the rest of the
           // blocks to be deferred
-          blockFetchListener.onBlockFetchSuccess(blocks(0), managedBuffers(0))
+          blockPushListener.onBlockPushSuccess(blocks(0), managedBuffers(0))
         } else {
           (blocks, managedBuffers).zipped.foreach((blockId, buffer) => {
-            blockFetchListener.onBlockFetchSuccess(blockId, buffer)
+            blockPushListener.onBlockPushSuccess(blockId, buffer)
           })
         }
       })
@@ -193,7 +193,7 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with 
BeforeAndAfterEach {
       .pushBlocks(any(), any(), any(), any(), any())
     assert(pushedBlocks.length == 2)
     // this will trigger push of deferred blocks
-    listener.onBlockFetchSuccess(blockPendingResponse, 
mock(classOf[ManagedBuffer]))
+    listener.onBlockPushSuccess(blockPendingResponse, 
mock(classOf[ManagedBuffer]))
     pusher.runPendingTasks()
     verify(shuffleClient, times(4))
       .pushBlocks(any(), any(), any(), any(), any())
@@ -248,17 +248,17 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     when(shuffleClient.pushBlocks(any(), any(), any(), any(), any()))
       .thenAnswer((invocation: InvocationOnMock) => {
         val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]]
-        val blockFetchListener = 
invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
+        val blockPushListener = 
invocation.getArguments()(4).asInstanceOf[BlockPushingListener]
         blocks.foreach(blockId => {
           if (failBlock) {
             failBlock = false
             // Fail the first block with the collision exception.
-            blockFetchListener.onBlockFetchFailure(blockId, new 
RuntimeException(
+            blockPushListener.onBlockPushFailure(blockId, new RuntimeException(
               new IllegalArgumentException(
                 
BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX)))
           } else {
             pushedBlocks += blockId
-            blockFetchListener.onBlockFetchSuccess(blockId, 
mock(classOf[ManagedBuffer]))
+            blockPushListener.onBlockPushSuccess(blockId, 
mock(classOf[ManagedBuffer]))
           }
         })
       })
@@ -278,16 +278,16 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     when(shuffleClient.pushBlocks(any(), any(), any(), any(), any()))
       .thenAnswer((invocation: InvocationOnMock) => {
         val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]]
-        val blockFetchListener = 
invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
+        val blockPushListener = 
invocation.getArguments()(4).asInstanceOf[BlockPushingListener]
         blocks.foreach(blockId => {
           if (failBlock) {
             failBlock = false
             // Fail the first block with the too late exception.
-            blockFetchListener.onBlockFetchFailure(blockId, new 
RuntimeException(
+            blockPushListener.onBlockPushFailure(blockId, new RuntimeException(
               new 
IllegalArgumentException(BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX)))
           } else {
             pushedBlocks += blockId
-            blockFetchListener.onBlockFetchSuccess(blockId, 
mock(classOf[ManagedBuffer]))
+            blockPushListener.onBlockPushSuccess(blockId, 
mock(classOf[ManagedBuffer]))
           }
         })
       })
@@ -307,9 +307,9 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with 
BeforeAndAfterEach {
       .thenAnswer((invocation: InvocationOnMock) => {
         val blocks = invocation.getArguments()(2).asInstanceOf[Array[String]]
         pushedBlocks ++= blocks
-        val blockFetchListener = 
invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
+        val blockPushListener = 
invocation.getArguments()(4).asInstanceOf[BlockPushingListener]
         blocks.foreach(blockId => {
-          blockFetchListener.onBlockFetchFailure(
+          blockPushListener.onBlockPushFailure(
             blockId, new RuntimeException(new ConnectException()))
         })
       })
@@ -332,9 +332,9 @@ class ShuffleBlockPusherSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     when(shuffleClient.pushBlocks(any(), any(), any(), any(), any()))
       .thenAnswer((invocation: InvocationOnMock) => {
         val pushedBlocks = 
invocation.getArguments()(2).asInstanceOf[Array[String]]
-        val blockFetchListener = 
invocation.getArguments()(4).asInstanceOf[BlockFetchingListener]
+        val blockPushListener = 
invocation.getArguments()(4).asInstanceOf[BlockPushingListener]
         pushedBlocks.foreach(blockId => {
-          blockFetchListener.onBlockFetchFailure(
+          blockPushListener.onBlockPushFailure(
             blockId, new IOException("Failed to send RPC",
               new FileNotFoundException("file not found")))
         })

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to