This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 1907f0a  [SPARK-35546][SHUFFLE] Enable push-based shuffle when 
multiple app attempts are enabled and manage concurrent access to the state in 
a better way
1907f0a is described below

commit 1907f0ac572eb209ee7c8f9b364f1bdd8d615dcd
Author: Ye Zhou <yez...@linkedin.com>
AuthorDate: Tue Jul 20 00:03:30 2021 -0500

    [SPARK-35546][SHUFFLE] Enable push-based shuffle when multiple app attempts 
are enabled and manage concurrent access to the state in a better way
    
    ### What changes were proposed in this pull request?
    This is one of the patches for SPIP SPARK-30602 which is needed for 
push-based shuffle.
    
    ### Summary of the change:
    When Executor registers with Shuffle Service, it will encode the merged 
shuffle dir created and also the application attemptId into the 
ShuffleManagerMeta into Json. Then in Shuffle Service, it will decode the Json 
string and get the correct merged shuffle dir and also the attemptId. If the 
registration comes from a newer attempt, the merged shuffle information will be 
updated to store the information from the newer attempt.
    
    This PR also refactored the management of the merged shuffle information to 
avoid concurrency issues.
    ### Why are the changes needed?
    Refer to the SPIP in SPARK-30602.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Added unit tests.
    The reference PR with the consolidated changes covering the complete 
implementation is also provided in SPARK-30602.
    We have already verified the functionality and the improved performance as 
documented in the SPIP doc.
    
    Closes #33078 from zhouyejoe/SPARK-35546.
    
    Authored-by: Ye Zhou <yez...@linkedin.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit c77acf0bbc25341de2636649fdd76f9bb4bdf4ed)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../apache/spark/network/util/TransportConf.java   |   7 +
 .../network/shuffle/ExternalBlockStoreClient.java  |   7 +-
 .../network/shuffle/OneForOneBlockPusher.java      |   8 +-
 .../network/shuffle/RemoteBlockPushResolver.java   | 546 ++++++++++++---------
 .../shuffle/protocol/ExecutorShuffleInfo.java      |   7 +-
 .../shuffle/protocol/FinalizeShuffleMerge.java     |  13 +-
 .../network/shuffle/protocol/PushBlockStream.java  |  21 +-
 .../network/shuffle/ExternalBlockHandlerSuite.java |   2 +-
 .../network/shuffle/OneForOneBlockPusherSuite.java |  22 +-
 .../shuffle/RemoteBlockPushResolverSuite.java      | 466 +++++++++++++-----
 .../main/scala/org/apache/spark/SparkContext.scala |   1 +
 .../org/apache/spark/internal/config/package.scala |  10 +
 .../org/apache/spark/storage/BlockManager.scala    |   9 +-
 .../apache/spark/storage/DiskBlockManager.scala    |  34 +-
 .../main/scala/org/apache/spark/util/Utils.scala   |  23 +-
 .../spark/storage/DiskBlockManagerSuite.scala      |  23 +-
 .../scala/org/apache/spark/util/UtilsSuite.scala   |   2 +-
 17 files changed, 810 insertions(+), 391 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index f051042..8e7ecf5 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -419,4 +419,11 @@ public class TransportConf {
   public int ioExceptionsThresholdDuringMerge() {
     return 
conf.getInt("spark.shuffle.server.ioExceptionsThresholdDuringMerge", 4);
   }
+
+  /**
+   * The application attemptID assigned from Hadoop YARN.
+   */
+  public int appAttemptId() {
+    return conf.getInt("spark.app.attempt.id", -1);
+  }
 }
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
index f44140b1..63bf787 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
@@ -141,8 +141,8 @@ public class ExternalBlockStoreClient extends 
BlockStoreClient {
       RetryingBlockFetcher.BlockFetchStarter blockPushStarter =
           (inputBlockId, inputListener) -> {
             TransportClient client = clientFactory.createClient(host, port);
-            new OneForOneBlockPusher(client, appId, inputBlockId, 
inputListener, buffersWithId)
-              .start();
+            new OneForOneBlockPusher(client, appId, conf.appAttemptId(), 
inputBlockId,
+              inputListener, buffersWithId).start();
           };
       int maxRetries = conf.maxIORetries();
       if (maxRetries > 0) {
@@ -168,7 +168,8 @@ public class ExternalBlockStoreClient extends 
BlockStoreClient {
     checkInit();
     try {
       TransportClient client = clientFactory.createClient(host, port);
-      ByteBuffer finalizeShuffleMerge = new FinalizeShuffleMerge(appId, 
shuffleId).toByteBuffer();
+      ByteBuffer finalizeShuffleMerge =
+        new FinalizeShuffleMerge(appId, conf.appAttemptId(), 
shuffleId).toByteBuffer();
       client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
         @Override
         public void onSuccess(ByteBuffer response) {
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
index 6ee95ef..b8b32e2 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
@@ -45,6 +45,7 @@ public class OneForOneBlockPusher {
 
   private final TransportClient client;
   private final String appId;
+  private final int appAttemptId;
   private final String[] blockIds;
   private final BlockFetchingListener listener;
   private final Map<String, ManagedBuffer> buffers;
@@ -52,11 +53,13 @@ public class OneForOneBlockPusher {
   public OneForOneBlockPusher(
       TransportClient client,
       String appId,
+      int appAttemptId,
       String[] blockIds,
       BlockFetchingListener listener,
       Map<String, ManagedBuffer> buffers) {
     this.client = client;
     this.appId = appId;
+    this.appAttemptId = appAttemptId;
     this.blockIds = blockIds;
     this.listener = listener;
     this.buffers = buffers;
@@ -123,8 +126,9 @@ public class OneForOneBlockPusher {
         throw new IllegalArgumentException(
           "Unexpected shuffle push block id format: " + blockIds[i]);
       }
-      ByteBuffer header = new PushBlockStream(appId, 
Integer.parseInt(blockIdParts[1]),
-        Integer.parseInt(blockIdParts[2]), Integer.parseInt(blockIdParts[3]) , 
i).toByteBuffer();
+      ByteBuffer header =
+        new PushBlockStream(appId, appAttemptId, 
Integer.parseInt(blockIdParts[1]),
+          Integer.parseInt(blockIdParts[2]), Integer.parseInt(blockIdParts[3]) 
, i).toByteBuffer();
       client.uploadStream(new NioManagedBuffer(header), 
buffers.get(blockIds[i]),
         new BlockPushCallback(i, blockIds[i]));
     }
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 47d2547..f88cfee 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -28,27 +28,26 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.cache.Weigher;
-import com.google.common.collect.Maps;
 import com.google.common.primitives.Ints;
 import com.google.common.primitives.Longs;
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
 import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,14 +72,22 @@ import org.apache.spark.network.util.TransportConf;
 public class RemoteBlockPushResolver implements MergedShuffleFileManager {
 
   private static final Logger logger = 
LoggerFactory.getLogger(RemoteBlockPushResolver.class);
-  @VisibleForTesting
-  static final String MERGE_MANAGER_DIR = "merge_manager";
+
   public static final String MERGED_SHUFFLE_FILE_NAME_PREFIX = "shuffleMerged";
+  public static final String SHUFFLE_META_DELIMITER = ":";
+  public static final String MERGE_DIR_KEY = "mergeDir";
+  public static final String ATTEMPT_ID_KEY = "attemptId";
+  private static final int UNDEFINED_ATTEMPT_ID = -1;
 
-  private final ConcurrentMap<String, AppPathsInfo> appsPathInfo;
-  private final ConcurrentMap<AppShuffleId, Map<Integer, 
AppShufflePartitionInfo>> partitions;
+  /**
+   * A concurrent hashmap where the key is the applicationId, and the value 
includes
+   * all the merged shuffle information for this application. AppShuffleInfo 
stores
+   * the application attemptId, merged shuffle local directories and the 
metadata
+   * for actively being merged shuffle partitions.
+   */
+  private final ConcurrentMap<String, AppShuffleInfo> appsShuffleInfo;
 
-  private final Executor directoryCleaner;
+  private final Executor mergedShuffleCleaner;
   private final TransportConf conf;
   private final int minChunkSize;
   private final int ioExceptionsThresholdDuringMerge;
@@ -92,9 +99,8 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
   @SuppressWarnings("UnstableApiUsage")
   public RemoteBlockPushResolver(TransportConf conf) {
     this.conf = conf;
-    this.partitions = Maps.newConcurrentMap();
-    this.appsPathInfo = Maps.newConcurrentMap();
-    this.directoryCleaner = Executors.newSingleThreadExecutor(
+    this.appsShuffleInfo = new ConcurrentHashMap<>();
+    this.mergedShuffleCleaner = Executors.newSingleThreadExecutor(
       // Add `spark` prefix because it will run in NM in Yarn mode.
       
NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner"));
     this.minChunkSize = conf.minChunkSizeInMergedShuffleFile();
@@ -112,34 +118,59 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     this.errorHandler = new ErrorHandler.BlockPushErrorHandler();
   }
 
+  @VisibleForTesting
+  protected AppShuffleInfo validateAndGetAppShuffleInfo(String appId) {
+    // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
+    AppShuffleInfo appShuffleInfo = appsShuffleInfo.get(appId);
+    Preconditions.checkArgument(appShuffleInfo != null,
+      "application " + appId + " is not registered or NM was restarted.");
+    return appShuffleInfo;
+  }
+
   /**
-   * Given the appShuffleId and reduceId that uniquely identifies a given 
shuffle partition of an
-   * application, retrieves the associated metadata. If not present and the 
corresponding merged
-   * shuffle does not exist, initializes the metadata.
+   * Given the appShuffleInfo, shuffleId and reduceId that uniquely identifies 
a given shuffle
+   * partition of an application, retrieves the associated metadata. If not 
present and the
+   * corresponding merged shuffle does not exist, initializes the metadata.
    */
   private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
-      AppShuffleId appShuffleId,
+      AppShuffleInfo appShuffleInfo,
+      int shuffleId,
       int reduceId) {
-    File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId);
-    if (!partitions.containsKey(appShuffleId) && dataFile.exists()) {
-      // If this partition is already finalized then the partitions map will 
not contain
-      // the appShuffleId but the data file would exist. In that case the 
block is considered late.
+    File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId, 
reduceId);
+    ConcurrentMap<Integer, Map<Integer, AppShufflePartitionInfo>> partitions =
+      appShuffleInfo.partitions;
+    Map<Integer, AppShufflePartitionInfo> shufflePartitions =
+      partitions.compute(shuffleId, (id, map) -> {
+        if (map == null) {
+          // If this partition is already finalized then the partitions map 
will not contain the
+          // shuffleId but the data file would exist. In that case the block 
is considered late.
+          if (dataFile.exists()) {
+            return null;
+          }
+          return new ConcurrentHashMap<>();
+        } else {
+          return map;
+        }
+      });
+    if (shufflePartitions == null) {
       return null;
     }
-    Map<Integer, AppShufflePartitionInfo> shufflePartitions =
-      partitions.computeIfAbsent(appShuffleId, id -> Maps.newConcurrentMap());
+
     return shufflePartitions.computeIfAbsent(reduceId, key -> {
       // It only gets here when the key is not present in the map. This could 
either
       // be the first time the merge manager receives a pushed block for a 
given application
       // shuffle partition, or after the merged shuffle file is finalized. We 
handle these
       // two cases accordingly by checking if the file already exists.
-      File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId);
-      File metaFile = getMergedShuffleMetaFile(appShuffleId, reduceId);
+      File indexFile =
+        appShuffleInfo.getMergedShuffleIndexFile(shuffleId, reduceId);
+      File metaFile =
+        appShuffleInfo.getMergedShuffleMetaFile(shuffleId, reduceId);
       try {
         if (dataFile.exists()) {
           return null;
         } else {
-          return newAppShufflePartitionInfo(appShuffleId, reduceId, dataFile, 
indexFile, metaFile);
+          return newAppShufflePartitionInfo(
+            appShuffleInfo.appId, shuffleId, reduceId, dataFile, indexFile, 
metaFile);
         }
       } catch (IOException e) {
         logger.error(
@@ -148,26 +179,28 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
             indexFile.getAbsolutePath(), metaFile.getAbsolutePath());
         throw new RuntimeException(
           String.format("Cannot initialize merged shuffle partition for appId 
%s shuffleId %s "
-          + "reduceId %s", appShuffleId.appId, appShuffleId.shuffleId, 
reduceId), e);
+            + "reduceId %s", appShuffleInfo.appId, shuffleId, reduceId), e);
       }
     });
   }
 
   @VisibleForTesting
   AppShufflePartitionInfo newAppShufflePartitionInfo(
-      AppShuffleId appShuffleId,
+      String appId,
+      int shuffleId,
       int reduceId,
       File dataFile,
       File indexFile,
       File metaFile) throws IOException {
-    return new AppShufflePartitionInfo(appShuffleId, reduceId, dataFile,
+    return new AppShufflePartitionInfo(appId, shuffleId, reduceId, dataFile,
       new MergeShuffleFile(indexFile), new MergeShuffleFile(metaFile));
   }
 
   @Override
   public MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int 
reduceId) {
-    AppShuffleId appShuffleId = new AppShuffleId(appId, shuffleId);
-    File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId);
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    File indexFile =
+      appShuffleInfo.getMergedShuffleIndexFile(shuffleId, reduceId);
     if (!indexFile.exists()) {
       throw new RuntimeException(String.format(
         "Merged shuffle index file %s not found", indexFile.getPath()));
@@ -175,7 +208,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     int size = (int) indexFile.length();
     // First entry is the zero offset
     int numChunks = (size / Long.BYTES) - 1;
-    File metaFile = getMergedShuffleMetaFile(appShuffleId, reduceId);
+    File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId, 
reduceId);
     if (!metaFile.exists()) {
       throw new RuntimeException(String.format("Merged shuffle meta file %s 
not found",
         metaFile.getPath()));
@@ -190,13 +223,14 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
   @SuppressWarnings("UnstableApiUsage")
   @Override
   public ManagedBuffer getMergedBlockData(String appId, int shuffleId, int 
reduceId, int chunkId) {
-    AppShuffleId appShuffleId = new AppShuffleId(appId, shuffleId);
-    File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId);
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId, 
reduceId);
     if (!dataFile.exists()) {
       throw new RuntimeException(String.format("Merged shuffle data file %s 
not found",
         dataFile.getPath()));
     }
-    File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId);
+    File indexFile =
+      appShuffleInfo.getMergedShuffleIndexFile(shuffleId, reduceId);
     try {
       // If we get here, the merged shuffle file should have been properly 
finalized. Thus we can
       // use the file length to determine the size of the merged shuffle block.
@@ -210,76 +244,51 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     }
   }
 
-  /**
-   * The logic here is consistent with
-   * @see [[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(
-   *      org.apache.spark.storage.BlockId, scala.Option)]]
-   */
-  private File getFile(String appId, String filename) {
-    // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.get(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
-      appPathsInfo.subDirsPerLocalDir, filename);
-    logger.debug("Get merged file {}", targetFile.getAbsolutePath());
-    return targetFile;
-  }
-
-  private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String fileName = String.format("%s.data", generateFileName(appShuffleId, 
reduceId));
-    return getFile(appShuffleId.appId, fileName);
-  }
-
-  private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String indexName = String.format("%s.index", 
generateFileName(appShuffleId, reduceId));
-    return getFile(appShuffleId.appId, indexName);
-  }
-
-  private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int 
reduceId) {
-    String metaName = String.format("%s.meta", generateFileName(appShuffleId, 
reduceId));
-    return getFile(appShuffleId.appId, metaName);
-  }
-
   @Override
   public String[] getMergedBlockDirs(String appId) {
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.get(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    String[] activeLocalDirs = 
Preconditions.checkNotNull(appPathsInfo.activeLocalDirs,
-      "application " + appId
-      + " active local dirs list has not been updated by any executor 
registration");
-    return activeLocalDirs;
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    return appShuffleInfo.appPathsInfo.activeLocalDirs;
   }
 
   @Override
   public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
     logger.info("Application {} removed, cleanupLocalDirs = {}", appId, 
cleanupLocalDirs);
-    // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
-    AppPathsInfo appPathsInfo = 
Preconditions.checkNotNull(appsPathInfo.remove(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    Iterator<Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>>> 
iterator =
-      partitions.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>> entry = 
iterator.next();
-      AppShuffleId appShuffleId = entry.getKey();
-      if (appId.equals(appShuffleId.appId)) {
-        iterator.remove();
-        for (AppShufflePartitionInfo partitionInfo : 
entry.getValue().values()) {
+    AppShuffleInfo appShuffleInfo = appsShuffleInfo.remove(appId);
+    if (null != appShuffleInfo) {
+      mergedShuffleCleaner.execute(
+        () -> closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, 
cleanupLocalDirs));
+    }
+  }
+
+
+  /**
+   * Clean up the AppShufflePartitionInfo for a specific AppShuffleInfo.
+   * If cleanupLocalDirs is true, the merged shuffle files will also be 
deleted.
+   * The cleanup will be executed in a separate thread.
+   */
+  @VisibleForTesting
+  void closeAndDeletePartitionFilesIfNeeded(
+      AppShuffleInfo appShuffleInfo,
+      boolean cleanupLocalDirs) {
+    for (Map<Integer, AppShufflePartitionInfo> partitionMap : 
appShuffleInfo.partitions.values()) {
+      for (AppShufflePartitionInfo partitionInfo : partitionMap.values()) {
+        synchronized (partitionInfo) {
           partitionInfo.closeAllFiles();
         }
       }
     }
     if (cleanupLocalDirs) {
-      Path[] dirs = Arrays.stream(appPathsInfo.activeLocalDirs)
-        .map(dir -> Paths.get(dir)).toArray(Path[]::new);
-      directoryCleaner.execute(() -> deleteExecutorDirs(dirs));
+      deleteExecutorDirs(appShuffleInfo);
     }
   }
 
   /**
-   * Serially delete local dirs, executed in a separate thread.
+   * Serially delete local dirs.
    */
   @VisibleForTesting
-  void deleteExecutorDirs(Path[] dirs) {
+  void deleteExecutorDirs(AppShuffleInfo appShuffleInfo) {
+    Path[] dirs = Arrays.stream(appShuffleInfo.appPathsInfo.activeLocalDirs)
+      .map(dir -> Paths.get(dir)).toArray(Path[]::new);
     for (Path localDir : dirs) {
       try {
         if (Files.exists(localDir)) {
@@ -294,10 +303,22 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
 
   @Override
   public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    final String streamId = String.format("%s_%d_%d_%d",
+      OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, msg.shuffleId, 
msg.mapIndex,
+      msg.reduceId);
+    if (appShuffleInfo.attemptId != msg.appAttemptId) {
+      // If this Block belongs to a former application attempt, it is 
considered late,
+      // as only the blocks from the current application attempt will be merged
+      // TODO: [SPARK-35548] Client should be updated to handle this error.
+      throw new IllegalArgumentException(
+        String.format("The attempt id %s in this PushBlockStream message does 
not match "
+          + "with the current attempt id %s stored in shuffle service for 
application %s",
+          msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+    }
     // Retrieve merged shuffle file metadata
-    AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId);
     AppShufflePartitionInfo partitionInfoBeforeCheck =
-      getOrCreateAppShufflePartitionInfo(appShuffleId, msg.reduceId);
+      getOrCreateAppShufflePartitionInfo(appShuffleInfo, msg.shuffleId, 
msg.reduceId);
     // Here partitionInfo will be null in 2 cases:
     // 1) The request is received for a block that has already been merged, 
this is possible due
     // to the retry logic.
@@ -338,11 +359,9 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     final AppShufflePartitionInfo partitionInfo = partitionInfoBeforeCheck != 
null
       && partitionInfoBeforeCheck.mapTracker.contains(msg.mapIndex) ? null
         : partitionInfoBeforeCheck;
-    final String streamId = String.format("%s_%d_%d_%d",
-      OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, appShuffleId.shuffleId, 
msg.mapIndex,
-      msg.reduceId);
     if (partitionInfo != null) {
-      return new PushBlockStreamCallback(this, streamId, partitionInfo, 
msg.mapIndex);
+      return new PushBlockStreamCallback(
+        this, appShuffleInfo, streamId, partitionInfo, msg.mapIndex);
     } else {
       // For a duplicate block or a block which is late, respond back with a 
callback that handles
       // them differently.
@@ -377,24 +396,31 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     }
   }
 
-  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
   @Override
   public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws 
IOException {
-    logger.info("Finalizing shuffle {} from Application {}.", msg.shuffleId, 
msg.appId);
-    AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId);
-    Map<Integer, AppShufflePartitionInfo> shufflePartitions = 
partitions.get(appShuffleId);
+    logger.info("Finalizing shuffle {} from Application {}_{}.",
+      msg.shuffleId, msg.appId, msg.appAttemptId);
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    if (appShuffleInfo.attemptId != msg.appAttemptId) {
+      // If this Block belongs to a former application attempt, it is 
considered late,
+      // as only the blocks from the current application attempt will be merged
+      // TODO: [SPARK-35548] Client should be updated to handle this error.
+      throw new IllegalArgumentException(
+        String.format("The attempt id %s in this FinalizeShuffleMerge message 
does not match "
+          + "with the current attempt id %s stored in shuffle service for 
application %s",
+          msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+    }
+    Map<Integer, AppShufflePartitionInfo> shufflePartitions =
+      appShuffleInfo.partitions.remove(msg.shuffleId);
     MergeStatuses mergeStatuses;
     if (shufflePartitions == null || shufflePartitions.isEmpty()) {
       mergeStatuses =
         new MergeStatuses(msg.shuffleId, new RoaringBitmap[0], new int[0], new 
long[0]);
     } else {
-      Collection<AppShufflePartitionInfo> partitionsToFinalize = 
shufflePartitions.values();
-      List<RoaringBitmap> bitmaps = new 
ArrayList<>(partitionsToFinalize.size());
-      List<Integer> reduceIds = new ArrayList<>(partitionsToFinalize.size());
-      List<Long> sizes = new ArrayList<>(partitionsToFinalize.size());
-      Iterator<AppShufflePartitionInfo> partitionsIter = 
partitionsToFinalize.iterator();
-      while (partitionsIter.hasNext()) {
-        AppShufflePartitionInfo partition = partitionsIter.next();
+      List<RoaringBitmap> bitmaps = new ArrayList<>(shufflePartitions.size());
+      List<Integer> reduceIds = new ArrayList<>(shufflePartitions.size());
+      List<Long> sizes = new ArrayList<>(shufflePartitions.size());
+      for (AppShufflePartitionInfo partition: shufflePartitions.values()) {
         synchronized (partition) {
           try {
             // This can throw IOException which will marks this shuffle 
partition as not merged.
@@ -403,13 +429,10 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
             reduceIds.add(partition.reduceId);
             sizes.add(partition.getLastChunkOffset());
           } catch (IOException ioe) {
-            logger.warn("Exception while finalizing shuffle partition {} {} 
{}", msg.appId,
-              msg.shuffleId, partition.reduceId, ioe);
+            logger.warn("Exception while finalizing shuffle partition {}_{} {} 
{}", msg.appId,
+              msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe);
           } finally {
             partition.closeAllFiles();
-            // The partition should be removed after the files are written so 
that any new stream
-            // for the same reduce partition will see that the data file 
exists.
-            partitionsIter.remove();
           }
         }
       }
@@ -417,8 +440,8 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
         bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), 
Ints.toArray(reduceIds),
         Longs.toArray(sizes));
     }
-    partitions.remove(appShuffleId);
-    logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, 
msg.appId);
+    logger.info("Finalized shuffle {} from Application {}_{}.",
+      msg.shuffleId, msg.appId, msg.appAttemptId);
     return mergeStatuses;
   }
 
@@ -426,15 +449,68 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
   public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) 
{
     if (logger.isDebugEnabled()) {
       logger.debug("register executor with RemoteBlockPushResolver {} 
local-dirs {} "
-        + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs),
-          executorInfo.subDirsPerLocalDir);
+        + "num sub-dirs {} shuffleManager {}", appId, 
Arrays.toString(executorInfo.localDirs),
+        executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager);
+    }
+    String shuffleManagerMeta = executorInfo.shuffleManager;
+    if (shuffleManagerMeta.contains(SHUFFLE_META_DELIMITER)) {
+      String mergeDirInfo =
+        
shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(SHUFFLE_META_DELIMITER) 
+ 1);
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        TypeReference<Map<String, String>> typeRef
+          = new TypeReference<Map<String, String>>(){};
+        Map<String, String> metaMap = mapper.readValue(mergeDirInfo, typeRef);
+        String mergeDir = metaMap.get(MERGE_DIR_KEY);
+        int attemptId = Integer.valueOf(
+          metaMap.getOrDefault(ATTEMPT_ID_KEY, 
String.valueOf(UNDEFINED_ATTEMPT_ID)));
+        if (mergeDir == null) {
+          throw new IllegalArgumentException(
+            String.format("Failed to get the merge directory information from 
the " +
+              "shuffleManagerMeta %s in executor registration message", 
shuffleManagerMeta));
+        }
+        if (attemptId == UNDEFINED_ATTEMPT_ID) {
+          // When attemptId is -1, there is no attemptId stored in the 
ExecutorShuffleInfo.
+          // Only the first ExecutorRegister message can register the merge 
dirs
+          appsShuffleInfo.computeIfAbsent(appId, id ->
+            new AppShuffleInfo(
+              appId, UNDEFINED_ATTEMPT_ID,
+              new AppPathsInfo(appId, executorInfo.localDirs,
+                mergeDir, executorInfo.subDirsPerLocalDir)
+            ));
+        } else {
+          // If attemptId is not -1, there is attemptId stored in the 
ExecutorShuffleInfo.
+          // The first ExecutorRegister message from the same application 
attempt wil register
+          // the merge dirs in External Shuffle Service. Any later 
ExecutorRegister message
+          // from the same application attempt will not override the merge 
dirs. But it can
+          // be overridden by ExecutorRegister message from newer application 
attempt,
+          // and former attempts' shuffle partitions information will also be 
cleaned up.
+          AtomicReference<AppShuffleInfo> originalAppShuffleInfo = new 
AtomicReference<>();
+          appsShuffleInfo.compute(appId, (id, appShuffleInfo) -> {
+            if (appShuffleInfo == null || attemptId > 
appShuffleInfo.attemptId) {
+              originalAppShuffleInfo.set(appShuffleInfo);
+              appShuffleInfo =
+                new AppShuffleInfo(
+                  appId, attemptId,
+                  new AppPathsInfo(appId, executorInfo.localDirs,
+                    mergeDir, executorInfo.subDirsPerLocalDir));
+            }
+            return appShuffleInfo;
+          });
+          if (originalAppShuffleInfo.get() != null) {
+            AppShuffleInfo appShuffleInfo = originalAppShuffleInfo.get();
+            logger.warn("Cleanup shuffle info and merged shuffle files for 
{}_{} as new " +
+                "application attempt registered", appId, 
appShuffleInfo.attemptId);
+            mergedShuffleCleaner.execute(
+              () -> closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, 
true));
+          }
+        }
+      } catch (JsonProcessingException e) {
+        logger.warn("Failed to get the merge directory information from 
ExecutorShuffleInfo: ", e);
+      }
+    } else {
+      logger.warn("ExecutorShuffleInfo does not have the expected merge 
directory information");
     }
-    appsPathInfo.computeIfAbsent(appId, id -> new AppPathsInfo(appId, 
executorInfo.localDirs,
-      executorInfo.subDirsPerLocalDir));
-  }
-  private static String generateFileName(AppShuffleId appShuffleId, int 
reduceId) {
-    return String.format("%s_%s_%d_%d", MERGED_SHUFFLE_FILE_NAME_PREFIX, 
appShuffleId.appId,
-      appShuffleId.shuffleId, reduceId);
   }
 
   /**
@@ -443,6 +519,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
   static class PushBlockStreamCallback implements StreamCallbackWithID {
 
     private final RemoteBlockPushResolver mergeManager;
+    private final AppShuffleInfo appShuffleInfo;
     private final String streamId;
     private final int mapIndex;
     private final AppShufflePartitionInfo partitionInfo;
@@ -457,12 +534,17 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
 
     private PushBlockStreamCallback(
         RemoteBlockPushResolver mergeManager,
+        AppShuffleInfo appShuffleInfo,
         String streamId,
         AppShufflePartitionInfo partitionInfo,
         int mapIndex) {
-      this.mergeManager = Preconditions.checkNotNull(mergeManager);
+      Preconditions.checkArgument(mergeManager != null);
+      this.mergeManager = mergeManager;
+      Preconditions.checkArgument(appShuffleInfo != null);
+      this.appShuffleInfo = appShuffleInfo;
       this.streamId = streamId;
-      this.partitionInfo = Preconditions.checkNotNull(partitionInfo);
+      Preconditions.checkArgument(partitionInfo != null);
+      this.partitionInfo = partitionInfo;
       this.mapIndex = mapIndex;
       abortIfNecessary();
     }
@@ -482,7 +564,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       while (buf.hasRemaining()) {
         long updatedPos = partitionInfo.getDataFilePos() + length;
         logger.debug("{} shuffleId {} reduceId {} current pos {} updated pos 
{}",
-          partitionInfo.appShuffleId.appId, 
partitionInfo.appShuffleId.shuffleId,
+          partitionInfo.appId, partitionInfo.shuffleId,
           partitionInfo.reduceId, partitionInfo.getDataFilePos(), updatedPos);
         length += partitionInfo.dataChannel.write(buf, updatedPos);
       }
@@ -567,7 +649,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       // memory, while still providing the necessary guarantee.
       synchronized (partitionInfo) {
         Map<Integer, AppShufflePartitionInfo> shufflePartitions =
-          mergeManager.partitions.get(partitionInfo.appShuffleId);
+          appShuffleInfo.partitions.get(partitionInfo.shuffleId);
         // If the partitionInfo corresponding to (appId, shuffleId, reduceId) 
is no longer present
         // then it means that the shuffle merge has already been finalized. We 
should thus ignore
         // the data and just drain the remaining bytes of this message. This 
check should be
@@ -587,7 +669,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
           }
           abortIfNecessary();
           logger.trace("{} shuffleId {} reduceId {} onData writable",
-            partitionInfo.appShuffleId.appId, 
partitionInfo.appShuffleId.shuffleId,
+            partitionInfo.appId, partitionInfo.shuffleId,
             partitionInfo.reduceId);
           if (partitionInfo.getCurrentMapIndex() < 0) {
             partitionInfo.setCurrentMapIndex(mapIndex);
@@ -609,7 +691,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
           }
         } else {
           logger.trace("{} shuffleId {} reduceId {} onData deferred",
-            partitionInfo.appShuffleId.appId, 
partitionInfo.appShuffleId.shuffleId,
+            partitionInfo.appId, partitionInfo.shuffleId,
             partitionInfo.reduceId);
           // If we cannot write to disk, we buffer the current block chunk in 
memory so it could
           // potentially be written to disk later. We take our best effort 
without guarantee
@@ -644,10 +726,10 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     public void onComplete(String streamId) throws IOException {
       synchronized (partitionInfo) {
         logger.trace("{} shuffleId {} reduceId {} onComplete invoked",
-          partitionInfo.appShuffleId.appId, 
partitionInfo.appShuffleId.shuffleId,
+          partitionInfo.appId, partitionInfo.shuffleId,
           partitionInfo.reduceId);
         Map<Integer, AppShufflePartitionInfo> shufflePartitions =
-          mergeManager.partitions.get(partitionInfo.appShuffleId);
+          appShuffleInfo.partitions.get(partitionInfo.shuffleId);
         // When this request initially got to the server, the shuffle merge 
finalize request
         // was not received yet. By the time we finish reading this message, 
the shuffle merge
         // however is already finalized. We should thus respond RpcFailure to 
the client.
@@ -724,10 +806,10 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       if (isWriting) {
         synchronized (partitionInfo) {
           Map<Integer, AppShufflePartitionInfo> shufflePartitions =
-            mergeManager.partitions.get(partitionInfo.appShuffleId);
+            appShuffleInfo.partitions.get(partitionInfo.shuffleId);
           if (shufflePartitions != null && 
shufflePartitions.containsKey(partitionInfo.reduceId)) {
             logger.debug("{} shuffleId {} reduceId {} encountered failure",
-              partitionInfo.appShuffleId.appId, 
partitionInfo.appShuffleId.shuffleId,
+              partitionInfo.appId, partitionInfo.shuffleId,
               partitionInfo.reduceId);
             partitionInfo.setCurrentMapIndex(-1);
           }
@@ -742,63 +824,25 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     }
   }
 
-  /**
-   * ID that uniquely identifies a shuffle for an application. This is used as 
a key in
-   * {@link #partitions}.
-   */
-  public static class AppShuffleId {
-    public final String appId;
-    public final int shuffleId;
-
-    AppShuffleId(String appId, int shuffleId) {
-      this.appId = appId;
-      this.shuffleId = shuffleId;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      AppShuffleId that = (AppShuffleId) o;
-      return shuffleId == that.shuffleId && Objects.equal(appId, that.appId);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(appId, shuffleId);
-    }
-
-    @Override
-    public String toString() {
-      return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
-        .append("appId", appId)
-        .append("shuffleId", shuffleId)
-        .toString();
-    }
-  }
-
   /** Metadata tracked for an actively merged shuffle partition */
   public static class AppShufflePartitionInfo {
 
-    private final AppShuffleId appShuffleId;
+    private final String appId;
+    private final int shuffleId;
     private final int reduceId;
     // The merged shuffle data file channel
-    public FileChannel dataChannel;
+    public final FileChannel dataChannel;
+    // The index file for a particular merged shuffle contains the chunk 
offsets.
+    private final MergeShuffleFile indexFile;
+    // The meta file for a particular merged shuffle contains all the map 
indices that belong to
+    // every chunk. The entry per chunk is a serialized bitmap.
+    private final MergeShuffleFile metaFile;
     // Location offset of the last successfully merged block for this shuffle 
partition
     private long dataFilePos;
     // Track the map index whose block is being merged for this shuffle 
partition
     private int currentMapIndex;
     // Bitmap tracking which mapper's blocks have been merged for this shuffle 
partition
     private RoaringBitmap mapTracker;
-    // The index file for a particular merged shuffle contains the chunk 
offsets.
-    private MergeShuffleFile indexFile;
-    // The meta file for a particular merged shuffle contains all the map 
indices that belong to
-    // every chunk. The entry per chunk is a serialized bitmap.
-    private MergeShuffleFile metaFile;
     // The offset for the last chunk tracked in the index file for this 
shuffle partition
     private long lastChunkOffset;
     private int lastMergedMapIndex = -1;
@@ -808,12 +852,15 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     private boolean indexMetaUpdateFailed;
 
     AppShufflePartitionInfo(
-        AppShuffleId appShuffleId,
+        String appId,
+        int shuffleId,
         int reduceId,
         File dataFile,
         MergeShuffleFile indexFile,
         MergeShuffleFile metaFile) throws IOException {
-      this.appShuffleId = Preconditions.checkNotNull(appShuffleId, "app 
shuffle id");
+      Preconditions.checkArgument(appId != null, "app id is null");
+      this.appId = appId;
+      this.shuffleId = shuffleId;
       this.reduceId = reduceId;
       this.dataChannel = new FileOutputStream(dataFile).getChannel();
       this.indexFile = indexFile;
@@ -831,8 +878,8 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     }
 
     public void setDataFilePos(long dataFilePos) {
-      logger.trace("{} shuffleId {} reduceId {} current pos {} update pos {}", 
appShuffleId.appId,
-        appShuffleId.shuffleId, reduceId, this.dataFilePos, dataFilePos);
+      logger.trace("{} shuffleId {} reduceId {} current pos {} update pos {}", 
appId,
+        shuffleId, reduceId, this.dataFilePos, dataFilePos);
       this.dataFilePos = dataFilePos;
     }
 
@@ -842,7 +889,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
 
     void setCurrentMapIndex(int mapIndex) {
       logger.trace("{} shuffleId {} reduceId {} updated mapIndex {} current 
mapIndex {}",
-        appShuffleId.appId, appShuffleId.shuffleId, reduceId, currentMapIndex, 
mapIndex);
+        appId, shuffleId, reduceId, currentMapIndex, mapIndex);
       this.currentMapIndex = mapIndex;
     }
 
@@ -851,8 +898,8 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     }
 
     void blockMerged(int mapIndex) {
-      logger.debug("{} shuffleId {} reduceId {} updated merging mapIndex {}", 
appShuffleId.appId,
-        appShuffleId.shuffleId, reduceId, mapIndex);
+      logger.debug("{} shuffleId {} reduceId {} updated merging mapIndex {}", 
appId,
+        shuffleId, reduceId, mapIndex);
       mapTracker.add(mapIndex);
       chunkTracker.add(mapIndex);
       lastMergedMapIndex = mapIndex;
@@ -871,7 +918,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException {
       try {
         logger.trace("{} shuffleId {} reduceId {} index current {} updated {}",
-          appShuffleId.appId, appShuffleId.shuffleId, reduceId, 
this.lastChunkOffset, chunkOffset);
+          appId, shuffleId, reduceId, this.lastChunkOffset, chunkOffset);
         if (indexMetaUpdateFailed) {
           indexFile.getChannel().position(indexFile.getPos());
         }
@@ -885,8 +932,8 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
         this.lastChunkOffset = chunkOffset;
         indexMetaUpdateFailed = false;
       } catch (IOException ioe) {
-        logger.warn("{} shuffleId {} reduceId {} update to index/meta failed", 
appShuffleId.appId,
-          appShuffleId.shuffleId, reduceId);
+        logger.warn("{} shuffleId {} reduceId {} update to index/meta failed", 
appId,
+          shuffleId, reduceId);
         indexMetaUpdateFailed = true;
         // Any exception here is propagated to the caller and the caller can 
decide whether to
         // abort or not.
@@ -900,7 +947,7 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
       }
       chunkTracker.add(mapIndex);
       logger.trace("{} shuffleId {} reduceId {} mapIndex {} write chunk to 
meta file",
-        appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex);
+        appId, shuffleId, reduceId, mapIndex);
       if (indexMetaUpdateFailed) {
         metaFile.getChannel().position(metaFile.getPos());
       }
@@ -934,35 +981,25 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     }
 
     void closeAllFiles() {
-      if (dataChannel != null) {
-        try {
+      try {
+        if (dataChannel.isOpen()) {
           dataChannel.close();
-        } catch (IOException ioe) {
-          logger.warn("Error closing data channel for {} shuffleId {} reduceId 
{}",
-            appShuffleId.appId, appShuffleId.shuffleId, reduceId);
-        } finally {
-          dataChannel = null;
         }
+      } catch (IOException ioe) {
+        logger.warn("Error closing data channel for {} shuffleId {} reduceId 
{}",
+          appId, shuffleId, reduceId);
       }
-      if (metaFile != null) {
-        try {
-          metaFile.close();
-        } catch (IOException ioe) {
-          logger.warn("Error closing meta file for {} shuffleId {} reduceId 
{}",
-            appShuffleId.appId, appShuffleId.shuffleId, reduceId);
-        } finally {
-          metaFile = null;
-        }
+      try {
+        metaFile.close();
+      } catch (IOException ioe) {
+        logger.warn("Error closing meta file for {} shuffleId {} reduceId {}",
+          appId, shuffleId, reduceId);
       }
-      if (indexFile != null) {
-        try {
-          indexFile.close();
-        } catch (IOException ioe) {
-          logger.warn("Error closing index file for {} shuffleId {} reduceId 
{}",
-            appShuffleId.appId, appShuffleId.shuffleId, reduceId);
-        } finally {
-          indexFile = null;
-        }
+      try {
+        indexFile.close();
+      } catch (IOException ioe) {
+        logger.warn("Error closing index file for {} shuffleId {} reduceId {}",
+          appId, shuffleId, reduceId);
       }
     }
 
@@ -1003,14 +1040,16 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     private AppPathsInfo(
         String appId,
         String[] localDirs,
+        String mergeDirectory,
         int subDirsPerLocalDir) {
       activeLocalDirs = Arrays.stream(localDirs)
         .map(localDir ->
           // Merge directory is created at the same level as block-manager 
directory. The list of
-          // local directories that we get from executorShuffleInfo are paths 
of each
-          // block-manager directory. To find out the merge directory 
location, we first find the
-          // parent dir and then append the "merger_manager" directory to it.
-          
Paths.get(localDir).getParent().resolve(MERGE_MANAGER_DIR).toFile().getPath())
+          // local directories that we get from ExecutorShuffleInfo are paths 
of each
+          // block-manager directory. The mergeDirectory is the merge 
directory name that we get
+          // from ExecutorShuffleInfo. To find out the merge directory 
location, we first find the
+          // parent dir of the block-manager directory and then append merge 
directory name to it.
+          
Paths.get(localDir).getParent().resolve(mergeDirectory).toFile().getPath())
         .toArray(String[]::new);
       this.subDirsPerLocalDir = subDirsPerLocalDir;
       if (logger.isInfoEnabled()) {
@@ -1020,10 +1059,76 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     }
   }
 
+  /** Merged Shuffle related information tracked for a specific application 
attempt */
+  public static class AppShuffleInfo {
+
+    private final String appId;
+    private final int attemptId;
+    private final AppPathsInfo appPathsInfo;
+    private final ConcurrentMap<Integer, Map<Integer, 
AppShufflePartitionInfo>> partitions;
+
+    AppShuffleInfo(
+        String appId,
+        int attemptId,
+        AppPathsInfo appPathsInfo) {
+      this.appId = appId;
+      this.attemptId = attemptId;
+      this.appPathsInfo = appPathsInfo;
+      partitions = new ConcurrentHashMap<>();
+    }
+
+    @VisibleForTesting
+    public ConcurrentMap<Integer, Map<Integer, AppShufflePartitionInfo>> 
getPartitions() {
+      return partitions;
+    }
+
+    /**
+     * The logic here is consistent with
+     * @see [[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(
+     *      org.apache.spark.storage.BlockId, scala.Option)]]
+     */
+    private File getFile(String filename) {
+      // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart
+      File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
+        appPathsInfo.subDirsPerLocalDir, filename);
+      logger.debug("Get merged file {}", targetFile.getAbsolutePath());
+      return targetFile;
+    }
+
+    private String generateFileName(
+        String appId,
+        int shuffleId,
+        int reduceId) {
+      return String.format(
+        "%s_%s_%d_%d", MERGED_SHUFFLE_FILE_NAME_PREFIX, appId, shuffleId, 
reduceId);
+    }
+
+    public File getMergedShuffleDataFile(
+        int shuffleId,
+        int reduceId) {
+      String fileName = String.format("%s.data", generateFileName(appId, 
shuffleId, reduceId));
+      return getFile(fileName);
+    }
+
+    public File getMergedShuffleIndexFile(
+        int shuffleId,
+        int reduceId) {
+      String indexName = String.format("%s.index", generateFileName(appId, 
shuffleId, reduceId));
+      return getFile(indexName);
+    }
+
+    public File getMergedShuffleMetaFile(
+        int shuffleId,
+        int reduceId) {
+      String metaName = String.format("%s.meta", generateFileName(appId, 
shuffleId, reduceId));
+      return getFile(metaName);
+    }
+  }
+
   @VisibleForTesting
   static class MergeShuffleFile {
-    private FileChannel channel;
-    private DataOutputStream dos;
+    private final FileChannel channel;
+    private final DataOutputStream dos;
     private long pos;
 
     @VisibleForTesting
@@ -1044,11 +1149,8 @@ public class RemoteBlockPushResolver implements 
MergedShuffleFileManager {
     }
 
     void close() throws IOException {
-      try {
+      if (channel.isOpen()) {
         dos.close();
-      } finally {
-        dos = null;
-        channel = null;
       }
     }
 
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
index b4e7bc4..f123ccb 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
@@ -35,7 +35,12 @@ public class ExecutorShuffleInfo implements Encodable {
   public final String[] localDirs;
   /** Number of subdirectories created within each localDir. */
   public final int subDirsPerLocalDir;
-  /** Shuffle manager (SortShuffleManager) that the executor is using. */
+  /**
+   * Shuffle manager (SortShuffleManager) that the executor is using.
+   * If this string contains semicolon, it will also include the meta 
information
+   * for push based shuffle in JSON format. Example of the string with 
semicolon would be:
+   * SortShuffleManager:{"mergeDir": "mergeDirectory_1", "attemptId": 1}
+   */
   public final String shuffleManager;
 
   @JsonCreator
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java
index 31efbb7..f6ab78b 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java
@@ -32,12 +32,15 @@ import org.apache.spark.network.protocol.Encoders;
  */
 public class FinalizeShuffleMerge extends BlockTransferMessage {
   public final String appId;
+  public final int appAttemptId;
   public final int shuffleId;
 
   public FinalizeShuffleMerge(
       String appId,
+      int appAttemptId,
       int shuffleId) {
     this.appId = appId;
+    this.appAttemptId = appAttemptId;
     this.shuffleId = shuffleId;
   }
 
@@ -48,13 +51,14 @@ public class FinalizeShuffleMerge extends 
BlockTransferMessage {
 
   @Override
   public int hashCode() {
-    return Objects.hashCode(appId, shuffleId);
+    return Objects.hashCode(appId, appAttemptId, shuffleId);
   }
 
   @Override
   public String toString() {
     return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
       .append("appId", appId)
+      .append("attemptId", appAttemptId)
       .append("shuffleId", shuffleId)
       .toString();
   }
@@ -64,6 +68,7 @@ public class FinalizeShuffleMerge extends 
BlockTransferMessage {
     if (other != null && other instanceof FinalizeShuffleMerge) {
       FinalizeShuffleMerge o = (FinalizeShuffleMerge) other;
       return Objects.equal(appId, o.appId)
+        && appAttemptId == appAttemptId
         && shuffleId == o.shuffleId;
     }
     return false;
@@ -71,18 +76,20 @@ public class FinalizeShuffleMerge extends 
BlockTransferMessage {
 
   @Override
   public int encodedLength() {
-    return Encoders.Strings.encodedLength(appId) + 4;
+    return Encoders.Strings.encodedLength(appId) + 4 + 4;
   }
 
   @Override
   public void encode(ByteBuf buf) {
     Encoders.Strings.encode(buf, appId);
+    buf.writeInt(appAttemptId);
     buf.writeInt(shuffleId);
   }
 
   public static FinalizeShuffleMerge decode(ByteBuf buf) {
     String appId = Encoders.Strings.decode(buf);
+    int attemptId = buf.readInt();
     int shuffleId = buf.readInt();
-    return new FinalizeShuffleMerge(appId, shuffleId);
+    return new FinalizeShuffleMerge(appId, attemptId, shuffleId);
   }
 }
diff --git 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java
 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java
index 559f88f..d5e1cf2 100644
--- 
a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java
+++ 
b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java
@@ -19,6 +19,7 @@ package org.apache.spark.network.shuffle.protocol;
 
 import com.google.common.base.Objects;
 import io.netty.buffer.ByteBuf;
+
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 
@@ -34,6 +35,7 @@ import org.apache.spark.network.protocol.Encoders;
  */
 public class PushBlockStream extends BlockTransferMessage {
   public final String appId;
+  public final int appAttemptId;
   public final int shuffleId;
   public final int mapIndex;
   public final int reduceId;
@@ -41,8 +43,15 @@ public class PushBlockStream extends BlockTransferMessage {
   // blocks to be pushed.
   public final int index;
 
-  public PushBlockStream(String appId, int shuffleId, int mapIndex, int 
reduceId, int index) {
+  public PushBlockStream(
+      String appId,
+      int appAttemptId,
+      int shuffleId,
+      int mapIndex,
+      int reduceId,
+      int index) {
     this.appId = appId;
+    this.appAttemptId = appAttemptId;
     this.shuffleId = shuffleId;
     this.mapIndex = mapIndex;
     this.reduceId = reduceId;
@@ -56,13 +65,14 @@ public class PushBlockStream extends BlockTransferMessage {
 
   @Override
   public int hashCode() {
-    return Objects.hashCode(appId, shuffleId, mapIndex , reduceId, index);
+    return Objects.hashCode(appId, appAttemptId, shuffleId, mapIndex , 
reduceId, index);
   }
 
   @Override
   public String toString() {
     return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
       .append("appId", appId)
+      .append("attemptId", appAttemptId)
       .append("shuffleId", shuffleId)
       .append("mapIndex", mapIndex)
       .append("reduceId", reduceId)
@@ -75,6 +85,7 @@ public class PushBlockStream extends BlockTransferMessage {
     if (other != null && other instanceof PushBlockStream) {
       PushBlockStream o = (PushBlockStream) other;
       return Objects.equal(appId, o.appId)
+        && appAttemptId == o.appAttemptId
         && shuffleId == o.shuffleId
         && mapIndex == o.mapIndex
         && reduceId == o.reduceId
@@ -85,12 +96,13 @@ public class PushBlockStream extends BlockTransferMessage {
 
   @Override
   public int encodedLength() {
-    return Encoders.Strings.encodedLength(appId) + 16;
+    return Encoders.Strings.encodedLength(appId) + 4 + 4 + 4 + 4 + 4;
   }
 
   @Override
   public void encode(ByteBuf buf) {
     Encoders.Strings.encode(buf, appId);
+    buf.writeInt(appAttemptId);
     buf.writeInt(shuffleId);
     buf.writeInt(mapIndex);
     buf.writeInt(reduceId);
@@ -99,10 +111,11 @@ public class PushBlockStream extends BlockTransferMessage {
 
   public static PushBlockStream decode(ByteBuf buf) {
     String appId = Encoders.Strings.decode(buf);
+    int attemptId = buf.readInt();
     int shuffleId = buf.readInt();
     int mapIdx = buf.readInt();
     int reduceId = buf.readInt();
     int index = buf.readInt();
-    return new PushBlockStream(appId, shuffleId, mapIdx, reduceId, index);
+    return new PushBlockStream(appId, attemptId, shuffleId, mapIdx, reduceId, 
index);
   }
 }
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
index dc41e95..00756b1 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
@@ -243,7 +243,7 @@ public class ExternalBlockHandlerSuite {
   public void testFinalizeShuffleMerge() throws IOException {
     RpcResponseCallback callback = mock(RpcResponseCallback.class);
 
-    FinalizeShuffleMerge req = new FinalizeShuffleMerge("app0", 0);
+    FinalizeShuffleMerge req = new FinalizeShuffleMerge("app0", 1, 0);
     RoaringBitmap bitmap = RoaringBitmap.bitmapOf(0, 1, 2);
     MergeStatuses statuses = new MergeStatuses(0, new RoaringBitmap[]{bitmap},
       new int[]{3}, new long[]{30});
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
index 46a0f6c..e41198f 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
@@ -51,7 +51,7 @@ public class OneForOneBlockPusherSuite {
     BlockFetchingListener listener = pushBlocks(
       blocks,
       blockIds,
-      Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0)));
+      Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0)));
 
     verify(listener).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
   }
@@ -67,9 +67,9 @@ public class OneForOneBlockPusherSuite {
     BlockFetchingListener listener = pushBlocks(
       blocks,
       blockIds,
-      Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0),
-        new PushBlockStream("app-id", 0, 1, 0, 1),
-        new PushBlockStream("app-id", 0, 2, 0, 2)));
+      Arrays.asList(new PushBlockStream("app-id",0,  0, 0, 0, 0),
+        new PushBlockStream("app-id", 0, 0, 1, 0, 1),
+        new PushBlockStream("app-id", 0, 0, 2, 0, 2)));
 
     verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), 
any());
     verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_1_0"), 
any());
@@ -87,9 +87,9 @@ public class OneForOneBlockPusherSuite {
     BlockFetchingListener listener = pushBlocks(
       blocks,
       blockIds,
-      Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0),
-        new PushBlockStream("app-id", 0, 1, 0, 1),
-        new PushBlockStream("app-id", 0, 2, 0, 2)));
+      Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0),
+        new PushBlockStream("app-id", 0, 0, 1, 0, 1),
+        new PushBlockStream("app-id", 0, 0, 2, 0, 2)));
 
     verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), 
any());
     verify(listener, times(1)).onBlockFetchFailure(eq("shufflePush_0_1_0"), 
any());
@@ -107,9 +107,9 @@ public class OneForOneBlockPusherSuite {
     BlockFetchingListener listener = pushBlocks(
       blocks,
       blockIds,
-      Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0),
-        new PushBlockStream("app-id", 0, 1, 0, 1),
-        new PushBlockStream("app-id", 0, 2, 0, 2)));
+      Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0),
+        new PushBlockStream("app-id", 0, 0, 1, 0, 1),
+        new PushBlockStream("app-id", 0, 0, 2, 0, 2)));
 
     verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), 
any());
     verify(listener, 
times(0)).onBlockFetchSuccess(not(eq("shufflePush_0_0_0")), any());
@@ -130,7 +130,7 @@ public class OneForOneBlockPusherSuite {
     TransportClient client = mock(TransportClient.class);
     BlockFetchingListener listener = mock(BlockFetchingListener.class);
     OneForOneBlockPusher pusher =
-      new OneForOneBlockPusher(client, "app-id", blockIds, listener, blocks);
+      new OneForOneBlockPusher(client, "app-id", 0, blockIds, listener, 
blocks);
 
     Iterator<Map.Entry<String, ManagedBuffer>> blockIterator = 
blocks.entrySet().iterator();
     Iterator<BlockTransferMessage> msgIterator = expectMessages.iterator();
diff --git 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 565d433..2a73aa5 100644
--- 
a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++ 
b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -22,11 +22,13 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
+import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -61,6 +63,17 @@ public class RemoteBlockPushResolverSuite {
 
   private static final Logger log = 
LoggerFactory.getLogger(RemoteBlockPushResolverSuite.class);
   private final String TEST_APP = "testApp";
+  private final String MERGE_DIRECTORY = "merge_manager";
+  private final int NO_ATTEMPT_ID = -1;
+  private final int ATTEMPT_ID_1 = 1;
+  private final int ATTEMPT_ID_2 = 2;
+  private final String MERGE_DIRECTORY_META = "shuffleManager:{\"mergeDir\": 
\"merge_manager\"}";
+  private final String MERGE_DIRECTORY_META_1 =
+    "shuffleManager:{\"mergeDir\": \"merge_manager_1\", \"attemptId\": \"1\"}";
+  private final String MERGE_DIRECTORY_META_2 =
+    "shuffleManager:{\"mergeDir\": \"merge_manager_2\", \"attemptId\": \"2\"}";
+  private final String INVALID_MERGE_DIRECTORY_META =
+          "shuffleManager:{\"mergeDirInvalid\": \"merge_manager_2\", 
\"attemptId\": \"2\"}";
   private final String BLOCK_MANAGER_DIR = "blockmgr-193d8401";
 
   private TransportConf conf;
@@ -74,7 +87,7 @@ public class RemoteBlockPushResolverSuite {
       ImmutableMap.of("spark.shuffle.server.minChunkSizeInMergedShuffleFile", 
"4"));
     conf = new TransportConf("shuffle", provider);
     pushResolver = new RemoteBlockPushResolver(conf);
-    registerExecutor(TEST_APP, prepareLocalDirs(localDirs));
+    registerExecutor(TEST_APP, prepareLocalDirs(localDirs, MERGE_DIRECTORY), 
MERGE_DIRECTORY_META);
   }
 
   @After
@@ -106,9 +119,9 @@ public class RemoteBlockPushResolverSuite {
       new PushBlock(0, 0, 0, ByteBuffer.wrap(new byte[4])),
       new PushBlock(0, 1, 0, ByteBuffer.wrap(new byte[5]))
     };
-    pushBlockHelper(TEST_APP, pushBlocks);
+    pushBlockHelper(TEST_APP, NO_ATTEMPT_ID, pushBlocks);
     MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
-      new FinalizeShuffleMerge(TEST_APP, 0));
+      new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     validateMergeStatuses(statuses, new int[] {0}, new long[] {9});
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4, 5}, new 
int[][]{{0}, {1}});
@@ -122,9 +135,9 @@ public class RemoteBlockPushResolverSuite {
       new PushBlock(0, 2, 0, ByteBuffer.wrap(new byte[5])),
       new PushBlock(0, 3, 0, ByteBuffer.wrap(new byte[3]))
     };
-    pushBlockHelper(TEST_APP, pushBlocks);
+    pushBlockHelper(TEST_APP, NO_ATTEMPT_ID, pushBlocks);
     MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
-      new FinalizeShuffleMerge(TEST_APP, 0));
+      new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     validateMergeStatuses(statuses, new int[] {0}, new long[] {13});
     MergedBlockMeta meta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
     validateChunks(TEST_APP, 0, 0, meta, new int[]{5, 5, 3}, new int[][]{{0, 
1}, {2}, {3}});
@@ -138,9 +151,9 @@ public class RemoteBlockPushResolverSuite {
       new PushBlock(0, 0, 1, ByteBuffer.wrap(new byte[5])),
       new PushBlock(0, 1, 1, ByteBuffer.wrap(new byte[3]))
     };
-    pushBlockHelper(TEST_APP, pushBlocks);
+    pushBlockHelper(TEST_APP, NO_ATTEMPT_ID, pushBlocks);
     MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
-      new FinalizeShuffleMerge(TEST_APP, 0));
+      new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     validateMergeStatuses(statuses, new int[] {0, 1}, new long[] {5, 8});
     MergedBlockMeta meta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
     validateChunks(TEST_APP, 0, 0, meta, new int[]{5}, new int[][]{{0, 1}});
@@ -149,10 +162,12 @@ public class RemoteBlockPushResolverSuite {
   @Test
   public void testDeferredBufsAreWrittenDuringOnData() throws IOException {
     StreamCallbackWithID stream1 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     StreamCallbackWithID stream2 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
1, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     // This should be deferred
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3]));
     // stream 1 now completes
@@ -161,7 +176,7 @@ public class RemoteBlockPushResolverSuite {
     // stream 2 has more data and then completes
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3]));
     stream2.onComplete(stream2.getID());
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4, 6}, new 
int[][]{{0}, {1}});
   }
@@ -169,10 +184,12 @@ public class RemoteBlockPushResolverSuite {
   @Test
   public void testDeferredBufsAreWrittenDuringOnComplete() throws IOException {
     StreamCallbackWithID stream1 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     StreamCallbackWithID stream2 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
1, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     // This should be deferred
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3]));
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3]));
@@ -181,7 +198,7 @@ public class RemoteBlockPushResolverSuite {
     stream1.onComplete(stream1.getID());
     // stream 2 now completes completes
     stream2.onComplete(stream2.getID());
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4, 6}, new 
int[][]{{0}, {1}});
   }
@@ -189,17 +206,19 @@ public class RemoteBlockPushResolverSuite {
   @Test
   public void testDuplicateBlocksAreIgnoredWhenPrevStreamHasCompleted() throws 
IOException {
     StreamCallbackWithID stream1 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     stream1.onComplete(stream1.getID());
     StreamCallbackWithID stream2 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     // This should be ignored
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
     stream2.onComplete(stream2.getID());
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}});
   }
@@ -207,10 +226,12 @@ public class RemoteBlockPushResolverSuite {
   @Test
   public void testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws 
IOException {
     StreamCallbackWithID stream1 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     StreamCallbackWithID stream2 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     // This should be ignored
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
@@ -219,7 +240,7 @@ public class RemoteBlockPushResolverSuite {
     stream1.onComplete(stream1.getID());
     // stream 2 now completes completes
     stream2.onComplete(stream2.getID());
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}});
   }
@@ -227,10 +248,11 @@ public class RemoteBlockPushResolverSuite {
   @Test
   public void testFailureAfterData() throws IOException {
     StreamCallbackWithID stream =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
     stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0);
     assertEquals("num-chunks", 0, blockMeta.getNumChunks());
   }
@@ -238,12 +260,13 @@ public class RemoteBlockPushResolverSuite {
   @Test
   public void testFailureAfterMultipleDataBlocks() throws IOException {
     StreamCallbackWithID stream =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[2]));
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[3]));
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
     stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0);
     assertEquals("num-chunks", 0, blockMeta.getNumChunks());
   }
@@ -251,39 +274,39 @@ public class RemoteBlockPushResolverSuite {
   @Test
   public void testFailureAfterComplete() throws IOException {
     StreamCallbackWithID stream =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[2]));
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[3]));
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
     stream.onComplete(stream.getID());
     stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{9}, new int[][]{{0}});
   }
 
-  @Test (expected = RuntimeException.class)
-  public void testTooLateArrival() throws IOException {
+  @Test(expected = RuntimeException.class)
+  public void testBlockReceivedAfterMergeFinalize() throws IOException {
     ByteBuffer[] blocks = new ByteBuffer[]{
       ByteBuffer.wrap(new byte[4]),
       ByteBuffer.wrap(new byte[5])
     };
     StreamCallbackWithID stream = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+      new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     for (ByteBuffer block : blocks) {
       stream.onData(stream.getID(), block);
     }
     stream.onComplete(stream.getID());
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0));
     StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+      new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4]));
     try {
       stream1.onComplete(stream1.getID());
     } catch (RuntimeException re) {
       assertEquals(
-        "Block shufflePush_0_1_0 received after merged shuffle is finalized",
-          re.getMessage());
+        "Block shufflePush_0_1_0 received after merged shuffle is finalized", 
re.getMessage());
       MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0);
       validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{9}, new 
int[][]{{0}});
       throw re;
@@ -292,28 +315,31 @@ public class RemoteBlockPushResolverSuite {
 
   @Test
   public void testIncompleteStreamsAreOverwritten() throws IOException {
-    registerExecutor(TEST_APP, prepareLocalDirs(localDirs));
+    registerExecutor(TEST_APP, prepareLocalDirs(localDirs, MERGE_DIRECTORY), 
MERGE_DIRECTORY_META);
     byte[] expectedBytes = new byte[4];
     ThreadLocalRandom.current().nextBytes(expectedBytes);
 
     StreamCallbackWithID stream1 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     byte[] data = new byte[10];
     ThreadLocalRandom.current().nextBytes(data);
     stream1.onData(stream1.getID(), ByteBuffer.wrap(data));
     // There is a failure
     stream1.onFailure(stream1.getID(), new RuntimeException("forced error"));
     StreamCallbackWithID stream2 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
1, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     ByteBuffer nextBuf= ByteBuffer.wrap(expectedBytes, 0, 2);
     stream2.onData(stream2.getID(), nextBuf);
     stream2.onComplete(stream2.getID());
     StreamCallbackWithID stream3 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
2, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0));
     nextBuf =  ByteBuffer.wrap(expectedBytes, 2, 2);
     stream3.onData(stream3.getID(), nextBuf);
     stream3.onComplete(stream3.getID());
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4}, new int[][]{{1, 
2}});
     FileSegmentManagedBuffer mb =
@@ -321,13 +347,15 @@ public class RemoteBlockPushResolverSuite {
     assertArrayEquals(expectedBytes, mb.nioByteBuffer().array());
   }
 
-  @Test (expected = RuntimeException.class)
+  @Test(expected = RuntimeException.class)
   public void testCollision() throws IOException {
     StreamCallbackWithID stream1 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     StreamCallbackWithID stream2 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
1, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     // This should be deferred
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[5]));
     // Since stream2 didn't get any opportunity it will throw couldn't find 
opportunity error
@@ -341,17 +369,20 @@ public class RemoteBlockPushResolverSuite {
     }
   }
 
-  @Test (expected = RuntimeException.class)
+  @Test(expected = RuntimeException.class)
   public void testFailureInAStreamDoesNotInterfereWithStreamWhichIsWriting() 
throws IOException {
     StreamCallbackWithID stream1 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     StreamCallbackWithID stream2 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
1, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     // There is a failure with stream2
     stream2.onFailure(stream2.getID(), new RuntimeException("forced error"));
     StreamCallbackWithID stream3 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
2, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0));
     // This should be deferred
     stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[5]));
     // Since this stream didn't get any opportunity it will throw couldn't 
find opportunity error
@@ -368,7 +399,7 @@ public class RemoteBlockPushResolverSuite {
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     stream1.onComplete(stream1.getID());
 
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4}, new int[][] 
{{0}});
     if (failedEx != null) {
@@ -376,28 +407,83 @@ public class RemoteBlockPushResolverSuite {
     }
   }
 
-  @Test(expected = NullPointerException.class)
+  @Test(expected = IllegalArgumentException.class)
   public void testUpdateLocalDirsOnlyOnce() throws IOException {
     String testApp = "updateLocalDirsOnlyOnceTest";
     Path[] activeLocalDirs = createLocalDirs(1);
-    registerExecutor(testApp, prepareLocalDirs(activeLocalDirs));
+    registerExecutor(testApp, prepareLocalDirs(activeLocalDirs, 
MERGE_DIRECTORY),
+      MERGE_DIRECTORY_META);
     assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 1);
     assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains(
       activeLocalDirs[0].toFile().getPath()));
-    // Any later executor register from the same application should not change 
the active local
-    // dirs list
+    // Any later executor register from the same application attempt should 
not change the active
+    // local dirs list
     Path[] updatedLocalDirs = localDirs;
-    registerExecutor(testApp, prepareLocalDirs(updatedLocalDirs));
+    registerExecutor(testApp, prepareLocalDirs(updatedLocalDirs, 
MERGE_DIRECTORY),
+      MERGE_DIRECTORY_META);
     assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 1);
     assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains(
       activeLocalDirs[0].toFile().getPath()));
     removeApplication(testApp);
     try {
       pushResolver.getMergedBlockDirs(testApp);
-    } catch (Throwable e) {
-      assertTrue(e.getMessage()
-        .startsWith("application " + testApp + " is not registered or NM was 
restarted."));
-      Throwables.propagate(e);
+    } catch (IllegalArgumentException e) {
+      assertEquals(e.getMessage(),
+        "application " + testApp + " is not registered or NM was restarted.");
+      throw e;
+    }
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testExecutorRegisterWithInvalidJsonForPushShuffle() throws 
IOException {
+    String testApp = "executorRegisterWithInvalidShuffleManagerMeta";
+    Path[] activeLocalDirs = createLocalDirs(1);
+    try {
+      registerExecutor(testApp, prepareLocalDirs(activeLocalDirs, 
MERGE_DIRECTORY),
+        INVALID_MERGE_DIRECTORY_META);
+    } catch (IllegalArgumentException re) {
+      assertEquals(
+        "Failed to get the merge directory information from the 
shuffleManagerMeta " +
+          "shuffleManager:{\"mergeDirInvalid\": \"merge_manager_2\", 
\"attemptId\": \"2\"} in " +
+          "executor registration message", re.getMessage());
+      throw re;
+    }
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testExecutorRegistrationFromTwoAppAttempts() throws IOException {
+    String testApp = "testExecutorRegistrationFromTwoAppAttempts";
+    Path[] attempt1LocalDirs = createLocalDirs(1);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_1),
+      MERGE_DIRECTORY_META_1);
+    assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 1);
+    assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains(
+      attempt1LocalDirs[0].toFile().getPath()));
+    // Any later executor register from the same application attempt should 
not change the active
+    // local dirs list
+    Path[] attempt1UpdatedLocalDirs = localDirs;
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt1UpdatedLocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_1),
+      MERGE_DIRECTORY_META_1);
+    assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 1);
+    assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains(
+      attempt1LocalDirs[0].toFile().getPath()));
+    // But a new attempt from the same application can change the active local 
dirs list
+    Path[] attempt2LocalDirs = createLocalDirs(2);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_2),
+      MERGE_DIRECTORY_META_2);
+    assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 2);
+    assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains(
+      attempt2LocalDirs[0].toFile().getPath()));
+    removeApplication(testApp);
+    try {
+      pushResolver.getMergedBlockDirs(testApp);
+    } catch (IllegalArgumentException e) {
+      assertEquals(e.getMessage(),
+        "application " + testApp + " is not registered or NM was restarted.");
+      throw e;
     }
   }
 
@@ -407,17 +493,18 @@ public class RemoteBlockPushResolverSuite {
     Semaphore deleted = new Semaphore(0);
     pushResolver = new RemoteBlockPushResolver(conf) {
       @Override
-      void deleteExecutorDirs(Path[] dirs) {
-        super.deleteExecutorDirs(dirs);
+      void deleteExecutorDirs(AppShuffleInfo appShuffleInfo) {
+        super.deleteExecutorDirs(appShuffleInfo);
         deleted.release();
       }
     };
+
     Path[] activeDirs = createLocalDirs(1);
-    registerExecutor(testApp, prepareLocalDirs(activeDirs));
+    registerExecutor(testApp, prepareLocalDirs(activeDirs, MERGE_DIRECTORY), 
MERGE_DIRECTORY_META);
     PushBlock[] pushBlocks = new PushBlock[] {
       new PushBlock(0, 0, 0, ByteBuffer.wrap(new byte[4]))};
-    pushBlockHelper(testApp, pushBlocks);
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, 0));
+    pushBlockHelper(testApp, NO_ATTEMPT_ID, pushBlocks);
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, 
NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(testApp, 0, 0);
     validateChunks(testApp, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}});
     String[] mergeDirs = pushResolver.getMergedBlockDirs(testApp);
@@ -435,7 +522,7 @@ public class RemoteBlockPushResolverSuite {
     useTestFiles(true, false);
     RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
       (RemoteBlockPushResolver.PushBlockStreamCallback) 
pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4]));
     callback1.onComplete(callback1.getID());
     RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = 
callback1.getPartitionInfo();
@@ -443,7 +530,7 @@ public class RemoteBlockPushResolverSuite {
     TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) 
partitionInfo.getIndexFile();
     testIndexFile.close();
     StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+      new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5]));
     // This will complete without any IOExceptions because number of 
IOExceptions are less than
     // the threshold but the update to index file will be unsuccessful.
@@ -452,12 +539,12 @@ public class RemoteBlockPushResolverSuite {
     // Restore the index stream so it can write successfully again.
     testIndexFile.restore();
     StreamCallbackWithID callback3 = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(TEST_APP, 0, 2, 0, 0));
+      new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0));
     callback3.onData(callback3.getID(), ByteBuffer.wrap(new byte[2]));
     callback3.onComplete(callback3.getID());
     assertEquals("index position", 24, testIndexFile.getPos());
     MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
-      new FinalizeShuffleMerge(TEST_APP, 0));
+      new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     validateMergeStatuses(statuses, new int[] {0}, new long[] {11});
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 7}, new int[][] 
{{0}, {1, 2}});
@@ -468,7 +555,7 @@ public class RemoteBlockPushResolverSuite {
     useTestFiles(true, false);
     RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
       (RemoteBlockPushResolver.PushBlockStreamCallback) 
pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4]));
     callback1.onComplete(callback1.getID());
     RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = 
callback1.getPartitionInfo();
@@ -476,7 +563,7 @@ public class RemoteBlockPushResolverSuite {
     TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) 
partitionInfo.getIndexFile();
     testIndexFile.close();
     StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+      new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5]));
     // This will complete without any IOExceptions because number of 
IOExceptions are less than
     // the threshold but the update to index file will be unsuccessful.
@@ -486,7 +573,7 @@ public class RemoteBlockPushResolverSuite {
     // Restore the index stream so it can write successfully again.
     testIndexFile.restore();
     MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
-      new FinalizeShuffleMerge(TEST_APP, 0));
+      new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     assertEquals("index position", 24, testIndexFile.getPos());
     validateMergeStatuses(statuses, new int[] {0}, new long[] {9});
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0);
@@ -498,7 +585,7 @@ public class RemoteBlockPushResolverSuite {
     useTestFiles(false, true);
     RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
       (RemoteBlockPushResolver.PushBlockStreamCallback) 
pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4]));
     callback1.onComplete(callback1.getID());
     RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = 
callback1.getPartitionInfo();
@@ -507,7 +594,7 @@ public class RemoteBlockPushResolverSuite {
     long metaPosBeforeClose = testMetaFile.getPos();
     testMetaFile.close();
     StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+      new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5]));
     // This will complete without any IOExceptions because number of 
IOExceptions are less than
     // the threshold but the update to index and meta file will be 
unsuccessful.
@@ -517,13 +604,13 @@ public class RemoteBlockPushResolverSuite {
     // Restore the meta stream so it can write successfully again.
     testMetaFile.restore();
     StreamCallbackWithID callback3 = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(TEST_APP, 0, 2, 0, 0));
+      new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0));
     callback3.onData(callback3.getID(), ByteBuffer.wrap(new byte[2]));
     callback3.onComplete(callback3.getID());
     assertEquals("index position", 24, partitionInfo.getIndexFile().getPos());
     assertTrue("meta position", testMetaFile.getPos() > metaPosBeforeClose);
     MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
-      new FinalizeShuffleMerge(TEST_APP, 0));
+      new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     validateMergeStatuses(statuses, new int[] {0}, new long[] {11});
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 7}, new int[][] 
{{0}, {1, 2}});
@@ -534,7 +621,7 @@ public class RemoteBlockPushResolverSuite {
     useTestFiles(false, true);
     RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
       (RemoteBlockPushResolver.PushBlockStreamCallback) 
pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4]));
     callback1.onComplete(callback1.getID());
     RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = 
callback1.getPartitionInfo();
@@ -543,7 +630,7 @@ public class RemoteBlockPushResolverSuite {
     long metaPosBeforeClose = testMetaFile.getPos();
     testMetaFile.close();
     StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+      new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5]));
     // This will complete without any IOExceptions because number of 
IOExceptions are less than
     // the threshold but the update to index and meta file will be 
unsuccessful.
@@ -554,7 +641,7 @@ public class RemoteBlockPushResolverSuite {
     // Restore the meta stream so it can write successfully again.
     testMetaFile.restore();
     MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
-      new FinalizeShuffleMerge(TEST_APP, 0));
+      new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     assertEquals("index position", 24, indexFile.getPos());
     assertTrue("meta position", testMetaFile.getPos() > metaPosBeforeClose);
     validateMergeStatuses(statuses, new int[] {0}, new long[] {9});
@@ -562,11 +649,11 @@ public class RemoteBlockPushResolverSuite {
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 5}, new int[][] 
{{0}, {1}});
   }
 
-  @Test (expected = RuntimeException.class)
+  @Test(expected = RuntimeException.class)
   public void testIOExceptionsExceededThreshold() throws IOException {
     RemoteBlockPushResolver.PushBlockStreamCallback callback =
       (RemoteBlockPushResolver.PushBlockStreamCallback) 
pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = 
callback.getPartitionInfo();
     callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4]));
     callback.onComplete(callback.getID());
@@ -575,7 +662,7 @@ public class RemoteBlockPushResolverSuite {
     for (int i = 1; i < 5; i++) {
       RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
         (RemoteBlockPushResolver.PushBlockStreamCallback) 
pushResolver.receiveBlockDataAsStream(
-          new PushBlockStream(TEST_APP, 0, i, 0, 0));
+          new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, i, 0, 0));
       try {
         callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[2]));
       } catch (IOException ioe) {
@@ -588,7 +675,7 @@ public class RemoteBlockPushResolverSuite {
     try {
       RemoteBlockPushResolver.PushBlockStreamCallback callback2 =
         (RemoteBlockPushResolver.PushBlockStreamCallback) 
pushResolver.receiveBlockDataAsStream(
-          new PushBlockStream(TEST_APP, 0, 5, 0, 0));
+          new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 5, 0, 0));
       callback2.onData(callback.getID(), ByteBuffer.wrap(new byte[1]));
     } catch (Throwable t) {
       assertEquals("IOExceptions exceeded the threshold when merging 
shufflePush_0_5_0",
@@ -597,12 +684,12 @@ public class RemoteBlockPushResolverSuite {
     }
   }
 
-  @Test (expected = RuntimeException.class)
+  @Test(expected = RuntimeException.class)
   public void testIOExceptionsDuringMetaUpdateIncreasesExceptionCount() throws 
IOException {
     useTestFiles(true, false);
     RemoteBlockPushResolver.PushBlockStreamCallback callback =
       (RemoteBlockPushResolver.PushBlockStreamCallback) 
pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = 
callback.getPartitionInfo();
     callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4]));
     callback.onComplete(callback.getID());
@@ -611,7 +698,7 @@ public class RemoteBlockPushResolverSuite {
     for (int i = 1; i < 5; i++) {
       RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
         (RemoteBlockPushResolver.PushBlockStreamCallback) 
pushResolver.receiveBlockDataAsStream(
-          new PushBlockStream(TEST_APP, 0, i, 0, 0));
+          new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, i, 0, 0));
       callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5]));
       // This will complete without any exceptions but the exception count is 
increased.
       callback1.onComplete(callback1.getID());
@@ -622,7 +709,7 @@ public class RemoteBlockPushResolverSuite {
     try {
       RemoteBlockPushResolver.PushBlockStreamCallback callback2 =
       (RemoteBlockPushResolver.PushBlockStreamCallback) 
pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 5, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 5, 0, 0));
       callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[4]));
       callback2.onComplete(callback2.getID());
     } catch (Throwable t) {
@@ -632,7 +719,7 @@ public class RemoteBlockPushResolverSuite {
     }
   }
 
-  @Test (expected = RuntimeException.class)
+  @Test(expected = RuntimeException.class)
   public void testRequestForAbortedShufflePartitionThrowsException() {
     try {
       testIOExceptionsDuringMetaUpdateIncreasesExceptionCount();
@@ -641,7 +728,7 @@ public class RemoteBlockPushResolverSuite {
     }
     try {
       pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 10, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 10, 0, 0));
     } catch (Throwable t) {
       assertEquals("IOExceptions exceeded the threshold when merging 
shufflePush_0_10_0",
         t.getMessage());
@@ -649,19 +736,19 @@ public class RemoteBlockPushResolverSuite {
     }
   }
 
-  @Test (expected = RuntimeException.class)
+  @Test(expected = RuntimeException.class)
   public void testPendingBlockIsAbortedImmediately() throws IOException {
     useTestFiles(true, false);
     RemoteBlockPushResolver.PushBlockStreamCallback callback =
       (RemoteBlockPushResolver.PushBlockStreamCallback) 
pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = 
callback.getPartitionInfo();
     TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) 
partitionInfo.getIndexFile();
     testIndexFile.close();
     for (int i = 1; i < 6; i++) {
       RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
         (RemoteBlockPushResolver.PushBlockStreamCallback) 
pushResolver.receiveBlockDataAsStream(
-          new PushBlockStream(TEST_APP, 0, i, 0, 0));
+          new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, i, 0, 0));
       try {
         callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5]));
         // This will complete without any exceptions but the exception count 
is increased.
@@ -682,19 +769,19 @@ public class RemoteBlockPushResolverSuite {
     }
   }
 
-  @Test (expected = RuntimeException.class)
+  @Test(expected = RuntimeException.class)
   public void testWritingPendingBufsIsAbortedImmediatelyDuringComplete() 
throws IOException {
     useTestFiles(true, false);
     RemoteBlockPushResolver.PushBlockStreamCallback callback =
       (RemoteBlockPushResolver.PushBlockStreamCallback) 
pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = 
callback.getPartitionInfo();
     TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) 
partitionInfo.getIndexFile();
     testIndexFile.close();
     for (int i = 1; i < 5; i++) {
       RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
         (RemoteBlockPushResolver.PushBlockStreamCallback) 
pushResolver.receiveBlockDataAsStream(
-          new PushBlockStream(TEST_APP, 0, i, 0, 0));
+          new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, i, 0, 0));
       try {
         callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5]));
         // This will complete without any exceptions but the exception count 
is increased.
@@ -706,7 +793,7 @@ public class RemoteBlockPushResolverSuite {
     assertEquals(4, partitionInfo.getNumIOExceptions());
     RemoteBlockPushResolver.PushBlockStreamCallback callback2 =
       (RemoteBlockPushResolver.PushBlockStreamCallback) 
pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 5, 0, 0));
+        new PushBlockStream(TEST_APP, 1, 0, 5, 0, 0));
     callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5]));
     // This is deferred
     callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4]));
@@ -738,10 +825,10 @@ public class RemoteBlockPushResolverSuite {
       new PushBlock(0, 0, 1, ByteBuffer.wrap(new byte[5])),
       new PushBlock(0, 1, 1, ByteBuffer.wrap(new byte[3]))
     };
-    pushBlockHelper(TEST_APP, pushBlocks);
+    pushBlockHelper(TEST_APP, NO_ATTEMPT_ID, pushBlocks);
     RemoteBlockPushResolver.PushBlockStreamCallback callback =
       (RemoteBlockPushResolver.PushBlockStreamCallback) 
pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 2, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0));
     callback.onData(callback.getID(), ByteBuffer.wrap(new byte[2]));
     callback.onComplete(callback.getID());
     RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = 
callback.getPartitionInfo();
@@ -749,7 +836,7 @@ public class RemoteBlockPushResolverSuite {
     // Close the index file so truncate throws IOException
     testIndexFile.close();
     MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
-      new FinalizeShuffleMerge(TEST_APP, 0));
+      new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     validateMergeStatuses(statuses, new int[] {1}, new long[] {8});
     MergedBlockMeta meta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 1);
     validateChunks(TEST_APP, 0, 1, meta, new int[]{5, 3}, new 
int[][]{{0},{1}});
@@ -758,46 +845,53 @@ public class RemoteBlockPushResolverSuite {
   @Test
   public void testOnFailureInvokedMoreThanOncePerBlock() throws IOException {
     StreamCallbackWithID stream1 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     stream1.onFailure(stream1.getID(), new RuntimeException("forced error"));
     StreamCallbackWithID stream2 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
1, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[5]));
     // On failure on stream1 gets invoked again and should cause no 
interference
     stream1.onFailure(stream1.getID(), new RuntimeException("2nd forced 
error"));
     StreamCallbackWithID stream3 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
3, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 3, 0, 0));
     // This should be deferred as stream 2 is still the active stream
     stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2]));
     // Stream 2 writes more and completes
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[4]));
     stream2.onComplete(stream2.getID());
     stream3.onComplete(stream3.getID());
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {9, 2}, new int[][] 
{{1},{3}});
     removeApplication(TEST_APP);
   }
 
-  @Test (expected = RuntimeException.class)
+  @Test(expected = RuntimeException.class)
   public void testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() 
throws IOException {
     StreamCallbackWithID stream1 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     StreamCallbackWithID stream1Duplicate =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     stream1.onComplete(stream1.getID());
     stream1Duplicate.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
 
     StreamCallbackWithID stream2 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
1, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[5]));
     // Should not change the current map id of the reduce partition
     stream1Duplicate.onFailure(stream2.getID(), new RuntimeException("forced 
error"));
 
     StreamCallbackWithID stream3 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 
2, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0));
     // This should be deferred as stream 2 is still the active stream
     stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2]));
     RuntimeException failedEx = null;
@@ -812,7 +906,7 @@ public class RemoteBlockPushResolverSuite {
     // Stream 2 writes more and completes
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[4]));
     stream2.onComplete(stream2.getID());
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 
NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 
0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {11}, new int[][] {{0, 
1}});
     removeApplication(TEST_APP);
@@ -821,20 +915,165 @@ public class RemoteBlockPushResolverSuite {
     }
   }
 
+  @Test(expected = IllegalArgumentException.class)
+  public void testPushBlockFromPreviousAttemptIsRejected()
+      throws IOException, InterruptedException {
+    Semaphore closed = new Semaphore(0);
+    pushResolver = new RemoteBlockPushResolver(conf) {
+      @Override
+      void closeAndDeletePartitionFilesIfNeeded(
+        AppShuffleInfo appShuffleInfo,
+        boolean cleanupLocalDirs) {
+        super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, 
cleanupLocalDirs);
+        closed.release();
+      }
+    };
+    String testApp = "testPushBlockFromPreviousAttemptIsRejected";
+    Path[] attempt1LocalDirs = createLocalDirs(1);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_1),
+      MERGE_DIRECTORY_META_1);
+    ByteBuffer[] blocks = new ByteBuffer[]{
+      ByteBuffer.wrap(new byte[4]),
+      ByteBuffer.wrap(new byte[5])
+    };
+    StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
+      new PushBlockStream(testApp, 1, 0, 0, 0, 0));
+    for (ByteBuffer block : blocks) {
+      stream1.onData(stream1.getID(), block);
+    }
+    stream1.onComplete(stream1.getID());
+    RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo =
+      pushResolver.validateAndGetAppShuffleInfo(testApp);
+    Map<Integer, Map<Integer, 
RemoteBlockPushResolver.AppShufflePartitionInfo>> partitions =
+      appShuffleInfo.getPartitions();
+    for (Map<Integer, RemoteBlockPushResolver.AppShufflePartitionInfo> 
partitionMap :
+        partitions.values()) {
+      for (RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo : 
partitionMap.values()) {
+        assertTrue(partitionInfo.getDataChannel().isOpen());
+        assertTrue(partitionInfo.getMetaFile().getChannel().isOpen());
+        assertTrue(partitionInfo.getIndexFile().getChannel().isOpen());
+      }
+    }
+    Path[] attempt2LocalDirs = createLocalDirs(2);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_2),
+      MERGE_DIRECTORY_META_2);
+    StreamCallbackWithID stream2 = pushResolver.receiveBlockDataAsStream(
+      new PushBlockStream(testApp, 2, 0, 1, 0, 0));
+    for (ByteBuffer block : blocks) {
+      stream2.onData(stream2.getID(), block);
+    }
+    stream2.onComplete(stream2.getID());
+    closed.acquire();
+    // Check if all the file channels created for the first attempt are safely 
closed.
+    for (Map<Integer, RemoteBlockPushResolver.AppShufflePartitionInfo> 
partitionMap :
+        partitions.values()) {
+      for (RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo : 
partitionMap.values()) {
+        assertFalse(partitionInfo.getDataChannel().isOpen());
+        assertFalse(partitionInfo.getMetaFile().getChannel().isOpen());
+        assertFalse(partitionInfo.getIndexFile().getChannel().isOpen());
+      }
+    }
+    try {
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(testApp, 1, 0, 1, 0, 0));
+    } catch (IllegalArgumentException re) {
+      assertEquals(
+        "The attempt id 1 in this PushBlockStream message does not match " +
+          "with the current attempt id 2 stored in shuffle service for 
application " +
+          testApp, re.getMessage());
+      throw re;
+    }
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testFinalizeShuffleMergeFromPreviousAttemptIsAborted()
+    throws IOException, InterruptedException {
+    String testApp = "testFinalizeShuffleMergeFromPreviousAttemptIsAborted";
+    Path[] attempt1LocalDirs = createLocalDirs(1);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_1),
+      MERGE_DIRECTORY_META_1);
+    ByteBuffer[] blocks = new ByteBuffer[]{
+      ByteBuffer.wrap(new byte[4]),
+      ByteBuffer.wrap(new byte[5])
+    };
+    StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
+      new PushBlockStream(testApp, 1, 0, 0, 0, 0));
+    for (ByteBuffer block : blocks) {
+      stream1.onData(stream1.getID(), block);
+    }
+    stream1.onComplete(stream1.getID());
+    Path[] attempt2LocalDirs = createLocalDirs(2);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_2),
+      MERGE_DIRECTORY_META_2);
+    try {
+      pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, 
ATTEMPT_ID_1, 0));
+    } catch (IllegalArgumentException e) {
+      assertEquals(e.getMessage(),
+        String.format("The attempt id %s in this FinalizeShuffleMerge message 
does not " +
+          "match with the current attempt id %s stored in shuffle service for 
application %s",
+          ATTEMPT_ID_1, ATTEMPT_ID_2, testApp));
+      throw e;
+    }
+  }
+
+  @Test(expected = ClosedChannelException.class)
+  public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted()
+    throws IOException, InterruptedException {
+    Semaphore closed = new Semaphore(0);
+    pushResolver = new RemoteBlockPushResolver(conf) {
+      @Override
+      void closeAndDeletePartitionFilesIfNeeded(
+        AppShuffleInfo appShuffleInfo,
+        boolean cleanupLocalDirs) {
+        super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, 
cleanupLocalDirs);
+        closed.release();
+      }
+    };
+    String testApp = "testOngoingMergeOfBlockFromPreviousAttemptIsAborted";
+    Path[] attempt1LocalDirs = createLocalDirs(1);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_1),
+      MERGE_DIRECTORY_META_1);
+    ByteBuffer[] blocks = new ByteBuffer[]{
+      ByteBuffer.wrap(new byte[4]),
+      ByteBuffer.wrap(new byte[5]),
+      ByteBuffer.wrap(new byte[6]),
+      ByteBuffer.wrap(new byte[7])
+    };
+    StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
+      new PushBlockStream(testApp, 1, 0, 0, 0, 0));
+    // The onData callback should be called 4 times here before the onComplete 
callback. But a
+    // register executor message arrives in shuffle service after the 2nd 
onData callback. The 3rd
+    // onData callback should all throw ClosedChannelException as their 
channels are closed.
+    stream1.onData(stream1.getID(), blocks[0]);
+    stream1.onData(stream1.getID(), blocks[1]);
+    Path[] attempt2LocalDirs = createLocalDirs(2);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + 
ATTEMPT_ID_2),
+      MERGE_DIRECTORY_META_2);
+    closed.acquire();
+    // Should throw ClosedChannelException here.
+    stream1.onData(stream1.getID(), blocks[3]);
+  }
+
   private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile) 
throws IOException {
     pushResolver = new RemoteBlockPushResolver(conf) {
       @Override
-      AppShufflePartitionInfo newAppShufflePartitionInfo(AppShuffleId 
appShuffleId, int reduceId,
-        File dataFile, File indexFile, File metaFile) throws IOException {
+      AppShufflePartitionInfo newAppShufflePartitionInfo(String appId, int 
shuffleId,
+          int reduceId, File dataFile, File indexFile, File metaFile) throws 
IOException {
         MergeShuffleFile mergedIndexFile = useTestIndexFile ? new 
TestMergeShuffleFile(indexFile)
           : new MergeShuffleFile(indexFile);
         MergeShuffleFile mergedMetaFile = useTestMetaFile ? new 
TestMergeShuffleFile(metaFile) :
           new MergeShuffleFile(metaFile);
-        return new AppShufflePartitionInfo(appShuffleId, reduceId, dataFile, 
mergedIndexFile,
+        return new AppShufflePartitionInfo(appId, shuffleId, reduceId, 
dataFile, mergedIndexFile,
           mergedMetaFile);
       }
     };
-    registerExecutor(TEST_APP, prepareLocalDirs(localDirs));
+    registerExecutor(TEST_APP, prepareLocalDirs(localDirs, MERGE_DIRECTORY), 
MERGE_DIRECTORY_META);
   }
 
   private Path[] createLocalDirs(int numLocalDirs) throws IOException {
@@ -846,16 +1085,15 @@ public class RemoteBlockPushResolverSuite {
     return localDirs;
   }
 
-  private void registerExecutor(String appId, String[] localDirs) throws 
IOException {
-    ExecutorShuffleInfo shuffleInfo = new ExecutorShuffleInfo(localDirs, 1, 
"mergedShuffle");
+  private void registerExecutor(String appId, String[] localDirs, String 
shuffleManagerMeta) {
+    ExecutorShuffleInfo shuffleInfo = new ExecutorShuffleInfo(localDirs, 1, 
shuffleManagerMeta);
     pushResolver.registerExecutor(appId, shuffleInfo);
   }
 
-  private String[] prepareLocalDirs(Path[] localDirs) throws IOException {
+  private String[] prepareLocalDirs(Path[] localDirs, String mergeDir) throws 
IOException {
     String[] blockMgrDirs = new String[localDirs.length];
     for (int i = 0; i< localDirs.length; i++) {
-      Files.createDirectories(localDirs[i].resolve(
-        RemoteBlockPushResolver.MERGE_MANAGER_DIR + File.separator + "00"));
+      Files.createDirectories(localDirs[i].resolve(mergeDir + File.separator + 
"00"));
       blockMgrDirs[i] = localDirs[i].toFile().getPath() + File.separator + 
BLOCK_MANAGER_DIR;
     }
     return blockMgrDirs;
@@ -898,10 +1136,12 @@ public class RemoteBlockPushResolverSuite {
 
   private void pushBlockHelper(
       String appId,
+      int attemptId,
       PushBlock[] blocks) throws IOException {
     for (int i = 0; i < blocks.length; i++) {
       StreamCallbackWithID stream = pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(appId, blocks[i].shuffleId, blocks[i].mapIndex, 
blocks[i].reduceId, 0));
+        new PushBlockStream(
+          appId, attemptId, blocks[i].shuffleId, blocks[i].mapIndex, 
blocks[i].reduceId, 0));
       stream.onData(stream.getID(), blocks[i].buffer);
       stream.onComplete(stream.getID());
     }
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ef47252..d11fa55 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -583,6 +583,7 @@ class SparkContext(config: SparkConf) extends Logging {
     _applicationId = _taskScheduler.applicationId()
     _applicationAttemptId = _taskScheduler.applicationAttemptId()
     _conf.set("spark.app.id", _applicationId)
+    _applicationAttemptId.foreach(attemptId => _conf.set(APP_ATTEMPT_ID, 
attemptId))
     if (_conf.get(UI_REVERSE_PROXY)) {
       val proxyUrl = _conf.get(UI_REVERSE_PROXY_URL.key, "").stripSuffix("/") +
         "/proxy/" + _applicationId
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 3ef964f..39c526c 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2244,4 +2244,14 @@ package object config {
       .stringConf
       .toSequence
       .createWithDefault(Nil)
+
+  private[spark] val APP_ATTEMPT_ID =
+    ConfigBuilder("spark.app.attempt.id")
+      .internal()
+      .doc("The application attempt Id assigned from Hadoop YARN. " +
+        "When the application runs in cluster mode on YARN, there can be " +
+        "multiple attempts before failing the application")
+      .version("3.2.0")
+      .stringConf
+      .createOptional
 }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 98d0949..43c7baf 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -535,10 +535,17 @@ private[spark] class BlockManager(
 
   private def registerWithExternalShuffleServer(): Unit = {
     logInfo("Registering executor with local external shuffle service.")
+    val shuffleManagerMeta =
+      if (Utils.isPushBasedShuffleEnabled(conf)) {
+        s"${shuffleManager.getClass.getName}:" +
+          s"${diskBlockManager.getMergeDirectoryAndAttemptIDJsonString()}}}"
+      } else {
+        shuffleManager.getClass.getName
+      }
     val shuffleConfig = new ExecutorShuffleInfo(
       diskBlockManager.localDirsString,
       diskBlockManager.subDirsPerLocalDir,
-      shuffleManager.getClass.getName)
+      shuffleManagerMeta)
 
     val MAX_ATTEMPTS = conf.get(config.SHUFFLE_REGISTRATION_MAX_ATTEMPTS)
     val SLEEP_TIME_SECS = 5
diff --git 
a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala 
b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index d49f43f..d92f686 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -21,11 +21,18 @@ import java.io.{File, IOException}
 import java.nio.file.Files
 import java.util.UUID
 
+import scala.collection.mutable.HashMap
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
 import org.apache.spark.SparkConf
 import org.apache.spark.executor.ExecutorExitCode
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.network.shuffle.ExecutorDiskUtils
-import org.apache.spark.storage.DiskBlockManager.MERGE_MANAGER_DIR
+import org.apache.spark.storage.DiskBlockManager.ATTEMPT_ID_KEY
+import org.apache.spark.storage.DiskBlockManager.MERGE_DIR_KEY
+import org.apache.spark.storage.DiskBlockManager.MERGE_DIRECTORY
 import org.apache.spark.util.{ShutdownHookManager, Utils}
 
 /**
@@ -57,6 +64,10 @@ private[spark] class DiskBlockManager(conf: SparkConf, var 
deleteFilesOnStop: Bo
   // of subDirs(i) is protected by the lock of subDirs(i)
   private val subDirs = Array.fill(localDirs.length)(new 
Array[File](subDirsPerLocalDir))
 
+  // Get merge directory name, append attemptId if there is any
+  private val mergeDirName =
+    s"$MERGE_DIRECTORY${conf.get(config.APP_ATTEMPT_ID).map(id => 
s"_$id").getOrElse("")}"
+
   // Create merge directories
   createLocalDirsForMergedShuffleBlocks()
 
@@ -200,12 +211,12 @@ private[spark] class DiskBlockManager(conf: SparkConf, 
var deleteFilesOnStop: Bo
       // Will create the merge_manager directory only if it doesn't exist 
under the local dir.
       Utils.getConfiguredLocalDirs(conf).foreach { rootDir =>
         try {
-          val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+          val mergeDir = new File(rootDir, mergeDirName)
           if (!mergeDir.exists()) {
             // This executor does not find merge_manager directory, it will 
try to create
             // the merge_manager directory and the sub directories.
             logDebug(s"Try to create $mergeDir and its sub dirs since the " +
-              s"$MERGE_MANAGER_DIR dir does not exist")
+              s"$mergeDirName dir does not exist")
             for (dirNum <- 0 until subDirsPerLocalDir) {
               val subDir = new File(mergeDir, "%02x".format(dirNum))
               if (!subDir.exists()) {
@@ -219,7 +230,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, var 
deleteFilesOnStop: Bo
         } catch {
           case e: IOException =>
             logError(
-              s"Failed to create $MERGE_MANAGER_DIR dir in $rootDir. Ignoring 
this directory.", e)
+              s"Failed to create $mergeDirName dir in $rootDir. Ignoring this 
directory.", e)
         }
       }
     }
@@ -264,6 +275,17 @@ private[spark] class DiskBlockManager(conf: SparkConf, var 
deleteFilesOnStop: Bo
     }
   }
 
+  def getMergeDirectoryAndAttemptIDJsonString(): String = {
+    val mergedMetaMap: HashMap[String, String] = new HashMap[String, String]()
+    mergedMetaMap.put(MERGE_DIR_KEY, mergeDirName)
+    conf.get(config.APP_ATTEMPT_ID).foreach(
+      attemptId => mergedMetaMap.put(ATTEMPT_ID_KEY, attemptId))
+    val mapper = new ObjectMapper()
+    mapper.registerModule(DefaultScalaModule)
+    val jsonString = mapper.writeValueAsString(mergedMetaMap)
+    jsonString
+  }
+
   private def addShutdownHook(): AnyRef = {
     logDebug("Adding shutdown hook") // force eager creation of logger
     
ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY
 + 1) { () =>
@@ -303,5 +325,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, var 
deleteFilesOnStop: Bo
 }
 
 private[spark] object DiskBlockManager {
-  private[spark] val MERGE_MANAGER_DIR = "merge_manager"
+  val MERGE_DIRECTORY = "merge_manager"
+  val MERGE_DIR_KEY = "mergeDir"
+  val ATTEMPT_ID_KEY = "attemptId"
 }
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index b1df7bd..89aa299 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2592,32 +2592,13 @@ private[spark] object Utils extends Logging {
 
   /**
    * Push based shuffle can only be enabled when the application is submitted
-   * to run in YARN mode, with external shuffle service enabled and
-   * spark.yarn.maxAttempts or the yarn cluster default max attempts is set to 
1.
-   * TODO: Remove the requirement on spark.yarn.maxAttempts after SPARK-35546
-   * Support push based shuffle with multiple app attempts
+   * to run in YARN mode, with external shuffle service enabled
    */
   def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = {
     conf.get(PUSH_BASED_SHUFFLE_ENABLED) &&
       (conf.get(IS_TESTING).getOrElse(false) ||
         (conf.get(SHUFFLE_SERVICE_ENABLED) &&
-          conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn" &&
-          getYarnMaxAttempts(conf) == 1))
-  }
-
-  /**
-   * Returns the maximum number of attempts to register the AM in YARN mode.
-   * TODO: Remove this method after SPARK-35546 Support push based shuffle
-   * with multiple app attempts
-   */
-  def getYarnMaxAttempts(conf: SparkConf): Int = {
-    val sparkMaxAttempts = 
conf.getOption("spark.yarn.maxAttempts").map(_.toInt)
-    val yarnMaxAttempts = getSparkOrYarnConfig(conf, 
YarnConfiguration.RM_AM_MAX_ATTEMPTS,
-      YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS.toString).toInt
-    sparkMaxAttempts match {
-      case Some(x) => if (x <= yarnMaxAttempts) x else yarnMaxAttempts
-      case None => yarnMaxAttempts
-    }
+          conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn"))
   }
 
   /**
diff --git 
a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 6397c96..0443c40 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -20,7 +20,10 @@ package org.apache.spark.storage
 import java.io.{File, FileWriter}
 import java.nio.file.{Files, Paths}
 import java.nio.file.attribute.PosixFilePermissions
+import java.util.HashMap
 
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.commons.io.FileUtils
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
@@ -91,11 +94,11 @@ class DiskBlockManagerSuite extends SparkFunSuite with 
BeforeAndAfterEach with B
   }
 
   test("should still create merge directories if one already exists under a 
local dir") {
-    val mergeDir0 = new File(rootDir0, DiskBlockManager.MERGE_MANAGER_DIR)
+    val mergeDir0 = new File(rootDir0, DiskBlockManager.MERGE_DIRECTORY)
     if (!mergeDir0.exists()) {
       Files.createDirectories(mergeDir0.toPath)
     }
-    val mergeDir1 = new File(rootDir1, DiskBlockManager.MERGE_MANAGER_DIR)
+    val mergeDir1 = new File(rootDir1, DiskBlockManager.MERGE_DIRECTORY)
     if (mergeDir1.exists()) {
       Utils.deleteRecursively(mergeDir1)
     }
@@ -104,7 +107,7 @@ class DiskBlockManagerSuite extends SparkFunSuite with 
BeforeAndAfterEach with B
     testConf.set(config.Tests.IS_TESTING, true)
     diskBlockManager = new DiskBlockManager(testConf, deleteFilesOnStop = true)
     assert(Utils.getConfiguredLocalDirs(testConf).map(
-      rootDir => new File(rootDir, DiskBlockManager.MERGE_MANAGER_DIR))
+      rootDir => new File(rootDir, DiskBlockManager.MERGE_DIRECTORY))
       .filter(mergeDir => mergeDir.exists()).length === 2)
     // mergeDir0 will be skipped as it already exists
     assert(mergeDir0.list().length === 0)
@@ -124,6 +127,20 @@ class DiskBlockManagerSuite extends SparkFunSuite with 
BeforeAndAfterEach with B
     FileUtils.deleteQuietly(testDir)
   }
 
+  test("Encode merged directory name and attemptId in shuffleManager field") {
+    testConf.set(config.APP_ATTEMPT_ID, "1");
+    diskBlockManager = new DiskBlockManager(testConf, deleteFilesOnStop = true)
+    val mergedShuffleMeta = 
diskBlockManager.getMergeDirectoryAndAttemptIDJsonString();
+    val mapper: ObjectMapper = new ObjectMapper
+    val typeRef: TypeReference[HashMap[String, String]] =
+      new TypeReference[HashMap[String, String]]() {}
+    val metaMap: HashMap[String, String] = mapper.readValue(mergedShuffleMeta, 
typeRef)
+    val mergeDir = metaMap.get(DiskBlockManager.MERGE_DIR_KEY)
+    assert(mergeDir.equals(DiskBlockManager.MERGE_DIRECTORY + "_1"))
+    val attemptId = metaMap.get(DiskBlockManager.ATTEMPT_ID_KEY)
+    assert(attemptId.equals("1"))
+  }
+
   def writeToFile(file: File, numBytes: Int): Unit = {
     val writer = new FileWriter(file, true)
     for (i <- 0 until numBytes) writer.write(i)
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index dba7e39..095dbef 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -1450,7 +1450,7 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
     conf.set("spark.yarn.maxAttempts", "1")
     assert(Utils.isPushBasedShuffleEnabled(conf) === true)
     conf.set("spark.yarn.maxAttempts", "2")
-    assert(Utils.isPushBasedShuffleEnabled(conf) === false)
+    assert(Utils.isPushBasedShuffleEnabled(conf) === true)
   }
 }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to