otterc commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r927806886
########## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ########## @@ -656,6 +787,236 @@ public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { } } + /** + * Close the DB during shutdown + */ + @Override + public void close() { + if (db != null) { + try { + db.close(); + } catch (IOException e) { + logger.error("Exception closing leveldb with registered app paths info and " + + "shuffle partition info", e); + } + } + } + + /** + * Write the application attempt's local path information to the DB + */ + private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo appPathsInfo) { + if (db != null) { + AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId); + try { + byte[] key = getDbAppAttemptPathsKey(appAttemptId); + String valueStr = mapper.writeValueAsString(appPathsInfo); + byte[] value = valueStr.getBytes(StandardCharsets.UTF_8); + db.put(key, value); + } catch (Exception e) { + logger.error("Error saving registered app paths info for {}", appAttemptId, e); + } + } + } + + /** + * Write the finalized shuffle merge partition information into the DB + */ + private void writeAppAttemptShuffleMergeInfoToDB( + AppAttemptShuffleMergeId appAttemptShuffleMergeId) { + if (db != null) { + // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles + try{ + byte[] dbKey = getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId); + db.put(dbKey, new byte[0]); + } catch (Exception e) { + logger.error("Error saving active app shuffle partition {}", appAttemptShuffleMergeId, e); + } + } + } + + /** + * Parse the DB key with the prefix and the expected return value type + */ + private <T> T parseDbKey(String key, String prefix, Class<T> valueType) throws IOException { + String json = key.substring(prefix.length() + 1); + return mapper.readValue(json, valueType); + } + + /** + * Generate AppAttemptId from the DB key + */ + private AppAttemptId parseDbAppAttemptPathsKey(String key) throws IOException { + return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class); + } + + /** + * Generate AppAttemptShuffleMergeId from the DB key + */ + private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey( + String key) throws IOException { + return parseDbKey( + key, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX, AppAttemptShuffleMergeId.class); + } + + /** + * Generate the DB key with the key object and the specified string prefix + */ + private byte[] getDbKey(Object key, String prefix) throws IOException { + // We add a common prefix on all the keys so we can find them in the DB + String keyJsonString = prefix + DB_KEY_DELIMITER + mapper.writeValueAsString(key); + return keyJsonString.getBytes(StandardCharsets.UTF_8); + } + + /** + * Generate the DB key from AppAttemptShuffleMergeId object + */ + private byte[] getDbAppAttemptShufflePartitionKey( + AppAttemptShuffleMergeId appAttemptShuffleMergeId) throws IOException { + return getDbKey(appAttemptShuffleMergeId, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX); + } + + /** + * Generate the DB key from AppAttemptId object + */ + private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId) throws IOException { + return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX); + } + + /** + * Reload the DB to recover the meta data stored in the hashmap for merged shuffles. + * The application attempts local paths information will be firstly reloaded, and then + * the finalized shuffle merges will be updated. + * This method will also try deleting dangling key/values in DB, which includes: + * 1) Outdated application attempt local paths information as of some DB deletion failures + * 2) The deletion of finalized shuffle merges are triggered asynchronously, there can be cases + * that deletions miss the execution during restart. These finalized shuffle merges should have + * no relevant application attempts local paths information registered in the DB and the hashmap. + */ + @VisibleForTesting + void reloadAndCleanUpAppShuffleInfo(DB db) throws IOException { + logger.info("Reload applications merged shuffle information from DB"); + List<byte[]> dbKeysToBeRemoved = new ArrayList<>(); + dbKeysToBeRemoved.addAll(reloadActiveAppAttemptsPathInfo(db)); + dbKeysToBeRemoved.addAll(reloadFinalizedAppAttemptsShuffleMergeInfo(db)); + removeOutdatedKeyValuesInDB(dbKeysToBeRemoved); + } + + /** + * Reload application attempts local paths information. + */ + @VisibleForTesting + List<byte[]> reloadActiveAppAttemptsPathInfo(DB db) throws IOException { + List<byte[]> dbKeysToBeRemoved = new ArrayList<>(); + if (db != null) { + DBIterator itr = db.iterator(); + itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8)); + while (itr.hasNext()) { + Map.Entry<byte[], byte[]> entry = itr.next(); + String key = new String(entry.getKey(), StandardCharsets.UTF_8); + if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) { + break; + } + AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key); + AppPathsInfo appPathsInfo = mapper.readValue(entry.getValue(), AppPathsInfo.class); + appsShuffleInfo.compute(appAttemptId.appId, + (appId, existingAppShuffleInfo) -> { + if (existingAppShuffleInfo == null || + existingAppShuffleInfo.attemptId < appAttemptId.attemptId) { + if (existingAppShuffleInfo != null) { + AppAttemptId existingAppAttemptId = new AppAttemptId( + existingAppShuffleInfo.appId, existingAppShuffleInfo.attemptId); + try { + // Add the former outdated DB key to deletion list + dbKeysToBeRemoved.add(getDbAppAttemptPathsKey(existingAppAttemptId)); + } catch (IOException e) { + logger.error("Failed to get the DB key for {}", existingAppAttemptId, e); + } + } + return new AppShuffleInfo( + appAttemptId.appId, appAttemptId.attemptId, appPathsInfo); + } else { + // Add the current DB key to deletion list as it is outdated + dbKeysToBeRemoved.add(entry.getKey()); + return existingAppShuffleInfo; + } + }); + } + } + return dbKeysToBeRemoved; + } + + /** + * Reload the finalized shuffle merges. + */ + private List<byte[]> reloadFinalizedAppAttemptsShuffleMergeInfo(DB db) throws IOException { + List<byte[]> dbKeysToBeRemoved = new ArrayList<>(); + if (db != null) { + DBIterator itr = db.iterator(); + itr.seek(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8)); + while (itr.hasNext()) { + Map.Entry<byte[], byte[]> entry = itr.next(); + String key = new String(entry.getKey(), StandardCharsets.UTF_8); + if (!key.startsWith(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX)) { + break; + } + AppAttemptShuffleMergeId partitionId = parseDbAppAttemptShufflePartitionKey(key); + logger.info("Reloading finalized shuffle info for partitionId {}", partitionId); + AppShuffleInfo appShuffleInfo = appsShuffleInfo.get(partitionId.appId); + if (appShuffleInfo != null && appShuffleInfo.attemptId == partitionId.attemptId) { + appShuffleInfo.shuffles.compute(partitionId.shuffleId, + (shuffleId, existingMergePartitionInfo) -> { + if (existingMergePartitionInfo == null || + existingMergePartitionInfo.shuffleMergeId < partitionId.shuffleMergeId) { + if (existingMergePartitionInfo != null) { + AppAttemptShuffleMergeId appAttemptShuffleMergeId = + new AppAttemptShuffleMergeId(appShuffleInfo.appId, appShuffleInfo.attemptId, + shuffleId, existingMergePartitionInfo.shuffleMergeId); + try{ + dbKeysToBeRemoved.add( + getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId)); + } catch (Exception e) { + logger.error("Error getting the DB key for {}", appAttemptShuffleMergeId, e); Review Comment: Is this covered by any of the UT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org