This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 82eea13  [SPARK-32915][CORE] Network-layer and shuffle RPC layer 
changes to support push shuffle blocks
82eea13 is described below

commit 82eea13c7686fb4bfbe8fb4185db81438d2ea884
Author: Min Shen <ms...@linkedin.com>
AuthorDate: Thu Oct 15 12:34:52 2020 -0500

    [SPARK-32915][CORE] Network-layer and shuffle RPC layer changes to support 
push shuffle blocks
    
    ### What changes were proposed in this pull request?
    
    This is the first patch for SPIP SPARK-30602 for push-based shuffle.
    Summary of changes:
    * Introduce new API in ExternalBlockStoreClient to push blocks to a remote 
shuffle service.
    * Leveraging the streaming upload functionality in SPARK-6237, it also 
enables the ExternalBlockHandler to delegate the handling of block push 
requests to MergedShuffleFileManager.
    * Propose the API for MergedShuffleFileManager, where the core logic on the 
shuffle service side to handle block push requests is defined. The actual 
implementation of this API is deferred into a later RB to restrict the size of 
this PR.
    * Introduce OneForOneBlockPusher to enable pushing blocks to remote shuffle 
services in shuffle RPC layer.
    * New protocols in shuffle RPC layer to support the functionalities.
    
    ### Why are the changes needed?
    
    Refer to the SPIP in SPARK-30602
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Added unit tests.
    The reference PR with the consolidated changes covering the complete 
implementation is also provided in SPARK-30602.
    We have already verified the functionality and the improved performance as 
documented in the SPIP doc.
    
    Lead-authored-by: Min Shen <mshenlinkedin.com>
    Co-authored-by: Chandni Singh <chsinghlinkedin.com>
    Co-authored-by: Ye Zhou <yezhoulinkedin.com>
    
    Closes #29855 from Victsm/SPARK-32915.
    
    Lead-authored-by: Min Shen <ms...@linkedin.com>
    Co-authored-by: Chandni Singh <chsi...@linkedin.com>
    Co-authored-by: Ye Zhou <yez...@linkedin.com>
    Co-authored-by: Chandni Singh <singh.chan...@gmail.com>
    Co-authored-by: Min Shen <victor....@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 common/network-common/pom.xml                      |   4 +
 .../apache/spark/network/protocol/Encoders.java    |  63 ++++++++
 common/network-shuffle/pom.xml                     |   9 ++
 .../spark/network/shuffle/BlockStoreClient.java    |  21 +++
 .../apache/spark/network/shuffle/ErrorHandler.java |  85 +++++++++++
 .../network/shuffle/ExternalBlockHandler.java      | 104 +++++++++++++-
 .../network/shuffle/ExternalBlockStoreClient.java  |  52 ++++++-
 .../spark/network/shuffle/MergedBlockMeta.java     |  64 +++++++++
 .../network/shuffle/MergedShuffleFileManager.java  | 116 +++++++++++++++
 .../network/shuffle/OneForOneBlockPusher.java      | 123 ++++++++++++++++
 .../network/shuffle/RetryingBlockFetcher.java      |  27 +++-
 .../shuffle/protocol/BlockTransferMessage.java     |   6 +-
 .../shuffle/protocol/FinalizeShuffleMerge.java     |  84 +++++++++++
 .../network/shuffle/protocol/MergeStatuses.java    | 118 +++++++++++++++
 .../network/shuffle/protocol/PushBlockStream.java  |  95 ++++++++++++
 .../spark/network/shuffle/ErrorHandlerSuite.java   |  51 +++++++
 .../network/shuffle/ExternalBlockHandlerSuite.java |  40 +++++-
 .../network/shuffle/OneForOneBlockPusherSuite.java | 159 +++++++++++++++++++++
 .../ExternalShuffleServiceMetricsSuite.scala       |   3 +-
 .../yarn/YarnShuffleServiceMetricsSuite.scala      |   2 +-
 .../network/yarn/YarnShuffleServiceSuite.scala     |   1 +
 21 files changed, 1212 insertions(+), 15 deletions(-)

diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index 9d5bc9a..d328a7d 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -91,6 +91,10 @@
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-crypto</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.roaringbitmap</groupId>
+      <artifactId>RoaringBitmap</artifactId>
+    </dependency>
 
     <!-- Test dependencies -->
     <dependency>
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
 
b/common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
index 490915f..4fa191b 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/protocol/Encoders.java
@@ -17,9 +17,11 @@
 
 package org.apache.spark.network.protocol;
 
+import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 
 import io.netty.buffer.ByteBuf;
+import org.roaringbitmap.RoaringBitmap;
 
 /** Provides a canonical set of Encoders for simple types. */
 public class Encoders {
@@ -44,6 +46,40 @@ public class Encoders {
     }
   }
 
+  /** Bitmaps are encoded with their serialization length followed by the 
serialization bytes. */
+  public static class Bitmaps {
+    public static int encodedLength(RoaringBitmap b) {
+      // Compress the bitmap before serializing it. Note that since 
BlockTransferMessage
+      // needs to invoke encodedLength first to figure out the length for the 
ByteBuf, it
+      // guarantees that the bitmap will always be compressed before being 
serialized.
+      b.trim();
+      b.runOptimize();
+      return b.serializedSizeInBytes();
+    }
+
+    public static void encode(ByteBuf buf, RoaringBitmap b) {
+      int encodedLength = b.serializedSizeInBytes();
+      // RoaringBitmap requires nio ByteBuffer for serde. We expose the netty 
ByteBuf as a nio
+      // ByteBuffer. Here, we need to explicitly manage the index so we can 
write into the
+      // ByteBuffer, and the write is reflected in the underneath ByteBuf.
+      b.serialize(buf.nioBuffer(buf.writerIndex(), encodedLength));
+      buf.writerIndex(buf.writerIndex() + encodedLength);
+    }
+
+    public static RoaringBitmap decode(ByteBuf buf) {
+      RoaringBitmap bitmap = new RoaringBitmap();
+      try {
+        bitmap.deserialize(buf.nioBuffer());
+        // RoaringBitmap deserialize does not advance the reader index of the 
underlying ByteBuf.
+        // Manually update the index here.
+        buf.readerIndex(buf.readerIndex() + bitmap.serializedSizeInBytes());
+      } catch (IOException e) {
+        throw new RuntimeException("Exception while decoding bitmap", e);
+      }
+      return bitmap;
+    }
+  }
+
   /** Byte arrays are encoded with their length followed by bytes. */
   public static class ByteArrays {
     public static int encodedLength(byte[] arr) {
@@ -135,4 +171,31 @@ public class Encoders {
       return longs;
     }
   }
+
+  /** Bitmap arrays are encoded with the number of bitmaps followed by 
per-Bitmap encoding. */
+  public static class BitmapArrays {
+    public static int encodedLength(RoaringBitmap[] bitmaps) {
+      int totalLength = 4;
+      for (RoaringBitmap b : bitmaps) {
+        totalLength += Bitmaps.encodedLength(b);
+      }
+      return totalLength;
+    }
+
+    public static void encode(ByteBuf buf, RoaringBitmap[] bitmaps) {
+      buf.writeInt(bitmaps.length);
+      for (RoaringBitmap b : bitmaps) {
+        Bitmaps.encode(buf, b);
+      }
+    }
+
+    public static RoaringBitmap[] decode(ByteBuf buf) {
+      int numBitmaps = buf.readInt();
+      RoaringBitmap[] bitmaps = new RoaringBitmap[numBitmaps];
+      for (int i = 0; i < bitmaps.length; i ++) {
+        bitmaps[i] = Bitmaps.decode(buf);
+      }
+      return bitmaps;
+    }
+  }
 }
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index 00f1def..a4a1ff9 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -57,6 +57,10 @@
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.roaringbitmap</groupId>
+      <artifactId>RoaringBitmap</artifactId>
+    </dependency>
 
     <!-- Test dependencies -->
     <dependency>
@@ -93,6 +97,11 @@
       <artifactId>mockito-core</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>commons-io</groupId>
+      <artifactId>commons-io</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
index e762bd20..37befcd 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
@@ -29,6 +29,7 @@ import com.codahale.metrics.MetricSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.spark.network.buffer.ManagedBuffer;
 import org.apache.spark.network.client.RpcResponseCallback;
 import org.apache.spark.network.client.TransportClient;
 import org.apache.spark.network.client.TransportClientFactory;
@@ -135,4 +136,24 @@ public abstract class BlockStoreClient implements 
Closeable {
       hostLocalDirsCompletable.completeExceptionally(e);
     }
   }
+
+  /**
+   * Push a sequence of shuffle blocks in a best-effort manner to a remote 
node asynchronously.
+   * These shuffle blocks, along with blocks pushed by other clients, will be 
merged into
+   * per-shuffle partition merged shuffle files on the destination node.
+   *
+   * @param host the host of the remote node.
+   * @param port the port of the remote node.
+   * @param blockIds block ids to be pushed
+   * @param buffers buffers to be pushed
+   * @param listener the listener to receive block push status.
+   */
+  public void pushBlocks(
+      String host,
+      int port,
+      String[] blockIds,
+      ManagedBuffer[] buffers,
+      BlockFetchingListener listener) {
+    throw new UnsupportedOperationException();
+  }
 }
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
new file mode 100644
index 0000000..308b0b7
--- /dev/null
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.net.ConnectException;
+
+import com.google.common.base.Throwables;
+
+/**
+ * Plugs into {@link RetryingBlockFetcher} to further control when an 
exception should be retried
+ * and logged.
+ * Note: {@link RetryingBlockFetcher} will delegate the exception to this 
handler only when
+ * - remaining retries < max retries
+ * - exception is an IOException
+ */
+
+public interface ErrorHandler {
+
+  boolean shouldRetryError(Throwable t);
+
+  default boolean shouldLogError(Throwable t) {
+    return true;
+  }
+
+  /**
+   * A no-op error handler instance.
+   */
+  ErrorHandler NOOP_ERROR_HANDLER = t -> true;
+
+  /**
+   * The error handler for pushing shuffle blocks to remote shuffle services.
+   */
+  class BlockPushErrorHandler implements ErrorHandler {
+    /**
+     * String constant used for generating exception messages indicating a 
block to be merged
+     * arrives too late on the server side, and also for later checking such 
exceptions on the
+     * client side. When we get a block push failure because of the block 
arrives too late, we
+     * will not retry pushing the block nor log the exception on the client 
side.
+     */
+    public static final String TOO_LATE_MESSAGE_SUFFIX =
+      "received after merged shuffle is finalized";
+
+    /**
+     * String constant used for generating exception messages indicating the 
server couldn't
+     * append a block after all available attempts due to collision with other 
blocks belonging
+     * to the same shuffle partition, and also for later checking such 
exceptions on the client
+     * side. When we get a block push failure because of the block couldn't be 
written due to
+     * this reason, we will not log the exception on the client side.
+     */
+    public static final String BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX =
+      "Couldn't find an opportunity to write block";
+
+    @Override
+    public boolean shouldRetryError(Throwable t) {
+      // If it is a connection time out or a connection closed exception, no 
need to retry.
+      if (t.getCause() != null && t.getCause() instanceof ConnectException) {
+        return false;
+      }
+      // If the block is too late, there is no need to retry it
+      return 
!Throwables.getStackTraceAsString(t).contains(TOO_LATE_MESSAGE_SUFFIX);
+    }
+
+    @Override
+    public boolean shouldLogError(Throwable t) {
+      String errorStackTrace = Throwables.getStackTraceAsString(t);
+      return 
!errorStackTrace.contains(BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX) &&
+        !errorStackTrace.contains(TOO_LATE_MESSAGE_SUFFIX);
+    }
+  }
+}
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
index 33865a2..321b253 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
@@ -32,6 +32,7 @@ import com.codahale.metrics.MetricSet;
 import com.codahale.metrics.Timer;
 import com.codahale.metrics.Counter;
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.spark.network.client.StreamCallbackWithID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,11 +62,21 @@ public class ExternalBlockHandler extends RpcHandler {
   final ExternalShuffleBlockResolver blockManager;
   private final OneForOneStreamManager streamManager;
   private final ShuffleMetrics metrics;
+  private final MergedShuffleFileManager mergeManager;
 
   public ExternalBlockHandler(TransportConf conf, File registeredExecutorFile)
     throws IOException {
     this(new OneForOneStreamManager(),
-      new ExternalShuffleBlockResolver(conf, registeredExecutorFile));
+      new ExternalShuffleBlockResolver(conf, registeredExecutorFile),
+      new NoOpMergedShuffleFileManager());
+  }
+
+  public ExternalBlockHandler(
+      TransportConf conf,
+      File registeredExecutorFile,
+      MergedShuffleFileManager mergeManager) throws IOException {
+    this(new OneForOneStreamManager(),
+      new ExternalShuffleBlockResolver(conf, registeredExecutorFile), 
mergeManager);
   }
 
   @VisibleForTesting
@@ -78,9 +89,19 @@ public class ExternalBlockHandler extends RpcHandler {
   public ExternalBlockHandler(
       OneForOneStreamManager streamManager,
       ExternalShuffleBlockResolver blockManager) {
+    this(streamManager, blockManager, new NoOpMergedShuffleFileManager());
+  }
+
+  /** Enables mocking out the StreamManager, BlockManager, and MergeManager. */
+  @VisibleForTesting
+  public ExternalBlockHandler(
+      OneForOneStreamManager streamManager,
+      ExternalShuffleBlockResolver blockManager,
+      MergedShuffleFileManager mergeManager) {
     this.metrics = new ShuffleMetrics();
     this.streamManager = streamManager;
     this.blockManager = blockManager;
+    this.mergeManager = mergeManager;
   }
 
   @Override
@@ -89,6 +110,21 @@ public class ExternalBlockHandler extends RpcHandler {
     handleMessage(msgObj, client, callback);
   }
 
+  @Override
+  public StreamCallbackWithID receiveStream(
+      TransportClient client,
+      ByteBuffer messageHeader,
+      RpcResponseCallback callback) {
+    BlockTransferMessage msgObj = 
BlockTransferMessage.Decoder.fromByteBuffer(messageHeader);
+    if (msgObj instanceof PushBlockStream) {
+      PushBlockStream message = (PushBlockStream) msgObj;
+      checkAuth(client, message.appId);
+      return mergeManager.receiveBlockDataAsStream(message);
+    } else {
+      throw new UnsupportedOperationException("Unexpected message with 
#receiveStream: " + msgObj);
+    }
+  }
+
   protected void handleMessage(
       BlockTransferMessage msgObj,
       TransportClient client,
@@ -139,6 +175,7 @@ public class ExternalBlockHandler extends RpcHandler {
         RegisterExecutor msg = (RegisterExecutor) msgObj;
         checkAuth(client, msg.appId);
         blockManager.registerExecutor(msg.appId, msg.execId, msg.executorInfo);
+        mergeManager.registerExecutor(msg.appId, msg.executorInfo.localDirs);
         callback.onSuccess(ByteBuffer.wrap(new byte[0]));
       } finally {
         responseDelayContext.stop();
@@ -156,6 +193,20 @@ public class ExternalBlockHandler extends RpcHandler {
       Map<String, String[]> localDirs = blockManager.getLocalDirs(msg.appId, 
msg.execIds);
       callback.onSuccess(new LocalDirsForExecutors(localDirs).toByteBuffer());
 
+    } else if (msgObj instanceof FinalizeShuffleMerge) {
+      final Timer.Context responseDelayContext =
+          metrics.finalizeShuffleMergeLatencyMillis.time();
+      FinalizeShuffleMerge msg = (FinalizeShuffleMerge) msgObj;
+      try {
+        checkAuth(client, msg.appId);
+        MergeStatuses statuses = mergeManager.finalizeShuffleMerge(msg);
+        callback.onSuccess(statuses.toByteBuffer());
+      } catch(IOException e) {
+        throw new RuntimeException(String.format("Error while finalizing 
shuffle merge "
+          + "for application %s shuffle %d", msg.appId, msg.shuffleId), e);
+      } finally {
+        responseDelayContext.stop();
+      }
     } else {
       throw new UnsupportedOperationException("Unexpected message: " + msgObj);
     }
@@ -225,6 +276,8 @@ public class ExternalBlockHandler extends RpcHandler {
     private final Timer openBlockRequestLatencyMillis = new Timer();
     // Time latency for executor registration latency in ms
     private final Timer registerExecutorRequestLatencyMillis = new Timer();
+    // Time latency for processing finalize shuffle merge request latency in ms
+    private final Timer finalizeShuffleMergeLatencyMillis = new Timer();
     // Block transfer rate in byte per second
     private final Meter blockTransferRateBytes = new Meter();
     // Number of active connections to the shuffle service
@@ -236,6 +289,7 @@ public class ExternalBlockHandler extends RpcHandler {
       allMetrics = new HashMap<>();
       allMetrics.put("openBlockRequestLatencyMillis", 
openBlockRequestLatencyMillis);
       allMetrics.put("registerExecutorRequestLatencyMillis", 
registerExecutorRequestLatencyMillis);
+      allMetrics.put("finalizeShuffleMergeLatencyMillis", 
finalizeShuffleMergeLatencyMillis);
       allMetrics.put("blockTransferRateBytes", blockTransferRateBytes);
       allMetrics.put("registeredExecutorsSize",
                      (Gauge<Integer>) () -> 
blockManager.getRegisteredExecutorsSize());
@@ -373,6 +427,54 @@ public class ExternalBlockHandler extends RpcHandler {
     }
   }
 
+  /**
+   * Dummy implementation of merged shuffle file manager. Suitable for when 
push-based shuffle
+   * is not enabled.
+   */
+  private static class NoOpMergedShuffleFileManager implements 
MergedShuffleFileManager {
+
+    @Override
+    public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
+      throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");
+    }
+
+    @Override
+    public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws 
IOException {
+      throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");
+    }
+
+    @Override
+    public void registerApplication(String appId, String user) {
+      // No-op. Do nothing.
+    }
+
+    @Override
+    public void registerExecutor(String appId, String[] localDirs) {
+      // No-Op. Do nothing.
+    }
+
+    @Override
+    public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
+      throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");
+    }
+
+    @Override
+    public ManagedBuffer getMergedBlockData(
+        String appId, int shuffleId, int reduceId, int chunkId) {
+      throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");
+    }
+
+    @Override
+    public MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int 
reduceId) {
+      throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");
+    }
+
+    @Override
+    public String[] getMergedBlockDirs(String appId) {
+      throw new UnsupportedOperationException("Cannot handle shuffle block 
merge");
+    }
+  }
+
   @Override
   public void channelActive(TransportClient client) {
     metrics.activeConnections.inc();
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
index 76e23e7..eca35ed 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
@@ -20,21 +20,24 @@ package org.apache.spark.network.shuffle;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 
 import com.codahale.metrics.MetricSet;
 import com.google.common.collect.Lists;
+
+import org.apache.spark.network.TransportContext;
+import org.apache.spark.network.buffer.ManagedBuffer;
 import org.apache.spark.network.client.RpcResponseCallback;
 import org.apache.spark.network.client.TransportClient;
 import org.apache.spark.network.client.TransportClientBootstrap;
-import org.apache.spark.network.shuffle.protocol.*;
-
-import org.apache.spark.network.TransportContext;
 import org.apache.spark.network.crypto.AuthClientBootstrap;
 import org.apache.spark.network.sasl.SecretKeyHolder;
 import org.apache.spark.network.server.NoOpRpcHandler;
+import org.apache.spark.network.shuffle.protocol.*;
 import org.apache.spark.network.util.TransportConf;
 
 /**
@@ -43,6 +46,8 @@ import org.apache.spark.network.util.TransportConf;
  * (via BlockTransferService), which has the downside of losing the data if we 
lose the executors.
  */
 public class ExternalBlockStoreClient extends BlockStoreClient {
+  private static final ErrorHandler PUSH_ERROR_HANDLER = new 
ErrorHandler.BlockPushErrorHandler();
+
   private final TransportConf conf;
   private final boolean authEnabled;
   private final SecretKeyHolder secretKeyHolder;
@@ -90,12 +95,12 @@ public class ExternalBlockStoreClient extends 
BlockStoreClient {
     try {
       int maxRetries = conf.maxIORetries();
       RetryingBlockFetcher.BlockFetchStarter blockFetchStarter =
-          (blockIds1, listener1) -> {
+          (inputBlockId, inputListener) -> {
             // Unless this client is closed.
             if (clientFactory != null) {
               TransportClient client = clientFactory.createClient(host, port, 
maxRetries > 0);
               new OneForOneBlockFetcher(client, appId, execId,
-                blockIds1, listener1, conf, downloadFileManager).start();
+                inputBlockId, inputListener, conf, 
downloadFileManager).start();
             } else {
               logger.info("This clientFactory was closed. Skipping further 
block fetch retries.");
             }
@@ -117,6 +122,43 @@ public class ExternalBlockStoreClient extends 
BlockStoreClient {
   }
 
   @Override
+  public void pushBlocks(
+      String host,
+      int port,
+      String[] blockIds,
+      ManagedBuffer[] buffers,
+      BlockFetchingListener listener) {
+    checkInit();
+    assert blockIds.length == buffers.length : "Number of block ids and 
buffers do not match.";
+
+    Map<String, ManagedBuffer> buffersWithId = new HashMap<>();
+    for (int i = 0; i < blockIds.length; i++) {
+      buffersWithId.put(blockIds[i], buffers[i]);
+    }
+    logger.debug("Push {} shuffle blocks to {}:{}", blockIds.length, host, 
port);
+    try {
+      RetryingBlockFetcher.BlockFetchStarter blockPushStarter =
+          (inputBlockId, inputListener) -> {
+            TransportClient client = clientFactory.createClient(host, port);
+            new OneForOneBlockPusher(client, appId, inputBlockId, 
inputListener, buffersWithId)
+              .start();
+          };
+      int maxRetries = conf.maxIORetries();
+      if (maxRetries > 0) {
+        new RetryingBlockFetcher(
+          conf, blockPushStarter, blockIds, listener, 
PUSH_ERROR_HANDLER).start();
+      } else {
+        blockPushStarter.createAndStart(blockIds, listener);
+      }
+    } catch (Exception e) {
+      logger.error("Exception while beginning pushBlocks", e);
+      for (String blockId : blockIds) {
+        listener.onBlockFetchFailure(blockId, e);
+      }
+    }
+  }
+
+  @Override
   public MetricSet shuffleMetrics() {
     checkInit();
     return clientFactory.getAllMetrics();
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedBlockMeta.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedBlockMeta.java
new file mode 100644
index 0000000..e9d9e53
--- /dev/null
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedBlockMeta.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Preconditions;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.roaringbitmap.RoaringBitmap;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.protocol.Encoders;
+
+/**
+ * Contains meta information for a merged block. Currently this information 
constitutes:
+ * 1. Number of chunks in a merged shuffle block.
+ * 2. Bitmaps for each chunk in the merged block. A chunk bitmap contains all 
the mapIds that were
+ *    merged to that merged block chunk.
+ */
+public class MergedBlockMeta {
+  private final int numChunks;
+  private final ManagedBuffer chunksBitmapBuffer;
+
+  public MergedBlockMeta(int numChunks, ManagedBuffer chunksBitmapBuffer) {
+    this.numChunks = numChunks;
+    this.chunksBitmapBuffer = Preconditions.checkNotNull(chunksBitmapBuffer);
+  }
+
+  public int getNumChunks() {
+    return numChunks;
+  }
+
+  public ManagedBuffer getChunksBitmapBuffer() {
+    return chunksBitmapBuffer;
+  }
+
+  public RoaringBitmap[] readChunkBitmaps() throws IOException {
+    ByteBuf buf = Unpooled.wrappedBuffer(chunksBitmapBuffer.nioByteBuffer());
+    List<RoaringBitmap> bitmaps = new ArrayList<>();
+    while(buf.isReadable()) {
+      bitmaps.add(Encoders.Bitmaps.decode(buf));
+    }
+    assert (bitmaps.size() == numChunks);
+    return bitmaps.toArray(new RoaringBitmap[0]);
+  }
+}
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
new file mode 100644
index 0000000..ef4dbb2
--- /dev/null
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.io.IOException;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.client.StreamCallbackWithID;
+import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
+import org.apache.spark.network.shuffle.protocol.MergeStatuses;
+import org.apache.spark.network.shuffle.protocol.PushBlockStream;
+
+
+/**
+ * The MergedShuffleFileManager is used to process push based shuffle when 
enabled. It works
+ * along side {@link ExternalBlockHandler} and serves as an RPCHandler for
+ * {@link org.apache.spark.network.server.RpcHandler#receiveStream}, where it 
processes the
+ * remotely pushed streams of shuffle blocks to merge them into merged shuffle 
files. Right
+ * now, support for push based shuffle is only implemented for external 
shuffle service in
+ * YARN mode.
+ */
+public interface MergedShuffleFileManager {
+  /**
+   * Provides the stream callback used to process a remotely pushed block. The 
callback is
+   * used by the {@link org.apache.spark.network.client.StreamInterceptor} 
installed on the
+   * channel to process the block data in the channel outside of the message 
frame.
+   *
+   * @param msg metadata of the remotely pushed blocks. This is processed 
inside the message frame
+   * @return A stream callback to process the block data in streaming fashion 
as it arrives
+   */
+  StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg);
+
+  /**
+   * Handles the request to finalize shuffle merge for a given shuffle.
+   *
+   * @param msg contains appId and shuffleId to uniquely identify a shuffle to 
be finalized
+   * @return The statuses of the merged shuffle partitions for the given 
shuffle on this
+   *         shuffle service
+   * @throws IOException
+   */
+  MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws 
IOException;
+
+  /**
+   * Registers an application when it starts. It also stores the username 
which is necessary
+   * for generating the host local directories for merged shuffle files.
+   * Right now, this is invoked by YarnShuffleService.
+   *
+   * @param appId application ID
+   * @param user username
+   */
+  void registerApplication(String appId, String user);
+
+  /**
+   * Registers an executor with its local dir list when it starts. This 
provides the specific path
+   * so MergedShuffleFileManager knows where to store and look for shuffle 
data for a
+   * given application. It is invoked by the RPC call when executor tries to 
register with the
+   * local shuffle service.
+   *
+   * @param appId application ID
+   * @param localDirs The list of local dirs that this executor gets granted 
from NodeManager
+   */
+  void registerExecutor(String appId, String[] localDirs);
+
+  /**
+   * Invoked when an application finishes. This cleans up any remaining 
metadata associated with
+   * this application, and optionally deletes the application specific 
directory path.
+   *
+   * @param appId application ID
+   * @param cleanupLocalDirs flag indicating whether MergedShuffleFileManager 
should handle
+   *                         deletion of local dirs itself.
+   */
+  void applicationRemoved(String appId, boolean cleanupLocalDirs);
+
+  /**
+   * Get the buffer for a given merged shuffle chunk when serving merged 
shuffle to reducers
+   *
+   * @param appId application ID
+   * @param shuffleId shuffle ID
+   * @param reduceId reducer ID
+   * @param chunkId merged shuffle file chunk ID
+   * @return The {@link ManagedBuffer} for the given merged shuffle chunk
+   */
+  ManagedBuffer getMergedBlockData(String appId, int shuffleId, int reduceId, 
int chunkId);
+
+  /**
+   * Get the meta information of a merged block.
+   *
+   * @param appId application ID
+   * @param shuffleId shuffle ID
+   * @param reduceId reducer ID
+   * @return meta information of a merged block
+   */
+  MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int 
reduceId);
+
+  /**
+   * Get the local directories which stores the merged shuffle files.
+   *
+   * @param appId application ID
+   */
+  String[] getMergedBlockDirs(String appId);
+}
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
new file mode 100644
index 0000000..407b248
--- /dev/null
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.shuffle.protocol.PushBlockStream;
+
+/**
+ * Similar to {@link OneForOneBlockFetcher}, but for pushing blocks to remote 
shuffle service to
+ * be merged instead of for fetching them from remote shuffle services. This 
is used by
+ * ShuffleWriter when the block push process is initiated. The supplied 
BlockFetchingListener
+ * is used to handle the success or failure in pushing each blocks.
+ */
+public class OneForOneBlockPusher {
+  private static final Logger logger = 
LoggerFactory.getLogger(OneForOneBlockPusher.class);
+  private static final ErrorHandler PUSH_ERROR_HANDLER = new 
ErrorHandler.BlockPushErrorHandler();
+
+  private final TransportClient client;
+  private final String appId;
+  private final String[] blockIds;
+  private final BlockFetchingListener listener;
+  private final Map<String, ManagedBuffer> buffers;
+
+  public OneForOneBlockPusher(
+      TransportClient client,
+      String appId,
+      String[] blockIds,
+      BlockFetchingListener listener,
+      Map<String, ManagedBuffer> buffers) {
+    this.client = client;
+    this.appId = appId;
+    this.blockIds = blockIds;
+    this.listener = listener;
+    this.buffers = buffers;
+  }
+
+  private class BlockPushCallback implements RpcResponseCallback {
+
+    private int index;
+    private String blockId;
+
+    BlockPushCallback(int index, String blockId) {
+      this.index = index;
+      this.blockId = blockId;
+    }
+
+    @Override
+    public void onSuccess(ByteBuffer response) {
+      // On receipt of a successful block push
+      listener.onBlockFetchSuccess(blockId, new 
NioManagedBuffer(ByteBuffer.allocate(0)));
+    }
+
+    @Override
+    public void onFailure(Throwable e) {
+      // Since block push is best effort, i.e., if we encountered a block push 
failure that's not
+      // retriable or exceeding the max retires, we should not fail all 
remaining block pushes.
+      // The best effort nature makes block push tolerable of a partial 
completion. Thus, we only
+      // fail the block that's actually failed. Not that, on the 
RetryingBlockFetcher side, once
+      // retry is initiated, it would still invalidate the previous active 
retry listener, and
+      // retry all outstanding blocks. We are preventing forwarding 
unnecessary block push failures
+      // to the parent listener of the retry listener. The only exceptions 
would be if the block
+      // push failure is due to block arriving on the server side after merge 
finalization, or the
+      // client fails to establish connection to the server side. In both 
cases, we would fail all
+      // remaining blocks.
+      if (PUSH_ERROR_HANDLER.shouldRetryError(e)) {
+        String[] targetBlockId = Arrays.copyOfRange(blockIds, index, index + 
1);
+        failRemainingBlocks(targetBlockId, e);
+      } else {
+        String[] targetBlockId = Arrays.copyOfRange(blockIds, index, 
blockIds.length);
+        failRemainingBlocks(targetBlockId, e);
+      }
+    }
+  }
+
+  private void failRemainingBlocks(String[] failedBlockIds, Throwable e) {
+    for (String blockId : failedBlockIds) {
+      try {
+        listener.onBlockFetchFailure(blockId, e);
+      } catch (Exception e2) {
+        logger.error("Error in block push failure callback", e2);
+      }
+    }
+  }
+
+  /**
+   * Begins the block pushing process, calling the listener with every block 
pushed.
+   */
+  public void start() {
+    logger.debug("Start pushing {} blocks", blockIds.length);
+    for (int i = 0; i < blockIds.length; i++) {
+      assert buffers.containsKey(blockIds[i]) : "Could not find the block 
buffer for block "
+        + blockIds[i];
+      ByteBuffer header = new PushBlockStream(appId, blockIds[i], 
i).toByteBuffer();
+      client.uploadStream(new NioManagedBuffer(header), 
buffers.get(blockIds[i]),
+        new BlockPushCallback(i, blockIds[i]));
+    }
+  }
+}
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
index 6bf3da9..43bde16 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RetryingBlockFetcher.java
@@ -99,11 +99,14 @@ public class RetryingBlockFetcher {
    */
   private RetryingBlockFetchListener currentListener;
 
+  private final ErrorHandler errorHandler;
+
   public RetryingBlockFetcher(
       TransportConf conf,
       RetryingBlockFetcher.BlockFetchStarter fetchStarter,
       String[] blockIds,
-      BlockFetchingListener listener) {
+      BlockFetchingListener listener,
+      ErrorHandler errorHandler) {
     this.fetchStarter = fetchStarter;
     this.listener = listener;
     this.maxRetries = conf.maxIORetries();
@@ -111,6 +114,15 @@ public class RetryingBlockFetcher {
     this.outstandingBlocksIds = Sets.newLinkedHashSet();
     Collections.addAll(outstandingBlocksIds, blockIds);
     this.currentListener = new RetryingBlockFetchListener();
+    this.errorHandler = errorHandler;
+  }
+
+  public RetryingBlockFetcher(
+      TransportConf conf,
+      BlockFetchStarter fetchStarter,
+      String[] blockIds,
+      BlockFetchingListener listener) {
+    this(conf, fetchStarter, blockIds, listener, 
ErrorHandler.NOOP_ERROR_HANDLER);
   }
 
   /**
@@ -178,7 +190,7 @@ public class RetryingBlockFetcher {
     boolean isIOException = e instanceof IOException
       || (e.getCause() != null && e.getCause() instanceof IOException);
     boolean hasRemainingRetries = retryCount < maxRetries;
-    return isIOException && hasRemainingRetries;
+    return isIOException && hasRemainingRetries && 
errorHandler.shouldRetryError(e);
   }
 
   /**
@@ -215,8 +227,15 @@ public class RetryingBlockFetcher {
           if (shouldRetry(exception)) {
             initiateRetry();
           } else {
-            logger.error(String.format("Failed to fetch block %s, and will not 
retry (%s retries)",
-              blockId, retryCount), exception);
+            if (errorHandler.shouldLogError(exception)) {
+              logger.error(
+                String.format("Failed to fetch block %s, and will not retry 
(%s retries)",
+                  blockId, retryCount), exception);
+            } else {
+              logger.debug(
+                String.format("Failed to fetch block %s, and will not retry 
(%s retries)",
+                  blockId, retryCount), exception);
+            }
             outstandingBlocksIds.remove(blockId);
             shouldForwardFailure = true;
           }
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
index 89d8dfe..7f50581 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/BlockTransferMessage.java
@@ -47,7 +47,8 @@ public abstract class BlockTransferMessage implements 
Encodable {
   public enum Type {
     OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), 
REGISTER_DRIVER(4),
     HEARTBEAT(5), UPLOAD_BLOCK_STREAM(6), REMOVE_BLOCKS(7), BLOCKS_REMOVED(8),
-    FETCH_SHUFFLE_BLOCKS(9), GET_LOCAL_DIRS_FOR_EXECUTORS(10), 
LOCAL_DIRS_FOR_EXECUTORS(11);
+    FETCH_SHUFFLE_BLOCKS(9), GET_LOCAL_DIRS_FOR_EXECUTORS(10), 
LOCAL_DIRS_FOR_EXECUTORS(11),
+    PUSH_BLOCK_STREAM(12), FINALIZE_SHUFFLE_MERGE(13), MERGE_STATUSES(14);
 
     private final byte id;
 
@@ -78,6 +79,9 @@ public abstract class BlockTransferMessage implements 
Encodable {
         case 9: return FetchShuffleBlocks.decode(buf);
         case 10: return GetLocalDirsForExecutors.decode(buf);
         case 11: return LocalDirsForExecutors.decode(buf);
+        case 12: return PushBlockStream.decode(buf);
+        case 13: return FinalizeShuffleMerge.decode(buf);
+        case 14: return MergeStatuses.decode(buf);
         default: throw new IllegalArgumentException("Unknown message type: " + 
type);
       }
     }
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java
new file mode 100644
index 0000000..9058575
--- /dev/null
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+
+/**
+ * Request to finalize merge for a given shuffle.
+ * Returns {@link MergeStatuses}
+ */
+public class FinalizeShuffleMerge extends BlockTransferMessage {
+  public final String appId;
+  public final int shuffleId;
+
+  public FinalizeShuffleMerge(
+      String appId,
+      int shuffleId) {
+    this.appId = appId;
+    this.shuffleId = shuffleId;
+  }
+
+  @Override
+  protected BlockTransferMessage.Type type() {
+    return Type.FINALIZE_SHUFFLE_MERGE;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(appId, shuffleId);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("appId", appId)
+      .add("shuffleId", shuffleId)
+      .toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other != null && other instanceof FinalizeShuffleMerge) {
+      FinalizeShuffleMerge o = (FinalizeShuffleMerge) other;
+      return Objects.equal(appId, o.appId)
+        && shuffleId == o.shuffleId;
+    }
+    return false;
+  }
+
+  @Override
+  public int encodedLength() {
+    return Encoders.Strings.encodedLength(appId) + 4;
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+    Encoders.Strings.encode(buf, appId);
+    buf.writeInt(shuffleId);
+  }
+
+  public static FinalizeShuffleMerge decode(ByteBuf buf) {
+    String appId = Encoders.Strings.decode(buf);
+    int shuffleId = buf.readInt();
+    return new FinalizeShuffleMerge(appId, shuffleId);
+  }
+}
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/MergeStatuses.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/MergeStatuses.java
new file mode 100644
index 0000000..f57e8b3
--- /dev/null
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/MergeStatuses.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import java.util.Arrays;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+import org.roaringbitmap.RoaringBitmap;
+
+import org.apache.spark.network.protocol.Encoders;
+
+/**
+ * Result returned by an ExternalShuffleService to the DAGScheduler. This 
represents the result
+ * of all the remote shuffle block merge operations performed by an 
ExternalShuffleService
+ * for a given shuffle ID. It includes the shuffle ID, an array of bitmaps 
each representing
+ * the set of mapper partition blocks that are merged for a given reducer 
partition, an array
+ * of reducer IDs, and an array of merged shuffle partition sizes. The 3 
arrays list information
+ * about all the reducer partitions merged by the ExternalShuffleService in 
the same order.
+ */
+public class MergeStatuses extends BlockTransferMessage {
+  /** Shuffle ID **/
+  public final int shuffleId;
+  /**
+   * Array of bitmaps tracking the set of mapper partition blocks merged for 
each
+   * reducer partition
+   */
+  public final RoaringBitmap[] bitmaps;
+  /** Array of reducer IDs **/
+  public final int[] reduceIds;
+  /**
+   * Array of merged shuffle partition block size. Each represents the total 
size of all
+   * merged shuffle partition blocks for one reducer partition.
+   * **/
+  public final long[] sizes;
+
+  public MergeStatuses(
+      int shuffleId,
+      RoaringBitmap[] bitmaps,
+      int[] reduceIds,
+      long[] sizes) {
+    this.shuffleId = shuffleId;
+    this.bitmaps = bitmaps;
+    this.reduceIds = reduceIds;
+    this.sizes = sizes;
+  }
+
+  @Override
+  protected Type type() {
+    return Type.MERGE_STATUSES;
+  }
+
+  @Override
+  public int hashCode() {
+    int objectHashCode = Objects.hashCode(shuffleId);
+    return (objectHashCode * 41 + Arrays.hashCode(reduceIds) * 41
+      + Arrays.hashCode(bitmaps) * 41 + Arrays.hashCode(sizes));
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("shuffleId", shuffleId)
+      .add("reduceId size", reduceIds.length)
+      .toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other != null && other instanceof MergeStatuses) {
+      MergeStatuses o = (MergeStatuses) other;
+      return Objects.equal(shuffleId, o.shuffleId)
+        && Arrays.equals(bitmaps, o.bitmaps)
+        && Arrays.equals(reduceIds, o.reduceIds)
+        && Arrays.equals(sizes, o.sizes);
+    }
+    return false;
+  }
+
+  @Override
+  public int encodedLength() {
+    return 4 // int
+      + Encoders.BitmapArrays.encodedLength(bitmaps)
+      + Encoders.IntArrays.encodedLength(reduceIds)
+      + Encoders.LongArrays.encodedLength(sizes);
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+    buf.writeInt(shuffleId);
+    Encoders.BitmapArrays.encode(buf, bitmaps);
+    Encoders.IntArrays.encode(buf, reduceIds);
+    Encoders.LongArrays.encode(buf, sizes);
+  }
+
+  public static MergeStatuses decode(ByteBuf buf) {
+    int shuffleId = buf.readInt();
+    RoaringBitmap[] bitmaps = Encoders.BitmapArrays.decode(buf);
+    int[] reduceIds = Encoders.IntArrays.decode(buf);
+    long[] sizes = Encoders.LongArrays.decode(buf);
+    return new MergeStatuses(shuffleId, bitmaps, reduceIds, sizes);
+  }
+}
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java
new file mode 100644
index 0000000..7eab5a6
--- /dev/null
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import com.google.common.base.Objects;
+import io.netty.buffer.ByteBuf;
+
+import org.apache.spark.network.protocol.Encoders;
+
+// Needed by ScalaDoc. See SPARK-7726
+import static 
org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;
+
+
+/**
+ * Request to push a block to a remote shuffle service to be merged in push 
based shuffle.
+ * The remote shuffle service will also include this message when responding 
the push requests.
+ */
+public class PushBlockStream extends BlockTransferMessage {
+  public final String appId;
+  public final String blockId;
+  // Similar to the chunkIndex in StreamChunkId, indicating the index of a 
block in a batch of
+  // blocks to be pushed.
+  public final int index;
+
+  public PushBlockStream(String appId, String blockId, int index) {
+    this.appId = appId;
+    this.blockId = blockId;
+    this.index = index;
+  }
+
+  @Override
+  protected Type type() {
+    return Type.PUSH_BLOCK_STREAM;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(appId, blockId, index);
+  }
+
+  @Override
+  public String toString() {
+    return Objects.toStringHelper(this)
+      .add("appId", appId)
+      .add("blockId", blockId)
+      .add("index", index)
+      .toString();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other != null && other instanceof PushBlockStream) {
+      PushBlockStream o = (PushBlockStream) other;
+      return Objects.equal(appId, o.appId)
+        && Objects.equal(blockId, o.blockId)
+        && index == o.index;
+    }
+    return false;
+  }
+
+  @Override
+  public int encodedLength() {
+    return Encoders.Strings.encodedLength(appId)
+      + Encoders.Strings.encodedLength(blockId) + 4;
+  }
+
+  @Override
+  public void encode(ByteBuf buf) {
+    Encoders.Strings.encode(buf, appId);
+    Encoders.Strings.encode(buf, blockId);
+    buf.writeInt(index);
+  }
+
+  public static PushBlockStream decode(ByteBuf buf) {
+    String appId = Encoders.Strings.decode(buf);
+    String blockId = Encoders.Strings.decode(buf);
+    int index = buf.readInt();
+    return new PushBlockStream(appId, blockId, index);
+  }
+}
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
new file mode 100644
index 0000000..992e776
--- /dev/null
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ErrorHandlerSuite.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.net.ConnectException;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+/**
+ * Test suite for {@link ErrorHandler}
+ */
+public class ErrorHandlerSuite {
+
+  @Test
+  public void testPushErrorRetry() {
+    ErrorHandler.BlockPushErrorHandler handler = new 
ErrorHandler.BlockPushErrorHandler();
+    assertFalse(handler.shouldRetryError(new RuntimeException(new 
IllegalArgumentException(
+      ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX))));
+    assertFalse(handler.shouldRetryError(new RuntimeException(new 
ConnectException())));
+    assertTrue(handler.shouldRetryError(new RuntimeException(new 
IllegalArgumentException(
+      
ErrorHandler.BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX))));
+    assertTrue(handler.shouldRetryError(new Throwable()));
+  }
+
+  @Test
+  public void testPushErrorLogging() {
+    ErrorHandler.BlockPushErrorHandler handler = new 
ErrorHandler.BlockPushErrorHandler();
+    assertFalse(handler.shouldLogError(new RuntimeException(new 
IllegalArgumentException(
+      ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX))));
+    assertFalse(handler.shouldLogError(new RuntimeException(new 
IllegalArgumentException(
+      
ErrorHandler.BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX))));
+    assertTrue(handler.shouldLogError(new Throwable()));
+  }
+}
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
index 455351f..680b8d7 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.network.shuffle;
 
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
 
@@ -25,6 +26,7 @@ import com.codahale.metrics.Timer;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.roaringbitmap.RoaringBitmap;
 
 import static org.junit.Assert.*;
 import static org.mockito.ArgumentMatchers.any;
@@ -39,6 +41,8 @@ import org.apache.spark.network.server.RpcHandler;
 import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
 import org.apache.spark.network.shuffle.protocol.FetchShuffleBlocks;
+import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
+import org.apache.spark.network.shuffle.protocol.MergeStatuses;
 import org.apache.spark.network.shuffle.protocol.OpenBlocks;
 import org.apache.spark.network.shuffle.protocol.RegisterExecutor;
 import org.apache.spark.network.shuffle.protocol.StreamHandle;
@@ -50,6 +54,7 @@ public class ExternalBlockHandlerSuite {
   OneForOneStreamManager streamManager;
   ExternalShuffleBlockResolver blockResolver;
   RpcHandler handler;
+  MergedShuffleFileManager mergedShuffleManager;
   ManagedBuffer[] blockMarkers = {
     new NioManagedBuffer(ByteBuffer.wrap(new byte[3])),
     new NioManagedBuffer(ByteBuffer.wrap(new byte[7]))
@@ -59,17 +64,20 @@ public class ExternalBlockHandlerSuite {
   public void beforeEach() {
     streamManager = mock(OneForOneStreamManager.class);
     blockResolver = mock(ExternalShuffleBlockResolver.class);
-    handler = new ExternalBlockHandler(streamManager, blockResolver);
+    mergedShuffleManager = mock(MergedShuffleFileManager.class);
+    handler = new ExternalBlockHandler(streamManager, blockResolver, 
mergedShuffleManager);
   }
 
   @Test
   public void testRegisterExecutor() {
     RpcResponseCallback callback = mock(RpcResponseCallback.class);
 
-    ExecutorShuffleInfo config = new ExecutorShuffleInfo(new String[] {"/a", 
"/b"}, 16, "sort");
+    String[] localDirs = new String[] {"/a", "/b"};
+    ExecutorShuffleInfo config = new ExecutorShuffleInfo(localDirs, 16, 
"sort");
     ByteBuffer registerMessage = new RegisterExecutor("app0", "exec1", 
config).toByteBuffer();
     handler.receive(client, registerMessage, callback);
     verify(blockResolver, times(1)).registerExecutor("app0", "exec1", config);
+    verify(mergedShuffleManager, times(1)).registerExecutor("app0", localDirs);
 
     verify(callback, times(1)).onSuccess(any(ByteBuffer.class));
     verify(callback, never()).onFailure(any(Throwable.class));
@@ -222,4 +230,32 @@ public class ExternalBlockHandlerSuite {
     verify(callback, never()).onSuccess(any(ByteBuffer.class));
     verify(callback, never()).onFailure(any(Throwable.class));
   }
+
+  @Test
+  public void testFinalizeShuffleMerge() throws IOException {
+    RpcResponseCallback callback = mock(RpcResponseCallback.class);
+
+    FinalizeShuffleMerge req = new FinalizeShuffleMerge("app0", 0);
+    RoaringBitmap bitmap = RoaringBitmap.bitmapOf(0, 1, 2);
+    MergeStatuses statuses = new MergeStatuses(0, new RoaringBitmap[]{bitmap},
+      new int[]{3}, new long[]{30});
+    when(mergedShuffleManager.finalizeShuffleMerge(req)).thenReturn(statuses);
+
+    ByteBuffer reqBuf = req.toByteBuffer();
+    handler.receive(client, reqBuf, callback);
+    verify(mergedShuffleManager, times(1)).finalizeShuffleMerge(req);
+    ArgumentCaptor<ByteBuffer> response = 
ArgumentCaptor.forClass(ByteBuffer.class);
+    verify(callback, times(1)).onSuccess(response.capture());
+    verify(callback, never()).onFailure(any());
+
+    MergeStatuses mergeStatuses =
+      (MergeStatuses) 
BlockTransferMessage.Decoder.fromByteBuffer(response.getValue());
+    assertEquals(mergeStatuses, statuses);
+
+    Timer finalizeShuffleMergeLatencyMillis = (Timer) ((ExternalBlockHandler) 
handler)
+        .getAllMetrics()
+        .getMetrics()
+        .get("finalizeShuffleMergeLatencyMillis");
+    assertEquals(1, finalizeShuffleMergeLatencyMillis.getCount());
+  }
 }
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
new file mode 100644
index 0000000..ebcdba7
--- /dev/null
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import com.google.common.collect.Maps;
+import io.netty.buffer.Unpooled;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+import static org.mockito.AdditionalMatchers.*;
+import static org.mockito.Mockito.*;
+
+import org.apache.spark.network.buffer.ManagedBuffer;
+import org.apache.spark.network.buffer.NettyManagedBuffer;
+import org.apache.spark.network.buffer.NioManagedBuffer;
+import org.apache.spark.network.client.RpcResponseCallback;
+import org.apache.spark.network.client.TransportClient;
+import org.apache.spark.network.shuffle.protocol.BlockTransferMessage;
+import org.apache.spark.network.shuffle.protocol.PushBlockStream;
+
+
+public class OneForOneBlockPusherSuite {
+
+  @Test
+  public void testPushOne() {
+    LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
+    blocks.put("shuffle_0_0_0", new NioManagedBuffer(ByteBuffer.wrap(new 
byte[1])));
+    String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
+
+    BlockFetchingListener listener = pushBlocks(
+      blocks,
+      blockIds,
+      Arrays.asList(new PushBlockStream("app-id", "shuffle_0_0_0", 0)));
+
+    verify(listener).onBlockFetchSuccess(eq("shuffle_0_0_0"), any());
+  }
+
+  @Test
+  public void testPushThree() {
+    LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
+    blocks.put("b0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
+    blocks.put("b1", new NioManagedBuffer(ByteBuffer.wrap(new byte[23])));
+    blocks.put("b2", new NettyManagedBuffer(Unpooled.wrappedBuffer(new 
byte[23])));
+    String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
+
+    BlockFetchingListener listener = pushBlocks(
+      blocks,
+      blockIds,
+      Arrays.asList(new PushBlockStream("app-id", "b0", 0),
+        new PushBlockStream("app-id", "b1", 1),
+        new PushBlockStream("app-id", "b2", 2)));
+
+    for (int i = 0; i < 3; i ++) {
+      verify(listener, times(1)).onBlockFetchSuccess(eq("b" + i), any());
+    }
+  }
+
+  @Test
+  public void testServerFailures() {
+    LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
+    blocks.put("b0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
+    blocks.put("b1", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
+    blocks.put("b2", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
+    String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
+
+    BlockFetchingListener listener = pushBlocks(
+      blocks,
+      blockIds,
+      Arrays.asList(new PushBlockStream("app-id", "b0", 0),
+        new PushBlockStream("app-id", "b1", 1),
+        new PushBlockStream("app-id", "b2", 2)));
+
+    verify(listener, times(1)).onBlockFetchSuccess(eq("b0"), any());
+    verify(listener, times(1)).onBlockFetchFailure(eq("b1"), any());
+    verify(listener, times(1)).onBlockFetchFailure(eq("b2"), any());
+  }
+
+  @Test
+  public void testHandlingRetriableFailures() {
+    LinkedHashMap<String, ManagedBuffer> blocks = Maps.newLinkedHashMap();
+    blocks.put("b0", new NioManagedBuffer(ByteBuffer.wrap(new byte[12])));
+    blocks.put("b1", null);
+    blocks.put("b2", new NioManagedBuffer(ByteBuffer.wrap(new byte[0])));
+    String[] blockIds = blocks.keySet().toArray(new String[blocks.size()]);
+
+    BlockFetchingListener listener = pushBlocks(
+      blocks,
+      blockIds,
+      Arrays.asList(new PushBlockStream("app-id", "b0", 0),
+        new PushBlockStream("app-id", "b1", 1),
+        new PushBlockStream("app-id", "b2", 2)));
+
+    verify(listener, times(1)).onBlockFetchSuccess(eq("b0"), any());
+    verify(listener, times(0)).onBlockFetchSuccess(not(eq("b0")), any());
+    verify(listener, times(0)).onBlockFetchFailure(eq("b0"), any());
+    verify(listener, times(1)).onBlockFetchFailure(eq("b1"), any());
+    verify(listener, times(2)).onBlockFetchFailure(eq("b2"), any());
+  }
+
+  /**
+   * Begins a push on the given set of blocks by mocking the response from 
server side.
+   * If a block is an empty byte, a server side retriable exception will be 
thrown.
+   * If a block is null, a non-retriable exception will be thrown.
+   */
+  private static BlockFetchingListener pushBlocks(
+      LinkedHashMap<String, ManagedBuffer> blocks,
+      String[] blockIds,
+      Iterable<BlockTransferMessage> expectMessages) {
+    TransportClient client = mock(TransportClient.class);
+    BlockFetchingListener listener = mock(BlockFetchingListener.class);
+    OneForOneBlockPusher pusher =
+      new OneForOneBlockPusher(client, "app-id", blockIds, listener, blocks);
+
+    Iterator<Map.Entry<String, ManagedBuffer>> blockIterator = 
blocks.entrySet().iterator();
+    Iterator<BlockTransferMessage> msgIterator = expectMessages.iterator();
+    doAnswer(invocation -> {
+      ByteBuffer header = ((ManagedBuffer) 
invocation.getArguments()[0]).nioByteBuffer();
+      BlockTransferMessage message = 
BlockTransferMessage.Decoder.fromByteBuffer(header);
+      RpcResponseCallback callback = (RpcResponseCallback) 
invocation.getArguments()[2];
+      Map.Entry<String, ManagedBuffer> entry = blockIterator.next();
+      ManagedBuffer block = entry.getValue();
+      if (block != null && block.nioByteBuffer().capacity() > 0) {
+        callback.onSuccess(header);
+      } else if (block != null) {
+        callback.onFailure(new RuntimeException("Failed " + entry.getKey()
+          + 
ErrorHandler.BlockPushErrorHandler.BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX));
+      } else {
+        callback.onFailure(new RuntimeException("Quick fail " + entry.getKey()
+          + ErrorHandler.BlockPushErrorHandler.TOO_LATE_MESSAGE_SUFFIX));
+      }
+      assertEquals(msgIterator.next(), message);
+      return null;
+    }).when(client).uploadStream(any(ManagedBuffer.class), any(), 
any(RpcResponseCallback.class));
+
+    pusher.start();
+    return listener;
+  }
+}
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceMetricsSuite.scala
 
b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceMetricsSuite.scala
index d681c13..ea4d252 100644
--- 
a/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceMetricsSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceMetricsSuite.scala
@@ -61,7 +61,8 @@ class ExternalShuffleServiceMetricsSuite extends 
SparkFunSuite {
         "registeredExecutorsSize",
         "registerExecutorRequestLatencyMillis",
         "shuffle-server.usedDirectMemory",
-        "shuffle-server.usedHeapMemory")
+        "shuffle-server.usedHeapMemory",
+        "finalizeShuffleMergeLatencyMillis")
     )
   }
 }
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
index 63ac1af..9239d89 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala
@@ -40,7 +40,7 @@ class YarnShuffleServiceMetricsSuite extends SparkFunSuite 
with Matchers {
     val allMetrics = Set(
       "openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis",
       "blockTransferRateBytes", "registeredExecutorsSize", 
"numActiveConnections",
-      "numCaughtExceptions")
+      "numCaughtExceptions", "finalizeShuffleMergeLatencyMillis")
 
     metrics.getMetrics.keySet().asScala should be (allMetrics)
   }
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index 46e5965..a6a302a 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -405,6 +405,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with 
Matchers with BeforeAnd
       "openBlockRequestLatencyMillis",
       "registeredExecutorsSize",
       "registerExecutorRequestLatencyMillis",
+      "finalizeShuffleMergeLatencyMillis",
       "shuffle-server.usedDirectMemory",
       "shuffle-server.usedHeapMemory"
     ))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to