otterc commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r903254647


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -993,6 +1311,42 @@ AppShufflePartitionInfo getPartitionInfo() {
     }
   }
 
+  /**
+   * Simply encodes an application attempt ID.

Review Comment:
   Nit: Remove `Simply`



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1021,12 +1375,55 @@ public boolean isFinalized() {
     }
   }
 
+  /**
+   * Simply encodes an application attempt shuffle merge ID.

Review Comment:
   Nit: Remove `Simply`



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -632,6 +737,14 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
           appsShuffleInfo.compute(appId, (id, appShuffleInfo) -> {
             if (appShuffleInfo == null || attemptId > 
appShuffleInfo.attemptId) {
               originalAppShuffleInfo.set(appShuffleInfo);
+              AppPathsInfo appPathsInfo = new AppPathsInfo(appId, 
executorInfo.localDirs,
+                  mergeDir, executorInfo.subDirsPerLocalDir);
+              // Clean up the outdated App Attempt local path info in the DB 
and
+              // put the newly registered local path info from newer attempt 
into the DB.
+              if (appShuffleInfo != null) {
+                removeAppAttemptPathInfoFromDB(new AppAttemptId(appId, 
appShuffleInfo.attemptId));
+              }
+              writeAppPathsInfoToDb(appId, attemptId, appPathsInfo);

Review Comment:
   > If we have to guarantee the success of DB removal, but it fails, should we 
fail the new app attempt Executor registration here? I feel the later one is a 
little bit overkill. WDYT?
   
   I don't think it is an overkill. We can have one method that does something 
like this:
   ```
   writeAppsPathInfoAndDeleteOlder(olderAttempt, newAttempt) {
      try{
          deleteOldAttempt
          writeNewAttempt
        } catch (IOException) {
          ....
        }
   ```
   If delete fails, write will never be executed.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -656,6 +771,206 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+  /**
+   * Close the DB during shutdown
+   */
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info 
and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  /**
+   * Write the application attempt's local path information to the DB
+   */
+  private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo 
appPathsInfo) {
+    if (db != null) {
+      try {
+        byte[] key = getDbAppAttemptPathsKey(new AppAttemptId(appId, 
attemptId));
+        String valueStr = mapper.writeValueAsString(appPathsInfo);
+        byte[] value = valueStr.getBytes(StandardCharsets.UTF_8);
+        db.put(key, value);
+      } catch (Exception e) {
+        logger.error("Error saving registered app paths info", e);
+      }
+    }
+  }
+
+  /**
+   * Write the finalized shuffle merge partition information into the DB
+   */
+  private void writeAppAttemptShuffleMergeInfoToDB(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
+    if (db != null) {
+      // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
+      try{
+        byte[] dbKey = 
getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId);
+        db.put(dbKey, new byte[0]);
+      } catch (Exception e) {
+        logger.error("Error saving active app shuffle partition", e);
+      }
+    }
+
+  }
+
+  /**
+   * Parse the DB key with the prefix and the expected return value type
+   */
+  private <T> T parseDbKey(String key, String prefix, Class<T> valueType) 
throws IOException {
+    String json = key.substring(prefix.length() + 1);
+    return mapper.readValue(json, valueType);
+  }
+
+  /**
+   * Generate AppAttemptId from the DB key
+   */
+  private AppAttemptId parseDbAppAttemptPathsKey(String key) throws 
IOException {
+    return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
+  }
+
+  /**
+   * Generate AppAttemptShuffleMergeId from the DB key
+   */
+  private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(
+      String key) throws IOException {
+    return parseDbKey(
+        key, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX, 
AppAttemptShuffleMergeId.class);
+  }
+
+  /**
+   * Generate the DB key with the key object and the specified string prefix
+   */
+  private byte[] getDbKey(Object key, String prefix) {
+    // We add a common prefix on all the keys so we can find them in the DB
+    try {
+      String keyJsonString = prefix + DB_KEY_DELIMITER + 
mapper.writeValueAsString(key);
+      return keyJsonString.getBytes(StandardCharsets.UTF_8);
+    } catch (Exception exception) {
+      logger.error("Exception while generating the DB key {}", key);
+      return null;
+    }
+  }
+
+  /**
+   * Generate the DB key from AppAttemptShuffleMergeId object
+   */
+  private byte[] getDbAppAttemptShufflePartitionKey(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
+    return getDbKey(appAttemptShuffleMergeId, 
APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+  }
+
+  /**
+   * Generate the DB key from AppAttemptId object
+   */
+  private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId){
+    return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX);
+  }
+
+  /**
+   * Reload the DB to recover the meta data stored in the hashmap for merged 
shuffles.
+   * The application attempts local paths information will be firstly 
reloaded, and then
+   * the finalized shuffle merges will be updated.
+   */
+  @VisibleForTesting
+  void reloadAndCleanUpAppShuffleInfo(DB db) throws IOException {
+    logger.info("Reload applications merged shuffle information from DB");
+    reloadActiveAppAttemptsPathInfo(db);
+    reloadFinalizedAppAttemptsShuffleMergeInfo(db);
+  }
+
+  /**
+   * Reload application attempts local paths information.
+   */
+  private void reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> e = itr.next();
+        String key = new String(e.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+          break;
+        }
+        AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+        AppPathsInfo appPathsInfo = mapper.readValue(e.getValue(), 
AppPathsInfo.class);
+        appsShuffleInfo.put(appAttemptId.appId, new AppShuffleInfo(
+            appAttemptId.appId, appAttemptId.attemptId, appPathsInfo));
+      }
+    }
+  }
+
+  /**
+   * Reload the finalized shuffle merges.
+   * Since the deletion of finalized shuffle merges are triggered 
asynchronously, there can be
+   * cases that deletions miss the execution during restart. Those dangling 
finalized shuffle merge
+   * keys in the DB will be cleaned up during this reload, if there are no 
relevant application
+   * attempts local paths information registered in the hashmap.
+   */
+  private void reloadFinalizedAppAttemptsShuffleMergeInfo(DB db) throws 
IOException {
+    List<byte[]> dbKeysToBeRemoved = new ArrayList<>();
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      
itr.seek(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> e = itr.next();
+        String key = new String(e.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX)) {
+          break;
+        }
+        AppAttemptShuffleMergeId partitionId = 
parseDbAppAttemptShufflePartitionKey(key);
+        logger.info("Reloading finalized shuffle info for partitionId {}", 
partitionId);
+        AppShuffleInfo appShuffleInfo = appsShuffleInfo.get(partitionId.appId);
+        if (appShuffleInfo != null && appShuffleInfo.attemptId == 
partitionId.attemptId) {
+          appShuffleInfo.shuffles.compute(partitionId.shuffleId,
+              (shuffleId, existingMergePartitionInfo) -> {
+                if (existingMergePartitionInfo == null ||
+                    existingMergePartitionInfo.shuffleMergeId < 
partitionId.shuffleMergeId) {
+                  if (existingMergePartitionInfo != null) {
+                    dbKeysToBeRemoved.add(getDbAppAttemptShufflePartitionKey(
+                        new AppAttemptShuffleMergeId(
+                            appShuffleInfo.appId, appShuffleInfo.attemptId, 
shuffleId,
+                            existingMergePartitionInfo.shuffleMergeId)));
+                  }
+                  return new 
AppShuffleMergePartitionsInfo(partitionId.shuffleMergeId, true);
+                } else {
+                  dbKeysToBeRemoved.add(e.getKey());
+                  return existingMergePartitionInfo;
+                }
+              });
+        } else {
+          logger.info("Adding dangling key {} in DB to clean up list", key);
+          dbKeysToBeRemoved.add(e.getKey());
+        }
+      }
+    }
+    // Clean up invalid data stored in DB
+    submitCleanupTask(() ->
+        dbKeysToBeRemoved.forEach(
+            (key) -> {
+              try {
+                db.delete(key);
+              } catch (Exception e) {
+                logger.error("Error deleting data in DB", e);

Review Comment:
   doesn't look like it's updated



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -656,6 +771,207 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+  /**
+   * Close the DB during shutdown
+   */
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info 
and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  /**
+   * Write the application attempt's local path information to the DB
+   */
+  private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo 
appPathsInfo) {
+    if (db != null) {
+      AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId);
+      try {
+        byte[] key = getDbAppAttemptPathsKey(appAttemptId);
+        String valueStr = mapper.writeValueAsString(appPathsInfo);
+        byte[] value = valueStr.getBytes(StandardCharsets.UTF_8);
+        db.put(key, value);
+      } catch (Exception e) {
+        logger.error("Error saving registered app paths info for {}", 
appAttemptId, e);
+      }
+    }
+  }
+
+  /**
+   * Write the finalized shuffle merge partition information into the DB
+   */
+  private void writeAppAttemptShuffleMergeInfoToDB(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
+    if (db != null) {
+      // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
+      try{
+        byte[] dbKey = 
getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId);
+        db.put(dbKey, new byte[0]);
+      } catch (Exception e) {
+        logger.error("Error saving active app shuffle partition {}", 
appAttemptShuffleMergeId, e);
+      }
+    }
+
+  }
+
+  /**
+   * Parse the DB key with the prefix and the expected return value type
+   */
+  private <T> T parseDbKey(String key, String prefix, Class<T> valueType) 
throws IOException {
+    String json = key.substring(prefix.length() + 1);
+    return mapper.readValue(json, valueType);
+  }
+
+  /**
+   * Generate AppAttemptId from the DB key
+   */
+  private AppAttemptId parseDbAppAttemptPathsKey(String key) throws 
IOException {
+    return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
+  }
+
+  /**
+   * Generate AppAttemptShuffleMergeId from the DB key
+   */
+  private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(
+      String key) throws IOException {
+    return parseDbKey(
+        key, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX, 
AppAttemptShuffleMergeId.class);
+  }
+
+  /**
+   * Generate the DB key with the key object and the specified string prefix
+   */
+  private byte[] getDbKey(Object key, String prefix) throws IOException {
+    // We add a common prefix on all the keys so we can find them in the DB
+    String keyJsonString = prefix + DB_KEY_DELIMITER + 
mapper.writeValueAsString(key);
+    return keyJsonString.getBytes(StandardCharsets.UTF_8);
+  }
+
+  /**
+   * Generate the DB key from AppAttemptShuffleMergeId object
+   */
+  private byte[] getDbAppAttemptShufflePartitionKey(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId) throws IOException {
+    return getDbKey(appAttemptShuffleMergeId, 
APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+  }
+
+  /**
+   * Generate the DB key from AppAttemptId object
+   */
+  private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId) throws 
IOException {
+    return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX);
+  }
+
+  /**
+   * Reload the DB to recover the meta data stored in the hashmap for merged 
shuffles.
+   * The application attempts local paths information will be firstly 
reloaded, and then
+   * the finalized shuffle merges will be updated.
+   */
+  @VisibleForTesting
+  void reloadAndCleanUpAppShuffleInfo(DB db) throws IOException {
+    logger.info("Reload applications merged shuffle information from DB");
+    reloadActiveAppAttemptsPathInfo(db);
+    reloadFinalizedAppAttemptsShuffleMergeInfo(db);
+  }
+
+  /**
+   * Reload application attempts local paths information.
+   */
+  private void reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> entry = itr.next();
+        String key = new String(entry.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+          break;
+        }
+        AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+        AppPathsInfo appPathsInfo = mapper.readValue(entry.getValue(), 
AppPathsInfo.class);
+        appsShuffleInfo.put(appAttemptId.appId, new AppShuffleInfo(
+            appAttemptId.appId, appAttemptId.attemptId, appPathsInfo));
+      }
+    }
+  }
+
+  /**
+   * Reload the finalized shuffle merges.
+   * Since the deletion of finalized shuffle merges are triggered 
asynchronously, there can be
+   * cases that deletions miss the execution during restart. Those dangling 
finalized shuffle merge
+   * keys in the DB will be cleaned up during this reload, if there are no 
relevant application
+   * attempts local paths information registered in the hashmap.
+   */
+  private void reloadFinalizedAppAttemptsShuffleMergeInfo(DB db) throws 
IOException {
+    List<byte[]> dbKeysToBeRemoved = new ArrayList<>();
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      
itr.seek(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> entry = itr.next();
+        String key = new String(entry.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX)) {
+          break;
+        }
+        AppAttemptShuffleMergeId partitionId = 
parseDbAppAttemptShufflePartitionKey(key);
+        logger.info("Reloading finalized shuffle info for partitionId {}", 
partitionId);
+        AppShuffleInfo appShuffleInfo = appsShuffleInfo.get(partitionId.appId);
+        if (appShuffleInfo != null && appShuffleInfo.attemptId == 
partitionId.attemptId) {
+          appShuffleInfo.shuffles.compute(partitionId.shuffleId,
+              (shuffleId, existingMergePartitionInfo) -> {
+                if (existingMergePartitionInfo == null ||
+                    existingMergePartitionInfo.shuffleMergeId < 
partitionId.shuffleMergeId) {
+                  if (existingMergePartitionInfo != null) {
+                    AppAttemptShuffleMergeId appAttemptShuffleMergeId =
+                        new AppAttemptShuffleMergeId(appShuffleInfo.appId, 
appShuffleInfo.attemptId,
+                            shuffleId, 
existingMergePartitionInfo.shuffleMergeId);
+                    try{
+                      dbKeysToBeRemoved.add(
+                          
getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId));
+                    } catch (Exception e) {
+                      logger.error("Error getting the DB key for {}", 
appAttemptShuffleMergeId, e);
+                    }
+                  }
+                  return new 
AppShuffleMergePartitionsInfo(partitionId.shuffleMergeId, true);
+                } else {
+                  dbKeysToBeRemoved.add(entry.getKey());
+                  return existingMergePartitionInfo;
+                }
+              });
+        } else {
+          logger.info("Adding dangling key {} in DB to clean up list", key);

Review Comment:
   Should this be at info level?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -583,6 +686,7 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
         bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), 
Ints.toArray(reduceIds),
         Longs.toArray(sizes));
     }
+    writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);

Review Comment:
   This line which writes to database, in a previous version, was just before 
line 645, where we save in the state that a shuffle is finalized. I think you 
moved it because of a comment. I think it is better to have it there because it 
is just consistent with other db operations.  Info is persisted to db and then 
it is saved in the state. I don't think anything is wrong but just for 
consistency.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1051,18 +1448,17 @@ public static class AppShufflePartitionInfo {
     private boolean indexMetaUpdateFailed;
 
     AppShufflePartitionInfo(
-        String appId,
-        int shuffleId,
-        int shuffleMergeId,
+        AppAttemptShuffleMergeId appAttemptShuffleMergeId,
         int reduceId,
         File dataFile,
         MergeShuffleFile indexFile,
         MergeShuffleFile metaFile) throws IOException {
-      Preconditions.checkArgument(appId != null, "app id is null");
-      this.appId = appId;
-      this.shuffleId = shuffleId;
-      this.shuffleMergeId = shuffleMergeId;
+      this.appAttemptShuffleMergeId = appAttemptShuffleMergeId;
       this.reduceId = reduceId;
+      // Create FileOutputStream with append mode set to false by default.
+      // This will make sure later write will start from the beginning of the 
file.

Review Comment:
   Nit: `This ensures that the file is always overwritten and not appended to 
even after the service is restarted`.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -656,6 +771,207 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+  /**
+   * Close the DB during shutdown
+   */
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info 
and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  /**
+   * Write the application attempt's local path information to the DB
+   */
+  private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo 
appPathsInfo) {
+    if (db != null) {
+      AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId);
+      try {
+        byte[] key = getDbAppAttemptPathsKey(appAttemptId);
+        String valueStr = mapper.writeValueAsString(appPathsInfo);
+        byte[] value = valueStr.getBytes(StandardCharsets.UTF_8);
+        db.put(key, value);
+      } catch (Exception e) {
+        logger.error("Error saving registered app paths info for {}", 
appAttemptId, e);
+      }
+    }
+  }
+
+  /**
+   * Write the finalized shuffle merge partition information into the DB
+   */
+  private void writeAppAttemptShuffleMergeInfoToDB(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
+    if (db != null) {
+      // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
+      try{
+        byte[] dbKey = 
getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId);
+        db.put(dbKey, new byte[0]);
+      } catch (Exception e) {
+        logger.error("Error saving active app shuffle partition {}", 
appAttemptShuffleMergeId, e);
+      }
+    }
+
+  }
+
+  /**
+   * Parse the DB key with the prefix and the expected return value type
+   */
+  private <T> T parseDbKey(String key, String prefix, Class<T> valueType) 
throws IOException {
+    String json = key.substring(prefix.length() + 1);
+    return mapper.readValue(json, valueType);
+  }
+
+  /**
+   * Generate AppAttemptId from the DB key
+   */
+  private AppAttemptId parseDbAppAttemptPathsKey(String key) throws 
IOException {
+    return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
+  }
+
+  /**
+   * Generate AppAttemptShuffleMergeId from the DB key
+   */
+  private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(
+      String key) throws IOException {
+    return parseDbKey(
+        key, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX, 
AppAttemptShuffleMergeId.class);
+  }
+
+  /**
+   * Generate the DB key with the key object and the specified string prefix
+   */
+  private byte[] getDbKey(Object key, String prefix) throws IOException {
+    // We add a common prefix on all the keys so we can find them in the DB
+    String keyJsonString = prefix + DB_KEY_DELIMITER + 
mapper.writeValueAsString(key);
+    return keyJsonString.getBytes(StandardCharsets.UTF_8);
+  }
+
+  /**
+   * Generate the DB key from AppAttemptShuffleMergeId object
+   */
+  private byte[] getDbAppAttemptShufflePartitionKey(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId) throws IOException {
+    return getDbKey(appAttemptShuffleMergeId, 
APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+  }
+
+  /**
+   * Generate the DB key from AppAttemptId object
+   */
+  private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId) throws 
IOException {
+    return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX);
+  }
+
+  /**
+   * Reload the DB to recover the meta data stored in the hashmap for merged 
shuffles.
+   * The application attempts local paths information will be firstly 
reloaded, and then
+   * the finalized shuffle merges will be updated.
+   */
+  @VisibleForTesting
+  void reloadAndCleanUpAppShuffleInfo(DB db) throws IOException {
+    logger.info("Reload applications merged shuffle information from DB");
+    reloadActiveAppAttemptsPathInfo(db);
+    reloadFinalizedAppAttemptsShuffleMergeInfo(db);
+  }
+
+  /**
+   * Reload application attempts local paths information.
+   */
+  private void reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> entry = itr.next();
+        String key = new String(entry.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+          break;
+        }
+        AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+        AppPathsInfo appPathsInfo = mapper.readValue(entry.getValue(), 
AppPathsInfo.class);
+        appsShuffleInfo.put(appAttemptId.appId, new AppShuffleInfo(
+            appAttemptId.appId, appAttemptId.attemptId, appPathsInfo));
+      }
+    }
+  }
+
+  /**
+   * Reload the finalized shuffle merges.
+   * Since the deletion of finalized shuffle merges are triggered 
asynchronously, there can be
+   * cases that deletions miss the execution during restart. Those dangling 
finalized shuffle merge
+   * keys in the DB will be cleaned up during this reload, if there are no 
relevant application
+   * attempts local paths information registered in the hashmap.
+   */
+  private void reloadFinalizedAppAttemptsShuffleMergeInfo(DB db) throws 
IOException {
+    List<byte[]> dbKeysToBeRemoved = new ArrayList<>();
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      
itr.seek(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> entry = itr.next();
+        String key = new String(entry.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX)) {
+          break;
+        }
+        AppAttemptShuffleMergeId partitionId = 
parseDbAppAttemptShufflePartitionKey(key);
+        logger.info("Reloading finalized shuffle info for partitionId {}", 
partitionId);
+        AppShuffleInfo appShuffleInfo = appsShuffleInfo.get(partitionId.appId);
+        if (appShuffleInfo != null && appShuffleInfo.attemptId == 
partitionId.attemptId) {
+          appShuffleInfo.shuffles.compute(partitionId.shuffleId,
+              (shuffleId, existingMergePartitionInfo) -> {
+                if (existingMergePartitionInfo == null ||
+                    existingMergePartitionInfo.shuffleMergeId < 
partitionId.shuffleMergeId) {
+                  if (existingMergePartitionInfo != null) {
+                    AppAttemptShuffleMergeId appAttemptShuffleMergeId =
+                        new AppAttemptShuffleMergeId(appShuffleInfo.appId, 
appShuffleInfo.attemptId,
+                            shuffleId, 
existingMergePartitionInfo.shuffleMergeId);
+                    try{
+                      dbKeysToBeRemoved.add(
+                          
getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId));
+                    } catch (Exception e) {
+                      logger.error("Error getting the DB key for {}", 
appAttemptShuffleMergeId, e);
+                    }
+                  }
+                  return new 
AppShuffleMergePartitionsInfo(partitionId.shuffleMergeId, true);
+                } else {
+                  dbKeysToBeRemoved.add(entry.getKey());
+                  return existingMergePartitionInfo;
+                }
+              });
+        } else {
+          logger.info("Adding dangling key {} in DB to clean up list", key);

Review Comment:
   In this method, there are other places that we are adding keys to be 
removed. Why don't we add the log there?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to