mridulm commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r883744010


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -342,6 +389,29 @@ void closeAndDeletePartitionFilesIfNeeded(
     if (cleanupLocalDirs) {
       deleteExecutorDirs(appShuffleInfo);
     }
+    cleanUpAppShuffleInfoInDB(appShuffleInfo);

Review Comment:
   This method is called async to application completion - and can be delayed.
   In case there is a NM shutdown, some of these wont be cleaned up. (`close` 
will close the db, and so all subsequent deletes will fail)
   
   Do we want to delete the app attempt paths immediately, and do the shuffle 
deletes async (along with path deletes like here) ?
   On reload time, if shuffle info is present for missing attempts paths, we 
can remove those from the db.
   
   Thoughts ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -209,9 +246,16 @@ private AppShufflePartitionInfo 
getOrCreateAppShufflePartitionInfo(
         appShuffleInfo.getMergedShuffleIndexFilePath(shuffleId, 
shuffleMergeId, reduceId));
       File metaFile =
         appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, 
reduceId);
+      // Make sure unuseful non-finalized merged data/index/meta files get 
cleaned up
+      // during service restart
+      if (dataFile.exists()) dataFile.delete();
+      if (indexFile.exists()) indexFile.delete();
+      if (metaFile.exists()) metaFile.delete();

Review Comment:
   We are immediately opening all these files with append = false, why do we 
need to delete them ?
   Note: if we are removing the `delete()` here, do add a comment in 
`MergeShuffleFile`, etc at `FileOutputStream` creation that it must have append 
= false



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -342,6 +389,29 @@ void closeAndDeletePartitionFilesIfNeeded(
     if (cleanupLocalDirs) {
       deleteExecutorDirs(appShuffleInfo);
     }
+    cleanUpAppShuffleInfoInDB(appShuffleInfo);
+  }
+
+  private void cleanUpAppShuffleInfoInDB(AppShuffleInfo appShuffleInfo) {
+    if (db != null) {
+      try {
+        db.delete(
+          getDbAppAttemptPathsKey(
+            new AppAttemptId(appShuffleInfo.appId, appShuffleInfo.attemptId)));
+        appShuffleInfo.shuffles
+          .forEach((shuffleId, shuffleInfo) -> 
shuffleInfo.shuffleMergePartitions
+            .forEach((shuffleMergeId, partitionInfo) -> {
+              synchronized (partitionInfo) {
+                cleanUpAppShufflePartitionInfoInDB(
+                  new AppAttemptShuffleMergeId(
+                    appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId, 
shuffleMergeId));
+              }
+            }));
+      } catch (Exception e) {
+        logger.error("Error deleting {}_{} from application paths info db",
+          appShuffleInfo.appId, appShuffleInfo.attemptId, e);

Review Comment:
   This failure could be for either application paths info or for shuffle 
partitions. Update message ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -709,9 +948,9 @@ public ByteBuffer getCompletionResponse() {
      */
     private void writeBuf(ByteBuffer buf) throws IOException {

Review Comment:
   revert changes to this method ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -565,8 +650,8 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
               sizes.add(partition.getLastChunkOffset());
               logger.debug("{} attempt {} shuffle {} shuffleMerge {}: 
finalization results " +
                   "added for partition {} data size {} index size {} meta size 
{}",
-                  msg.appId, msg.appAttemptId, msg.shuffleId,
-                  msg.shuffleMergeId, partition.reduceId, 
partition.getLastChunkOffset(),
+                  msg.appId, msg.appAttemptId, msg.shuffleId, 
msg.shuffleMergeId,
+                  partition.reduceId, partition.getLastChunkOffset(),

Review Comment:
   revert ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -576,6 +661,7 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
           } finally {
             partition.closeAllFilesAndDeleteIfNeeded(false);
           }
+          
cleanUpAppShufflePartitionInfoInDB(partition.appAttemptShuffleMergeId);

Review Comment:
   Why are we doing this ? We just inserted it in the `compute` above ?
   We should have a test which validates that after successful finalization, 
the db contains the partition info.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -614,12 +700,12 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
         if (attemptId == UNDEFINED_ATTEMPT_ID) {
           // When attemptId is -1, there is no attemptId stored in the 
ExecutorShuffleInfo.
           // Only the first ExecutorRegister message can register the merge 
dirs
-          appsShuffleInfo.computeIfAbsent(appId, id ->
-            new AppShuffleInfo(
-              appId, UNDEFINED_ATTEMPT_ID,
-              new AppPathsInfo(appId, executorInfo.localDirs,
-                mergeDir, executorInfo.subDirsPerLocalDir)
-            ));
+          appsShuffleInfo.computeIfAbsent(appId, id -> {
+            AppPathsInfo appPathsInfo = new AppPathsInfo(appId, attemptId, 
executorInfo.localDirs,
+                mergeDir, executorInfo.subDirsPerLocalDir);
+            writeAppPathsInfoToDb(appId, attemptId, appPathsInfo);
+            return new AppShuffleInfo(appId, UNDEFINED_ATTEMPT_ID, 
appPathsInfo);

Review Comment:
   nit: either use `UNDEFINED_ATTEMPT_ID` or `attemptId` in both cases.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -928,7 +1169,7 @@ public void onComplete(String streamId) throws IOException 
{
               throw ioe;
             }
           }
-          long updatedPos = partitionInfo.getDataFilePos() + length;
+          long updatedPos = partitionInfo.dataFilePos + length;
           boolean indexUpdated = false;

Review Comment:
   revert



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -992,6 +1233,45 @@ AppShufflePartitionInfo getPartitionInfo() {
     }
   }
 
+  /**
+   * Simply encodes an application attempt ID.
+   */
+  public static class AppAttemptId {

Review Comment:
   For the various json beans we have `AppAttemptId`, 
`AppAttemptShuffleMergeId`, `AppPathsInfo` - can we make sure the `equals`, 
`toString` and `hashCode` follow similar patterns ?
   
   For example, `AppAttemptId`/`AppPathsInfo` is allowing for subclasses to be 
passed in, while `AppAttemptShuffleMergeId` check if the arg is specifically of 
the same class.
   
   Also, in `equals`, go from cheapest check to more expensive ones 
progressively.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to