http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java
new file mode 100644
index 0000000..b3bcca1
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitterFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory;
+
+/**
+ * Dynamically create the output committer based on subclass type and settings.
+ */
+public abstract class AbstractS3ACommitterFactory
+    extends PathOutputCommitterFactory {
+  public static final Logger LOG = LoggerFactory.getLogger(
+      AbstractS3ACommitterFactory.class);
+
+  @Override
+  public PathOutputCommitter createOutputCommitter(Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    FileSystem fs = getDestinationFileSystem(outputPath, context);
+    PathOutputCommitter outputCommitter;
+    if (fs instanceof S3AFileSystem) {
+      outputCommitter = createTaskCommitter((S3AFileSystem)fs,
+          outputPath, context);
+    } else {
+      throw new PathCommitException(outputPath,
+          "Filesystem not supported by this committer");
+    }
+    LOG.info("Using Commmitter {} for {}",
+        outputCommitter,
+        outputPath);
+    return outputCommitter;
+  }
+
+  /**
+   * Get the destination filesystem, returning null if there is none.
+   * Code using this must explicitly or implicitly look for a null value
+   * in the response.
+   * @param outputPath output path
+   * @param context job/task context
+   * @return the destination filesystem, if it can be determined
+   * @throws IOException if the FS cannot be instantiated
+   */
+  protected FileSystem getDestinationFileSystem(Path outputPath,
+      JobContext context)
+      throws IOException {
+    return outputPath != null ?
+          FileSystem.get(outputPath.toUri(), context.getConfiguration())
+          : null;
+  }
+
+  /**
+   * Implementation point: create a task committer for a specific filesystem.
+   * @param fileSystem destination FS.
+   * @param outputPath final output path for work
+   * @param context task context
+   * @return a committer
+   * @throws IOException any problem, including the FS not supporting
+   * the desired committer
+   */
+  public abstract PathOutputCommitter createTaskCommitter(
+      S3AFileSystem fileSystem,
+      Path outputPath,
+      TaskAttemptContext context) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
new file mode 100644
index 0000000..03cfcba
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import static 
org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.COMMITTER_FACTORY_SCHEME_PATTERN;
+
+/**
+ * Constants for working with committers.
+ */
+@SuppressWarnings("unused")
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public final class CommitConstants {
+
+  private CommitConstants() {
+  }
+
+  /**
+   * Path for "magic" writes: path and {@link #PENDING_SUFFIX} files:
+   * {@value}.
+   */
+  public static final String MAGIC = "__magic";
+
+  /**
+   * Marker of the start of a directory tree for calculating
+   * the final path names: {@value}.
+   */
+  public static final String BASE = "__base";
+
+  /**
+   * Suffix applied to pending commit metadata: {@value}.
+   */
+  public static final String PENDING_SUFFIX = ".pending";
+
+  /**
+   * Suffix applied to multiple pending commit metadata: {@value}.
+   */
+  public static final String PENDINGSET_SUFFIX = ".pendingset";
+
+  /**
+   * Flag to indicate whether support for the Magic committer is enabled
+   * in the filesystem.
+   * Value: {@value}.
+   */
+  public static final String MAGIC_COMMITTER_PREFIX
+      = "fs.s3a.committer.magic";
+
+  /**
+   * Flag to indicate whether support for the Magic committer is enabled
+   * in the filesystem.
+   * Value: {@value}.
+   */
+  public static final String MAGIC_COMMITTER_ENABLED
+      = MAGIC_COMMITTER_PREFIX + ".enabled";
+
+  /**
+   * Flag to indicate whether a stream is a magic output stream;
+   * returned in {@code StreamCapabilities}
+   * Value: {@value}.
+   */
+  public static final String STREAM_CAPABILITY_MAGIC_OUTPUT
+      = "s3a:magic.output.stream";
+
+  /**
+   * Flag to indicate that a store supports magic committers.
+   * returned in {@code StreamCapabilities}
+   * Value: {@value}.
+   */
+  public static final String STORE_CAPABILITY_MAGIC_COMMITTER
+      = "s3a:magic.committer";
+
+  /**
+   * Is the committer enabled by default? No.
+   */
+  public static final boolean DEFAULT_MAGIC_COMMITTER_ENABLED = false;
+
+  /**
+   * This is the "Pending" directory of the {@code FileOutputCommitter};
+   * data written here is, in that algorithm, renamed into place.
+   * Value: {@value}.
+   */
+  public static final String TEMPORARY = "_temporary";
+
+  /**
+   * Temp data which is not auto-committed: {@value}.
+   * Uses a different name from normal just to make clear it is different.
+   */
+  public static final String TEMP_DATA = "__temp-data";
+
+
+  /**
+   * Flag to trigger creation of a marker file on job completion.
+   */
+  public static final String CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER
+      = "mapreduce.fileoutputcommitter.marksuccessfuljobs";
+
+  /**
+   * Marker file to create on success: {@value}.
+   */
+  public static final String _SUCCESS = "_SUCCESS";
+
+  /** Default job marker option: {@value}. */
+  public static final boolean DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER = true;
+
+  /**
+   * Key to set for the S3A schema to use the specific committer.
+   */
+  public static final String S3A_COMMITTER_FACTORY_KEY = String.format(
+      COMMITTER_FACTORY_SCHEME_PATTERN, "s3a");
+
+  /**
+   * S3 Committer factory: {@value}.
+   * This uses the value of {@link #FS_S3A_COMMITTER_NAME}
+   * to choose the final committer.
+   */
+  public static final String S3A_COMMITTER_FACTORY =
+      S3ACommitterFactory.CLASSNAME;
+
+  /**
+   * Option to identify the S3A committer:
+   * {@value}.
+   */
+  public static final String FS_S3A_COMMITTER_NAME =
+      "fs.s3a.committer.name";
+
+  /**
+   * Option for {@link #FS_S3A_COMMITTER_NAME}:
+   * classic/file output committer: {@value}.
+   */
+  public static final String COMMITTER_NAME_FILE = "file";
+
+  /**
+   * Option for {@link #FS_S3A_COMMITTER_NAME}:
+   * magic output committer: {@value}.
+   */
+  public static final String COMMITTER_NAME_MAGIC = "magic";
+
+  /**
+   * Option for {@link #FS_S3A_COMMITTER_NAME}:
+   * directory output committer: {@value}.
+   */
+  public static final String COMMITTER_NAME_DIRECTORY = "directory";
+
+  /**
+   * Option for {@link #FS_S3A_COMMITTER_NAME}:
+   * partition output committer: {@value}.
+   */
+  public static final String COMMITTER_NAME_PARTITIONED = "partitioned";
+
+  /**
+   * Option for final files to have a uniqueness name through job attempt info,
+   * falling back to a new UUID if there is no job attempt information to use.
+   * {@value}.
+   * When writing data with the "append" conflict option, this guarantees
+   * that new data will not overwrite any existing data.
+   */
+  public static final String FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES =
+      "fs.s3a.committer.staging.unique-filenames";
+  /**
+   * Default value for {@link #FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES}:
+   * {@value}.
+   */
+  public static final boolean DEFAULT_STAGING_COMMITTER_UNIQUE_FILENAMES = 
true;
+
+  /**
+   * Staging committer conflict resolution policy: {@value}.
+   * Supported: fail, append, replace.
+   */
+  public static final String FS_S3A_COMMITTER_STAGING_CONFLICT_MODE =
+      "fs.s3a.committer.staging.conflict-mode";
+
+  /** Conflict mode: {@value}. */
+  public static final String CONFLICT_MODE_FAIL = "fail";
+
+  /** Conflict mode: {@value}. */
+  public static final String CONFLICT_MODE_APPEND = "append";
+
+  /** Conflict mode: {@value}. */
+  public static final String CONFLICT_MODE_REPLACE = "replace";
+
+  /** Default conflict mode: {@value}. */
+  public static final String DEFAULT_CONFLICT_MODE = CONFLICT_MODE_FAIL;
+
+  /**
+   * Number of threads in committers for parallel operations on files
+   * (upload, commit, abort, delete...): {@value}.
+   */
+  public static final String FS_S3A_COMMITTER_THREADS =
+      "fs.s3a.committer.threads";
+  /**
+   * Default value for {@link #FS_S3A_COMMITTER_THREADS}: {@value}.
+   */
+  public static final int DEFAULT_COMMITTER_THREADS = 8;
+
+  /**
+   * Path  in the cluster filesystem for temporary data: {@value}.
+   * This is for HDFS, not the local filesystem.
+   * It is only for the summary data of each file, not the actual
+   * data being committed.
+   */
+  public static final String FS_S3A_COMMITTER_STAGING_TMP_PATH =
+      "fs.s3a.committer.staging.tmp.path";
+
+
+  /**
+   * Should the staging committers abort all pending uploads to the destination
+   * directory? Default: true.
+   *
+   * Changing this is if more than one partitioned committer is
+   * writing to the same destination tree simultaneously; otherwise
+   * the first job to complete will cancel all outstanding uploads from the
+   * others. However, it may lead to leaked outstanding uploads from failed
+   * tasks. If disabled, configure the bucket lifecycle to remove uploads
+   * after a time period, and/or set up a workflow to explicitly delete
+   * entries. Otherwise there is a risk that uncommitted uploads may run up
+   * bills.
+   */
+  public static final String FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS =
+      "fs.s3a.committer.staging.abort.pending.uploads";
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
new file mode 100644
index 0000000..9381ef1
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitOperations.java
@@ -0,0 +1,596 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.amazonaws.services.s3.model.MultipartUpload;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AInstrumentation;
+import org.apache.hadoop.fs.s3a.S3AUtils;
+import org.apache.hadoop.fs.s3a.WriteOperationHelper;
+import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
+
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+/**
+ * The implementation of the various actions a committer needs.
+ * This doesn't implement the protocol/binding to a specific execution engine,
+ * just the operations needed to to build one.
+ *
+ * When invoking FS operations, it assumes that the underlying FS is
+ * handling retries and exception translation: it does not attempt to
+ * duplicate that work.
+ *
+ */
+public class CommitOperations {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      CommitOperations.class);
+
+  /**
+   * Destination filesystem.
+   */
+  private final S3AFileSystem fs;
+
+  /** Statistics. */
+  private final S3AInstrumentation.CommitterStatistics statistics;
+
+  /**
+   * Write operations for the destination fs.
+   */
+  private final WriteOperationHelper writeOperations;
+
+  /**
+   * Filter to find all {code .pendingset} files.
+   */
+  public static final PathFilter PENDINGSET_FILTER =
+      path -> path.toString().endsWith(CommitConstants.PENDINGSET_SUFFIX);
+
+  /**
+   * Filter to find all {code .pending} files.
+   */
+  public static final PathFilter PENDING_FILTER =
+      path -> path.toString().endsWith(CommitConstants.PENDING_SUFFIX);
+
+  /**
+   * Instantiate.
+   * @param fs FS to bind to
+   */
+  public CommitOperations(S3AFileSystem fs) {
+    Preconditions.checkArgument(fs != null, "null fs");
+    this.fs = fs;
+    statistics = fs.newCommitterStatistics();
+    writeOperations = fs.createWriteOperationHelper();
+  }
+
+  /**
+   * Convert an ordered list of strings to a list of index etag parts.
+   * @param tagIds list of tags
+   * @return same list, now in numbered tuples
+   */
+  public static List<PartETag> toPartEtags(List<String> tagIds) {
+    return IntStream.range(0, tagIds.size())
+        .mapToObj(i -> new PartETag(i + 1, tagIds.get(i)))
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public String toString() {
+    return "CommitOperations{" + fs.getUri() + '}';
+  }
+
+  /** @return statistics. */
+  protected S3AInstrumentation.CommitterStatistics getStatistics() {
+    return statistics;
+  }
+
+  /**
+   * Commit the operation, throwing an exception on any failure.
+   * @param commit commit to execute
+   * @throws IOException on a failure
+   */
+  public void commitOrFail(SinglePendingCommit commit) throws IOException {
+    commit(commit, commit.getFilename()).maybeRethrow();
+  }
+
+  /**
+   * Commit a single pending commit; exceptions are caught
+   * and converted to an outcome.
+   * @param commit entry to commit
+   * @param origin origin path/string for outcome text
+   * @return the outcome
+   */
+  public MaybeIOE commit(SinglePendingCommit commit, String origin) {
+    LOG.debug("Committing single commit {}", commit);
+    MaybeIOE outcome;
+    String destKey = "unknown destination";
+    try {
+      commit.validate();
+      destKey = commit.getDestinationKey();
+      long l = innerCommit(commit);
+      LOG.debug("Successful commit of file length {}", l);
+      outcome = MaybeIOE.NONE;
+      statistics.commitCompleted(commit.getLength());
+    } catch (IOException e) {
+      String msg = String.format("Failed to commit upload against %s: %s",
+          destKey, e);
+      LOG.warn(msg, e);
+      outcome = new MaybeIOE(e);
+      statistics.commitFailed();
+    } catch (Exception e) {
+      String msg = String.format("Failed to commit upload against %s," +
+          " described in %s: %s", destKey, origin, e);
+      LOG.warn(msg, e);
+      outcome = new MaybeIOE(new PathCommitException(origin, msg, e));
+      statistics.commitFailed();
+    }
+    return outcome;
+  }
+
+  /**
+   * Inner commit operation.
+   * @param commit entry to commit
+   * @return bytes committed.
+   * @throws IOException failure
+   */
+  private long innerCommit(SinglePendingCommit commit) throws IOException {
+    // finalize the commit
+    writeOperations.completeMPUwithRetries(
+        commit.getDestinationKey(),
+              commit.getUploadId(),
+              toPartEtags(commit.getEtags()),
+              commit.getLength(),
+              new AtomicInteger(0));
+    return commit.getLength();
+  }
+
+  /**
+   * Locate all files with the pending suffix under a directory.
+   * @param pendingDir directory
+   * @param recursive recursive listing?
+   * @return the list of all located entries
+   * @throws IOException if there is a problem listing the path.
+   */
+  public List<LocatedFileStatus> locateAllSinglePendingCommits(
+      Path pendingDir,
+      boolean recursive) throws IOException {
+    return listAndFilter(fs, pendingDir, recursive, PENDING_FILTER);
+  }
+
+  /**
+   * Load all single pending commits in the directory.
+   * All load failures are logged and then added to list of files which would
+   * not load.
+   * @param pendingDir directory containing commits
+   * @param recursive do a recursive scan?
+   * @return tuple of loaded entries and those pending files which would
+   * not load/validate.
+   * @throws IOException on a failure to list the files.
+   */
+  public Pair<PendingSet,
+      List<Pair<LocatedFileStatus, IOException>>>
+      loadSinglePendingCommits(Path pendingDir, boolean recursive)
+      throws IOException {
+
+    List<LocatedFileStatus> statusList = locateAllSinglePendingCommits(
+        pendingDir, recursive);
+    PendingSet commits = new PendingSet(
+        statusList.size());
+    List<Pair<LocatedFileStatus, IOException>> failures = new ArrayList<>(1);
+    for (LocatedFileStatus status : statusList) {
+      try {
+        commits.add(SinglePendingCommit.load(fs, status.getPath()));
+      } catch (IOException e) {
+        LOG.warn("Failed to load commit file {}", status.getPath(), e);
+        failures.add(Pair.of(status, e));
+      }
+    }
+    return Pair.of(commits, failures);
+  }
+
+  /**
+   * Convert any exception to an IOE, if needed.
+   * @param key key to use in a path exception
+   * @param ex exception
+   * @return an IOE, either the passed in value or a new one wrapping the other
+   * exception.
+   */
+  public IOException makeIOE(String key, Exception ex) {
+    return ex instanceof IOException
+           ? (IOException) ex
+           : new PathCommitException(key, ex.toString(), ex);
+  }
+
+  /**
+   * Abort the multipart commit supplied. This is the lower level operation
+   * which doesn't generate an outcome, instead raising an exception.
+   * @param commit pending commit to abort
+   * @throws FileNotFoundException if the abort ID is unknown
+   * @throws IOException on any failure
+   */
+  public void abortSingleCommit(SinglePendingCommit commit)
+      throws IOException {
+    String destKey = commit.getDestinationKey();
+    String origin = commit.getFilename() != null
+                    ? (" defined in " + commit.getFilename())
+                    : "";
+    String uploadId = commit.getUploadId();
+    LOG.info("Aborting commit to object {}{}", destKey, origin);
+    abortMultipartCommit(destKey, uploadId);
+  }
+
+  /**
+   * Create an {@code AbortMultipartUpload} request and POST it to S3,
+   * incrementing statistics afterwards.
+   * @param destKey destination key
+   * @param uploadId upload to cancel
+   * @throws FileNotFoundException if the abort ID is unknown
+   * @throws IOException on any failure
+   */
+  public void abortMultipartCommit(String destKey, String uploadId)
+      throws IOException {
+    try {
+      writeOperations.abortMultipartCommit(destKey, uploadId);
+    } finally {
+      statistics.commitAborted();
+    }
+  }
+
+  /**
+   * Enumerate all pending files in a dir/tree, abort.
+   * @param pendingDir directory of pending operations
+   * @param recursive recurse?
+   * @return the outcome of all the abort operations
+   * @throws IOException if there is a problem listing the path.
+   */
+  public MaybeIOE abortAllSinglePendingCommits(Path pendingDir,
+      boolean recursive)
+      throws IOException {
+    Preconditions.checkArgument(pendingDir != null, "null pendingDir");
+    LOG.debug("Aborting all pending commit filess under {}"
+            + " (recursive={}", pendingDir, recursive);
+    RemoteIterator<LocatedFileStatus> pendingFiles;
+    try {
+      pendingFiles = ls(pendingDir, recursive);
+    } catch (FileNotFoundException fnfe) {
+      LOG.info("No directory to abort {}", pendingDir);
+      return MaybeIOE.NONE;
+    }
+    MaybeIOE outcome = MaybeIOE.NONE;
+    if (!pendingFiles.hasNext()) {
+      LOG.debug("No files to abort under {}", pendingDir);
+    }
+    while (pendingFiles.hasNext()) {
+      Path pendingFile = pendingFiles.next().getPath();
+      if (pendingFile.getName().endsWith(CommitConstants.PENDING_SUFFIX)) {
+        try {
+          abortSingleCommit(SinglePendingCommit.load(fs, pendingFile));
+        } catch (FileNotFoundException e) {
+          LOG.debug("listed file already deleted: {}", pendingFile);
+        } catch (IOException | IllegalArgumentException e) {
+          if (outcome == null) {
+            outcome = new MaybeIOE(makeIOE(pendingFile.toString(), e));
+          }
+        } finally {
+          // quietly try to delete the pending file
+          S3AUtils.deleteQuietly(fs, pendingFile, false);
+        }
+      }
+    }
+    return outcome;
+  }
+
+  /**
+   * List files.
+   * @param path path
+   * @param recursive recursive listing?
+   * @return iterator
+   * @throws IOException failure
+   */
+  protected RemoteIterator<LocatedFileStatus> ls(Path path, boolean recursive)
+      throws IOException {
+    return fs.listFiles(path, recursive);
+  }
+
+  /**
+   * List all pending uploads to the destination FS under a path.
+   * @param dest destination path
+   * @return A list of the pending uploads to any directory under that path.
+   * @throws IOException IO failure
+   */
+  public List<MultipartUpload> listPendingUploadsUnderPath(Path dest)
+      throws IOException {
+    return fs.listMultipartUploads(fs.pathToKey(dest));
+  }
+
+  /**
+   * Abort all pending uploads to the destination FS under a path.
+   * @param dest destination path
+   * @return a count of the number of uploads aborted.
+   * @throws IOException IO failure
+   */
+  public int abortPendingUploadsUnderPath(Path dest) throws IOException {
+    return writeOperations.abortMultipartUploadsUnderPath(fs.pathToKey(dest));
+  }
+
+  /**
+   * Delete any existing {@code _SUCCESS} file.
+   * @param outputPath output directory
+   * @throws IOException IO problem
+   */
+  public void deleteSuccessMarker(Path outputPath) throws IOException {
+    fs.delete(new Path(outputPath, _SUCCESS), false);
+  }
+
+  /**
+   * Save the success data to the {@code _SUCCESS} file.
+   * @param outputPath output directory
+   * @param successData success data to save.
+   * @param addMetrics should the FS metrics be added?
+   * @throws IOException IO problem
+   */
+  public void createSuccessMarker(Path outputPath,
+      SuccessData successData,
+      boolean addMetrics)
+      throws IOException {
+    Preconditions.checkArgument(outputPath != null, "null outputPath");
+
+    if (addMetrics) {
+      addFileSystemStatistics(successData.getMetrics());
+    }
+    // add any diagnostics
+    Configuration conf = fs.getConf();
+    successData.addDiagnostic(S3_METADATA_STORE_IMPL,
+        conf.getTrimmed(S3_METADATA_STORE_IMPL, ""));
+    successData.addDiagnostic(METADATASTORE_AUTHORITATIVE,
+        conf.getTrimmed(METADATASTORE_AUTHORITATIVE, "false"));
+    successData.addDiagnostic(MAGIC_COMMITTER_ENABLED,
+        conf.getTrimmed(MAGIC_COMMITTER_ENABLED, "false"));
+
+    // now write
+    Path markerPath = new Path(outputPath, _SUCCESS);
+    LOG.debug("Touching success marker for job {}: {}", markerPath,
+        successData);
+    successData.save(fs, markerPath, true);
+  }
+
+  /**
+   * Revert a pending commit by deleting the destination.
+   * @param commit pending commit
+   * @throws IOException failure
+   */
+  public void revertCommit(SinglePendingCommit commit) throws IOException {
+    LOG.warn("Revert {}", commit);
+    try {
+      writeOperations.revertCommit(commit.getDestinationKey());
+    } finally {
+      statistics.commitReverted();
+    }
+  }
+
+  /**
+   * Upload all the data in the local file, returning the information
+   * needed to commit the work.
+   * @param localFile local file (be  a file)
+   * @param destPath destination path
+   * @param partition partition/subdir. Not used
+   * @param uploadPartSize size of upload
+   * @return a pending upload entry
+   * @throws IOException failure
+   */
+  public SinglePendingCommit uploadFileToPendingCommit(File localFile,
+      Path destPath,
+      String partition,
+      long uploadPartSize)
+      throws IOException {
+
+    LOG.debug("Initiating multipart upload from {} to {}",
+        localFile, destPath);
+    Preconditions.checkArgument(destPath != null);
+    if (!localFile.isFile()) {
+      throw new FileNotFoundException("Not a file: " + localFile);
+    }
+    String destURI = destPath.toString();
+    String destKey = fs.pathToKey(destPath);
+    String uploadId = null;
+
+    boolean threw = true;
+    try {
+      statistics.commitCreated();
+      uploadId = writeOperations.initiateMultiPartUpload(destKey);
+      long length = localFile.length();
+
+      SinglePendingCommit commitData = new SinglePendingCommit();
+      commitData.setDestinationKey(destKey);
+      commitData.setBucket(fs.getBucket());
+      commitData.touch(System.currentTimeMillis());
+      commitData.setUploadId(uploadId);
+      commitData.setUri(destURI);
+      commitData.setText(partition != null ? "partition: " + partition : "");
+      commitData.setLength(length);
+
+      long offset = 0;
+      long numParts = (length / uploadPartSize +
+          ((length % uploadPartSize) > 0 ? 1 : 0));
+      // always write one part, even if it is just an empty one
+      if (numParts == 0) {
+        numParts = 1;
+      }
+
+      List<PartETag> parts = new ArrayList<>((int) numParts);
+
+      LOG.debug("File size is {}, number of parts to upload = {}",
+          length, numParts);
+      for (int partNumber = 1; partNumber <= numParts; partNumber += 1) {
+        long size = Math.min(length - offset, uploadPartSize);
+        UploadPartRequest part;
+        part = writeOperations.newUploadPartRequest(
+            destKey,
+            uploadId,
+            partNumber,
+            (int) size,
+            null,
+            localFile,
+            offset);
+        part.setLastPart(partNumber == numParts);
+        UploadPartResult partResult = writeOperations.uploadPart(part);
+        offset += uploadPartSize;
+        parts.add(partResult.getPartETag());
+      }
+
+      commitData.bindCommitData(parts);
+      statistics.commitUploaded(length);
+      threw = false;
+      return commitData;
+    } finally {
+      if (threw && uploadId != null) {
+        statistics.commitAborted();
+        try {
+          abortMultipartCommit(destKey, uploadId);
+        } catch (IOException e) {
+          LOG.error("Failed to abort upload {} to {}", uploadId, destKey, e);
+        }
+      }
+    }
+  }
+
+  /**
+   * Add the filesystem statistics to the map; overwriting anything
+   * with the same name.
+   * @param dest destination map
+   */
+  public void addFileSystemStatistics(Map<String, Long> dest) {
+    dest.putAll(fs.getInstrumentation().toMap());
+  }
+
+  /**
+   * Note that a task has completed.
+   * @param success success flag
+   */
+  public void taskCompleted(boolean success) {
+    statistics.taskCompleted(success);
+  }
+
+  /**
+   * Note that a job has completed.
+   * @param success success flag
+   */
+  public void jobCompleted(boolean success) {
+    statistics.jobCompleted(success);
+  }
+
+  /**
+   * A holder for a possible IOException; the call {@link #maybeRethrow()}
+   * will throw any exception passed into the constructor, and be a no-op
+   * if none was.
+   *
+   * Why isn't a Java 8 optional used here? The main benefit would be that
+   * {@link #maybeRethrow()} could be done as a map(), but because Java doesn't
+   * allow checked exceptions in a map, the following code is invalid
+   * <pre>
+   *   exception.map((e) -> {throw e;}
+   * </pre>
+   * As a result, the code to work with exceptions would be almost as 
convoluted
+   * as the original.
+   */
+  public static class MaybeIOE {
+    private final IOException exception;
+
+    public static final MaybeIOE NONE = new MaybeIOE(null);
+
+    /**
+     * Construct with an exception.
+     * @param exception exception
+     */
+    public MaybeIOE(IOException exception) {
+      this.exception = exception;
+    }
+
+    /**
+     * Get any exception.
+     * @return the exception.
+     */
+    public IOException getException() {
+      return exception;
+    }
+
+    /**
+     * Is there an exception in this class?
+     * @return true if there is an exception
+     */
+    public boolean hasException() {
+      return exception != null;
+    }
+
+    /**
+     * Rethrow any exception.
+     * @throws IOException the exception field, if non-null.
+     */
+    public void maybeRethrow() throws IOException {
+      if (exception != null) {
+        throw exception;
+      }
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder("MaybeIOE{");
+      sb.append(hasException() ? exception : "");
+      sb.append('}');
+      return sb.toString();
+    }
+
+    /**
+     * Get an instance based on the exception: either a value
+     * or a reference to {@link #NONE}.
+     * @param ex exception
+     * @return an instance.
+     */
+    public static MaybeIOE of(IOException ex) {
+      return ex != null ? new MaybeIOE(ex) : NONE;
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtils.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtils.java
new file mode 100644
index 0000000..9c684c7
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtils.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.ValidationFailure.verify;
+
+/**
+ * Static utility methods related to S3A commitment processing, both
+ * staging and magic.
+ *
+ * <b>Do not use in any codepath intended to be used from the S3AFS
+ * except in the committers themselves.</b>
+ */
+public final class CommitUtils {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CommitUtils.class);
+
+  private CommitUtils() {
+  }
+
+  /**
+   * Verify that the path is a magic one.
+   * @param fs filesystem
+   * @param path path
+   * @throws PathCommitException if the path isn't a magic commit path
+   */
+  public static void verifyIsMagicCommitPath(S3AFileSystem fs,
+      Path path) throws PathCommitException {
+    verifyIsMagicCommitFS(fs);
+    if (!fs.isMagicCommitPath(path)) {
+      throw new PathCommitException(path, E_BAD_PATH);
+    }
+  }
+
+  /**
+   * Verify that an S3A FS instance is a magic commit FS.
+   * @param fs filesystem
+   * @throws PathCommitException if the FS isn't a magic commit FS.
+   */
+  public static void verifyIsMagicCommitFS(S3AFileSystem fs)
+      throws PathCommitException {
+    if (!fs.isMagicCommitEnabled()) {
+      // dump out details to console for support diagnostics
+      String fsUri = fs.getUri().toString();
+      LOG.error("{}: {}:\n{}", E_NORMAL_FS, fsUri, fs);
+      // then fail
+      throw new PathCommitException(fsUri, E_NORMAL_FS);
+    }
+  }
+
+  /**
+   * Verify that an FS is an S3A FS.
+   * @param fs filesystem
+   * @param path path to to use in exception
+   * @return the typecast FS.
+   * @throws PathCommitException if the FS is not an S3A FS.
+   */
+  public static S3AFileSystem verifyIsS3AFS(FileSystem fs, Path path)
+      throws PathCommitException {
+    if (!(fs instanceof S3AFileSystem)) {
+      throw new PathCommitException(path, E_WRONG_FS);
+    }
+    return (S3AFileSystem) fs;
+  }
+
+  /**
+   * Get the S3A FS of a path.
+   * @param path path to examine
+   * @param conf config
+   * @param magicCommitRequired is magic complete required in the FS?
+   * @return the filesystem
+   * @throws PathCommitException output path isn't to an S3A FS instance, or
+   * if {@code magicCommitRequired} is set, if doesn't support these commits.
+   * @throws IOException failure to instantiate the FS
+   */
+  public static S3AFileSystem getS3AFileSystem(Path path,
+      Configuration conf,
+      boolean magicCommitRequired)
+      throws PathCommitException, IOException {
+    S3AFileSystem s3AFS = verifyIsS3AFS(path.getFileSystem(conf), path);
+    if (magicCommitRequired) {
+      verifyIsMagicCommitFS(s3AFS);
+    }
+    return s3AFS;
+  }
+
+  /**
+   * Verify that all instances in a collection are of the given class.
+   * @param it iterator
+   * @param classname classname to require
+   * @throws ValidationFailure on a failure
+   */
+  public static void validateCollectionClass(Iterable it, Class classname)
+      throws ValidationFailure {
+    for (Object o : it) {
+      verify(o.getClass().equals(classname),
+          "Collection element is not a %s: %s", classname, o.getClass());
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtilsWithMR.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtilsWithMR.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtilsWithMR.java
new file mode 100644
index 0000000..c6c0da8
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitUtilsWithMR.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.TEMP_DATA;
+
+/**
+ * These are commit utility methods which import classes from
+ * hadoop-mapreduce, and so only work when that module is on the
+ * classpath.
+ *
+ * <b>Do not use in any codepath intended to be used from the S3AFS
+ * except in the committers themselves.</b>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class CommitUtilsWithMR {
+
+  private CommitUtilsWithMR() {
+  }
+
+  /**
+   * Get the location of magic job attempts.
+   * @param out the base output directory.
+   * @return the location of magic job attempts.
+   */
+  public static Path getMagicJobAttemptsPath(Path out) {
+    return new Path(out, MAGIC);
+  }
+
+  /**
+   * Get the Application Attempt ID for this job.
+   * @param context the context to look in
+   * @return the Application Attempt ID for a given job.
+   */
+  public static int getAppAttemptId(JobContext context) {
+    return context.getConfiguration().getInt(
+        MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
+  }
+
+  /**
+   * Compute the "magic" path for a job attempt.
+   * @param appAttemptId the ID of the application attempt for this job.
+   * @param dest the final output directory
+   * @return the path to store job attempt data.
+   */
+  public static Path getMagicJobAttemptPath(int appAttemptId, Path dest) {
+    return new Path(getMagicJobAttemptsPath(dest),
+        formatAppAttemptDir(appAttemptId));
+  }
+
+  /**
+   * Format the application attempt directory.
+   * @param attemptId attempt ID
+   * @return the directory name for the application attempt
+   */
+  public static String formatAppAttemptDir(int attemptId) {
+    return String.format("app-attempt-%04d", attemptId);
+  }
+
+  /**
+   * Compute the path where the output of magic task attempts are stored.
+   * @param context the context of the job with magic tasks.
+   * @param dest destination of work
+   * @return the path where the output of magic task attempts are stored.
+   */
+  public static Path getMagicTaskAttemptsPath(JobContext context, Path dest) {
+    return new Path(getMagicJobAttemptPath(
+        getAppAttemptId(context), dest), "tasks");
+  }
+
+  /**
+   * Compute the path where the output of a task attempt is stored until
+   * that task is committed.
+   * This path is marked as a base path for relocations, so subdirectory
+   * information is preserved.
+   * @param context the context of the task attempt.
+   * @param dest The output path to commit work into
+   * @return the path where a task attempt should be stored.
+   */
+  public static Path getMagicTaskAttemptPath(TaskAttemptContext context,
+      Path dest) {
+    return new Path(getBaseMagicTaskAttemptPath(context, dest), BASE);
+  }
+
+  /**
+   * Get the base Magic attempt path, without any annotations to mark relative
+   * references.
+   * @param context task context.
+   * @param dest The output path to commit work into
+   * @return the path under which all attempts go
+   */
+  public static Path getBaseMagicTaskAttemptPath(TaskAttemptContext context,
+      Path dest) {
+    return new Path(getMagicTaskAttemptsPath(context, dest),
+          String.valueOf(context.getTaskAttemptID()));
+  }
+
+  /**
+   * Compute a path for temporary data associated with a job.
+   * This data is <i>not magic</i>
+   * @param appAttemptId the ID of the application attempt for this job.
+   * @param out output directory of job
+   * @return the path to store temporary job attempt data.
+   */
+  public static Path getTempJobAttemptPath(int appAttemptId, Path out) {
+    return new Path(new Path(out, TEMP_DATA),
+        formatAppAttemptDir(appAttemptId));
+  }
+
+  /**
+   * Compute the path where the output of a given job attempt will be placed.
+   * @param context task context
+   * @param out output directory of job
+   * @return the path to store temporary job attempt data.
+   */
+  public static Path getTempTaskAttemptPath(TaskAttemptContext context,
+      Path out) {
+    return new Path(getTempJobAttemptPath(getAppAttemptId(context), out),
+        String.valueOf(context.getTaskAttemptID()));
+  }
+
+  /**
+   * Get a string value of a job ID; returns meaningful text if there is no ID.
+   * @param context job context
+   * @return a string for logs
+   */
+  public static String jobIdString(JobContext context) {
+    JobID jobID = context.getJobID();
+    return jobID != null ? jobID.toString() : "(no job ID)";
+  }
+
+  /**
+   * Get a job name; returns meaningful text if there is no name.
+   * @param context job context
+   * @return a string for logs
+   */
+  public static String jobName(JobContext context) {
+    String name = context.getJobName();
+    return (name != null && !name.isEmpty()) ? name : "(anonymous)";
+  }
+
+  /**
+   * Get a configuration option, with any value in the job configuration
+   * taking priority over that in the filesystem.
+   * This allows for per-job override of FS parameters.
+   *
+   * Order is: job context, filesystem config, default value
+   *
+   * @param context job/task context
+   * @param fsConf filesystem configuration. Get this from the FS to guarantee
+   * per-bucket parameter propagation
+   * @param key key to look for
+   * @param defVal default value
+   * @return the configuration option.
+   */
+  public static String getConfigurationOption(
+      JobContext context,
+      Configuration fsConf,
+      String key,
+      String defVal) {
+    return context.getConfiguration().getTrimmed(key,
+        fsConf.getTrimmed(key, defVal));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Duration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Duration.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Duration.java
new file mode 100644
index 0000000..c44a90b
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/Duration.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+/**
+ * Little duration counter.
+ */
+public class Duration {
+
+  private final long started;
+  private long finished;
+
+  public Duration() {
+    started = time();
+    finished = started;
+  }
+
+  protected long time() {
+    return System.currentTimeMillis();
+  }
+
+  public void finished() {
+    finished = time();
+  }
+
+  public String getDurationString() {
+    return humanTime(value());
+  }
+
+  public static String humanTime(long time) {
+    long seconds = (time / 1000);
+    long minutes = (seconds / 60);
+    return String.format("%d:%02d.%03ds", minutes, seconds % 60, time % 1000);
+  }
+
+  @Override
+  public String toString() {
+    return getDurationString();
+  }
+
+  public long value() {
+    return finished -started;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/DurationInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/DurationInfo.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/DurationInfo.java
new file mode 100644
index 0000000..c6617f8
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/DurationInfo.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import org.slf4j.Logger;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A duration with logging of final state at info in the {@code close()} call.
+ * This allows it to be used in a try-with-resources clause, and have the
+ * duration automatically logged.
+ */
+@InterfaceAudience.Private
+public class DurationInfo extends Duration
+    implements AutoCloseable {
+  private final String text;
+
+  private final Logger log;
+
+  /**
+   * Create the duration text from a {@code String.format()} code call.
+   * @param log log to write to
+   * @param format format string
+   * @param args list of arguments
+   */
+  public DurationInfo(Logger log, String format, Object... args) {
+    this.text = String.format(format, args);
+    this.log = log;
+    log.info("Starting: {}", text);
+  }
+
+  @Override
+  public String toString() {
+    return text + ": duration " + super.toString();
+  }
+
+  @Override
+  public void close() {
+    finished();
+    log.info(this.toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java
new file mode 100644
index 0000000..2821fce
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitterFactory;
+import 
org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitterFactory;
+import 
org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitterFactory;
+import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterFactory;
+
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC;
+import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
+
+/**
+ * These are internal constants not intended for public use.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public final class InternalCommitterConstants {
+
+  private InternalCommitterConstants() {
+  }
+
+  /**
+   * This is the staging committer base class; only used for testing.
+   */
+  public static final String COMMITTER_NAME_STAGING = "staging";
+
+  /**
+   * A unique identifier to use for this work: {@value}.
+   */
+  public static final String FS_S3A_COMMITTER_STAGING_UUID =
+      "fs.s3a.committer.staging.uuid";
+
+  /**
+   * Directory committer factory: {@value}.
+   */
+  public static final String STAGING_COMMITTER_FACTORY =
+      StagingCommitterFactory.CLASSNAME;
+
+  /**
+   * Directory committer factory: {@value}.
+   */
+  public static final String DIRECTORY_COMMITTER_FACTORY =
+      DirectoryStagingCommitterFactory.CLASSNAME;
+
+  /**
+   * Partitioned committer factory: {@value}.
+   */
+  public static final String PARTITION_COMMITTER_FACTORY =
+      PartitionedStagingCommitterFactory.CLASSNAME;
+
+  /**
+   * Magic committer factory: {@value}.
+   */
+  public static final String MAGIC_COMMITTER_FACTORY =
+      MagicS3GuardCommitterFactory.CLASSNAME;
+
+  /**
+   * Error text when the destination path exists and the committer
+   * must abort the job/task {@value}.
+   */
+  public static final String E_DEST_EXISTS =
+      "Destination path exists and committer conflict resolution mode is "
+          + "\"fail\"";
+
+  /** Error message for bad path: {@value}. */
+  public static final String E_BAD_PATH
+      = "Path does not represent a magic-commit path";
+
+  /** Error message if filesystem isn't magic: {@value}. */
+  public static final String E_NORMAL_FS
+      = "Filesystem does not have support for 'magic' committer enabled"
+      + " in configuration option " + MAGIC_COMMITTER_ENABLED;
+
+  /** Error message if the dest FS isn't S3A: {@value}. */
+  public static final String E_WRONG_FS
+      = "Output path is not on an S3A Filesystem";
+
+  /** Error message for a path without a magic element in the list: {@value}. 
*/
+  public static final String E_NO_MAGIC_PATH_ELEMENT
+      = "No " + MAGIC + " element in path";
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/LocalTempDir.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/LocalTempDir.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/LocalTempDir.java
new file mode 100644
index 0000000..849a06d
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/LocalTempDir.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Constants;
+
+import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
+
+/**
+ * A class which manages access to a temporary directory store, uses the
+ * directories listed in {@link Constants#BUFFER_DIR} for this.
+ */
+final class LocalTempDir {
+
+  private LocalTempDir() {
+  }
+
+  private static LocalDirAllocator directoryAllocator;
+
+  private static synchronized LocalDirAllocator getAllocator(
+      Configuration conf, String key) {
+    if (directoryAllocator != null) {
+      String bufferDir = conf.get(key) != null
+          ? key : Constants.HADOOP_TMP_DIR;
+      directoryAllocator = new LocalDirAllocator(bufferDir);
+    }
+    return directoryAllocator;
+  }
+
+  /**
+   * Create a temp file.
+   * @param conf configuration to use when creating the allocator
+   * @param prefix filename prefix
+   * @param size file size, or -1 if not known
+   * @return the temp file. The file has been created.
+   * @throws IOException IO failure
+   */
+  public static File tempFile(Configuration conf, String prefix, long size)
+      throws IOException {
+    return getAllocator(conf, BUFFER_DIR).createTmpFileForWrite(
+        prefix, size, conf);
+  }
+
+  /**
+   * Get a temporary path.
+   * @param conf configuration to use when creating the allocator
+   * @param prefix filename prefix
+   * @param size file size, or -1 if not known
+   * @return the temp path.
+   * @throws IOException IO failure
+   */
+  public static Path tempPath(Configuration conf, String prefix, long size)
+      throws IOException {
+    return getAllocator(conf, BUFFER_DIR)
+        .getLocalPathForWrite(prefix, size, conf);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java
new file mode 100644
index 0000000..a07b5c9
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitIntegration.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.Statistic;
+import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker;
+
+import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
+
+/**
+ * Adds the code needed for S3A to support magic committers.
+ * It's pulled out to keep S3A FS class slightly less complex.
+ * This class can be instantiated even when magic commit is disabled;
+ * in this case:
+ * <ol>
+ *   <li>{@link #isMagicCommitPath(Path)} will always return false.</li>
+ *   <li>{@link #createTracker(Path, String)} will always return an instance
+ *   of {@link PutTracker}.</li>
+ * </ol>
+ *
+ * <p>Important</p>: must not directly or indirectly import a class which
+ * uses any datatype in hadoop-mapreduce.
+ */
+public class MagicCommitIntegration {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MagicCommitIntegration.class);
+  private final S3AFileSystem owner;
+  private final boolean magicCommitEnabled;
+
+  /**
+   * Instantiate.
+   * @param owner owner class
+   * @param magicCommitEnabled is magic commit enabled.
+   */
+  public MagicCommitIntegration(S3AFileSystem owner,
+      boolean magicCommitEnabled) {
+    this.owner = owner;
+    this.magicCommitEnabled = magicCommitEnabled;
+  }
+
+  /**
+   * Given an (elements, key) pair, return the key of the final destination of
+   * the PUT, that is: where the final path is expected to go?
+   * @param elements path split to elements
+   * @param key key
+   * @return key for final put. If this is not a magic commit, the
+   * same as the key in.
+   */
+  public String keyOfFinalDestination(List<String> elements, String key) {
+    if (isMagicCommitPath(elements)) {
+      return elementsToKey(finalDestination(elements));
+    } else {
+      return key;
+    }
+  }
+
+  /**
+   * Given a path and a key to that same path, create a tracker for it.
+   * This specific tracker will be chosen based on whether or not
+   * the path is a magic one.
+   * @param path path of nominal write
+   * @param key key of path of nominal write
+   * @return the tracker for this operation.
+   */
+  public PutTracker createTracker(Path path, String key) {
+    final List<String> elements = splitPathToElements(path);
+    PutTracker tracker;
+
+    if(isMagicFile(elements)) {
+      // path is of a magic file
+      if (isMagicCommitPath(elements)) {
+        final String destKey = keyOfFinalDestination(elements, key);
+        String pendingsetPath = key + CommitConstants.PENDING_SUFFIX;
+        owner.getInstrumentation()
+            .incrementCounter(Statistic.COMMITTER_MAGIC_FILES_CREATED, 1);
+        tracker = new MagicCommitTracker(path,
+            owner.getBucket(),
+            key,
+            destKey,
+            pendingsetPath,
+            owner.createWriteOperationHelper());
+        LOG.debug("Created {}", tracker);
+      } else {
+        LOG.warn("File being created has a \"magic\" path, but the filesystem"
+            + " has magic file support disabled: {}", path);
+        // downgrade to standard multipart tracking
+        tracker = new PutTracker(key);
+      }
+    } else {
+      // standard multipart tracking
+      tracker = new PutTracker(key);
+    }
+    return tracker;
+  }
+
+  /**
+   * This performs the calculation of the final destination of a set
+   * of elements.
+   *
+   * @param elements original (do not edit after this call)
+   * @return a list of elements, possibly empty
+   */
+  private List<String> finalDestination(List<String> elements) {
+    return magicCommitEnabled ?
+        MagicCommitPaths.finalDestination(elements)
+        : elements;
+  }
+
+  /**
+   * Is magic commit enabled?
+   * @return true if magic commit is turned on.
+   */
+  public boolean isMagicCommitEnabled() {
+    return magicCommitEnabled;
+  }
+
+  /**
+   * Predicate: is a path a magic commit path?
+   * @param path path to examine
+   * @return true if the path is or is under a magic directory
+   */
+  public boolean isMagicCommitPath(Path path) {
+    return isMagicCommitPath(splitPathToElements(path));
+  }
+
+  /**
+   * Is this path a magic commit path in this filesystem?
+   * True if magic commit is enabled, the path is magic
+   * and the path is not actually a commit metadata file.
+   * @param elements element list
+   * @return true if writing path is to be uprated to a magic file write
+   */
+  private boolean isMagicCommitPath(List<String> elements) {
+    return magicCommitEnabled && isMagicFile(elements);
+  }
+
+  /**
+   * Is the file a magic file: this predicate doesn't check
+   * for the FS actually having the magic bit being set.
+   * @param elements path elements
+   * @return true if the path is one a magic file write expects.
+   */
+  private boolean isMagicFile(List<String> elements) {
+    return isMagicPath(elements) &&
+        !isCommitMetadataFile(elements);
+  }
+
+  /**
+   * Does this file contain all the commit metadata?
+   * @param elements path element list
+   * @return true if this file is one of the commit metadata files.
+   */
+  private boolean isCommitMetadataFile(List<String> elements) {
+    String last = elements.get(elements.size() - 1);
+    return last.endsWith(CommitConstants.PENDING_SUFFIX)
+        || last.endsWith(CommitConstants.PENDINGSET_SUFFIX);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitPaths.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitPaths.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitPaths.java
new file mode 100644
index 0000000..745b5b2
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/MagicCommitPaths.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.StringUtils;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.BASE;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC;
+import static 
org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_MAGIC_PATH_ELEMENT;
+
+/**
+ * Operations on (magic) paths.
+ */
+public final class MagicCommitPaths {
+
+  private MagicCommitPaths() {
+  }
+
+  /**
+   * Take an absolute path, split it into a list of elements.
+   * If empty, the path is the root path.
+   * @param path input path
+   * @return a possibly empty list of elements.
+   * @throws IllegalArgumentException if the path is invalid -relative, 
empty...
+   */
+  public static List<String> splitPathToElements(Path path) {
+    checkArgument(path.isAbsolute(), "path is relative");
+    String uriPath = path.toUri().getPath();
+    checkArgument(!uriPath.isEmpty(), "empty path");
+    if ("/".equals(uriPath)) {
+      // special case: empty list
+      return new ArrayList<>(0);
+    }
+    List<String> elements = new ArrayList<>();
+    int len = uriPath.length();
+    int firstElementChar = 1;
+    int endOfElement = uriPath.indexOf('/', firstElementChar);
+    while (endOfElement > 0) {
+      elements.add(uriPath.substring(firstElementChar, endOfElement));
+      firstElementChar = endOfElement + 1;
+      endOfElement = firstElementChar == len ? -1
+          : uriPath.indexOf('/', firstElementChar);
+    }
+    // expect a possible child element here
+    if (firstElementChar != len) {
+      elements.add(uriPath.substring(firstElementChar));
+    }
+    return elements;
+  }
+
+  /**
+   * Is a path in the magic tree?
+   * @param elements element list
+   * @return true if a path is considered magic
+   */
+  public static boolean isMagicPath(List<String> elements) {
+    return elements.contains(MAGIC);
+  }
+
+  /**
+   * Does the list of magic elements contain a base path marker?
+   * @param elements element list, already stripped out
+   * from the magic tree.
+   * @return true if a path has a base directory
+   */
+  public static boolean containsBasePath(List<String> elements) {
+    return elements.contains(BASE);
+  }
+
+  /**
+   * Get the index of the magic path element.
+   * @param elements full path element list
+   * @return the index.
+   * @throws IllegalArgumentException if there is no magic element
+   */
+  public static int magicElementIndex(List<String> elements) {
+    int index = elements.indexOf(MAGIC);
+    checkArgument(index >= 0, E_NO_MAGIC_PATH_ELEMENT);
+    return index;
+  }
+
+  /**
+   * Get the parent path elements of the magic path.
+   * The list may be immutable or may be a view of the underlying list.
+   * Both the parameter list and the returned list MUST NOT be modified.
+   * @param elements full path element list
+   * @return the parent elements; may be empty
+   */
+  public static List<String> magicPathParents(List<String> elements) {
+    return elements.subList(0, magicElementIndex(elements));
+  }
+
+  /**
+   * Get the child path elements under the magic path.
+   * The list may be immutable or may be a view of the underlying list.
+   * Both the parameter list and the returned list MUST NOT be modified.
+   * @param elements full path element list
+   * @return the child elements; may be empty
+   */
+  public static List<String> magicPathChildren(List<String> elements) {
+    int index = magicElementIndex(elements);
+    int len = elements.size();
+    if (index == len - 1) {
+      // empty index
+      return Collections.emptyList();
+    } else {
+      return elements.subList(index + 1, len);
+    }
+  }
+
+  /**
+   * Get any child path elements under any {@code __base} path,
+   * or an empty list if there is either: no {@code __base} path element,
+   * or no child entries under it.
+   * The list may be immutable or may be a view of the underlying list.
+   * Both the parameter list and the returned list MUST NOT be modified.
+   * @param elements full path element list
+   * @return the child elements; may be empty
+   */
+  public static List<String> basePathChildren(List<String> elements) {
+    int index = elements.indexOf(BASE);
+    if (index < 0) {
+      return Collections.emptyList();
+    }
+    int len = elements.size();
+    if (index == len - 1) {
+      // empty index
+      return Collections.emptyList();
+    } else {
+      return elements.subList(index + 1, len);
+    }
+  }
+
+  /**
+   * Take a list of elements and create an S3 key by joining them
+   * with "/" between each one.
+   * @param elements path elements
+   * @return a path which can be used in the AWS API
+   */
+  public static String elementsToKey(List<String> elements) {
+    return StringUtils.join("/", elements);
+  }
+
+  /**
+   * Get the filename of a path: the last element.
+   * @param elements element list.
+   * @return the filename; the last element.
+   */
+  public static String filename(List<String> elements) {
+    return lastElement(elements);
+  }
+
+  /**
+   * Last element of a (non-empty) list.
+   * @param strings strings in
+   * @return the last one.
+   */
+  public static String lastElement(List<String> strings) {
+    checkArgument(!strings.isEmpty(), "empty list");
+    return strings.get(strings.size() - 1);
+  }
+
+  /**
+   * Get the magic subdirectory of a destination directory.
+   * @param destDir the destination directory
+   * @return a new path.
+   */
+  public static Path magicSubdir(Path destDir) {
+    return new Path(destDir, MAGIC);
+  }
+
+  /**
+   * Calculates the final destination of a file.
+   * This is the parent of any {@code __magic} element, and the filename
+   * of the path. That is: all intermediate child path elements are discarded.
+   * Why so? paths under the magic path include job attempt and task attempt
+   * subdirectories, which need to be skipped.
+   *
+   * If there is a {@code __base} directory in the children, then it becomes
+   * a base for unflattened paths, that is: all its children are pulled into
+   * the final destination.
+   * @param elements element list.
+   * @return the path
+   */
+  public static List<String> finalDestination(List<String> elements) {
+    if (isMagicPath(elements)) {
+      List<String> destDir = magicPathParents(elements);
+      List<String> children = magicPathChildren(elements);
+      checkArgument(!children.isEmpty(), "No path found under " +
+          MAGIC);
+      ArrayList<String> dest = new ArrayList<>(destDir);
+      if (containsBasePath(children)) {
+        // there's a base marker in the path
+        List<String> baseChildren = basePathChildren(children);
+        checkArgument(!baseChildren.isEmpty(),
+            "No path found under " + BASE);
+        dest.addAll(baseChildren);
+      } else {
+        dest.add(filename(children));
+      }
+      return dest;
+    } else {
+      return elements;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/PathCommitException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/PathCommitException.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/PathCommitException.java
new file mode 100644
index 0000000..4607548
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/PathCommitException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+
+/**
+ * Path exception to use for various commit issues.
+ */
+public class PathCommitException extends PathIOException {
+  public PathCommitException(String path, Throwable cause) {
+    super(path, cause);
+  }
+
+  public PathCommitException(String path, String error) {
+    super(path, error);
+  }
+
+  public PathCommitException(Path path, String error) {
+    super(path != null ? path.toString() : "", error);
+  }
+
+  public PathCommitException(String path, String error, Throwable cause) {
+    super(path, error, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/PutTracker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/PutTracker.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/PutTracker.java
new file mode 100644
index 0000000..bbffef3
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/PutTracker.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.amazonaws.services.s3.model.PartETag;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Multipart put tracker.
+ * Base class does nothing except declare that any
+ * MPU must complete in the {@code close()} operation.
+ *
+ */
+@InterfaceAudience.Private
+public class PutTracker {
+
+  /** The destination. */
+  private final String destKey;
+
+  /**
+   * Instantiate.
+   * @param destKey destination key
+   */
+  public PutTracker(String destKey) {
+    this.destKey = destKey;
+  }
+
+  /**
+   * Startup event.
+   * @return true if the multipart should start immediately.
+   * @throws IOException any IO problem.
+   */
+  public boolean initialize() throws IOException {
+    return false;
+  }
+
+  /**
+   * Flag to indicate that output is not immediately visible after the stream
+   * is closed. Default: false.
+   * @return true if the output's visibility will be delayed.
+   */
+  public boolean outputImmediatelyVisible() {
+    return true;
+  }
+
+  /**
+   * Callback when the upload is is about to complete.
+   * @param uploadId Upload ID
+   * @param parts list of parts
+   * @param bytesWritten bytes written
+   * @return true if the commit is to be initiated immediately.
+   * False implies the output stream does not need to worry about
+   * what happens.
+   * @throws IOException I/O problem or validation failure.
+   */
+  public boolean aboutToComplete(String uploadId,
+      List<PartETag> parts,
+      long bytesWritten)
+      throws IOException {
+    return true;
+  }
+
+  /**
+   * get the destination key. The default implementation returns the
+   * key passed in: there is no adjustment of the destination.
+   * @return the destination to use in PUT requests.
+   */
+  public String getDestKey() {
+    return destKey;
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "DefaultPutTracker{");
+    sb.append("destKey='").append(destKey).append('\'');
+    sb.append('}');
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java
new file mode 100644
index 0000000..6b170f9
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/S3ACommitterFactory.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitterFactory;
+import 
org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitterFactory;
+import 
org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitterFactory;
+import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterFactory;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
+
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+
+/**
+ * The S3A committer factory which chooses the committer based on the
+ * specific option chosen in a per-bucket basis from the property
+ * {@link CommitConstants#FS_S3A_COMMITTER_NAME}.
+ *
+ * This should be instantiated by using the property value {@link #CLASSNAME}
+ * as the committer for the job, then set the filesystem property
+ * {@link CommitConstants#FS_S3A_COMMITTER_NAME} to one of
+ * <ul>
+ *   <li>{@link CommitConstants#COMMITTER_NAME_FILE}: File committer.</li>
+ *   <li>{@link CommitConstants#COMMITTER_NAME_DIRECTORY}:
+ *   Staging directory committer.</li>
+ *   <li>{@link CommitConstants#COMMITTER_NAME_PARTITIONED}:
+ *   Staging partitioned committer.</li>
+ *   <li>{@link CommitConstants#COMMITTER_NAME_MAGIC}:
+ *   the "Magic" committer</li>
+ *   <li>{@link InternalCommitterConstants#COMMITTER_NAME_STAGING}:
+ *   the "staging" committer, which isn't intended for use outside tests.</li>
+ * </ul>
+ * There are no checks to verify that the filesystem is compatible with
+ * the committer.
+ */
+public class S3ACommitterFactory extends AbstractS3ACommitterFactory {
+
+  /**
+   * Name of this class: {@value}.
+   */
+  public static final String CLASSNAME
+      = "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory";
+
+  /**
+   * Create a task committer.
+   * @param fileSystem destination FS.
+   * @param outputPath final output path for work
+   * @param context job context
+   * @return a committer
+   * @throws IOException instantiation failure
+   */
+  @Override
+  public PathOutputCommitter createTaskCommitter(S3AFileSystem fileSystem,
+      Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    AbstractS3ACommitterFactory factory = chooseCommitterFactory(fileSystem,
+        outputPath,
+        context.getConfiguration());
+    return factory != null ?
+      factory.createTaskCommitter(fileSystem, outputPath, context)
+      : createFileOutputCommitter(outputPath, context);
+  }
+
+  /**
+   * Choose a committer from the FS and task configurations. Task Configuration
+   * takes priority, allowing execution engines to dynamically change
+   * committer on a query-by-query basis.
+   * @param fileSystem FS
+   * @param outputPath destination path
+   * @param taskConf configuration from the task
+   * @return An S3A committer if chosen, or "null" for the classic value
+   * @throws PathCommitException on a failure to identify the committer
+   */
+  private AbstractS3ACommitterFactory chooseCommitterFactory(
+      S3AFileSystem fileSystem,
+      Path outputPath,
+      Configuration taskConf) throws PathCommitException {
+    AbstractS3ACommitterFactory factory;
+
+    // the FS conf will have had its per-bucket values resolved, unlike
+    // job/task configurations.
+    Configuration fsConf = fileSystem.getConf();
+
+    String name = fsConf.getTrimmed(FS_S3A_COMMITTER_NAME, 
COMMITTER_NAME_FILE);
+    name = taskConf.getTrimmed(FS_S3A_COMMITTER_NAME, name);
+    switch (name) {
+    case COMMITTER_NAME_FILE:
+      factory = null;
+      break;
+    case COMMITTER_NAME_DIRECTORY:
+      factory = new DirectoryStagingCommitterFactory();
+      break;
+    case COMMITTER_NAME_PARTITIONED:
+      factory = new PartitionedStagingCommitterFactory();
+      break;
+    case COMMITTER_NAME_MAGIC:
+      factory = new MagicS3GuardCommitterFactory();
+      break;
+    case InternalCommitterConstants.COMMITTER_NAME_STAGING:
+      factory = new StagingCommitterFactory();
+      break;
+    default:
+      throw new PathCommitException(outputPath,
+          "Unknown committer: \"" + name + "\"");
+    }
+    return factory;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to