mridulm commented on code in PR #37922: URL: https://github.com/apache/spark/pull/37922#discussion_r990756966
########## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ########## @@ -1410,26 +1431,27 @@ public String toString() { * required for the shuffles of indeterminate stages. */ public static class AppShuffleMergePartitionsInfo { Review Comment: Revert changes to this class ? (assuming the changes I sketched above are fine) ########## common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java: ########## @@ -650,24 +666,28 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) { } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) { // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return // empty MergeStatuses but cleanup the older shuffleMergeId files. + Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions = + mergePartitionsInfo.shuffleMergePartitions; submitCleanupTask(() -> - closeAndDeleteOutdatedPartitions( - appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions)); + closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, shuffleMergePartitions)); } else { // This block covers: // 1. finalization of determinate stage // 2. finalization of indeterminate stage if the shuffleMergeId related to it is the one // for which the message is received. shuffleMergePartitionsRef.set(mergePartitionsInfo.shuffleMergePartitions); } + } else { + mergePartitionsInfo = new AppShuffleMergePartitionsInfo(shuffleId, true); } + mergePartitionsInfo.setFinalized(true); // Update the DB for the finalized shuffle writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId); // Even when the mergePartitionsInfo is null, we mark the shuffle as finalized but the results // sent to the driver will be empty. This can happen when the service didn't receive any // blocks for the shuffle yet and the driver didn't wait for enough time to finalize the // shuffle. - return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true); + return mergePartitionsInfo; Review Comment: revert changes to this method ? (assuming the changes I sketched above are fine) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org