mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1057911122


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    if (appShuffleInfo.attemptId != msg.appAttemptId) {
+      throw new IllegalArgumentException(
+          String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+                  + "with the current attempt id %s stored in shuffle service 
for application %s",
+              msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+    }
+    appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+      if (mergePartitionsInfo == null) {
+        if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+          return null;
+        } else {
+          writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+              msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+          return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+        }
+      }
+      boolean deleteAllMergedShuffle =

Review Comment:
   nit: `deleteAllMergedShuffle ` -> `deleteCurrentMergedShuffle`



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    if (appShuffleInfo.attemptId != msg.appAttemptId) {
+      throw new IllegalArgumentException(
+          String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+                  + "with the current attempt id %s stored in shuffle service 
for application %s",
+              msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+    }
+    appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+      if (mergePartitionsInfo == null) {
+        if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+          return null;
+        } else {
+          writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+              msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+          return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+        }
+      }
+      boolean deleteAllMergedShuffle =
+          msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE ||
+              msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+      int shuffleMergeId = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ?
+          msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId;
+      AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+          new AppAttemptShuffleMergeId(
+              msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);

Review Comment:
   This is `mergePartitionsInfo` itself - replace 
`currentAppAttemptShuffleMergeId` with it ? (or rename `mergePartitionsInfo` as 
`currentAppAttemptShuffleMergeId` if we are going for clarity ?)
   



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    if (appShuffleInfo.attemptId != msg.appAttemptId) {
+      throw new IllegalArgumentException(
+          String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+                  + "with the current attempt id %s stored in shuffle service 
for application %s",
+              msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+    }
+    appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+      if (mergePartitionsInfo == null) {
+        if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+          return null;
+        } else {
+          writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+              msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+          return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+        }
+      }
+      boolean deleteAllMergedShuffle =
+          msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE ||
+              msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+      int shuffleMergeId = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ?
+          msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId;
+      AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+          new AppAttemptShuffleMergeId(
+              msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+          msg.appId, msg.appAttemptId, msg.shuffleId, shuffleMergeId);

Review Comment:
   Note on the DB updates: we dont need to update the DB if shuffle was 
finalized and merge id is not changing - `mergePartitionsInfo.isFinalized() && 
shuffleMergeId == mergePartitionsInfo.shuffleMergeId`



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    if (appShuffleInfo.attemptId != msg.appAttemptId) {
+      throw new IllegalArgumentException(
+          String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+                  + "with the current attempt id %s stored in shuffle service 
for application %s",
+              msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+    }
+    appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+      boolean deleteCurrent =
+          msg.shuffleMergeId == DELETE_CURRENT_MERGED_SHUFFLE_ID ||
+              msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+      AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+          new AppAttemptShuffleMergeId(
+              msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+          msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
+      if(deleteCurrent) {
+        // request to clean up shuffle we are currently hosting
+        if (!mergePartitionsInfo.isFinalized()) {
+          submitCleanupTask(() -> {
+            closeAndDeleteOutdatedPartitions(
+                currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions);
+            writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+          });
+        } else {
+          submitCleanupTask(() -> {
+            deleteMergedFiles(currentAppAttemptShuffleMergeId,
+                mergePartitionsInfo.getReduceIds());
+            writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+            mergePartitionsInfo.setReduceIds(new int[0]);
+          });
+        }
+      } else if(msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId) {
+        throw new RuntimeException(String.format("Asked to remove old shuffle 
merged data for " +
+                "application %s shuffleId %s shuffleMergeId %s, but current 
shuffleMergeId %s ",
+            msg.appId, msg.shuffleId, msg.shuffleMergeId, 
mergePartitionsInfo.shuffleMergeId));
+      } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
+        // cleanup request for newer shuffle - remove the outdated data we 
have.
+        submitCleanupTask(() -> {
+          closeAndDeleteOutdatedPartitions(
+              currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions);
+          writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);

Review Comment:
   Can you elaborate on what the concurrency issue is ?
   
   The expectation is to keep DB consistent with `appShuffleInfo.shuffles`, so 
that during recovery there is a consistent view for shuffle service. The files 
on disk can be lazily cleared up - subsequent requests wont be able to get to 
them since they are no longer accessible via metadata.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    if (appShuffleInfo.attemptId != msg.appAttemptId) {
+      throw new IllegalArgumentException(
+          String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+                  + "with the current attempt id %s stored in shuffle service 
for application %s",
+              msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+    }
+    appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+      boolean deleteCurrent =
+          msg.shuffleMergeId == DELETE_CURRENT_MERGED_SHUFFLE_ID ||
+              msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+      AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+          new AppAttemptShuffleMergeId(
+              msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+          msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
+      if(deleteCurrent) {
+        // request to clean up shuffle we are currently hosting
+        if (!mergePartitionsInfo.isFinalized()) {
+          submitCleanupTask(() -> {
+            closeAndDeleteOutdatedPartitions(
+                currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions);
+            writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+          });
+        } else {
+          submitCleanupTask(() -> {
+            deleteMergedFiles(currentAppAttemptShuffleMergeId,
+                mergePartitionsInfo.getReduceIds());

Review Comment:
   Resolving conversation, please reopen if there is something additionally 
missing.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -470,6 +527,39 @@ void closeAndDeleteOutdatedPartitions(
       });
   }
 
+  void deleteMergedFiles(AppAttemptShuffleMergeId appAttemptShuffleMergeId, 
int[] reduceIds) {
+    removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);
+    AppShuffleInfo appShuffleInfo = 
validateAndGetAppShuffleInfo(appAttemptShuffleMergeId.appId);
+    int shuffleId = appAttemptShuffleMergeId.shuffleId;
+    int shuffleMergeId = appAttemptShuffleMergeId.shuffleMergeId;
+    for (int reduceId : reduceIds) {
+      try {
+        File dataFile =
+            appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId, 
reduceId);
+        dataFile.delete();
+      } catch (Exception e) {

Review Comment:
   This is a best effort deferred task, which can conflict with concurrent 
cleanup (like app termination for example) - so a number of failures is 
expected.
   For busy clusters, a warn message will quickly overwhelm the logs.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -396,6 +403,67 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    if (appShuffleInfo.attemptId != msg.appAttemptId) {
+      throw new IllegalArgumentException(
+          String.format("The attempt id %s in this RemoveShuffleMerge message 
does not match "
+                  + "with the current attempt id %s stored in shuffle service 
for application %s",
+              msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+    }
+    appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, 
mergePartitionsInfo) -> {
+      if (mergePartitionsInfo == null) {
+        if (msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE) {
+          return null;
+        } else {
+          writeAppAttemptShuffleMergeInfoToDB(new AppAttemptShuffleMergeId(
+              msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId));
+          return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+        }
+      }
+      boolean deleteAllMergedShuffle =
+          msg.shuffleMergeId == DELETE_ALL_MERGED_SHUFFLE ||
+              msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+      int shuffleMergeId = msg.shuffleMergeId != DELETE_ALL_MERGED_SHUFFLE ?
+          msg.shuffleMergeId : mergePartitionsInfo.shuffleMergeId;
+      AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+          new AppAttemptShuffleMergeId(
+              msg.appId, msg.appAttemptId, msg.shuffleId, 
mergePartitionsInfo.shuffleMergeId);
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId = new 
AppAttemptShuffleMergeId(
+          msg.appId, msg.appAttemptId, msg.shuffleId, shuffleMergeId);
+      if(deleteAllMergedShuffle) {
+        // request to clean up shuffle we are currently hosting
+        if (!mergePartitionsInfo.isFinalized()) {
+          submitCleanupTask(() -> {
+            closeAndDeleteOutdatedPartitions(
+                currentAppAttemptShuffleMergeId, 
mergePartitionsInfo.shuffleMergePartitions);
+            writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+          });
+        } else {
+          submitCleanupTask(() -> {
+            deleteMergedFiles(currentAppAttemptShuffleMergeId, appShuffleInfo,
+                mergePartitionsInfo.getReduceIds());
+            writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+            mergePartitionsInfo.setReduceIds(new int[0]);

Review Comment:
   `setReduceIds` is not required as `mergePartitionsInfo` is no longer 
accessible.
   
   Also, same as below, `writeAppAttemptShuffleMergeInfoToDB` should be outside 
the cleanup task.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -702,7 +722,8 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
                 "finalizing shuffle partition {}", msg.appId, 
msg.appAttemptId, msg.shuffleId,
                 msg.shuffleMergeId, partition.reduceId);
           } finally {
-            partition.closeAllFilesAndDeleteIfNeeded(false);
+            Boolean deleteFile = partition.mapTracker.getCardinality() == 0;
+            partition.closeAllFilesAndDeleteIfNeeded(deleteFile);

Review Comment:
   Please revert this for the time being, and revisit it as a follow up as 
required.
   There are a number of edge cases which needs to be handled in this important 
PR, and I dont want to add to them with optional changes with minor utility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to