[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2022-07-15 Thread GitBox


mridulm commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r922587713


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -656,6 +781,236 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
 }
   }
 
+  /**
+   * Close the DB during shutdown
+   */
+  @Override
+  public void close() {
+if (db != null) {
+  try {
+db.close();
+  } catch (IOException e) {
+logger.error("Exception closing leveldb with registered app paths info 
and "
++ "shuffle partition info", e);
+  }
+}
+  }
+
+  /**
+   * Write the application attempt's local path information to the DB
+   */
+  private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo 
appPathsInfo) {
+if (db != null) {
+  AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId);
+  try {
+byte[] key = getDbAppAttemptPathsKey(appAttemptId);
+String valueStr = mapper.writeValueAsString(appPathsInfo);
+byte[] value = valueStr.getBytes(StandardCharsets.UTF_8);
+db.put(key, value);
+  } catch (Exception e) {
+logger.error("Error saving registered app paths info for {}", 
appAttemptId, e);
+  }
+}
+  }
+
+  /**
+   * Write the finalized shuffle merge partition information into the DB
+   */
+  private void writeAppAttemptShuffleMergeInfoToDB(
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
+if (db != null) {
+  // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
+  try{
+byte[] dbKey = 
getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId);
+db.put(dbKey, new byte[0]);
+  } catch (Exception e) {
+logger.error("Error saving active app shuffle partition {}", 
appAttemptShuffleMergeId, e);
+  }
+}
+  }
+
+  /**
+   * Parse the DB key with the prefix and the expected return value type
+   */
+  private  T parseDbKey(String key, String prefix, Class valueType) 
throws IOException {
+String json = key.substring(prefix.length() + 1);
+return mapper.readValue(json, valueType);
+  }
+
+  /**
+   * Generate AppAttemptId from the DB key
+   */
+  private AppAttemptId parseDbAppAttemptPathsKey(String key) throws 
IOException {
+return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
+  }
+
+  /**
+   * Generate AppAttemptShuffleMergeId from the DB key
+   */
+  private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(
+  String key) throws IOException {
+return parseDbKey(
+key, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX, 
AppAttemptShuffleMergeId.class);
+  }
+
+  /**
+   * Generate the DB key with the key object and the specified string prefix
+   */
+  private byte[] getDbKey(Object key, String prefix) throws IOException {
+// We add a common prefix on all the keys so we can find them in the DB
+String keyJsonString = prefix + DB_KEY_DELIMITER + 
mapper.writeValueAsString(key);
+return keyJsonString.getBytes(StandardCharsets.UTF_8);
+  }
+
+  /**
+   * Generate the DB key from AppAttemptShuffleMergeId object
+   */
+  private byte[] getDbAppAttemptShufflePartitionKey(
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId) throws IOException {
+return getDbKey(appAttemptShuffleMergeId, 
APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+  }
+
+  /**
+   * Generate the DB key from AppAttemptId object
+   */
+  private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId) throws 
IOException {
+return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX);
+  }
+
+  /**
+   * Reload the DB to recover the meta data stored in the hashmap for merged 
shuffles.
+   * The application attempts local paths information will be firstly 
reloaded, and then
+   * the finalized shuffle merges will be updated.
+   * This method will also try deleting dangling key/values in DB, which 
includes:
+   * 1) Outdated application attempt local paths information as of some DB 
deletion failures
+   * 2) The deletion of finalized shuffle merges are triggered asynchronously, 
there can be cases
+   * that deletions miss the execution during restart. These finalized shuffle 
merges should have
+   * no relevant application attempts local paths information registered in 
the DB and the hashmap.
+   */
+  @VisibleForTesting
+  void reloadAndCleanUpAppShuffleInfo(DB db) throws IOException {
+logger.info("Reload applications merged shuffle information from DB");
+List dbKeysToBeRemoved = new ArrayList<>();
+dbKeysToBeRemoved.addAll(reloadActiveAppAttemptsPathInfo(db));
+dbKeysToBeRemoved.addAll(reloadFinalizedAppAttemptsShuffleMergeInfo(db));
+// Clean up invalid data stored in DB
+submitCleanupTask(() ->
+dbKeysToBeRemoved.forEach(
+(key) -> {
+  try {
+if (logger.isDebugEnabled()) {
+  

[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2022-07-11 Thread GitBox


mridulm commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r918226879


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -343,15 +397,44 @@ void closeAndDeletePartitionFilesIfNeeded(
 if (cleanupLocalDirs) {
   deleteExecutorDirs(appShuffleInfo);
 }
+removeAppShuffleInfoFromDB(appShuffleInfo);
+  }
+
+  /**
+   * Remove the application attempt local paths information from the DB.
+   */
+  private void removeAppAttemptPathInfoFromDB(String appId, int attemptId) 
throws Exception{

Review Comment:
   Can you add a comment to both `removeAppAttemptPathInfoFromDB` and 
`writeNewAppAttemptPathInfoToDBAndRemoveOutdated` that they are expected to be 
invoked with the `appsShuffleInfo` lock help for `appId` ?
   (Also, group them together in the source file)



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -656,6 +768,232 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
 }
   }
 
+  /**
+   * Remove the former application attempt local paths information from the DB 
and insert the
+   * local paths information from the newer application attempt. If the 
deletion fails, the
+   * insertion will also be skipped. This ensures that there will always be a 
single application
+   * attempt local path information in the DB.
+   */
+  private void writeNewAppAttemptPathInfoToDBAndRemoveOutdated(
+  String appId,
+  int newAttemptId,
+  AppShuffleInfo appShuffleInfo,
+  AppPathsInfo appPathsInfo) {
+try{
+  if (appShuffleInfo != null) {
+removeAppAttemptPathInfoFromDB(appId, appShuffleInfo.attemptId);
+  }
+  writeAppPathsInfoToDb(appId, newAttemptId, appPathsInfo);

Review Comment:
   Thoughts on making the exception handling local to the DB invocation (since 
we dont need to handle cross DB invocation failures for now) ? We are already 
doing this for `removeAppShufflePartitionInfoFromDB`, 
`writeAppAttemptShuffleMergeInfoToDB`, etc.
   
   Given this, let us make the Exception handling local to 
`removeAppAttemptPathInfoFromDB` (so it does not throw an `Exception`)



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -632,6 +736,12 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
   appsShuffleInfo.compute(appId, (id, appShuffleInfo) -> {
 if (appShuffleInfo == null || attemptId > 
appShuffleInfo.attemptId) {
   originalAppShuffleInfo.set(appShuffleInfo);
+  AppPathsInfo appPathsInfo = new AppPathsInfo(appId, 
executorInfo.localDirs,
+  mergeDir, executorInfo.subDirsPerLocalDir);
+  // Clean up the outdated App Attempt local path info in the DB 
and
+  // put the newly registered local path info from newer attempt 
into the DB.
+  writeNewAppAttemptPathInfoToDBAndRemoveOutdated(

Review Comment:
   nit: Do we need this method ? (the comment is helpful, just the method)
   We can simply do 
   ```
   if (null != appShuffleInfo) removeAppAttemptPathInfoFromDB()
   writeAppPathsInfoToDb()
   ```



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -343,15 +397,44 @@ void closeAndDeletePartitionFilesIfNeeded(
 if (cleanupLocalDirs) {
   deleteExecutorDirs(appShuffleInfo);
 }
+removeAppShuffleInfoFromDB(appShuffleInfo);
+  }
+
+  /**
+   * Remove the application attempt local paths information from the DB.
+   */
+  private void removeAppAttemptPathInfoFromDB(String appId, int attemptId) 
throws Exception{
+AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId);
+if (db != null) {
+  db.delete(getDbAppAttemptPathsKey(appAttemptId));

Review Comment:
   Given the possibility of stale entries from previous failed deletes hanging 
around, can we scan for all entries for an application id and delete them here ?
   
   Note that this will require changes to how we encode db key for 
`AppAttemptId` - we cannot use json for it, since we want to do a prefix scan.
   Something like:
   ```
 private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId) throws 
IOException {
   return (APP_ATTEMPT_PATH_KEY_PREFIX + DB_KEY_DELIMITER + 
appAttemptId.appId + DB_KEY_DELIMITER + 
appAttemptId.attemptId).getBytes(StandardCharsets.UTF_8);
 }
   
 private byte[] getDbAppAttemptPathsKeyPrefix(String appId) throws 
IOException {
   return (APP_ATTEMPT_PATH_KEY_PREFIX + DB_KEY_DELIMITER + 
appAttemptId.appId + DB_KEY_DELIMITER).getBytes(StandardCharsets.UTF_8);
 }
   
 private AppAttemptId parseDbAppAttemptPathsKey(String value) throws 
IOException {
   String[] parts = key.split(DB_KEY_DELIMITER);
   if 

[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2022-07-11 Thread GitBox


mridulm commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r918184780


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -317,22 +353,24 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 logger.info("Application {} removed, cleanupLocalDirs = {}", appId, 
cleanupLocalDirs);
 AppShuffleInfo appShuffleInfo = appsShuffleInfo.remove(appId);
 if (null != appShuffleInfo) {
-  mergedShuffleCleaner.execute(
-() -> closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, 
cleanupLocalDirs));
+  submitCleanupTask(
+() -> closeAndDeletePartitions(appShuffleInfo, cleanupLocalDirs, 
true));
 }
+removeAppAttemptPathInfoFromDB(
+new AppAttemptId(appShuffleInfo.appId, appShuffleInfo.attemptId));
   }
 
-
   /**
* Clean up the AppShufflePartitionInfo for a specific AppShuffleInfo.
* If cleanupLocalDirs is true, the merged shuffle files will also be 
deleted.
* The cleanup will be executed in a separate thread.
*/
   @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
   @VisibleForTesting
-  void closeAndDeletePartitionFilesIfNeeded(
+  void closeAndDeletePartitions(
   AppShuffleInfo appShuffleInfo,
-  boolean cleanupLocalDirs) {
+  boolean cleanupLocalDirs,
+  boolean removeFromDb) {

Review Comment:
   This is not exposed api, so let us remove the flag for now, and add it when 
required.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2022-06-12 Thread GitBox


mridulm commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r895292260


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -317,22 +353,24 @@ public void applicationRemoved(String appId, boolean 
cleanupLocalDirs) {
 logger.info("Application {} removed, cleanupLocalDirs = {}", appId, 
cleanupLocalDirs);
 AppShuffleInfo appShuffleInfo = appsShuffleInfo.remove(appId);
 if (null != appShuffleInfo) {
-  mergedShuffleCleaner.execute(
-() -> closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, 
cleanupLocalDirs));
+  submitCleanupTask(
+() -> closeAndDeletePartitions(appShuffleInfo, cleanupLocalDirs, 
true));
 }
+removeAppAttemptPathInfoFromDB(
+new AppAttemptId(appShuffleInfo.appId, appShuffleInfo.attemptId));

Review Comment:
   We are relying on DB being consistent with `appsShuffleInfo` map when 
reloading.
   Given this, we should move the removal of the DB entry  to be atomic w.r.t 
`appsShuffleInfo.remove`.
   
   Something like:
   ```
   AtomicReference ref = new AtomicReference<>(null);
   appsShuffleInfo.compute( appId, (id, info) -> {
   if (null != info) {
 removeAppAttemptPathInfoFromDB(new AppAttemptId(info.appId, 
info.attemptId));
 ref.set(info);
   }
   // always remove
   return null;
 } );
   AppShuffleInfo appShuffleInfo = ref.get
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2022-06-12 Thread GitBox


mridulm commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r895282920


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -350,15 +415,27 @@ void closeAndDeletePartitionFilesIfNeeded(
* up older shuffleMergeId partitions. The cleanup will be executed in a 
separate thread.
*/
   @VisibleForTesting
-  void closeAndDeletePartitionFiles(Map 
partitions) {
+  void closeAndDeleteOutdatedPartitions(Map 
partitions) {
 partitions
   .forEach((partitionId, partitionInfo) -> {
 synchronized (partitionInfo) {
   partitionInfo.closeAllFilesAndDeleteIfNeeded(true);
+  
removeAppShufflePartitionInfoFromDB(partitionInfo.appAttemptShuffleMergeId);

Review Comment:
   Same as with comment in `removeAppShuffleInfoFromDB`. Does this need to be 
within sync block ?
   
   Also, we would have a large number of partitionInfo for a given shuffle 
merge id - we are repeatedly trying to delete the same shuffle merge id here.
   
   Assuming we dont need the delete to be in sync block, we can pull all shuffe 
merge id's in `partitions` and delete them.
   
   For example:
   
   ```
   Set mergeIdsToRemove = new HashSet<>();
   
   partitions.forEach ... {
 mergeIdsToRemove.add(partitionInfo.appAttemptShuffleMergeId);
 synchronized (partitionInfo) {
   partitionInfo.closeAllFilesAndDeleteIfNeeded(true)
 }
   }
   
   mergeIdsToRemove.forEach(this::removeAppShufflePartitionInfoFromDB);
   ```
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2022-06-12 Thread GitBox


mridulm commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r895282269


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -342,6 +380,33 @@ void closeAndDeletePartitionFilesIfNeeded(
 if (cleanupLocalDirs) {
   deleteExecutorDirs(appShuffleInfo);
 }
+if (removeFromDb){
+  removeAppShuffleInfoFromDB(appShuffleInfo);
+}
+  }
+
+  private void removeAppAttemptPathInfoFromDB(AppAttemptId appAttemptId) {
+if (db != null) {
+  try {
+db.delete(getDbAppAttemptPathsKey(appAttemptId));
+  } catch (Exception e) {
+logger.error("Error deleting {} from application paths info in DB", 
appAttemptId, e);
+  }
+}
+  }
+
+  private void removeAppShuffleInfoFromDB(AppShuffleInfo appShuffleInfo) {
+if (db != null) {
+  appShuffleInfo.shuffles
+.forEach((shuffleId, shuffleInfo) -> shuffleInfo.shuffleMergePartitions
+  .forEach((shuffleMergeId, partitionInfo) -> {
+synchronized (partitionInfo) {
+  removeAppShufflePartitionInfoFromDB(
+new AppAttemptShuffleMergeId(
+  appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId, 
shuffleMergeId));
+}

Review Comment:
   QQ: Why does this have to be within the `synchronized` block ?
   
   Currently, deletes to `appAttemptShufflePartition` are happening within the 
sync block - while addition is not.
   Any particular reason for delete to be within synch lock ?



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -655,6 +742,206 @@ public void registerExecutor(String appId, 
ExecutorShuffleInfo executorInfo) {
 }
   }
 
+
+  @Override
+  public void close() {
+if (db != null) {
+  try {
+db.close();
+  } catch (IOException e) {
+logger.error("Exception closing leveldb with registered app paths info 
and "
++ "shuffle partition info", e);
+  }
+}
+  }
+
+  private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo 
appPathsInfo) {
+if (db != null) {
+  try {
+byte[] key = getDbAppAttemptPathsKey(new AppAttemptId(appId, 
attemptId));
+String valueStr = mapper.writeValueAsString(appPathsInfo);
+byte[] value = valueStr.getBytes(StandardCharsets.UTF_8);
+db.put(key, value);
+  } catch (Exception e) {
+logger.error("Error saving registered app paths info", e);
+  }
+}
+  }
+
+  private void writeAppAttemptShuffleMergeInfoToDB(
+  String appId,
+  int appAttemptId,
+  int shuffleId,
+  int shuffleMergeId) {
+if (db != null) {
+  // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
+  try{
+byte[] dbKey = getDbAppAttemptShufflePartitionKey(
+new AppAttemptShuffleMergeId(appId, appAttemptId, shuffleId, 
shuffleMergeId));
+db.put(dbKey, new byte[0]);
+  } catch (Exception e) {
+logger.error("Error saving active app shuffle partition", e);
+  }
+}
+
+  }
+
+  private  T parseDbKey(String key, String prefix, Class valueType) {
+try {
+  String json = key.substring(prefix.length() + 1);
+  return mapper.readValue(json, valueType);
+} catch (Exception exception) {
+  logger.error("Exception while parsing the DB key {}", key);
+  return null;
+}
+  }
+
+  private AppPathsInfo parseDBAppAttemptPathsValue(byte[] value, AppAttemptId 
appAttemptId) {
+try {
+  return mapper.readValue(value, AppPathsInfo.class);
+} catch (Exception exception) {
+  logger.error("Exception while parsing the DB value for {}", 
appAttemptId);
+  return null;
+}
+  }
+
+  private AppAttemptId parseDbAppAttemptPathsKey(String key) {
+return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
+  }
+
+  private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(
+  String key,
+  String prefix) {
+return parseDbKey(key, prefix, AppAttemptShuffleMergeId.class);
+  }
+
+  private byte[] getDbKey(Object key, String prefix) {
+// We add a common prefix on all the keys so we can find them in the DB
+try {
+  String keyJsonString = prefix + DB_KEY_DELIMITER + 
mapper.writeValueAsString(key);
+  return keyJsonString.getBytes(StandardCharsets.UTF_8);
+} catch (Exception exception) {
+  logger.error("Exception while generating the DB key {}", key);
+  return null;
+}
+  }
+
+  private byte[] getDbAppAttemptShufflePartitionKey(
+  AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
+return getDbKey(appAttemptShuffleMergeId, 
APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+  }
+
+  private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId){
+return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX);
+  }
+
+  @VisibleForTesting
+  

[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2022-06-08 Thread GitBox


mridulm commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r893022070


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -576,6 +661,7 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
   } finally {
 partition.closeAllFilesAndDeleteIfNeeded(false);
   }
+  
cleanUpAppShufflePartitionInfoInDB(partition.appAttemptShuffleMergeId);

Review Comment:
   If we had a test for this, curious why it was not surfacing this issue as a 
test failure. Perhaps something needs to be augmented in the test ? Or some 
other bug ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2022-06-08 Thread GitBox


mridulm commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r893021304


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -992,6 +1233,45 @@ AppShufflePartitionInfo getPartitionInfo() {
 }
   }
 
+  /**
+   * Simply encodes an application attempt ID.
+   */
+  public static class AppAttemptId {

Review Comment:
   Yes, `equals` would be required.
   I was suggesting to make sure they are all following similar pattern, and go 
from cheaper to more expensive checks.
   Also, to have `hashCode` if we are overriding `equals`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2022-05-31 Thread GitBox


mridulm commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r886252782


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -536,9 +645,20 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
 }
   }
   // Even when the mergePartitionsInfo is null, we mark the shuffle as 
finalized but the results
-  // sent to the driver will be empty. This cam happen when the service 
didn't receive any
+  // sent to the driver will be empty. This can happen when the service 
didn't receive any
   // blocks for the shuffle yet and the driver didn't wait for enough time 
to finalize the
   // shuffle.
+  if (db != null) {

Review Comment:
   There are 4 cases here:
   
   1. When `mergePartitionsInfo == null`.
  1. ESS never received a push for this mergeId, but we want to prevent 
future pushes, and so add a marker entry.
  2. In this case, makes sense to add the entry to level db as well.
   2. When `mergePartitionsInfo != null`, we have three cases:
  1. The first condition, in the `if`, results in exception - so that does 
not hit this case.
  2. The second is when `msg.shuffleMergeId > 
mergePartitionsInfo.shuffleMergeId`.
 1. We are scheduling a cleanup in this case - so all keys are going to 
get deleted.
  3. The happy path - we do want the entry to be added.
   
   On further thought, 2.2 is the issue above.
   Should we be doing `cleanUpAppShufflePartitionInfoInDB` in 
`closeAndDeletePartitionFiles` ?
   This is removing the finalization marker from the level db - which will 
continue to exist in the map.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

2022-05-27 Thread GitBox


mridulm commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r883744010


##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -342,6 +389,29 @@ void closeAndDeletePartitionFilesIfNeeded(
 if (cleanupLocalDirs) {
   deleteExecutorDirs(appShuffleInfo);
 }
+cleanUpAppShuffleInfoInDB(appShuffleInfo);

Review Comment:
   This method is called async to application completion - and can be delayed.
   In case there is a NM shutdown, some of these wont be cleaned up. (`close` 
will close the db, and so all subsequent deletes will fail)
   
   Do we want to delete the app attempt paths immediately, and do the shuffle 
deletes async (along with path deletes like here) ?
   On reload time, if shuffle info is present for missing attempts paths, we 
can remove those from the db.
   
   Thoughts ?



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -209,9 +246,16 @@ private AppShufflePartitionInfo 
getOrCreateAppShufflePartitionInfo(
 appShuffleInfo.getMergedShuffleIndexFilePath(shuffleId, 
shuffleMergeId, reduceId));
   File metaFile =
 appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, 
reduceId);
+  // Make sure unuseful non-finalized merged data/index/meta files get 
cleaned up
+  // during service restart
+  if (dataFile.exists()) dataFile.delete();
+  if (indexFile.exists()) indexFile.delete();
+  if (metaFile.exists()) metaFile.delete();

Review Comment:
   We are immediately opening all these files with append = false, why do we 
need to delete them ?
   Note: if we are removing the `delete()` here, do add a comment in 
`MergeShuffleFile`, etc at `FileOutputStream` creation that it must have append 
= false



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -342,6 +389,29 @@ void closeAndDeletePartitionFilesIfNeeded(
 if (cleanupLocalDirs) {
   deleteExecutorDirs(appShuffleInfo);
 }
+cleanUpAppShuffleInfoInDB(appShuffleInfo);
+  }
+
+  private void cleanUpAppShuffleInfoInDB(AppShuffleInfo appShuffleInfo) {
+if (db != null) {
+  try {
+db.delete(
+  getDbAppAttemptPathsKey(
+new AppAttemptId(appShuffleInfo.appId, appShuffleInfo.attemptId)));
+appShuffleInfo.shuffles
+  .forEach((shuffleId, shuffleInfo) -> 
shuffleInfo.shuffleMergePartitions
+.forEach((shuffleMergeId, partitionInfo) -> {
+  synchronized (partitionInfo) {
+cleanUpAppShufflePartitionInfoInDB(
+  new AppAttemptShuffleMergeId(
+appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId, 
shuffleMergeId));
+  }
+}));
+  } catch (Exception e) {
+logger.error("Error deleting {}_{} from application paths info db",
+  appShuffleInfo.appId, appShuffleInfo.attemptId, e);

Review Comment:
   This failure could be for either application paths info or for shuffle 
partitions. Update message ?



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -709,9 +948,9 @@ public ByteBuffer getCompletionResponse() {
  */
 private void writeBuf(ByteBuffer buf) throws IOException {

Review Comment:
   revert changes to this method ?



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -565,8 +650,8 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
   sizes.add(partition.getLastChunkOffset());
   logger.debug("{} attempt {} shuffle {} shuffleMerge {}: 
finalization results " +
   "added for partition {} data size {} index size {} meta size 
{}",
-  msg.appId, msg.appAttemptId, msg.shuffleId,
-  msg.shuffleMergeId, partition.reduceId, 
partition.getLastChunkOffset(),
+  msg.appId, msg.appAttemptId, msg.shuffleId, 
msg.shuffleMergeId,
+  partition.reduceId, partition.getLastChunkOffset(),

Review Comment:
   revert ?



##
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##
@@ -576,6 +661,7 @@ public MergeStatuses 
finalizeShuffleMerge(FinalizeShuffleMerge msg) {
   } finally {
 partition.closeAllFilesAndDeleteIfNeeded(false);
   }
+  
cleanUpAppShufflePartitionInfoInDB(partition.appAttemptShuffleMergeId);

Review Comment:
   Why are we doing this ? We just inserted it in the `compute` above ?
   We should have a test which validates that after successful finalization, 
the db contains the partition info.



##