mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990756966


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1410,26 +1431,27 @@ public String toString() {
    * required for the shuffles of indeterminate stages.
    */
   public static class AppShuffleMergePartitionsInfo {

Review Comment:
   Revert changes to this class ? (assuming the changes I sketched above are 
fine)



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -650,24 +666,28 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
         } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
           // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId 
then return
           // empty MergeStatuses but cleanup the older shuffleMergeId files.
+          Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions =
+              mergePartitionsInfo.shuffleMergePartitions;
           submitCleanupTask(() ->
-              closeAndDeleteOutdatedPartitions(
-                  appAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions));
+              closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, 
shuffleMergePartitions));
         } else {
           // This block covers:
           //  1. finalization of determinate stage
           //  2. finalization of indeterminate stage if the shuffleMergeId 
related to it is the one
           //  for which the message is received.
           
shuffleMergePartitionsRef.set(mergePartitionsInfo.shuffleMergePartitions);
         }
+      } else {
+        mergePartitionsInfo = new AppShuffleMergePartitionsInfo(shuffleId, 
true);
       }
+      mergePartitionsInfo.setFinalized(true);
       // Update the DB for the finalized shuffle
       writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
       // Even when the mergePartitionsInfo is null, we mark the shuffle as 
finalized but the results
       // sent to the driver will be empty. This can happen when the service 
didn't receive any
       // blocks for the shuffle yet and the driver didn't wait for enough time 
to finalize the
       // shuffle.
-      return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+      return mergePartitionsInfo;

Review Comment:
   revert changes to this method ? (assuming the changes I sketched above are 
fine)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to