[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
mridulm commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r922587713 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -656,6 +781,236 @@ public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { } } + /** + * Close the DB during shutdown + */ + @Override + public void close() { +if (db != null) { + try { +db.close(); + } catch (IOException e) { +logger.error("Exception closing leveldb with registered app paths info and " ++ "shuffle partition info", e); + } +} + } + + /** + * Write the application attempt's local path information to the DB + */ + private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo appPathsInfo) { +if (db != null) { + AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId); + try { +byte[] key = getDbAppAttemptPathsKey(appAttemptId); +String valueStr = mapper.writeValueAsString(appPathsInfo); +byte[] value = valueStr.getBytes(StandardCharsets.UTF_8); +db.put(key, value); + } catch (Exception e) { +logger.error("Error saving registered app paths info for {}", appAttemptId, e); + } +} + } + + /** + * Write the finalized shuffle merge partition information into the DB + */ + private void writeAppAttemptShuffleMergeInfoToDB( + AppAttemptShuffleMergeId appAttemptShuffleMergeId) { +if (db != null) { + // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles + try{ +byte[] dbKey = getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId); +db.put(dbKey, new byte[0]); + } catch (Exception e) { +logger.error("Error saving active app shuffle partition {}", appAttemptShuffleMergeId, e); + } +} + } + + /** + * Parse the DB key with the prefix and the expected return value type + */ + private T parseDbKey(String key, String prefix, Class valueType) throws IOException { +String json = key.substring(prefix.length() + 1); +return mapper.readValue(json, valueType); + } + + /** + * Generate AppAttemptId from the DB key + */ + private AppAttemptId parseDbAppAttemptPathsKey(String key) throws IOException { +return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class); + } + + /** + * Generate AppAttemptShuffleMergeId from the DB key + */ + private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey( + String key) throws IOException { +return parseDbKey( +key, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX, AppAttemptShuffleMergeId.class); + } + + /** + * Generate the DB key with the key object and the specified string prefix + */ + private byte[] getDbKey(Object key, String prefix) throws IOException { +// We add a common prefix on all the keys so we can find them in the DB +String keyJsonString = prefix + DB_KEY_DELIMITER + mapper.writeValueAsString(key); +return keyJsonString.getBytes(StandardCharsets.UTF_8); + } + + /** + * Generate the DB key from AppAttemptShuffleMergeId object + */ + private byte[] getDbAppAttemptShufflePartitionKey( + AppAttemptShuffleMergeId appAttemptShuffleMergeId) throws IOException { +return getDbKey(appAttemptShuffleMergeId, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX); + } + + /** + * Generate the DB key from AppAttemptId object + */ + private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId) throws IOException { +return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX); + } + + /** + * Reload the DB to recover the meta data stored in the hashmap for merged shuffles. + * The application attempts local paths information will be firstly reloaded, and then + * the finalized shuffle merges will be updated. + * This method will also try deleting dangling key/values in DB, which includes: + * 1) Outdated application attempt local paths information as of some DB deletion failures + * 2) The deletion of finalized shuffle merges are triggered asynchronously, there can be cases + * that deletions miss the execution during restart. These finalized shuffle merges should have + * no relevant application attempts local paths information registered in the DB and the hashmap. + */ + @VisibleForTesting + void reloadAndCleanUpAppShuffleInfo(DB db) throws IOException { +logger.info("Reload applications merged shuffle information from DB"); +List dbKeysToBeRemoved = new ArrayList<>(); +dbKeysToBeRemoved.addAll(reloadActiveAppAttemptsPathInfo(db)); +dbKeysToBeRemoved.addAll(reloadFinalizedAppAttemptsShuffleMergeInfo(db)); +// Clean up invalid data stored in DB +submitCleanupTask(() -> +dbKeysToBeRemoved.forEach( +(key) -> { + try { +if (logger.isDebugEnabled()) { +
[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
mridulm commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r918226879 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -343,15 +397,44 @@ void closeAndDeletePartitionFilesIfNeeded( if (cleanupLocalDirs) { deleteExecutorDirs(appShuffleInfo); } +removeAppShuffleInfoFromDB(appShuffleInfo); + } + + /** + * Remove the application attempt local paths information from the DB. + */ + private void removeAppAttemptPathInfoFromDB(String appId, int attemptId) throws Exception{ Review Comment: Can you add a comment to both `removeAppAttemptPathInfoFromDB` and `writeNewAppAttemptPathInfoToDBAndRemoveOutdated` that they are expected to be invoked with the `appsShuffleInfo` lock help for `appId` ? (Also, group them together in the source file) ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -656,6 +768,232 @@ public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { } } + /** + * Remove the former application attempt local paths information from the DB and insert the + * local paths information from the newer application attempt. If the deletion fails, the + * insertion will also be skipped. This ensures that there will always be a single application + * attempt local path information in the DB. + */ + private void writeNewAppAttemptPathInfoToDBAndRemoveOutdated( + String appId, + int newAttemptId, + AppShuffleInfo appShuffleInfo, + AppPathsInfo appPathsInfo) { +try{ + if (appShuffleInfo != null) { +removeAppAttemptPathInfoFromDB(appId, appShuffleInfo.attemptId); + } + writeAppPathsInfoToDb(appId, newAttemptId, appPathsInfo); Review Comment: Thoughts on making the exception handling local to the DB invocation (since we dont need to handle cross DB invocation failures for now) ? We are already doing this for `removeAppShufflePartitionInfoFromDB`, `writeAppAttemptShuffleMergeInfoToDB`, etc. Given this, let us make the Exception handling local to `removeAppAttemptPathInfoFromDB` (so it does not throw an `Exception`) ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -632,6 +736,12 @@ public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { appsShuffleInfo.compute(appId, (id, appShuffleInfo) -> { if (appShuffleInfo == null || attemptId > appShuffleInfo.attemptId) { originalAppShuffleInfo.set(appShuffleInfo); + AppPathsInfo appPathsInfo = new AppPathsInfo(appId, executorInfo.localDirs, + mergeDir, executorInfo.subDirsPerLocalDir); + // Clean up the outdated App Attempt local path info in the DB and + // put the newly registered local path info from newer attempt into the DB. + writeNewAppAttemptPathInfoToDBAndRemoveOutdated( Review Comment: nit: Do we need this method ? (the comment is helpful, just the method) We can simply do ``` if (null != appShuffleInfo) removeAppAttemptPathInfoFromDB() writeAppPathsInfoToDb() ``` ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -343,15 +397,44 @@ void closeAndDeletePartitionFilesIfNeeded( if (cleanupLocalDirs) { deleteExecutorDirs(appShuffleInfo); } +removeAppShuffleInfoFromDB(appShuffleInfo); + } + + /** + * Remove the application attempt local paths information from the DB. + */ + private void removeAppAttemptPathInfoFromDB(String appId, int attemptId) throws Exception{ +AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId); +if (db != null) { + db.delete(getDbAppAttemptPathsKey(appAttemptId)); Review Comment: Given the possibility of stale entries from previous failed deletes hanging around, can we scan for all entries for an application id and delete them here ? Note that this will require changes to how we encode db key for `AppAttemptId` - we cannot use json for it, since we want to do a prefix scan. Something like: ``` private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId) throws IOException { return (APP_ATTEMPT_PATH_KEY_PREFIX + DB_KEY_DELIMITER + appAttemptId.appId + DB_KEY_DELIMITER + appAttemptId.attemptId).getBytes(StandardCharsets.UTF_8); } private byte[] getDbAppAttemptPathsKeyPrefix(String appId) throws IOException { return (APP_ATTEMPT_PATH_KEY_PREFIX + DB_KEY_DELIMITER + appAttemptId.appId + DB_KEY_DELIMITER).getBytes(StandardCharsets.UTF_8); } private AppAttemptId parseDbAppAttemptPathsKey(String value) throws IOException { String[] parts = key.split(DB_KEY_DELIMITER); if
[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
mridulm commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r918184780 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -317,22 +353,24 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) { logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs); AppShuffleInfo appShuffleInfo = appsShuffleInfo.remove(appId); if (null != appShuffleInfo) { - mergedShuffleCleaner.execute( -() -> closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, cleanupLocalDirs)); + submitCleanupTask( +() -> closeAndDeletePartitions(appShuffleInfo, cleanupLocalDirs, true)); } +removeAppAttemptPathInfoFromDB( +new AppAttemptId(appShuffleInfo.appId, appShuffleInfo.attemptId)); } - /** * Clean up the AppShufflePartitionInfo for a specific AppShuffleInfo. * If cleanupLocalDirs is true, the merged shuffle files will also be deleted. * The cleanup will be executed in a separate thread. */ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter") @VisibleForTesting - void closeAndDeletePartitionFilesIfNeeded( + void closeAndDeletePartitions( AppShuffleInfo appShuffleInfo, - boolean cleanupLocalDirs) { + boolean cleanupLocalDirs, + boolean removeFromDb) { Review Comment: This is not exposed api, so let us remove the flag for now, and add it when required. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
mridulm commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r895292260 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -317,22 +353,24 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) { logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs); AppShuffleInfo appShuffleInfo = appsShuffleInfo.remove(appId); if (null != appShuffleInfo) { - mergedShuffleCleaner.execute( -() -> closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, cleanupLocalDirs)); + submitCleanupTask( +() -> closeAndDeletePartitions(appShuffleInfo, cleanupLocalDirs, true)); } +removeAppAttemptPathInfoFromDB( +new AppAttemptId(appShuffleInfo.appId, appShuffleInfo.attemptId)); Review Comment: We are relying on DB being consistent with `appsShuffleInfo` map when reloading. Given this, we should move the removal of the DB entry to be atomic w.r.t `appsShuffleInfo.remove`. Something like: ``` AtomicReference ref = new AtomicReference<>(null); appsShuffleInfo.compute( appId, (id, info) -> { if (null != info) { removeAppAttemptPathInfoFromDB(new AppAttemptId(info.appId, info.attemptId)); ref.set(info); } // always remove return null; } ); AppShuffleInfo appShuffleInfo = ref.get ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
mridulm commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r895282920 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -350,15 +415,27 @@ void closeAndDeletePartitionFilesIfNeeded( * up older shuffleMergeId partitions. The cleanup will be executed in a separate thread. */ @VisibleForTesting - void closeAndDeletePartitionFiles(Map partitions) { + void closeAndDeleteOutdatedPartitions(Map partitions) { partitions .forEach((partitionId, partitionInfo) -> { synchronized (partitionInfo) { partitionInfo.closeAllFilesAndDeleteIfNeeded(true); + removeAppShufflePartitionInfoFromDB(partitionInfo.appAttemptShuffleMergeId); Review Comment: Same as with comment in `removeAppShuffleInfoFromDB`. Does this need to be within sync block ? Also, we would have a large number of partitionInfo for a given shuffle merge id - we are repeatedly trying to delete the same shuffle merge id here. Assuming we dont need the delete to be in sync block, we can pull all shuffe merge id's in `partitions` and delete them. For example: ``` Set mergeIdsToRemove = new HashSet<>(); partitions.forEach ... { mergeIdsToRemove.add(partitionInfo.appAttemptShuffleMergeId); synchronized (partitionInfo) { partitionInfo.closeAllFilesAndDeleteIfNeeded(true) } } mergeIdsToRemove.forEach(this::removeAppShufflePartitionInfoFromDB); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
mridulm commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r895282269 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -342,6 +380,33 @@ void closeAndDeletePartitionFilesIfNeeded( if (cleanupLocalDirs) { deleteExecutorDirs(appShuffleInfo); } +if (removeFromDb){ + removeAppShuffleInfoFromDB(appShuffleInfo); +} + } + + private void removeAppAttemptPathInfoFromDB(AppAttemptId appAttemptId) { +if (db != null) { + try { +db.delete(getDbAppAttemptPathsKey(appAttemptId)); + } catch (Exception e) { +logger.error("Error deleting {} from application paths info in DB", appAttemptId, e); + } +} + } + + private void removeAppShuffleInfoFromDB(AppShuffleInfo appShuffleInfo) { +if (db != null) { + appShuffleInfo.shuffles +.forEach((shuffleId, shuffleInfo) -> shuffleInfo.shuffleMergePartitions + .forEach((shuffleMergeId, partitionInfo) -> { +synchronized (partitionInfo) { + removeAppShufflePartitionInfoFromDB( +new AppAttemptShuffleMergeId( + appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId, shuffleMergeId)); +} Review Comment: QQ: Why does this have to be within the `synchronized` block ? Currently, deletes to `appAttemptShufflePartition` are happening within the sync block - while addition is not. Any particular reason for delete to be within synch lock ? ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -655,6 +742,206 @@ public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { } } + + @Override + public void close() { +if (db != null) { + try { +db.close(); + } catch (IOException e) { +logger.error("Exception closing leveldb with registered app paths info and " ++ "shuffle partition info", e); + } +} + } + + private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo appPathsInfo) { +if (db != null) { + try { +byte[] key = getDbAppAttemptPathsKey(new AppAttemptId(appId, attemptId)); +String valueStr = mapper.writeValueAsString(appPathsInfo); +byte[] value = valueStr.getBytes(StandardCharsets.UTF_8); +db.put(key, value); + } catch (Exception e) { +logger.error("Error saving registered app paths info", e); + } +} + } + + private void writeAppAttemptShuffleMergeInfoToDB( + String appId, + int appAttemptId, + int shuffleId, + int shuffleMergeId) { +if (db != null) { + // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles + try{ +byte[] dbKey = getDbAppAttemptShufflePartitionKey( +new AppAttemptShuffleMergeId(appId, appAttemptId, shuffleId, shuffleMergeId)); +db.put(dbKey, new byte[0]); + } catch (Exception e) { +logger.error("Error saving active app shuffle partition", e); + } +} + + } + + private T parseDbKey(String key, String prefix, Class valueType) { +try { + String json = key.substring(prefix.length() + 1); + return mapper.readValue(json, valueType); +} catch (Exception exception) { + logger.error("Exception while parsing the DB key {}", key); + return null; +} + } + + private AppPathsInfo parseDBAppAttemptPathsValue(byte[] value, AppAttemptId appAttemptId) { +try { + return mapper.readValue(value, AppPathsInfo.class); +} catch (Exception exception) { + logger.error("Exception while parsing the DB value for {}", appAttemptId); + return null; +} + } + + private AppAttemptId parseDbAppAttemptPathsKey(String key) { +return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class); + } + + private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey( + String key, + String prefix) { +return parseDbKey(key, prefix, AppAttemptShuffleMergeId.class); + } + + private byte[] getDbKey(Object key, String prefix) { +// We add a common prefix on all the keys so we can find them in the DB +try { + String keyJsonString = prefix + DB_KEY_DELIMITER + mapper.writeValueAsString(key); + return keyJsonString.getBytes(StandardCharsets.UTF_8); +} catch (Exception exception) { + logger.error("Exception while generating the DB key {}", key); + return null; +} + } + + private byte[] getDbAppAttemptShufflePartitionKey( + AppAttemptShuffleMergeId appAttemptShuffleMergeId) { +return getDbKey(appAttemptShuffleMergeId, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX); + } + + private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId){ +return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX); + } + + @VisibleForTesting +
[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
mridulm commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r893022070 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -576,6 +661,7 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { } finally { partition.closeAllFilesAndDeleteIfNeeded(false); } + cleanUpAppShufflePartitionInfoInDB(partition.appAttemptShuffleMergeId); Review Comment: If we had a test for this, curious why it was not surfacing this issue as a test failure. Perhaps something needs to be augmented in the test ? Or some other bug ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
mridulm commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r893021304 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -992,6 +1233,45 @@ AppShufflePartitionInfo getPartitionInfo() { } } + /** + * Simply encodes an application attempt ID. + */ + public static class AppAttemptId { Review Comment: Yes, `equals` would be required. I was suggesting to make sure they are all following similar pattern, and go from cheaper to more expensive checks. Also, to have `hashCode` if we are overriding `equals` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
mridulm commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r886252782 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -536,9 +645,20 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { } } // Even when the mergePartitionsInfo is null, we mark the shuffle as finalized but the results - // sent to the driver will be empty. This cam happen when the service didn't receive any + // sent to the driver will be empty. This can happen when the service didn't receive any // blocks for the shuffle yet and the driver didn't wait for enough time to finalize the // shuffle. + if (db != null) { Review Comment: There are 4 cases here: 1. When `mergePartitionsInfo == null`. 1. ESS never received a push for this mergeId, but we want to prevent future pushes, and so add a marker entry. 2. In this case, makes sense to add the entry to level db as well. 2. When `mergePartitionsInfo != null`, we have three cases: 1. The first condition, in the `if`, results in exception - so that does not hit this case. 2. The second is when `msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId`. 1. We are scheduling a cleanup in this case - so all keys are going to get deleted. 3. The happy path - we do want the entry to be added. On further thought, 2.2 is the issue above. Should we be doing `cleanUpAppShufflePartitionInfoInDB` in `closeAndDeletePartitionFiles` ? This is removing the finalization marker from the level db - which will continue to exist in the map. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
mridulm commented on code in PR #35906: URL: https://github.com/apache/spark/pull/35906#discussion_r883744010 ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -342,6 +389,29 @@ void closeAndDeletePartitionFilesIfNeeded( if (cleanupLocalDirs) { deleteExecutorDirs(appShuffleInfo); } +cleanUpAppShuffleInfoInDB(appShuffleInfo); Review Comment: This method is called async to application completion - and can be delayed. In case there is a NM shutdown, some of these wont be cleaned up. (`close` will close the db, and so all subsequent deletes will fail) Do we want to delete the app attempt paths immediately, and do the shuffle deletes async (along with path deletes like here) ? On reload time, if shuffle info is present for missing attempts paths, we can remove those from the db. Thoughts ? ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -209,9 +246,16 @@ private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo( appShuffleInfo.getMergedShuffleIndexFilePath(shuffleId, shuffleMergeId, reduceId)); File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, reduceId); + // Make sure unuseful non-finalized merged data/index/meta files get cleaned up + // during service restart + if (dataFile.exists()) dataFile.delete(); + if (indexFile.exists()) indexFile.delete(); + if (metaFile.exists()) metaFile.delete(); Review Comment: We are immediately opening all these files with append = false, why do we need to delete them ? Note: if we are removing the `delete()` here, do add a comment in `MergeShuffleFile`, etc at `FileOutputStream` creation that it must have append = false ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -342,6 +389,29 @@ void closeAndDeletePartitionFilesIfNeeded( if (cleanupLocalDirs) { deleteExecutorDirs(appShuffleInfo); } +cleanUpAppShuffleInfoInDB(appShuffleInfo); + } + + private void cleanUpAppShuffleInfoInDB(AppShuffleInfo appShuffleInfo) { +if (db != null) { + try { +db.delete( + getDbAppAttemptPathsKey( +new AppAttemptId(appShuffleInfo.appId, appShuffleInfo.attemptId))); +appShuffleInfo.shuffles + .forEach((shuffleId, shuffleInfo) -> shuffleInfo.shuffleMergePartitions +.forEach((shuffleMergeId, partitionInfo) -> { + synchronized (partitionInfo) { +cleanUpAppShufflePartitionInfoInDB( + new AppAttemptShuffleMergeId( +appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId, shuffleMergeId)); + } +})); + } catch (Exception e) { +logger.error("Error deleting {}_{} from application paths info db", + appShuffleInfo.appId, appShuffleInfo.attemptId, e); Review Comment: This failure could be for either application paths info or for shuffle partitions. Update message ? ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -709,9 +948,9 @@ public ByteBuffer getCompletionResponse() { */ private void writeBuf(ByteBuffer buf) throws IOException { Review Comment: revert changes to this method ? ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -565,8 +650,8 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { sizes.add(partition.getLastChunkOffset()); logger.debug("{} attempt {} shuffle {} shuffleMerge {}: finalization results " + "added for partition {} data size {} index size {} meta size {}", - msg.appId, msg.appAttemptId, msg.shuffleId, - msg.shuffleMergeId, partition.reduceId, partition.getLastChunkOffset(), + msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId, + partition.reduceId, partition.getLastChunkOffset(), Review Comment: revert ? ## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ## @@ -576,6 +661,7 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { } finally { partition.closeAllFilesAndDeleteIfNeeded(false); } + cleanUpAppShufflePartitionInfoInDB(partition.appAttemptShuffleMergeId); Review Comment: Why are we doing this ? We just inserted it in the `compute` above ? We should have a test which validates that after successful finalization, the db contains the partition info. ##