zhouyejoe commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r896286443


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -655,6 +742,206 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
     }
   }
 
+
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info 
and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo 
appPathsInfo) {
+    if (db != null) {
+      try {
+        byte[] key = getDbAppAttemptPathsKey(new AppAttemptId(appId, 
attemptId));
+        String valueStr = mapper.writeValueAsString(appPathsInfo);
+        byte[] value = valueStr.getBytes(StandardCharsets.UTF_8);
+        db.put(key, value);
+      } catch (Exception e) {
+        logger.error("Error saving registered app paths info", e);
+      }
+    }
+  }
+
+  private void writeAppAttemptShuffleMergeInfoToDB(
+      String appId,
+      int appAttemptId,
+      int shuffleId,
+      int shuffleMergeId) {
+    if (db != null) {
+      // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
+      try{
+        byte[] dbKey = getDbAppAttemptShufflePartitionKey(
+            new AppAttemptShuffleMergeId(appId, appAttemptId, shuffleId, 
shuffleMergeId));
+        db.put(dbKey, new byte[0]);
+      } catch (Exception e) {
+        logger.error("Error saving active app shuffle partition", e);
+      }
+    }
+
+  }
+
+  private <T> T parseDbKey(String key, String prefix, Class<T> valueType) {
+    try {
+      String json = key.substring(prefix.length() + 1);
+      return mapper.readValue(json, valueType);
+    } catch (Exception exception) {
+      logger.error("Exception while parsing the DB key {}", key);
+      return null;
+    }
+  }
+
+  private AppPathsInfo parseDBAppAttemptPathsValue(byte[] value, AppAttemptId 
appAttemptId) {
+    try {
+      return mapper.readValue(value, AppPathsInfo.class);
+    } catch (Exception exception) {
+      logger.error("Exception while parsing the DB value for {}", 
appAttemptId);
+      return null;
+    }
+  }
+
+  private AppAttemptId parseDbAppAttemptPathsKey(String key) {
+    return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
+  }
+
+  private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(
+      String key,
+      String prefix) {
+    return parseDbKey(key, prefix, AppAttemptShuffleMergeId.class);
+  }
+
+  private byte[] getDbKey(Object key, String prefix) {
+    // We add a common prefix on all the keys so we can find them in the DB
+    try {
+      String keyJsonString = prefix + DB_KEY_DELIMITER + 
mapper.writeValueAsString(key);
+      return keyJsonString.getBytes(StandardCharsets.UTF_8);
+    } catch (Exception exception) {
+      logger.error("Exception while generating the DB key {}", key);
+      return null;
+    }
+  }
+
+  private byte[] getDbAppAttemptShufflePartitionKey(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
+    return getDbKey(appAttemptShuffleMergeId, 
APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+  }
+
+  private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId){
+    return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX);
+  }
+
+  @VisibleForTesting
+  void reloadAndCleanUpAppShuffleInfo(DB db) {
+    logger.info("Reload applications merged shuffle information from DB");
+    List<byte[]> dbKeysToBeRemoved = reloadActiveAppAttemptsPathInfo(db);
+    dbKeysToBeRemoved.addAll(reloadFinalizedAppAttemptsShuffleMergeInfo(db));
+    // Clean up invalid data stored in DB
+    submitCleanupTask(() ->
+        dbKeysToBeRemoved.forEach(
+            (key) -> {
+              try {
+                db.delete(key);
+              } catch (Exception e) {
+                logger.error("Error deleting data in DB", e);
+              }
+            }
+        )
+    );
+  }
+
+  private List<byte[]> reloadActiveAppAttemptsPathInfo(DB db) {
+    List<byte[]> dbKeysToBeRemoved = new ArrayList<>();
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> e = itr.next();
+        String key = new String(e.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+          break;
+        }
+        AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+        if (appAttemptId == null) {
+          dbKeysToBeRemoved.add(e.getKey());
+          break;
+        }
+        AppPathsInfo appPathsInfo = parseDBAppAttemptPathsValue(e.getValue(), 
appAttemptId);
+        if (appPathsInfo == null) {
+          dbKeysToBeRemoved.add(e.getKey());
+          break;
+        }
+        appsShuffleInfo.compute(appAttemptId.appId,
+            (appId, existingAppShuffleInfo) -> {
+              if (existingAppShuffleInfo == null ||
+                  existingAppShuffleInfo.attemptId < appAttemptId.attemptId) {

Review Comment:
   Updated as suggested



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to