steveloughran commented on a change in pull request #2971:
URL: https://github.com/apache/hadoop/pull/2971#discussion_r824992486



##########
File path: 
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/stages/CleanupJobStage.java
##########
@@ -0,0 +1,561 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output.committer.manifest.stages;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.impl.ManifestStoreOperations;
+import org.apache.hadoop.util.DurationInfo;
+import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.hadoop.util.functional.TaskPool;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
+import static 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED;
+import static 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_FAILURES_IGNORED_DEFAULT;
+import static 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED;
+import static 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.FILEOUTPUTCOMMITTER_CLEANUP_SKIPPED_DEFAULT;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_MOVE_TO_TRASH;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_MOVE_TO_TRASH_DEFAULT;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_ATTEMPT_DIRS;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterConstants.OPT_CLEANUP_PARALLEL_ATTEMPT_DIRS_DEFAULT;
+import static 
org.apache.hadoop.mapreduce.lib.output.committer.manifest.ManifestCommitterStatisticNames.OP_STAGE_JOB_CLEANUP;
+
+/**
+ * Clean up a job's temporary directory through parallel delete,
+ * base _temporary delete and as a fallback, rename to trash.
+ * Returns: the outcome of the overall operation and any move to trash.
+ * The result is detailed purely for the benefit of tests, which need
+ * to make assertions about error handling and fallbacks.
+ */
+public class CleanupJobStage extends
+    AbstractJobCommitStage<
+        CleanupJobStage.Arguments,
+        CleanupJobStage.Result> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      CleanupJobStage.class);
+
+  /**
+   * Count of deleted directories.
+   */
+  private final AtomicInteger deleteDirCount = new AtomicInteger();
+
+  /**
+   * Count of delete failures.
+   */
+  private final AtomicInteger deleteFailureCount = new AtomicInteger();
+
+  /**
+   * Last delete exception; non null if deleteFailureCount is not zero.
+   */
+  private IOException lastDeleteException = null;
+
+  /**
+   * Stage name as passed in from arguments.
+   */
+  private String stageName = OP_STAGE_JOB_CLEANUP;
+
+  public CleanupJobStage(final StageConfig stageConfig) {
+    super(false, stageConfig, OP_STAGE_JOB_CLEANUP, true);
+  }
+
+  /**
+   * Statistic name is extracted from the arguments.
+   * @param arguments args to the invocation.
+   * @return stage name.
+   */
+  @Override
+  protected String getStageStatisticName(Arguments arguments) {
+    return arguments.statisticName;
+  }
+
+  /**
+   * Clean up the job attempt directory tree.
+   * @param args arguments built up.
+   * @return the result.
+   * @throws IOException failure was raised an exceptions weren't surpressed.
+   */
+  @Override
+  protected Result executeStage(
+      final Arguments args)
+      throws IOException {
+    stageName = getStageName(args);
+    // this is $dest/_temporary
+    final Path baseDir = 
requireNonNull(getStageConfig().getOutputTempSubDir());
+    LOG.debug("{}: Cleaup of directory {} with {}", getName(), baseDir, args);
+    if (!args.enabled) {
+      LOG.info("{}: Cleanup of {} disabled", getName(), baseDir);
+      return new Result(Outcome.DISABLED, baseDir,
+          0, null, null);
+    }
+    // shortcut of a single existence check before anything else
+    if (getFileStatusOrNull(baseDir) == null) {
+      return new Result(Outcome.NOTHING_TO_CLEAN_UP,
+          baseDir,
+          0, null, null);
+    }
+
+    // move to trash?
+    // this will be set if delete fails.
+    boolean moveToTrash = args.moveToTrash;

Review comment:
       # new API in hadoop-common
   
   The `hadoop-mapreduce-client-core` JAR is only used in 
`AbfsManifestStoreOperations` and
   
   I understand your concerns about making mapreduce JAR a dependency, but the 
jar is optional. you
   don't need it for FS operations. maven tags it as provided so it isn't 
published transitively, and
   the only imports are in a new committer factory and the store operations.
   
   If you want the new commmitter, yes, you need the JAR. But that's always 
been the case
   (and why spark, ParquetOutputFormat etc all import it).
   
   Putting it into hadoop common becomes a commitment to maintain it -it will 
be used by someone-
   and adds the need for better testing and strict spec.
   
   I'd rather put that off to doing a proper Rename operation which worked well 
with stores
   as well as HDFS, and always raised an exception on failure, and come in on 
an interface both
   FileSystem and FileContext implemented. Being builder based, etags would be 
an optional
   builder param for stores which cared.
   
   But we'd need to get rename() right there, and that's a nightmare.
   
   # cleanup
   
   `FileSystem.delete()` doesn't use trash, so the `moveToTrash` flag
   _is_ needed.
   
   |  **FS trash** | **moveToTrash** | **outcome** |
   |-------------|---------------|---------|
   |   `true`    |  `false`      | files deleted |
   |   `true`    |  `true`      | files to trash |
   |   `false`    |  `false`      | files deleted |
   |   `false`    |  `true`      | files deleted |
   
   Now, I do agree things are over complex. i was trying to be resilient to 
timeouts on dir deletes,
   but as I think that will be rare, how about I change the code
   
   * no attempt to downgrade from delete to move to trash if delete raises an 
exception
   * if moveToTrash is true, then fs trash *must* be enabled.
   
   This gives.
   
   |  **FS trash** | **moveToTrash** | **outcome** |
   |-------------|---------------|---------|
   |   `true`    |  `false`      | files deleted |
   |   `true`    |  `true`      | files to trash |
   |   `false`    |  `false`      | files deleted |
   |   `false`    |  `true`      | error |
   
   And all the recovery and tests are gone.
   
   What I could do here is reject that error on job setup, rather than waiting 
for cleanup.
   
   do you agree?
   
   # my next steps
   
   * will cut recovery from delete failures in cleanup.
   * log committer at info in factory
   * whatever else yetus complains of




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to