zhouyejoe commented on a change in pull request #33078: URL: https://github.com/apache/spark/pull/33078#discussion_r659243498
########## File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java ########## @@ -417,24 +476,75 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOExc bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds), Longs.toArray(sizes)); } - partitions.remove(appShuffleId); - logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, msg.appId); + appShuffleInfo.partitions.remove(msg.shuffleId); + logger.info("Finalized shuffle {} from Application {}_{}.", + msg.shuffleId, msg.appId, msg.attemptId); return mergeStatuses; } @Override public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) { if (logger.isDebugEnabled()) { logger.debug("register executor with RemoteBlockPushResolver {} local-dirs {} " - + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs), - executorInfo.subDirsPerLocalDir); + + "num sub-dirs {} shuffleManager {}", appId, Arrays.toString(executorInfo.localDirs), + executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager); + } + String shuffleManagerMeta = executorInfo.shuffleManager; + if (shuffleManagerMeta.contains(":")) { + String mergeDirInfo = shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(":") + 1); + try { + ObjectMapper mapper = new ObjectMapper(); + MergeDirectoryMeta mergeDirectoryMeta = + mapper.readValue(mergeDirInfo, MergeDirectoryMeta.class); + if (mergeDirectoryMeta.attemptId == ATTEMPT_ID_UNDEFINED) { + // When attemptId is -1, there is no attemptId stored in the ExecutorShuffleInfo. + // Only the first ExecutorRegister message can register the merge dirs + appsShuffleInfo.computeIfAbsent(appId, id -> + new AppShuffleInfo( + mergeDirectoryMeta.attemptId, + new AppPathsInfo(appId, executorInfo.localDirs, + mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir) + )); + } else { + // If attemptId is not -1, there is attemptId stored in the ExecutorShuffleInfo. + // The first ExecutorRegister message from the same application attempt wil register + // the merge dirs in External Shuffle Service. Any later ExecutorRegister message + // from the same application attempt will not override the merge dirs. But it can + // be overridden by ExecutorRegister message from newer application attempt, + // and former attempts' shuffle partitions information will also be cleaned up. + AtomicReference<AppShuffleInfo> originalAppShuffleInfo = new AtomicReference<>(); + appsShuffleInfo.compute(appId, (id, appShuffleInfo) -> { + if (appShuffleInfo == null || (appShuffleInfo != null + && mergeDirectoryMeta.attemptId > appShuffleInfo.attemptId)) { + originalAppShuffleInfo.set(appShuffleInfo); + appShuffleInfo = + new AppShuffleInfo( + mergeDirectoryMeta.attemptId, + new AppPathsInfo(appId, executorInfo.localDirs, + mergeDirectoryMeta.mergeDir, executorInfo.subDirsPerLocalDir)); + } + return appShuffleInfo; + }); + if (originalAppShuffleInfo.get() != null) { + AppShuffleInfo appShuffleInfo = originalAppShuffleInfo.get(); + logger.warn("Remove shuffle info for {}_{} as new application attempt registered", + appId, appShuffleInfo.attemptId); + cleanupShufflePartitionInfo(appShuffleInfo); + } + } + } catch (JsonProcessingException e ) { + logger.warn("Failed to get the merge directory information from ExecutorShuffleInfo: ", e); + } + } else { + logger.warn("ExecutorShuffleInfo does not have the expected merge directory information"); } - appsPathInfo.computeIfAbsent(appId, id -> new AppPathsInfo(appId, executorInfo.localDirs, - executorInfo.subDirsPerLocalDir)); } - private static String generateFileName(AppShuffleId appShuffleId, int reduceId) { - return String.format("%s_%s_%d_%d", MERGED_SHUFFLE_FILE_NAME_PREFIX, appShuffleId.appId, - appShuffleId.shuffleId, reduceId); + private static String generateFileName( + String appId, + int shuffleId, + int reduceId) { + return String.format( + "%s_%s_%d_%d", MERGED_SHUFFLE_FILE_NAME_PREFIX,appId, shuffleId, reduceId); Review comment: Update here. Trying to search in code base, but not being able to find this issues in other places. Can you be more specific on which lines? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org