[ 
https://issues.apache.org/jira/browse/MAPREDUCE-7474?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17837694#comment-17837694
 ] 

ASF GitHub Bot commented on MAPREDUCE-7474:
-------------------------------------------

anmolanmol1234 commented on code in PR #6716:
URL: https://github.com/apache/hadoop/pull/6716#discussion_r1567132937


##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/ManifestCommitterConstants.java:
##########
@@ -143,6 +145,20 @@ public final class ManifestCommitterConstants {
    */
   public static final boolean OPT_CLEANUP_PARALLEL_DELETE_DIRS_DEFAULT = true;
 
+  /**
+   * Should parallel cleanup try to delete teh base first?

Review Comment:
   typo: the



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java:
##########
@@ -142,64 +154,93 @@ protected Result executeStage(
     }
 
     Outcome outcome = null;
-    IOException exception;
+    IOException exception = null;
+    boolean baseDirDeleted = false;
 
 
     // to delete.
     LOG.info("{}: Deleting job directory {}", getName(), baseDir);
 
     if (args.deleteTaskAttemptDirsInParallel) {
-      // Attempt to do a parallel delete of task attempt dirs;
-      // don't overreact if a delete fails, but stop trying
-      // to delete the others, and fall back to deleting the
-      // job dir.
-      Path taskSubDir
-          = getStageConfig().getJobAttemptTaskSubDir();
-      try (DurationInfo info = new DurationInfo(LOG,
-          "parallel deletion of task attempts in %s",
-          taskSubDir)) {
-        RemoteIterator<FileStatus> dirs =
-            RemoteIterators.filteringRemoteIterator(
-                listStatusIterator(taskSubDir),
-                FileStatus::isDirectory);
-        TaskPool.foreach(dirs)
-            .executeWith(getIOProcessors())
-            .stopOnFailure()
-            .suppressExceptions(false)
-            .run(this::rmTaskAttemptDir);
-        getIOStatistics().aggregate((retrieveIOStatistics(dirs)));
-
-        if (getLastDeleteException() != null) {
-          // one of the task attempts failed.
-          throw getLastDeleteException();
+
+      // parallel delete of task attempt dirs.
+
+      if (args.parallelDeleteAttemptBaseDeleteFirst) {
+        // attempt to delete the base dir first.
+        // This can reduce ABFS delete load but may time out
+        // (which the fallback to parallel delete will handle).
+        // on GCS it is slow.
+        try (DurationInfo info = new DurationInfo(LOG, true,
+            "Initial delete of %s", baseDir)) {
+          exception = deleteOneDir(baseDir);
+          if (exception == null) {
+            // success: record this as the outcome, which
+            // will skip the parallel delete.
+            outcome = Outcome.DELETED;
+            baseDirDeleted = true;
+          } else {
+            // failure: log and continue
+            LOG.warn("{}: Exception on initial attempt at deleting base dir 
{}\n"
+                    + "attempting parallel delete",
+                getName(), baseDir, exception);
+          }
+        }
+      }
+      if (!baseDirDeleted) {

Review Comment:
   Do we need to change the state of this variable here because if not it will 
go to the next if block



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java:
##########
@@ -142,64 +154,93 @@ protected Result executeStage(
     }
 
     Outcome outcome = null;
-    IOException exception;
+    IOException exception = null;
+    boolean baseDirDeleted = false;
 
 
     // to delete.
     LOG.info("{}: Deleting job directory {}", getName(), baseDir);
 
     if (args.deleteTaskAttemptDirsInParallel) {
-      // Attempt to do a parallel delete of task attempt dirs;
-      // don't overreact if a delete fails, but stop trying
-      // to delete the others, and fall back to deleting the
-      // job dir.
-      Path taskSubDir
-          = getStageConfig().getJobAttemptTaskSubDir();
-      try (DurationInfo info = new DurationInfo(LOG,
-          "parallel deletion of task attempts in %s",
-          taskSubDir)) {
-        RemoteIterator<FileStatus> dirs =
-            RemoteIterators.filteringRemoteIterator(
-                listStatusIterator(taskSubDir),
-                FileStatus::isDirectory);
-        TaskPool.foreach(dirs)
-            .executeWith(getIOProcessors())
-            .stopOnFailure()
-            .suppressExceptions(false)
-            .run(this::rmTaskAttemptDir);
-        getIOStatistics().aggregate((retrieveIOStatistics(dirs)));
-
-        if (getLastDeleteException() != null) {
-          // one of the task attempts failed.
-          throw getLastDeleteException();
+
+      // parallel delete of task attempt dirs.
+
+      if (args.parallelDeleteAttemptBaseDeleteFirst) {
+        // attempt to delete the base dir first.
+        // This can reduce ABFS delete load but may time out
+        // (which the fallback to parallel delete will handle).
+        // on GCS it is slow.
+        try (DurationInfo info = new DurationInfo(LOG, true,
+            "Initial delete of %s", baseDir)) {
+          exception = deleteOneDir(baseDir);
+          if (exception == null) {
+            // success: record this as the outcome, which
+            // will skip the parallel delete.
+            outcome = Outcome.DELETED;
+            baseDirDeleted = true;
+          } else {
+            // failure: log and continue
+            LOG.warn("{}: Exception on initial attempt at deleting base dir 
{}\n"
+                    + "attempting parallel delete",
+                getName(), baseDir, exception);
+          }
+        }
+      }
+      if (!baseDirDeleted) {
+        // no base delete attempted or it failed.
+        // Attempt to do a parallel delete of task attempt dirs;
+        // don't overreact if a delete fails, but stop trying
+        // to delete the others, and fall back to deleting the
+        // job dir.
+        Path taskSubDir
+            = getStageConfig().getJobAttemptTaskSubDir();
+        try (DurationInfo info = new DurationInfo(LOG, true,
+            "parallel deletion of task attempts in %s",
+            taskSubDir)) {
+          RemoteIterator<FileStatus> dirs =
+              RemoteIterators.filteringRemoteIterator(
+                  listStatusIterator(taskSubDir),
+                  FileStatus::isDirectory);
+          TaskPool.foreach(dirs)
+              .executeWith(getIOProcessors())
+              .stopOnFailure()
+              .suppressExceptions(false)
+              .run(this::rmTaskAttemptDir);
+          getIOStatistics().aggregate((retrieveIOStatistics(dirs)));
+
+          if (getLastDeleteException() != null) {
+            // one of the task attempts failed.
+            throw getLastDeleteException();

Review Comment:
   what is the recovery mechanism here ? If one of the task attempts fail with 
an exception, do we intend to retry that delete operation or return failure ?
   



##########
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/AbstractJobOrTaskStage.java:
##########
@@ -582,19 +607,64 @@ protected final Path directoryMustExist(
    * Save a task manifest or summary. This will be done by
    * writing to a temp path and then renaming.
    * If the destination path exists: Delete it.
+   * This will retry so that a rename failure from abfs load or IO errors
+   * will not fail the task.
    * @param manifestData the manifest/success file
    * @param tempPath temp path for the initial save
    * @param finalPath final path for rename.
-   * @throws IOException failure to load/parse
+   * @throws IOException failure to rename after retries.
    */
   @SuppressWarnings("unchecked")
   protected final <T extends AbstractManifestData> void save(T manifestData,
       final Path tempPath,
       final Path finalPath) throws IOException {
-    LOG.trace("{}: save('{}, {}, {}')", getName(), manifestData, tempPath, 
finalPath);
-    trackDurationOfInvocation(getIOStatistics(), OP_SAVE_TASK_MANIFEST, () ->
-        operations.save(manifestData, tempPath, true));
-    renameFile(tempPath, finalPath);
+
+    int retryCount = 0;
+    RetryPolicy retryPolicy = retryUpToMaximumCountWithProportionalSleep(
+        getStageConfig().getManifestSaveAttempts(),
+        SAVE_SLEEP_INTERVAL,
+        TimeUnit.MILLISECONDS);
+    boolean success = false;
+    while (!success) {
+      try {
+        LOG.info("{}: save manifest to {} then rename as {}'); retry count={}",
+            getName(), tempPath, finalPath, retryCount);
+
+        trackDurationOfInvocation(getIOStatistics(), OP_SAVE_TASK_MANIFEST, () 
->
+            operations.save(manifestData, tempPath, true));

Review Comment:
   Also if rename failed in the first attempt but succeeded in the backend, 
will the save operation on tmpPath fail with an error and if yes how to recover 
from that 
   ?





> [ABFS] Improve commit resilience and performance in Manifest Committer
> ----------------------------------------------------------------------
>
>                 Key: MAPREDUCE-7474
>                 URL: https://issues.apache.org/jira/browse/MAPREDUCE-7474
>             Project: Hadoop Map/Reduce
>          Issue Type: Bug
>          Components: client
>    Affects Versions: 3.4.0, 3.3.6
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>
> * Manifest committer is not resilient to rename failures on task commit 
> without HADOOP-18012 rename recovery enabled. 
> * large burst of delete calls noted: are they needed?
> relates to HADOOP-19093 but takes a more minimal approach with goal of 
> changes in manifest committer only.
> Initial proposed changes
> * retry recovery on task commit rename, always (repeat save, delete, rename)
> * audit delete use and see if it can be pruned



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: mapreduce-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: mapreduce-issues-h...@hadoop.apache.org

Reply via email to