otterc commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r927276943


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1245,11 +1696,24 @@ int getNumIOExceptions() {
   /**
    * Wraps all the information related to the merge directory of an 
application.
    */
-  private static class AppPathsInfo {
+  public static class AppPathsInfo {
 
+    @JsonFormat(shape = JsonFormat.Shape.ARRAY)
+    @JsonProperty("activeLocalDirs")
     private final String[] activeLocalDirs;
+    @JsonProperty("subDirsPerLocalDir")
     private final int subDirsPerLocalDir;
 
+    @JsonCreator
+    public AppPathsInfo(
+      @JsonFormat(shape = JsonFormat.Shape.ARRAY)
+      @JsonProperty("activeLocalDirs") String[] activeLocalDirs,
+      @JsonProperty("subDirsPerLocalDir") int subDirsPerLocalDir
+      ) {
+      this.activeLocalDirs = activeLocalDirs;

Review Comment:
   Nit: indentation



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -886,12 +887,13 @@ public void 
testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws
   public void testPushBlockFromPreviousAttemptIsRejected()
       throws IOException, InterruptedException {
     Semaphore closed = new Semaphore(0);
-    pushResolver = new RemoteBlockPushResolver(conf) {
+    pushResolver = new RemoteBlockPushResolver(conf, null) {
       @Override
-      void closeAndDeletePartitionFilesIfNeeded(
-        AppShuffleInfo appShuffleInfo,
-        boolean cleanupLocalDirs) {
-        super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, 
cleanupLocalDirs);
+      void closeAndDeletePartitionsIfNeeded(
+          AppShuffleInfo appShuffleInfo,
+          boolean cleanupLocalDirs) {
+        super.closeAndDeletePartitionsIfNeeded(
+            appShuffleInfo, cleanupLocalDirs);

Review Comment:
   Nit: can fit in 1 line?



##########
resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala:
##########
@@ -44,14 +48,160 @@ object ShuffleTestAccessor {
     Option(resolver.executors.get(id))
   }
 
+  def getAppPathsInfo(
+      appId: String,
+      mergeManager: RemoteBlockPushResolver): Option[AppPathsInfo] = {
+    Option(mergeManager.appsShuffleInfo.get(appId)).flatMap(v => 
Option(v.getAppPathsInfo))
+  }
+
+  def getAppsShuffleInfo(
+    mergeManager: RemoteBlockPushResolver
+  ): ConcurrentMap[String, RemoteBlockPushResolver.AppShuffleInfo] = {
+    mergeManager.appsShuffleInfo
+  }
+
   def registeredExecutorFile(resolver: ExternalShuffleBlockResolver): File = {
     resolver.registeredExecutorFile
   }
 
+  def recoveryFile(mergeManager: RemoteBlockPushResolver): File = {
+    mergeManager.recoveryFile
+  }
+
   def shuffleServiceLevelDB(resolver: ExternalShuffleBlockResolver): DB = {
     resolver.db
   }
 
+  def mergeManagerLevelDB(mergeManager: RemoteBlockPushResolver): DB = {
+    mergeManager.db
+  }
+
+  def createMergeShuffleFileManagerForTestWithSynchronizedCleanup(
+      transportConf: TransportConf,
+      file: File): MergedShuffleFileManager = {
+    new RemoteBlockPushResolver(transportConf, file) {
+      override private[shuffle] def submitCleanupTask(task: Runnable): Unit = {
+        task.run()
+      }
+    }
+  }
+
+  def createMergeShuffleFileManagerForTestWithNoOpAppShuffleInfoDBCleanup(

Review Comment:
   Nit: It's a long name. Please remove `ForTest` and try to shorten it 



##########
resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala:
##########
@@ -44,14 +48,160 @@ object ShuffleTestAccessor {
     Option(resolver.executors.get(id))
   }
 
+  def getAppPathsInfo(
+      appId: String,
+      mergeManager: RemoteBlockPushResolver): Option[AppPathsInfo] = {
+    Option(mergeManager.appsShuffleInfo.get(appId)).flatMap(v => 
Option(v.getAppPathsInfo))
+  }
+
+  def getAppsShuffleInfo(
+    mergeManager: RemoteBlockPushResolver
+  ): ConcurrentMap[String, RemoteBlockPushResolver.AppShuffleInfo] = {
+    mergeManager.appsShuffleInfo
+  }
+
   def registeredExecutorFile(resolver: ExternalShuffleBlockResolver): File = {
     resolver.registeredExecutorFile
   }
 
+  def recoveryFile(mergeManager: RemoteBlockPushResolver): File = {
+    mergeManager.recoveryFile
+  }
+
   def shuffleServiceLevelDB(resolver: ExternalShuffleBlockResolver): DB = {
     resolver.db
   }
 
+  def mergeManagerLevelDB(mergeManager: RemoteBlockPushResolver): DB = {
+    mergeManager.db
+  }
+
+  def createMergeShuffleFileManagerForTestWithSynchronizedCleanup(
+      transportConf: TransportConf,
+      file: File): MergedShuffleFileManager = {
+    new RemoteBlockPushResolver(transportConf, file) {
+      override private[shuffle] def submitCleanupTask(task: Runnable): Unit = {
+        task.run()
+      }
+    }
+  }
+
+  def createMergeShuffleFileManagerForTestWithNoOpAppShuffleInfoDBCleanup(
+    transportConf: TransportConf,
+    file: File): MergedShuffleFileManager = {
+    new RemoteBlockPushResolver(transportConf, file) {
+      override private[shuffle] def removeAppShuffleInfoFromDB(
+          appShuffleInfo: RemoteBlockPushResolver.AppShuffleInfo): Unit = {
+        // NoOp
+      }
+      override private[shuffle] def submitCleanupTask(task: Runnable): Unit = {
+        task.run()
+      }
+    }
+  }
+
+  def createMergeShuffleFileManagerForTestWithNoDBCleanup(

Review Comment:
   nit: same here



##########
resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala:
##########
@@ -380,6 +550,472 @@ class YarnShuffleServiceSuite extends SparkFunSuite with 
Matchers {
     }
   }
 
+  test("Consistency in AppPathInfo between in-memory hashmap and the DB") {
+    s1 = new YarnShuffleService
+    s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
+    s1.init(yarnConfig)
+
+    val app1Id = ApplicationId.newInstance(0, 1)
+    val app1Data = makeAppInfo("user", app1Id)
+    s1.initializeApplication(app1Data)
+    val app2Attempt1Id = ApplicationId.newInstance(0, 2)
+    val app2Attempt1Data = makeAppInfo("user", app2Attempt1Id)
+    s1.initializeApplication(app2Attempt1Data)
+    val app2Attempt2Id = ApplicationId.newInstance(0, 2)
+    val app2Attempt2Data = makeAppInfo("user", app2Attempt2Id)
+    s1.initializeApplication(app2Attempt2Data)
+    val app3IdNoAttemptId = ApplicationId.newInstance(0, 3)
+    val app3NoAttemptIdData = makeAppInfo("user", app3IdNoAttemptId)
+    s1.initializeApplication(app3NoAttemptIdData)
+
+    val mergeMgrFile = s1.mergeManagerFile
+    mergeMgrFile should not be (null)
+    val mergedShuffleInfo1 =
+      new ExecutorShuffleInfo(
+        Array(new File(tempDir, "foo/foo").getAbsolutePath,
+          new File(tempDir, "bar/bar").getAbsolutePath), 3,
+        SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+    val mergedShuffleInfo2Attempt1 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, 
"bippy1/bippy1").getAbsolutePath),
+        5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+    val mergedShuffleInfo2Attempt2 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, 
"bippy2/bippy2").getAbsolutePath),
+        5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID2)
+    val mergedShuffleInfo3NoAttemptId =
+      new ExecutorShuffleInfo(
+        Array(new File(tempDir, "foo/foo").getAbsolutePath,
+          new File(tempDir, "bar/bar").getAbsolutePath),
+      4, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithNoAttemptID)
+
+    val localDirs1 = Array(new File(tempDir, 
"foo/merge_manager_1").getAbsolutePath,
+      new File(tempDir, "bar/merge_manager_1").getAbsolutePath)
+    val localDirs2Attempt1 = Array(new File(tempDir, 
"bippy1/merge_manager_1").getAbsolutePath)
+    val localDirs2Attempt2 = Array(new File(tempDir, 
"bippy2/merge_manager_2").getAbsolutePath)
+    val localDirs3NoAttempt = Array(new File(tempDir, 
"foo/merge_manager").getAbsolutePath,
+      new File(tempDir, "bar/merge_manager").getAbsolutePath)
+    val appPathsInfo1 = new AppPathsInfo(localDirs1, 3)
+    val appPathsInfo2Attempt1 = new AppPathsInfo(localDirs2Attempt1, 5)
+    val appPathsInfo2Attempt2 = new AppPathsInfo(localDirs2Attempt2, 5)
+    val appPathsInfo3NoAttempt = new AppPathsInfo(localDirs3NoAttempt, 4)
+
+    val mergeManager1 = 
s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+    val mergeManager1DB = 
ShuffleTestAccessor.mergeManagerLevelDB(mergeManager1)
+    ShuffleTestAccessor.recoveryFile(mergeManager1) should be (mergeMgrFile)
+
+    ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1).size() equals 0
+    ShuffleTestAccessor.reloadAppShuffleInfo(
+      mergeManager1, mergeManager1DB).size() equals 0
+
+    mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1)
+    var appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 1
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    var appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 1
+    appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+
+    mergeManager1.registerExecutor(app2Attempt1Id.toString, 
mergedShuffleInfo2Attempt1)
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 2
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    appShuffleInfo.get(
+      app2Attempt1Id.toString).getAppPathsInfo should be 
(appPathsInfo2Attempt1)
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 2
+    appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    appShuffleInfoAfterReload.get(
+      app2Attempt1Id.toString).getAppPathsInfo should be 
(appPathsInfo2Attempt1)
+
+    mergeManager1.registerExecutor(app3IdNoAttemptId.toString, 
mergedShuffleInfo3NoAttemptId)
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 3
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    appShuffleInfo.get(
+      app2Attempt1Id.toString).getAppPathsInfo should be 
(appPathsInfo2Attempt1)
+    appShuffleInfo.get(
+      app3IdNoAttemptId.toString).getAppPathsInfo should be 
(appPathsInfo3NoAttempt)
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 3
+    appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    appShuffleInfoAfterReload.get(
+      app2Attempt1Id.toString).getAppPathsInfo should be 
(appPathsInfo2Attempt1)
+    appShuffleInfoAfterReload.get(
+      app3IdNoAttemptId.toString).getAppPathsInfo should be 
(appPathsInfo3NoAttempt)
+
+    mergeManager1.registerExecutor(app2Attempt2Id.toString, 
mergedShuffleInfo2Attempt2)
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 3
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    appShuffleInfo.get(
+      app2Attempt2Id.toString).getAppPathsInfo should be 
(appPathsInfo2Attempt2)
+    appShuffleInfo.get(
+      app3IdNoAttemptId.toString).getAppPathsInfo should be 
(appPathsInfo3NoAttempt)
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 3
+    appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    appShuffleInfoAfterReload.get(
+      app2Attempt2Id.toString).getAppPathsInfo should be 
(appPathsInfo2Attempt2)
+    appShuffleInfoAfterReload.get(
+      app3IdNoAttemptId.toString).getAppPathsInfo should be 
(appPathsInfo3NoAttempt)
+
+    mergeManager1.applicationRemoved(app2Attempt2Id.toString, true)
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 2
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    assert(!appShuffleInfo.containsKey(app2Attempt2Id.toString))
+    appShuffleInfo.get(
+      app3IdNoAttemptId.toString).getAppPathsInfo should be 
(appPathsInfo3NoAttempt)
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 2
+    appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    assert(!appShuffleInfoAfterReload.containsKey(app2Attempt2Id.toString))
+    appShuffleInfoAfterReload.get(
+      app3IdNoAttemptId.toString).getAppPathsInfo should be 
(appPathsInfo3NoAttempt)
+
+    s1.stop()
+  }
+
+  test("Finalized merged shuffle are written into DB and cleaned up after 
application stopped") {
+    s1 = new YarnShuffleService
+    s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
+    s1.init(yarnConfig)
+
+    val app1Id = ApplicationId.newInstance(0, 1)
+    val app1Data = makeAppInfo("user", app1Id)
+    s1.initializeApplication(app1Data)
+    val app2Attempt1Id = ApplicationId.newInstance(0, 2)
+    val app2Attempt1Data = makeAppInfo("user", app2Attempt1Id)
+    s1.initializeApplication(app2Attempt1Data)
+
+    val mergeMgrFile = s1.mergeManagerFile
+    mergeMgrFile should not be (null)
+    val mergedShuffleInfo1 =
+      new ExecutorShuffleInfo(
+        Array(new File(tempDir, "foo/foo").getAbsolutePath,
+          new File(tempDir, "bar/bar").getAbsolutePath), 3,
+        SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+    val mergedShuffleInfo2Attempt1 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, 
"bippy1/bippy1").getAbsolutePath),
+        5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+
+    val localDirs1 = Array(new File(tempDir, 
"foo/merge_manager_1").getAbsolutePath,
+      new File(tempDir, "bar/merge_manager_1").getAbsolutePath)
+    val localDirs2Attempt1 = Array(new File(tempDir, 
"bippy1/merge_manager_1").getAbsolutePath)
+    val appPathsInfo1 = new AppPathsInfo(localDirs1, 3)
+    val appPathsInfo2Attempt1 = new AppPathsInfo(localDirs2Attempt1, 5)
+
+    val mergeManager1 = 
s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+    val mergeManager1DB = 
ShuffleTestAccessor.mergeManagerLevelDB(mergeManager1)
+    ShuffleTestAccessor.recoveryFile(mergeManager1) should be (mergeMgrFile)
+
+    ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1).size() equals 0
+    ShuffleTestAccessor.reloadAppShuffleInfo(
+      mergeManager1, mergeManager1DB).size() equals 0
+
+    mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1)
+    mergeManager1.registerExecutor(app2Attempt1Id.toString, 
mergedShuffleInfo2Attempt1)
+    val partitionId1 = new AppAttemptShuffleMergeId(app1Id.toString, 1, 1, 1)
+    val partitionId2 = new AppAttemptShuffleMergeId(app2Attempt1Id.toString, 
1, 2, 1)
+    prepareAppShufflePartition(mergeManager1, partitionId1, 1, "3")
+    prepareAppShufflePartition(mergeManager1, partitionId2, 2, "4")
+
+    var appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 2
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    appShuffleInfo.get(
+      app2Attempt1Id.toString).getAppPathsInfo should be 
(appPathsInfo2Attempt1)
+    assert(!appShuffleInfo.get(app1Id.toString).getShuffles.get(1).isFinalized)
+    
assert(!appShuffleInfo.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+    var appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 2
+    appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    appShuffleInfoAfterReload.get(
+      app2Attempt1Id.toString).getAppPathsInfo should be 
(appPathsInfo2Attempt1)
+    assert(appShuffleInfoAfterReload.get(app1Id.toString).getShuffles.isEmpty)
+    
assert(appShuffleInfoAfterReload.get(app2Attempt1Id.toString).getShuffles.isEmpty)
+
+    ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId1)
+    ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId2)
+
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    assert(appShuffleInfo.get(app1Id.toString).getShuffles.get(1).isFinalized)
+    
assert(appShuffleInfo.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    
assert(appShuffleInfoAfterReload.get(app1Id.toString).getShuffles.get(1).isFinalized)
+    
assert(appShuffleInfoAfterReload.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+
+    mergeManager1.applicationRemoved(app1Id.toString, true)
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 1
+    assert(!appShuffleInfo.containsKey(app1Id.toString))
+    
assert(appShuffleInfo.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 1
+    assert(!appShuffleInfoAfterReload.containsKey(app1Id.toString))
+    
assert(appShuffleInfoAfterReload.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+
+    s1.stop()
+  }
+
+  test("Dangling finalized merged partition info in DB will be removed during 
restart") {
+    s1 = new YarnShuffleService
+    s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
+    s1._conf = yarnConfig
+    yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, false)
+    val transportConf = new TransportConf("shuffle", new 
HadoopConfigProvider(yarnConfig))
+    val testShuffleMergeManager =
+      
ShuffleTestAccessor.createMergeShuffleFileManagerForTestWithNoOpAppShuffleInfoDBCleanup(

Review Comment:
   This seems to be repeated in multiple tests. Can this be moved to a helper 
method?



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -983,12 +985,13 @@ public void 
testFinalizeShuffleMergeFromPreviousAttemptIsAborted()
   public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted()
       throws IOException, InterruptedException {
     Semaphore closed = new Semaphore(0);
-    pushResolver = new RemoteBlockPushResolver(conf) {
+    pushResolver = new RemoteBlockPushResolver(conf, null) {
       @Override
-      void closeAndDeletePartitionFilesIfNeeded(
+      void closeAndDeletePartitionsIfNeeded(
           AppShuffleInfo appShuffleInfo,
           boolean cleanupLocalDirs) {
-        super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, 
cleanupLocalDirs);
+        super.closeAndDeletePartitionsIfNeeded(
+            appShuffleInfo, cleanupLocalDirs);

Review Comment:
   Nit: can fit in 1 line



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1305,7 +1791,8 @@ public ConcurrentMap<Integer, 
AppShuffleMergePartitionsInfo> getShuffles() {
      * @see [[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(
      *      org.apache.spark.storage.BlockId, scala.Option)]]
      */
-    private String getFilePath(String filename) {
+    @VisibleForTesting
+    String getFilePath(String filename) {
       // TODO: [SPARK-33236] Change the message when this service is able to 
handle NM restart

Review Comment:
   Remove this TODO



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -656,6 +787,236 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+  /**
+   * Close the DB during shutdown
+   */
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info 
and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  /**
+   * Write the application attempt's local path information to the DB
+   */
+  private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo 
appPathsInfo) {
+    if (db != null) {
+      AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId);
+      try {
+        byte[] key = getDbAppAttemptPathsKey(appAttemptId);
+        String valueStr = mapper.writeValueAsString(appPathsInfo);
+        byte[] value = valueStr.getBytes(StandardCharsets.UTF_8);
+        db.put(key, value);
+      } catch (Exception e) {
+        logger.error("Error saving registered app paths info for {}", 
appAttemptId, e);
+      }
+    }
+  }
+
+  /**
+   * Write the finalized shuffle merge partition information into the DB
+   */
+  private void writeAppAttemptShuffleMergeInfoToDB(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
+    if (db != null) {
+      // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
+      try{
+        byte[] dbKey = 
getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId);
+        db.put(dbKey, new byte[0]);
+      } catch (Exception e) {
+        logger.error("Error saving active app shuffle partition {}", 
appAttemptShuffleMergeId, e);
+      }
+    }
+  }
+
+  /**
+   * Parse the DB key with the prefix and the expected return value type
+   */
+  private <T> T parseDbKey(String key, String prefix, Class<T> valueType) 
throws IOException {
+    String json = key.substring(prefix.length() + 1);
+    return mapper.readValue(json, valueType);
+  }
+
+  /**
+   * Generate AppAttemptId from the DB key
+   */
+  private AppAttemptId parseDbAppAttemptPathsKey(String key) throws 
IOException {
+    return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
+  }
+
+  /**
+   * Generate AppAttemptShuffleMergeId from the DB key
+   */
+  private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(
+      String key) throws IOException {
+    return parseDbKey(
+        key, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX, 
AppAttemptShuffleMergeId.class);
+  }
+
+  /**
+   * Generate the DB key with the key object and the specified string prefix
+   */
+  private byte[] getDbKey(Object key, String prefix) throws IOException {
+    // We add a common prefix on all the keys so we can find them in the DB
+    String keyJsonString = prefix + DB_KEY_DELIMITER + 
mapper.writeValueAsString(key);
+    return keyJsonString.getBytes(StandardCharsets.UTF_8);
+  }
+
+  /**
+   * Generate the DB key from AppAttemptShuffleMergeId object
+   */
+  private byte[] getDbAppAttemptShufflePartitionKey(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId) throws IOException {
+    return getDbKey(appAttemptShuffleMergeId, 
APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+  }
+
+  /**
+   * Generate the DB key from AppAttemptId object
+   */
+  private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId) throws 
IOException {
+    return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX);
+  }
+
+  /**
+   * Reload the DB to recover the meta data stored in the hashmap for merged 
shuffles.
+   * The application attempts local paths information will be firstly 
reloaded, and then
+   * the finalized shuffle merges will be updated.
+   * This method will also try deleting dangling key/values in DB, which 
includes:
+   * 1) Outdated application attempt local paths information as of some DB 
deletion failures
+   * 2) The deletion of finalized shuffle merges are triggered asynchronously, 
there can be cases
+   * that deletions miss the execution during restart. These finalized shuffle 
merges should have
+   * no relevant application attempts local paths information registered in 
the DB and the hashmap.
+   */
+  @VisibleForTesting
+  void reloadAndCleanUpAppShuffleInfo(DB db) throws IOException {
+    logger.info("Reload applications merged shuffle information from DB");
+    List<byte[]> dbKeysToBeRemoved = new ArrayList<>();
+    dbKeysToBeRemoved.addAll(reloadActiveAppAttemptsPathInfo(db));
+    dbKeysToBeRemoved.addAll(reloadFinalizedAppAttemptsShuffleMergeInfo(db));
+    removeOutdatedKeyValuesInDB(dbKeysToBeRemoved);
+  }
+
+  /**
+   * Reload application attempts local paths information.
+   */
+  @VisibleForTesting
+  List<byte[]> reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
+    List<byte[]> dbKeysToBeRemoved = new ArrayList<>();
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> entry = itr.next();
+        String key = new String(entry.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+          break;
+        }
+        AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+        AppPathsInfo appPathsInfo = mapper.readValue(entry.getValue(), 
AppPathsInfo.class);
+        appsShuffleInfo.compute(appAttemptId.appId,
+            (appId, existingAppShuffleInfo) -> {
+              if (existingAppShuffleInfo == null ||
+                  existingAppShuffleInfo.attemptId < appAttemptId.attemptId) {
+                if (existingAppShuffleInfo != null) {
+                  AppAttemptId existingAppAttemptId = new AppAttemptId(
+                      existingAppShuffleInfo.appId, 
existingAppShuffleInfo.attemptId);
+                  try {
+                    // Add the former outdated DB key to deletion list
+                    
dbKeysToBeRemoved.add(getDbAppAttemptPathsKey(existingAppAttemptId));
+                  } catch (IOException e) {
+                    logger.error("Failed to get the DB key for {}", 
existingAppAttemptId, e);
+                  }
+                }
+                return new AppShuffleInfo(
+                    appAttemptId.appId, appAttemptId.attemptId, appPathsInfo);
+              } else {
+                // Add the current DB key to deletion list as it is outdated
+                dbKeysToBeRemoved.add(entry.getKey());
+                return existingAppShuffleInfo;
+              }
+            });
+      }
+    }
+    return dbKeysToBeRemoved;
+  }
+
+  /**
+   * Reload the finalized shuffle merges.
+   */
+  private List<byte[]> reloadFinalizedAppAttemptsShuffleMergeInfo(DB db) 
throws IOException {
+    List<byte[]> dbKeysToBeRemoved = new ArrayList<>();
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      
itr.seek(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> entry = itr.next();
+        String key = new String(entry.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX)) {
+          break;
+        }
+        AppAttemptShuffleMergeId partitionId = 
parseDbAppAttemptShufflePartitionKey(key);
+        logger.info("Reloading finalized shuffle info for partitionId {}", 
partitionId);

Review Comment:
   Should this be logged at info level?  Also for appAttemptPathInfo this log 
is missing



##########
common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java:
##########
@@ -230,11 +241,16 @@ protected void serviceInit(Configuration externalConf) 
throws Exception {
       // when it comes back
       if (_recoveryPath != null) {
         registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
+        mergeManagerFile = 
initRecoveryDb(SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME);
       }
 
       TransportConf transportConf = new TransportConf("shuffle", new 
HadoopConfigProvider(_conf));
-      MergedShuffleFileManager shuffleMergeManager = 
newMergedShuffleFileManagerInstance(
-        transportConf);
+      // Create new MergedShuffleFileManager if shuffleMergeManager is null.
+      // As in UT, a customized MergedShuffleFileManager will be created 
through

Review Comment:
   Nit: Please rephrase this. `This is because in the unit test ...`



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -1388,16 +1442,16 @@ private static class TestMergeShuffleFile extends 
MergeShuffleFile {
     private File file;
     private FileChannel channel;
 
-    private TestMergeShuffleFile(File file) throws IOException {
-      super(null, null);
+    private TestMergeShuffleFile(File file, boolean append) throws IOException 
{

Review Comment:
   We are not creating the file with append = true anymore right so this would 
always be false. So why is this needed?



##########
common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java:
##########
@@ -1412,9 +1466,17 @@ void close() throws IOException {
     }
 
     void restore() throws IOException {
+      restore(-1);
+    }
+
+    void restore(long pos) throws IOException {
       FileOutputStream fos = new FileOutputStream(file, true);
       channel = fos.getChannel();
       activeDos = new DataOutputStream(fos);
+      if (pos != -1) {
+        channel.position(pos);
+      }

Review Comment:
   Is `restore(long pos)` called with pos != -1?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1245,11 +1696,24 @@ int getNumIOExceptions() {
   /**
    * Wraps all the information related to the merge directory of an 
application.
    */
-  private static class AppPathsInfo {
+  public static class AppPathsInfo {

Review Comment:
   Is this visible for testing? If yes, then can you add that to the comment



##########
resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala:
##########
@@ -44,14 +48,160 @@ object ShuffleTestAccessor {
     Option(resolver.executors.get(id))
   }
 
+  def getAppPathsInfo(
+      appId: String,
+      mergeManager: RemoteBlockPushResolver): Option[AppPathsInfo] = {
+    Option(mergeManager.appsShuffleInfo.get(appId)).flatMap(v => 
Option(v.getAppPathsInfo))
+  }
+
+  def getAppsShuffleInfo(
+    mergeManager: RemoteBlockPushResolver
+  ): ConcurrentMap[String, RemoteBlockPushResolver.AppShuffleInfo] = {
+    mergeManager.appsShuffleInfo
+  }
+
   def registeredExecutorFile(resolver: ExternalShuffleBlockResolver): File = {
     resolver.registeredExecutorFile
   }
 
+  def recoveryFile(mergeManager: RemoteBlockPushResolver): File = {
+    mergeManager.recoveryFile
+  }
+
   def shuffleServiceLevelDB(resolver: ExternalShuffleBlockResolver): DB = {
     resolver.db
   }
 
+  def mergeManagerLevelDB(mergeManager: RemoteBlockPushResolver): DB = {
+    mergeManager.db
+  }
+
+  def createMergeShuffleFileManagerForTestWithSynchronizedCleanup(

Review Comment:
   Nit: I think `forTest` in the method name is unnecessary. You can add a 
comment to the method to describe it instead of adding it to the method name.\



##########
resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala:
##########
@@ -380,6 +550,472 @@ class YarnShuffleServiceSuite extends SparkFunSuite with 
Matchers {
     }
   }
 
+  test("Consistency in AppPathInfo between in-memory hashmap and the DB") {
+    s1 = new YarnShuffleService
+    s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
+    s1.init(yarnConfig)
+
+    val app1Id = ApplicationId.newInstance(0, 1)
+    val app1Data = makeAppInfo("user", app1Id)
+    s1.initializeApplication(app1Data)
+    val app2Attempt1Id = ApplicationId.newInstance(0, 2)
+    val app2Attempt1Data = makeAppInfo("user", app2Attempt1Id)
+    s1.initializeApplication(app2Attempt1Data)
+    val app2Attempt2Id = ApplicationId.newInstance(0, 2)
+    val app2Attempt2Data = makeAppInfo("user", app2Attempt2Id)
+    s1.initializeApplication(app2Attempt2Data)
+    val app3IdNoAttemptId = ApplicationId.newInstance(0, 3)
+    val app3NoAttemptIdData = makeAppInfo("user", app3IdNoAttemptId)
+    s1.initializeApplication(app3NoAttemptIdData)
+
+    val mergeMgrFile = s1.mergeManagerFile
+    mergeMgrFile should not be (null)
+    val mergedShuffleInfo1 =
+      new ExecutorShuffleInfo(
+        Array(new File(tempDir, "foo/foo").getAbsolutePath,
+          new File(tempDir, "bar/bar").getAbsolutePath), 3,
+        SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+    val mergedShuffleInfo2Attempt1 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, 
"bippy1/bippy1").getAbsolutePath),
+        5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+    val mergedShuffleInfo2Attempt2 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, 
"bippy2/bippy2").getAbsolutePath),
+        5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID2)
+    val mergedShuffleInfo3NoAttemptId =
+      new ExecutorShuffleInfo(
+        Array(new File(tempDir, "foo/foo").getAbsolutePath,
+          new File(tempDir, "bar/bar").getAbsolutePath),
+      4, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithNoAttemptID)
+
+    val localDirs1 = Array(new File(tempDir, 
"foo/merge_manager_1").getAbsolutePath,
+      new File(tempDir, "bar/merge_manager_1").getAbsolutePath)
+    val localDirs2Attempt1 = Array(new File(tempDir, 
"bippy1/merge_manager_1").getAbsolutePath)
+    val localDirs2Attempt2 = Array(new File(tempDir, 
"bippy2/merge_manager_2").getAbsolutePath)
+    val localDirs3NoAttempt = Array(new File(tempDir, 
"foo/merge_manager").getAbsolutePath,
+      new File(tempDir, "bar/merge_manager").getAbsolutePath)
+    val appPathsInfo1 = new AppPathsInfo(localDirs1, 3)
+    val appPathsInfo2Attempt1 = new AppPathsInfo(localDirs2Attempt1, 5)
+    val appPathsInfo2Attempt2 = new AppPathsInfo(localDirs2Attempt2, 5)
+    val appPathsInfo3NoAttempt = new AppPathsInfo(localDirs3NoAttempt, 4)
+
+    val mergeManager1 = 
s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+    val mergeManager1DB = 
ShuffleTestAccessor.mergeManagerLevelDB(mergeManager1)
+    ShuffleTestAccessor.recoveryFile(mergeManager1) should be (mergeMgrFile)
+
+    ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1).size() equals 0
+    ShuffleTestAccessor.reloadAppShuffleInfo(
+      mergeManager1, mergeManager1DB).size() equals 0
+
+    mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1)
+    var appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 1
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    var appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 1
+    appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+
+    mergeManager1.registerExecutor(app2Attempt1Id.toString, 
mergedShuffleInfo2Attempt1)
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 2
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    appShuffleInfo.get(
+      app2Attempt1Id.toString).getAppPathsInfo should be 
(appPathsInfo2Attempt1)
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 2
+    appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    appShuffleInfoAfterReload.get(
+      app2Attempt1Id.toString).getAppPathsInfo should be 
(appPathsInfo2Attempt1)
+
+    mergeManager1.registerExecutor(app3IdNoAttemptId.toString, 
mergedShuffleInfo3NoAttemptId)
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 3
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    appShuffleInfo.get(
+      app2Attempt1Id.toString).getAppPathsInfo should be 
(appPathsInfo2Attempt1)
+    appShuffleInfo.get(
+      app3IdNoAttemptId.toString).getAppPathsInfo should be 
(appPathsInfo3NoAttempt)
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 3
+    appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    appShuffleInfoAfterReload.get(
+      app2Attempt1Id.toString).getAppPathsInfo should be 
(appPathsInfo2Attempt1)
+    appShuffleInfoAfterReload.get(
+      app3IdNoAttemptId.toString).getAppPathsInfo should be 
(appPathsInfo3NoAttempt)
+
+    mergeManager1.registerExecutor(app2Attempt2Id.toString, 
mergedShuffleInfo2Attempt2)
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 3
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    appShuffleInfo.get(
+      app2Attempt2Id.toString).getAppPathsInfo should be 
(appPathsInfo2Attempt2)
+    appShuffleInfo.get(
+      app3IdNoAttemptId.toString).getAppPathsInfo should be 
(appPathsInfo3NoAttempt)
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 3
+    appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    appShuffleInfoAfterReload.get(
+      app2Attempt2Id.toString).getAppPathsInfo should be 
(appPathsInfo2Attempt2)
+    appShuffleInfoAfterReload.get(
+      app3IdNoAttemptId.toString).getAppPathsInfo should be 
(appPathsInfo3NoAttempt)
+
+    mergeManager1.applicationRemoved(app2Attempt2Id.toString, true)
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 2
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    assert(!appShuffleInfo.containsKey(app2Attempt2Id.toString))
+    appShuffleInfo.get(
+      app3IdNoAttemptId.toString).getAppPathsInfo should be 
(appPathsInfo3NoAttempt)
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 2
+    appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    assert(!appShuffleInfoAfterReload.containsKey(app2Attempt2Id.toString))
+    appShuffleInfoAfterReload.get(
+      app3IdNoAttemptId.toString).getAppPathsInfo should be 
(appPathsInfo3NoAttempt)
+
+    s1.stop()
+  }
+
+  test("Finalized merged shuffle are written into DB and cleaned up after 
application stopped") {
+    s1 = new YarnShuffleService
+    s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
+    s1.init(yarnConfig)
+
+    val app1Id = ApplicationId.newInstance(0, 1)
+    val app1Data = makeAppInfo("user", app1Id)
+    s1.initializeApplication(app1Data)
+    val app2Attempt1Id = ApplicationId.newInstance(0, 2)
+    val app2Attempt1Data = makeAppInfo("user", app2Attempt1Id)
+    s1.initializeApplication(app2Attempt1Data)
+
+    val mergeMgrFile = s1.mergeManagerFile
+    mergeMgrFile should not be (null)
+    val mergedShuffleInfo1 =
+      new ExecutorShuffleInfo(
+        Array(new File(tempDir, "foo/foo").getAbsolutePath,
+          new File(tempDir, "bar/bar").getAbsolutePath), 3,
+        SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+    val mergedShuffleInfo2Attempt1 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, 
"bippy1/bippy1").getAbsolutePath),
+        5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+
+    val localDirs1 = Array(new File(tempDir, 
"foo/merge_manager_1").getAbsolutePath,
+      new File(tempDir, "bar/merge_manager_1").getAbsolutePath)
+    val localDirs2Attempt1 = Array(new File(tempDir, 
"bippy1/merge_manager_1").getAbsolutePath)
+    val appPathsInfo1 = new AppPathsInfo(localDirs1, 3)
+    val appPathsInfo2Attempt1 = new AppPathsInfo(localDirs2Attempt1, 5)
+
+    val mergeManager1 = 
s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+    val mergeManager1DB = 
ShuffleTestAccessor.mergeManagerLevelDB(mergeManager1)
+    ShuffleTestAccessor.recoveryFile(mergeManager1) should be (mergeMgrFile)
+
+    ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1).size() equals 0
+    ShuffleTestAccessor.reloadAppShuffleInfo(
+      mergeManager1, mergeManager1DB).size() equals 0
+
+    mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1)
+    mergeManager1.registerExecutor(app2Attempt1Id.toString, 
mergedShuffleInfo2Attempt1)
+    val partitionId1 = new AppAttemptShuffleMergeId(app1Id.toString, 1, 1, 1)
+    val partitionId2 = new AppAttemptShuffleMergeId(app2Attempt1Id.toString, 
1, 2, 1)
+    prepareAppShufflePartition(mergeManager1, partitionId1, 1, "3")
+    prepareAppShufflePartition(mergeManager1, partitionId2, 2, "4")
+
+    var appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 2
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    appShuffleInfo.get(
+      app2Attempt1Id.toString).getAppPathsInfo should be 
(appPathsInfo2Attempt1)
+    assert(!appShuffleInfo.get(app1Id.toString).getShuffles.get(1).isFinalized)
+    
assert(!appShuffleInfo.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+    var appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 2
+    appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    appShuffleInfoAfterReload.get(
+      app2Attempt1Id.toString).getAppPathsInfo should be 
(appPathsInfo2Attempt1)
+    assert(appShuffleInfoAfterReload.get(app1Id.toString).getShuffles.isEmpty)
+    
assert(appShuffleInfoAfterReload.get(app2Attempt1Id.toString).getShuffles.isEmpty)
+
+    ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId1)
+    ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId2)
+
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    assert(appShuffleInfo.get(app1Id.toString).getShuffles.get(1).isFinalized)
+    
assert(appShuffleInfo.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    
assert(appShuffleInfoAfterReload.get(app1Id.toString).getShuffles.get(1).isFinalized)
+    
assert(appShuffleInfoAfterReload.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+
+    mergeManager1.applicationRemoved(app1Id.toString, true)
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 1
+    assert(!appShuffleInfo.containsKey(app1Id.toString))
+    
assert(appShuffleInfo.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 1
+    assert(!appShuffleInfoAfterReload.containsKey(app1Id.toString))
+    
assert(appShuffleInfoAfterReload.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+
+    s1.stop()
+  }
+
+  test("Dangling finalized merged partition info in DB will be removed during 
restart") {
+    s1 = new YarnShuffleService
+    s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
+    s1._conf = yarnConfig
+    yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, false)
+    val transportConf = new TransportConf("shuffle", new 
HadoopConfigProvider(yarnConfig))
+    val testShuffleMergeManager =
+      
ShuffleTestAccessor.createMergeShuffleFileManagerForTestWithNoOpAppShuffleInfoDBCleanup(
+        transportConf,
+        
s1.initRecoveryDb(YarnShuffleService.SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME))
+    s1.setShuffleMergeManager(testShuffleMergeManager)
+    s1.init(yarnConfig)
+
+    val app1Id = ApplicationId.newInstance(0, 1)
+    val app1Data = makeAppInfo("user", app1Id)
+    s1.initializeApplication(app1Data)
+    val app2Id = ApplicationId.newInstance(0, 2)
+    val app2Attempt1Data = makeAppInfo("user", app2Id)
+    s1.initializeApplication(app2Attempt1Data)
+
+    val mergeMgrFile = s1.mergeManagerFile
+    mergeMgrFile should not be (null)
+    val mergedShuffleInfo1 =
+      new ExecutorShuffleInfo(
+        Array(new File(tempDir, "foo/foo").getAbsolutePath,
+          new File(tempDir, "bar/bar").getAbsolutePath), 3,
+        SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+    val mergedShuffleInfo2Attempt1 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, 
"bippy1/bippy1").getAbsolutePath),
+        5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+
+    val localDirs1 = Array(new File(tempDir, 
"foo/merge_manager_1").getAbsolutePath,
+      new File(tempDir, "bar/merge_manager_1").getAbsolutePath)
+    val localDirs2Attempt1 = Array(new File(tempDir, 
"bippy1/merge_manager_1").getAbsolutePath)
+    val localDirs2Attempt2 = Array(new File(tempDir, 
"bippy2/merge_manager_2").getAbsolutePath)
+    val appPathsInfo1 = new AppPathsInfo(localDirs1, 3)
+    val appPathsInfo2Attempt1 = new AppPathsInfo(localDirs2Attempt1, 5)
+
+    val mergeManager1 = 
s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+    val mergeManager1DB = 
ShuffleTestAccessor.mergeManagerLevelDB(mergeManager1)
+    ShuffleTestAccessor.recoveryFile(mergeManager1) should be (mergeMgrFile)
+
+    mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1)
+    mergeManager1.registerExecutor(app2Id.toString, mergedShuffleInfo2Attempt1)
+    val partitionId1 = new AppAttemptShuffleMergeId(app1Id.toString, 1, 1, 1)
+    val partitionId2 = new AppAttemptShuffleMergeId(app2Id.toString, 1, 2, 1)
+    prepareAppShufflePartition(mergeManager1, partitionId1, 1, "3")
+    prepareAppShufflePartition(mergeManager1, partitionId2, 2, "4")
+
+    var appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 2
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    appShuffleInfo.get(
+      app2Id.toString).getAppPathsInfo should be (appPathsInfo2Attempt1)
+    assert(!appShuffleInfo.get(app1Id.toString).getShuffles.get(1).isFinalized)
+    assert(!appShuffleInfo.get(app2Id.toString).getShuffles.get(2).isFinalized)
+
+    ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId1)
+    ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId2)
+
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    assert(appShuffleInfo.get(app1Id.toString).getShuffles.get(1).isFinalized)
+    assert(appShuffleInfo.get(app2Id.toString).getShuffles.get(2).isFinalized)
+    var appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    
assert(appShuffleInfoAfterReload.get(app1Id.toString).getShuffles.get(1).isFinalized)
+    
assert(appShuffleInfoAfterReload.get(app2Id.toString).getShuffles.get(2).isFinalized)
+
+    // The applicationRemove will not clean up the finalized merged shuffle 
partition in DB
+    // as of the NoOp mergedShuffleFileManager removeAppShuffleInfoFromDB 
method
+    mergeManager1.applicationRemoved(app1Id.toString, true)
+
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 1
+    assert(!appShuffleInfo.containsKey(app1Id.toString))
+    assert(appShuffleInfo.get(app2Id.toString).getShuffles.get(2).isFinalized)
+    // Clear the AppsShuffleInfo hashmap and reload the hashmap from DB
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 1
+    assert(!appShuffleInfoAfterReload.containsKey(app1Id.toString))
+    
assert(appShuffleInfoAfterReload.get(app2Id.toString).getShuffles.get(2).isFinalized)
+
+    // Register application app1Id again and reload the DB again
+    mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1)
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 2
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be 
(appPathsInfo1)
+    assert(appShuffleInfo.get(app1Id.toString).getShuffles.isEmpty)
+    assert(appShuffleInfo.get(app2Id.toString).getShuffles.get(2).isFinalized)
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    // The merged partition information for App1 should be empty as they have 
been removed from DB
+    assert(appShuffleInfoAfterReload.get(app1Id.toString).getShuffles.isEmpty)
+    
assert(appShuffleInfoAfterReload.get(app2Id.toString).getShuffles.get(2).isFinalized)
+
+    s1.stop()
+  }
+
+  test("Dangling application attempt local path information in DB will be 
removed during restart") {
+    s1 = new YarnShuffleService
+    s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
+    s1._conf = yarnConfig
+    yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, false)
+    val transportConf = new TransportConf("shuffle", new 
HadoopConfigProvider(yarnConfig))
+    val testShuffleMergeManager =
+      ShuffleTestAccessor.createMergeShuffleFileManagerForTestWithNoDBCleanup(
+        transportConf,
+        
s1.initRecoveryDb(YarnShuffleService.SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME))
+    s1.setShuffleMergeManager(testShuffleMergeManager)
+    s1.init(yarnConfig)
+
+    val app1Id = ApplicationId.newInstance(0, 2)
+    val app1Attempt1Data = makeAppInfo("user", app1Id)
+    s1.initializeApplication(app1Attempt1Data)
+
+    val mergeMgrFile = s1.mergeManagerFile
+    mergeMgrFile should not be (null)
+
+    val mergedShuffleInfo1Attempt1 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, 
"bippy1/bippy1").getAbsolutePath),
+        5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+    val mergedShuffleInfo1Attempt2 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, 
"bippy2/bippy2").getAbsolutePath),
+        5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID2)
+
+    val localDirs1Attempt1 = Array(new File(tempDir, 
"bippy1/merge_manager_1").getAbsolutePath)
+    val localDirs1Attempt2 = Array(new File(tempDir, 
"bippy2/merge_manager_2").getAbsolutePath)
+    val appPathsInfo1Attempt1 = new AppPathsInfo(localDirs1Attempt1, 5)
+    val appPathsInfo1Attempt2 = new AppPathsInfo(localDirs1Attempt2, 5)
+
+    val mergeManager1 = 
s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+    ShuffleTestAccessor.recoveryFile(mergeManager1) should be (mergeMgrFile)
+
+    mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1Attempt1)
+    val partitionId1 = new AppAttemptShuffleMergeId(app1Id.toString, 1, 2, 1)
+    prepareAppShufflePartition(mergeManager1, partitionId1, 2, "4")
+
+    var appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 1
+    appShuffleInfo.get(
+      app1Id.toString).getAppPathsInfo should be (appPathsInfo1Attempt1)
+    assert(!appShuffleInfo.get(app1Id.toString).getShuffles.get(2).isFinalized)
+    ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId1)
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    assert(appShuffleInfo.get(app1Id.toString).getShuffles.get(2).isFinalized)
+
+    // Register Attempt 2
+    mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1Attempt2)
+    val partitionId2 = new AppAttemptShuffleMergeId(app1Id.toString, 2, 2, 1)
+    prepareAppShufflePartition(mergeManager1, partitionId2, 2, "4")
+
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 1
+    appShuffleInfo.get(
+      app1Id.toString).getAppPathsInfo should be (appPathsInfo1Attempt2)
+    assert(!appShuffleInfo.get(app1Id.toString).getShuffles.get(2).isFinalized)
+    ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId2)
+    assert(appShuffleInfo.get(app1Id.toString).getShuffles.get(2).isFinalized)
+
+    // now we pretend the shuffle service goes down, since the DB deletion are 
NoOp,
+    // it should have multiple app attempt local paths info and finalized 
merge info
+    s1.stop()
+    // Yarn Shuffle service comes back up without custom mergeManager
+    s2 = new YarnShuffleService
+    s2.setRecoveryPath(new Path(recoveryLocalDir.toURI))
+    s2.init(yarnConfig)
+    s2.mergeManagerFile should be (mergeMgrFile)
+
+    val mergeManager2 = 
s2.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager2)
+    appShuffleInfo.size() equals 1
+    appShuffleInfo.get(
+      app1Id.toString).getAppPathsInfo should be (appPathsInfo1Attempt2)
+    assert(appShuffleInfo.get(app1Id.toString).getShuffles.get(2).isFinalized)
+
+    s2.stop()
+  }
+
+  test("Cleanup for former attempts local path info should be triggered in 
applicationRemoved") {
+    s1 = new YarnShuffleService
+    s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
+    s1._conf = yarnConfig
+    yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, false)
+    val transportConf = new TransportConf("shuffle", new 
HadoopConfigProvider(yarnConfig))
+    val testShuffleMergeManager =
+      ShuffleTestAccessor.createMergeShuffleFileManagerForTestWithNoDBCleanup(
+        transportConf,
+        
s1.initRecoveryDb(YarnShuffleService.SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME))
+    s1.setShuffleMergeManager(testShuffleMergeManager)
+    s1.init(yarnConfig)
+
+    val app1Id = ApplicationId.newInstance(0, 1)
+    val app1Attempt1Data = makeAppInfo("user", app1Id)
+    s1.initializeApplication(app1Attempt1Data)
+
+    val mergeMgrFile = s1.mergeManagerFile
+    mergeMgrFile should not be (null)
+
+    val mergedShuffleInfo1Attempt1 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, 
"bippy1/bippy1").getAbsolutePath),
+        5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+    val mergedShuffleInfo1Attempt2 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, 
"bippy2/bippy2").getAbsolutePath),
+        5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID2)
+
+    val localDirs1Attempt1 = Array(new File(tempDir, 
"bippy1/merge_manager_1").getAbsolutePath)
+    val localDirs1Attempt2 = Array(new File(tempDir, 
"bippy2/merge_manager_2").getAbsolutePath)
+    val appPathsInfo1Attempt1 = new AppPathsInfo(localDirs1Attempt1, 5)

Review Comment:
   Is this used anywhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to