steveloughran commented on a change in pull request #2971:
URL: https://github.com/apache/hadoop/pull/2971#discussion_r824930882



##########
File path: 
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java
##########
@@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.hadoop.util.functional.TaskPool;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
+import static 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED;
+import static 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT;
+import static 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED;
+import static 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED_DEFAULT;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_MOVE_TO_TRASH;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_MOVE_TO_TRASH_DEFAULT;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_ATTEMPT_DIRS;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_ATTEMPT_DIRS_DEFAULT;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CLEANUP;
+
+/**
+ * Clean up a job's temporary directory through parallel delete,
+ * base _temporary delete and as a fallback, rename to trash.
+ * Returns: the outcome of the overall operation and any move to trash.
+ * The result is detailed purely for the benefit of tests, which need
+ * to make assertions about error handling and fallbacks.
+ */
+public class CleanupJobStage extends
+    AbstractJobCommitStage<
+        CleanupJobStage.Arguments,
+        CleanupJobStage.Result> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      CleanupJobStage.class);
+
+  /**
+   * Count of deleted directories.
+   */
+  private final AtomicInteger deleteDirCount = new AtomicInteger();
+
+  /**
+   * Count of delete failures.
+   */
+  private final AtomicInteger deleteFailureCount = new AtomicInteger();
+
+  /**
+   * Last delete exception; non null if deleteFailureCount is not zero.
+   */
+  private IOException lastDeleteException = null;
+
+  /**
+   * Stage name as passed in from arguments.
+   */
+  private String stageName = OP_STAGE_JOB_CLEANUP;
+
+  public CleanupJobStage(final StageConfig stageConfig) {
+    super(false, stageConfig, OP_STAGE_JOB_CLEANUP, true);
+  }
+
+  /**
+   * Statistic name is extracted from the arguments.
+   * @param arguments args to the invocation.
+   * @return stage name.
+   */
+  @Override
+  protected String getStageStatisticName(Arguments arguments) {
+    return arguments.statisticName;
+  }
+
+  /**
+   * Clean up the job attempt directory tree.
+   * @param args arguments built up.
+   * @return the result.
+   * @throws IOException failure was raised an exceptions weren't surpressed.
+   */
+  @Override
+  protected Result executeStage(
+      final Arguments args)
+      throws IOException {
+    stageName = getStageName(args);
+    // this is $dest/_temporary
+    final Path baseDir = 
requireNonNull(getStageConfig().getOutputTempSubDir());
+    LOG.debug("{}: Cleaup of directory {} with {}", getName(), baseDir, args);
+    if (!args.enabled) {
+      LOG.info("{}: Cleanup of {} disabled", getName(), baseDir);
+      return new Result(Outcome.DISABLED, baseDir,
+          0, null, null);
+    }
+    // shortcut of a single existence check before anything else
+    if (getFileStatusOrNull(baseDir) == null) {
+      return new Result(Outcome.NOTHING_TO_CLEAN_UP,
+          baseDir,
+          0, null, null);
+    }
+
+    // move to trash?
+    // this will be set if delete fails.
+    boolean moveToTrash = args.moveToTrash;
+
+    // delete
+    final boolean attemptDelete = !moveToTrash;
+
+    Outcome outcome = null;
+    IOException exception = null;
+
+    if (attemptDelete) {
+      // to delete.
+      LOG.info("{}: Deleting job directory {}", getName(), baseDir);
+
+      if (args.deleteTaskAttemptDirsInParallel) {
+        // Attempt to do a parallel delete of task attempt dirs;
+        // don't overreact if a delete fails, but stop trying
+        // to delete the others, and fall back to deleting the
+        // job dir.
+        Path taskSubDir
+            = getStageConfig().getJobAttemptTaskSubDir();
+        try (DurationInfo info = new DurationInfo(LOG,
+            "parallel deletion of task attempts in %s",
+            taskSubDir)) {
+          RemoteIterator<FileStatus> dirs =
+              RemoteIterators.filteringRemoteIterator(
+                  listStatusIterator(taskSubDir),
+                  FileStatus::isDirectory);
+          TaskPool.foreach(dirs)
+              .executeWith(getIOProcessors())
+              .stopOnFailure()
+              .suppressExceptions(false)
+              .run(this::rmTaskAttemptDir);
+          getIOStatistics().aggregate((retrieveIOStatistics(dirs)));
+
+          if (getLastDeleteException() != null) {
+            // one of the task attempts failed.
+            throw getLastDeleteException();
+          }
+          // success: record this as the outcome.
+          outcome = Outcome.PARALLEL_DELETE;
+        } catch (FileNotFoundException ex) {
+          // not a problem if there's no dir to list.
+          LOG.debug("{}: Task attempt dir {} not found", getName(), 
taskSubDir);
+          outcome = Outcome.DELETED;
+        } catch (IOException ex) {
+          // failure. Log and continue
+          LOG.info("{}: Exception while listing/deleting task attempts under 
{}; continuing",
+              getName(),
+              taskSubDir, ex);
+          // not overreacting here as the base delete will still get executing
+          outcome = Outcome.DELETED;
+        }
+      }
+      // Now the top-level deletion; exception gets saved
+      exception = deleteOneDir(baseDir);
+      if (exception != null) {
+        // failure, report and continue
+        LOG.warn("{}: Deleting job directory {} failed: moving to trash", 
getName(), baseDir);
+        moveToTrash = true;

Review comment:
       good q. its about handling timeout of dir deletion in azure, which we've 
seen trying to delete the whole of _temporary and oauth...things were timing 
out. here we do delete each TA subdir in parallel. so the risk is much smaller 
-a single task would have to contain too many dirs, whereas today its #of dirs 
in a task attempt * number of task attempts.
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to