http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitterFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitterFactory.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitterFactory.java
new file mode 100644
index 0000000..b446f22
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/PartitionedStagingCommitterFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitterFactory;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
+
+/**
+ * Factory for the {@link PartitionedStagingCommitter}.
+ */
+public class PartitionedStagingCommitterFactory
+    extends AbstractS3ACommitterFactory {
+
+  /**
+   * Name of this class: {@value}.
+   */
+  public static final String CLASSNAME
+      = "org.apache.hadoop.fs.s3a.commit.staging"
+      + ".PartitionedStagingCommitterFactory";
+
+  public PathOutputCommitter createTaskCommitter(S3AFileSystem fileSystem,
+      Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    return new PartitionedStagingCommitter(outputPath, context);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java
new file mode 100644
index 0000000..a4d39d7
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/Paths.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIsDirectoryException;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static 
org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.*;
+
+/**
+ * Path operations for the staging committers.
+ */
+public final class Paths {
+
+  private Paths() {
+  }
+
+  /**
+   * Insert the UUID to a path if it is not there already.
+   * If there is a trailing "." in the prefix after the last slash, the
+   * UUID is inserted before it with a "-" prefix; otherwise appended.
+   *
+   * Examples:
+   * <pre>
+   *   /example/part-0000  ==&gt; /example/part-0000-0ab34
+   *   /example/part-0001.gz.csv  ==&gt; /example/part-0001-0ab34.gz.csv
+   *   /example/part-0002-0abc3.gz.csv  ==&gt; /example/part-0002-0abc3.gz.csv
+   *   /example0abc3/part-0002.gz.csv  ==&gt; /example0abc3/part-0002.gz.csv
+   * </pre>
+   *
+   *
+   * @param pathStr path as a string; must not have a trailing "/".
+   * @param uuid UUID to append; must not be empty
+   * @return new path.
+   */
+  public static String addUUID(String pathStr, String uuid) {
+    Preconditions.checkArgument(StringUtils.isNotEmpty(pathStr), "empty path");
+    Preconditions.checkArgument(StringUtils.isNotEmpty(uuid), "empty uuid");
+    // In some cases, Spark will add the UUID to the filename itself.
+    if (pathStr.contains(uuid)) {
+      return pathStr;
+    }
+
+    int dot; // location of the first '.' in the file name
+    int lastSlash = pathStr.lastIndexOf('/');
+    if (lastSlash >= 0) {
+      Preconditions.checkState(lastSlash + 1 < pathStr.length(),
+          "Bad path: " + pathStr);
+      dot = pathStr.indexOf('.', lastSlash);
+    } else {
+      dot = pathStr.indexOf('.');
+    }
+
+    if (dot >= 0) {
+      return pathStr.substring(0, dot) + "-" + uuid + pathStr.substring(dot);
+    } else {
+      return pathStr + "-" + uuid;
+    }
+  }
+
+  /**
+   * Get the parent path of a string path: everything up to but excluding
+   * the last "/" in the path.
+   * @param pathStr path as a string
+   * @return the parent or null if there is no parent.
+   */
+  public static String getParent(String pathStr) {
+    int lastSlash = pathStr.lastIndexOf('/');
+    if (lastSlash >= 0) {
+      return pathStr.substring(0, lastSlash);
+    }
+    return null;
+  }
+
+  /**
+   * Using {@code URI#relativize()}, build the relative path from the
+   * base path to the full path.
+   * If {@code childPath} is not a child of {@code basePath} the outcome
+   * os undefined.
+   * @param basePath base path
+   * @param fullPath full path under the base path.
+   * @return the relative path
+   */
+  public static String getRelativePath(Path basePath,
+      Path fullPath) {
+    return basePath.toUri().relativize(fullPath.toUri()).getPath();
+  }
+
+  /**
+   * Varags constructor of paths. Not very efficient.
+   * @param parent parent path
+   * @param child child entries. "" elements are skipped.
+   * @return the full child path.
+   */
+  public static Path path(Path parent, String... child) {
+    Path p = parent;
+    for (String c : child) {
+      if (!c.isEmpty()) {
+        p = new Path(p, c);
+      }
+    }
+    return p;
+  }
+
+  /**
+   * A cache of temporary folders. There's a risk here that the cache
+   * gets too big
+   */
+  private static Cache<TaskAttemptID, Path> tempFolders = CacheBuilder
+      .newBuilder().build();
+
+  /**
+   * Get the task attempt temporary directory in the local filesystem.
+   * @param conf configuration
+   * @param uuid some UUID, such as a job UUID
+   * @param attemptID attempt ID
+   * @return a local task attempt directory.
+   * @throws IOException IO problem.
+   */
+  public static Path getLocalTaskAttemptTempDir(final Configuration conf,
+      final String uuid,
+      final TaskAttemptID attemptID)
+      throws IOException {
+    try {
+      final LocalDirAllocator allocator =
+          new LocalDirAllocator(Constants.BUFFER_DIR);
+      return tempFolders.get(attemptID,
+          () -> {
+            return FileSystem.getLocal(conf).makeQualified(
+                allocator.getLocalPathForWrite(uuid, conf));
+          });
+    } catch (ExecutionException e) {
+      throw new RuntimeException(e.getCause());
+    } catch (UncheckedExecutionException e) {
+      if (e.getCause() instanceof RuntimeException) {
+        throw (RuntimeException) e.getCause();
+      }
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Remove all information held about task attempts.
+   * @param attemptID attempt ID.
+   */
+  public static void clearTempFolderInfo(final TaskAttemptID attemptID) {
+    tempFolders.invalidate(attemptID);
+  }
+
+  /**
+   * Reset the temp folder cache; useful in tests.
+   */
+  @VisibleForTesting
+  public static void resetTempFolderCache() {
+    tempFolders.invalidateAll();
+  }
+
+  /**
+   * Try to come up with a good temp directory for different filesystems.
+   * @param fs filesystem
+   * @param conf configuration
+   * @return a qualified path under which temporary work can go.
+   */
+  public static Path tempDirForStaging(FileSystem fs,
+      Configuration conf) {
+    String fallbackPath = fs.getScheme().equals("file")
+        ? System.getProperty(JAVA_IO_TMPDIR)
+        : FILESYSTEM_TEMP_PATH;
+
+    return fs.makeQualified(new Path(conf.getTrimmed(
+        FS_S3A_COMMITTER_STAGING_TMP_PATH, fallbackPath)));
+  }
+
+  /**
+   * Get the Application Attempt ID for this job.
+   * @param conf the config to look in
+   * @return the Application Attempt ID for a given job.
+   */
+  private static int getAppAttemptId(Configuration conf) {
+    return conf.getInt(
+        MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
+  }
+
+  /**
+   * Build a qualified temporary path for the multipart upload commit
+   * information in the cluster filesystem.
+   * Path is built by
+   * {@link #getMultipartUploadCommitsDirectory(FileSystem, Configuration, 
String)}
+   * @param conf configuration defining default FS.
+   * @param uuid uuid of job
+   * @return a path which can be used for temporary work
+   * @throws IOException on an IO failure.
+   */
+  public static Path getMultipartUploadCommitsDirectory(Configuration conf,
+      String uuid) throws IOException {
+    return getMultipartUploadCommitsDirectory(FileSystem.get(conf), conf, 
uuid);
+  }
+
+  /**
+   * Build a qualified temporary path for the multipart upload commit
+   * information in the supplied filesystem
+   * (which is expected to be the cluster FS).
+   * Currently {code $tempDir/$user/$uuid/staging-uploads} where
+   * {@code tempDir} is from
+   * {@link #tempDirForStaging(FileSystem, Configuration)}.
+   * @param fs target FS
+   * @param conf configuration
+   * @param uuid uuid of job
+   * @return a path which can be used for temporary work
+   * @throws IOException on an IO failure.
+   */
+  @VisibleForTesting
+  static Path getMultipartUploadCommitsDirectory(FileSystem fs,
+      Configuration conf, String uuid) throws IOException {
+    return path(
+        tempDirForStaging(fs, conf),
+        UserGroupInformation.getCurrentUser().getShortUserName(),
+        uuid,
+        STAGING_UPLOADS);
+  }
+
+  /**
+   * Returns the partition of a relative file path, or null if the path is a
+   * file name with no relative directory.
+   *
+   * @param relative a relative file path
+   * @return the partition of the relative file path
+   */
+  protected static String getPartition(String relative) {
+    return getParent(relative);
+  }
+
+  /**
+   * Get the set of partitions from the list of files being staged.
+   * This is all immediate parents of those files. If a file is in the root
+   * dir, the partition is declared to be
+   * {@link StagingCommitterConstants#TABLE_ROOT}.
+   * @param attemptPath path for the attempt
+   * @param taskOutput list of output files.
+   * @return list of partitions.
+   * @throws IOException IO failure
+   */
+  public static Set<String> getPartitions(Path attemptPath,
+      List<? extends FileStatus> taskOutput)
+      throws IOException {
+    // get a list of partition directories
+    Set<String> partitions = Sets.newLinkedHashSet();
+    for (FileStatus fileStatus : taskOutput) {
+      // sanity check the output paths
+      Path outputFile = fileStatus.getPath();
+      if (!fileStatus.isFile()) {
+        throw new PathIsDirectoryException(outputFile.toString());
+      }
+      String partition = getPartition(
+          getRelativePath(attemptPath, outputFile));
+      partitions.add(partition != null ? partition : TABLE_ROOT);
+    }
+
+    return partitions;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
new file mode 100644
index 0000000..922d1ad
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
@@ -0,0 +1,851 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.DurationInfo;
+import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
+import org.apache.hadoop.fs.s3a.commit.Tasks;
+import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+
+import static com.google.common.base.Preconditions.*;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.Invoker.*;
+import static 
org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*;
+
+/**
+ * Committer based on the contributed work of the
+ * <a href="https://github.com/rdblue/s3committer";>Netflix multipart 
committers.</a>
+ * <ol>
+ *   <li>
+ *   The working directory of each task is actually under a temporary
+ *   path in the local filesystem; jobs write directly into it.
+ *   </li>
+ *   <li>
+ *     Task Commit: list all files under the task working dir, upload
+ *     each of them but do not commit the final operation.
+ *     Persist the information for each pending commit into the cluster
+ *     for enumeration and commit by the job committer.
+ *   </li>
+ *   <li>Task Abort: recursive delete of task working dir.</li>
+ *   <li>Job Commit: list all pending PUTs to commit; commit them.</li>
+ *   <li>
+ *     Job Abort: list all pending PUTs to commit; abort them.
+ *     Delete all task attempt directories.
+ *   </li>
+ * </ol>
+ *
+ * This is the base class of the Partitioned and Directory committers.
+ * It does not do any conflict resolution, and is made non-abstract
+ * primarily for test purposes. It is not expected to be used in production.
+ */
+public class StagingCommitter extends AbstractS3ACommitter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      StagingCommitter.class);
+
+  /** Name: {@value}. */
+  public static final String NAME = "staging";
+  private final Path constructorOutputPath;
+  private final long uploadPartSize;
+  private final String uuid;
+  private final boolean uniqueFilenames;
+  private final FileOutputCommitter wrappedCommitter;
+
+  private ConflictResolution conflictResolution;
+  private String s3KeyPrefix;
+
+  /** The directory in the cluster FS for commits to go to. */
+  private Path commitsDirectory;
+
+  /**
+   * Committer for a single task attempt.
+   * @param outputPath final output path
+   * @param context task context
+   * @throws IOException on a failure
+   */
+  public StagingCommitter(Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    super(outputPath, context);
+    this.constructorOutputPath = checkNotNull(getOutputPath(), "output path");
+    Configuration conf = getConf();
+    this.uploadPartSize = conf.getLongBytes(
+        MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
+    this.uuid = getUploadUUID(conf, context.getJobID());
+    this.uniqueFilenames = conf.getBoolean(
+        FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
+        DEFAULT_STAGING_COMMITTER_UNIQUE_FILENAMES);
+    setWorkPath(buildWorkPath(context, uuid));
+    this.wrappedCommitter = createWrappedCommitter(context, conf);
+    setOutputPath(constructorOutputPath);
+    Path finalOutputPath = getOutputPath();
+    Preconditions.checkNotNull(finalOutputPath, "Output path cannot be null");
+    S3AFileSystem fs = getS3AFileSystem(finalOutputPath,
+        context.getConfiguration(), false);
+    s3KeyPrefix = fs.pathToKey(finalOutputPath);
+    LOG.debug("{}: final output path is {}", getRole(), finalOutputPath);
+    // forces evaluation and caching of the resolution mode.
+    ConflictResolution mode = getConflictResolutionMode(getJobContext(),
+        fs.getConf());
+    LOG.debug("Conflict resolution mode: {}", mode);
+  }
+
+  @Override
+  public String getName() {
+    return NAME;
+  }
+
+  /**
+   * Create the wrapped committer.
+   * This includes customizing its options, and setting up the destination
+   * directory.
+   * @param context job/task context.
+   * @param conf config
+   * @return the inner committer
+   * @throws IOException on a failure
+   */
+  protected FileOutputCommitter createWrappedCommitter(JobContext context,
+      Configuration conf) throws IOException {
+
+    // explicitly choose commit algorithm
+    initFileOutputCommitterOptions(context);
+    commitsDirectory = Paths.getMultipartUploadCommitsDirectory(conf, uuid);
+    return new FileOutputCommitter(commitsDirectory, context);
+  }
+
+  /**
+   * Init the context config with everything needed for the file output
+   * committer. In particular, this code currently only works with
+   * commit algorithm 1.
+   * @param context context to configure.
+   */
+  protected void initFileOutputCommitterOptions(JobContext context) {
+    context.getConfiguration()
+        .setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, 1);
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder("StagingCommitter{");
+    sb.append(super.toString());
+    sb.append(", conflictResolution=").append(conflictResolution);
+    if (wrappedCommitter != null) {
+      sb.append(", wrappedCommitter=").append(wrappedCommitter);
+    }
+    sb.append('}');
+    return sb.toString();
+  }
+
+  /**
+   * Get the UUID of an upload; may be the job ID.
+   * Spark will use a fake app ID based on the current minute and job ID 0.
+   * To avoid collisions, the key policy is:
+   * <ol>
+   *   <li>Value of {@link 
InternalCommitterConstants#FS_S3A_COMMITTER_STAGING_UUID}.</li>
+   *   <li>Value of {@code "spark.sql.sources.writeJobUUID"}.</li>
+   *   <li>Value of {@code "spark.app.id"}.</li>
+   *   <li>JobId passed in.</li>
+   * </ol>
+   * The staging UUID is set in in {@link #setupJob(JobContext)} and so will
+   * be valid in all sequences where the job has been set up for the
+   * configuration passed in.
+   * @param conf job/task configuration
+   * @param jobId Job ID
+   * @return an ID for use in paths.
+   */
+  public static String getUploadUUID(Configuration conf, String jobId) {
+    return conf.getTrimmed(
+        InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID,
+        conf.getTrimmed(SPARK_WRITE_UUID,
+            conf.getTrimmed(SPARK_APP_ID, jobId)));
+  }
+
+  /**
+   * Get the UUID of a Job.
+   * @param conf job/task configuration
+   * @param jobId Job ID
+   * @return an ID for use in paths.
+   */
+  public static String getUploadUUID(Configuration conf, JobID jobId) {
+    return getUploadUUID(conf, jobId.toString());
+  }
+
+  /**
+   * Get the work path for a task.
+   * @param context job/task complex
+   * @param uuid UUID
+   * @return a path or null if the context is not of a task
+   * @throws IOException failure to build the path
+   */
+  private static Path buildWorkPath(JobContext context, String uuid)
+      throws IOException {
+    if (context instanceof TaskAttemptContext) {
+      return taskAttemptWorkingPath((TaskAttemptContext) context, uuid);
+    } else {
+      return null;
+    }
+  }
+
+  /**
+   * Is this committer using unique filenames?
+   * @return true if unique filenames are used.
+   */
+  public Boolean useUniqueFilenames() {
+    return uniqueFilenames;
+  }
+
+  /**
+   * Get the filesystem for the job attempt.
+   * @param context the context of the job.  This is used to get the
+   * application attempt ID.
+   * @return the FS to store job attempt data.
+   * @throws IOException failure to create the FS.
+   */
+  public FileSystem getJobAttemptFileSystem(JobContext context)
+      throws IOException {
+    Path p = getJobAttemptPath(context);
+    return p.getFileSystem(context.getConfiguration());
+  }
+
+  /**
+   * Compute the path where the output of a given job attempt will be placed.
+   * @param context the context of the job.  This is used to get the
+   * application attempt ID.
+   * @param out the output path to place these in.
+   * @return the path to store job attempt data.
+   */
+  public static Path getJobAttemptPath(JobContext context, Path out) {
+    return getJobAttemptPath(getAppAttemptId(context), out);
+  }
+
+  /**
+   * Compute the path where the output of a given job attempt will be placed.
+   * @param appAttemptId the ID of the application attempt for this job.
+   * @return the path to store job attempt data.
+   */
+  private static Path getJobAttemptPath(int appAttemptId, Path out) {
+    return new Path(getPendingJobAttemptsPath(out),
+        String.valueOf(appAttemptId));
+  }
+
+  @Override
+  protected Path getJobAttemptPath(int appAttemptId) {
+    return new Path(getPendingJobAttemptsPath(commitsDirectory),
+        String.valueOf(appAttemptId));
+  }
+
+  /**
+   * Compute the path where the output of pending task attempts are stored.
+   * @param context the context of the job with pending tasks.
+   * @return the path where the output of pending task attempts are stored.
+   */
+  private static Path getPendingTaskAttemptsPath(JobContext context, Path out) 
{
+    return new Path(getJobAttemptPath(context, out), TEMPORARY);
+  }
+
+  /**
+   * Compute the path where the output of a task attempt is stored until
+   * that task is committed.
+   *
+   * @param context the context of the task attempt.
+   * @param out The output path to put things in.
+   * @return the path where a task attempt should be stored.
+   */
+  public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
+    return new Path(getPendingTaskAttemptsPath(context, out),
+        String.valueOf(context.getTaskAttemptID()));
+  }
+
+  /**
+   * Get the location of pending job attempts.
+   * @param out the base output directory.
+   * @return the location of pending job attempts.
+   */
+  private static Path getPendingJobAttemptsPath(Path out) {
+    Preconditions.checkNotNull(out, "Null 'out' path");
+    return new Path(out, TEMPORARY);
+  }
+
+  /**
+   * Compute the path where the output of a committed task is stored until
+   * the entire job is committed.
+   * @param context the context of the task attempt
+   * @return the path where the output of a committed task is stored until
+   * the entire job is committed.
+   */
+  public Path getCommittedTaskPath(TaskAttemptContext context) {
+    return getCommittedTaskPath(getAppAttemptId(context), context);
+  }
+
+  /**
+   * Validate the task attempt context; makes sure
+   * that the task attempt ID data is valid.
+   * @param context task context
+   */
+  private static void validateContext(TaskAttemptContext context) {
+    Preconditions.checkNotNull(context, "null context");
+    Preconditions.checkNotNull(context.getTaskAttemptID(),
+        "null task attempt ID");
+    Preconditions.checkNotNull(context.getTaskAttemptID().getTaskID(),
+        "null task ID");
+    Preconditions.checkNotNull(context.getTaskAttemptID().getJobID(),
+        "null job ID");
+  }
+
+  /**
+   * Compute the path where the output of a committed task is stored until the
+   * entire job is committed for a specific application attempt.
+   * @param appAttemptId the ID of the application attempt to use
+   * @param context the context of any task.
+   * @return the path where the output of a committed task is stored.
+   */
+  protected Path getCommittedTaskPath(int appAttemptId,
+      TaskAttemptContext context) {
+    validateContext(context);
+    return new Path(getJobAttemptPath(appAttemptId),
+        String.valueOf(context.getTaskAttemptID().getTaskID()));
+  }
+
+  @Override
+  public Path getTempTaskAttemptPath(TaskAttemptContext context) {
+    throw new UnsupportedOperationException("Unimplemented");
+  }
+
+  /**
+   * Lists the output of a task under the task attempt path. Subclasses can
+   * override this method to change how output files are identified.
+   * <p>
+   * This implementation lists the files that are direct children of the output
+   * path and filters hidden files (file names starting with '.' or '_').
+   * <p>
+   * The task attempt path is provided by
+   * {@link #getTaskAttemptPath(TaskAttemptContext)}
+   *
+   * @param context this task's {@link TaskAttemptContext}
+   * @return the output files produced by this task in the task attempt path
+   * @throws IOException on a failure
+   */
+  protected List<LocatedFileStatus> getTaskOutput(TaskAttemptContext context)
+      throws IOException {
+
+    // get files on the local FS in the attempt path
+    Path attemptPath = getTaskAttemptPath(context);
+    Preconditions.checkNotNull(attemptPath,
+        "No attemptPath path in {}", this);
+
+    LOG.debug("Scanning {} for files to commit", attemptPath);
+
+    return listAndFilter(getTaskAttemptFilesystem(context),
+        attemptPath, true, HIDDEN_FILE_FILTER);
+  }
+
+  /**
+   * Returns the final S3 key for a relative path. Subclasses can override this
+   * method to upload files to a different S3 location.
+   * <p>
+   * This implementation concatenates the relative path with the key prefix
+   * from the output path.
+   * If {@link CommitConstants#FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES} is
+   * set, then the task UUID is also included in the calculation
+   *
+   * @param relative the path of a file relative to the task attempt path
+   * @param context the JobContext or TaskAttemptContext for this job
+   * @return the S3 key where the file will be uploaded
+   */
+  protected String getFinalKey(String relative, JobContext context) {
+    if (uniqueFilenames) {
+      return getS3KeyPrefix(context) + "/" + Paths.addUUID(relative, uuid);
+    } else {
+      return getS3KeyPrefix(context) + "/" + relative;
+    }
+  }
+
+  /**
+   * Returns the final S3 location for a relative path as a Hadoop {@link 
Path}.
+   * This is a final method that calls {@link #getFinalKey(String, JobContext)}
+   * to determine the final location.
+   *
+   * @param relative the path of a file relative to the task attempt path
+   * @param context the JobContext or TaskAttemptContext for this job
+   * @return the S3 Path where the file will be uploaded
+   */
+  protected final Path getFinalPath(String relative, JobContext context)
+      throws IOException {
+    return getDestS3AFS().keyToQualifiedPath(getFinalKey(relative, context));
+  }
+
+  /**
+   * Return the local work path as the destination for writing work.
+   * @param context the context of the task attempt.
+   * @return a path in the local filesystem.
+   */
+  @Override
+  public Path getBaseTaskAttemptPath(TaskAttemptContext context) {
+    // a path on the local FS for files that will be uploaded
+    return getWorkPath();
+  }
+
+  /**
+   * For a job attempt path, the staging committer returns that of the
+   * wrapped committer.
+   * @param context the context of the job.
+   * @return a path in HDFS.
+   */
+  @Override
+  public Path getJobAttemptPath(JobContext context) {
+    return wrappedCommitter.getJobAttemptPath(context);
+  }
+
+  /**
+   * Set up the job, including calling the same method on the
+   * wrapped committer.
+   * @param context job context
+   * @throws IOException IO failure.
+   */
+  @Override
+  public void setupJob(JobContext context) throws IOException {
+    LOG.debug("{}, Setting up job {}", getRole(), jobIdString(context));
+    context.getConfiguration().set(
+        InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID, uuid);
+    wrappedCommitter.setupJob(context);
+  }
+
+  /**
+   * Get the list of pending uploads for this job attempt.
+   * @param context job context
+   * @return a list of pending uploads.
+   * @throws IOException Any IO failure
+   */
+  @Override
+  protected List<SinglePendingCommit> listPendingUploadsToCommit(
+      JobContext context)
+      throws IOException {
+    return listPendingUploads(context, false);
+  }
+
+  /**
+   * Get the list of pending uploads for this job attempt, swallowing
+   * exceptions.
+   * @param context job context
+   * @return a list of pending uploads. If an exception was swallowed,
+   * then this may not match the actual set of pending operations
+   * @throws IOException shouldn't be raised, but retained for the compiler
+   */
+  protected List<SinglePendingCommit> listPendingUploadsToAbort(
+      JobContext context) throws IOException {
+    return listPendingUploads(context, true);
+  }
+
+  /**
+   * Get the list of pending uploads for this job attempt.
+   * @param context job context
+   * @param suppressExceptions should exceptions be swallowed?
+   * @return a list of pending uploads. If exceptions are being swallowed,
+   * then this may not match the actual set of pending operations
+   * @throws IOException Any IO failure which wasn't swallowed.
+   */
+  protected List<SinglePendingCommit> listPendingUploads(
+      JobContext context, boolean suppressExceptions) throws IOException {
+    try {
+      Path wrappedJobAttemptPath = wrappedCommitter.getJobAttemptPath(context);
+      final FileSystem attemptFS = wrappedJobAttemptPath.getFileSystem(
+          context.getConfiguration());
+      return loadPendingsetFiles(context, suppressExceptions, attemptFS,
+          listAndFilter(attemptFS,
+              wrappedJobAttemptPath, false,
+              HIDDEN_FILE_FILTER));
+    } catch (IOException e) {
+      // unable to work with endpoint, if suppressing errors decide our actions
+      maybeIgnore(suppressExceptions, "Listing pending uploads", e);
+    }
+    // reached iff an IOE was caught and swallowed
+    return new ArrayList<>(0);
+  }
+
+  @Override
+  public void cleanupStagingDirs() {
+    Path workPath = getWorkPath();
+    if (workPath != null) {
+      LOG.debug("Cleaning up work path {}", workPath);
+      ignoreIOExceptions(LOG, "cleaning up", workPath.toString(),
+          () -> deleteQuietly(workPath.getFileSystem(getConf()),
+              workPath, true));
+    }
+  }
+
+  @Override
+  @SuppressWarnings("deprecation")
+  protected void cleanup(JobContext context,
+      boolean suppressExceptions)
+      throws IOException {
+    maybeIgnore(suppressExceptions, "Cleanup wrapped committer",
+        () -> wrappedCommitter.cleanupJob(context));
+    maybeIgnore(suppressExceptions, "Delete destination paths",
+        () -> deleteDestinationPaths(context));
+    super.cleanup(context, suppressExceptions);
+  }
+
+  @Override
+  protected void abortPendingUploadsInCleanup(boolean suppressExceptions)
+      throws IOException {
+    if (getConf()
+        .getBoolean(FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS, true)) {
+      super.abortPendingUploadsInCleanup(suppressExceptions);
+    } else {
+      LOG.info("Not cleanup up pending uploads to {} as {} is false ",
+          getOutputPath(),
+          FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS);
+    }
+  }
+
+  @Override
+  protected void abortJobInternal(JobContext context,
+      boolean suppressExceptions) throws IOException {
+    String r = getRole();
+    boolean failed = false;
+    try (DurationInfo d = new DurationInfo(LOG,
+        "%s: aborting job in state %s ", r, jobIdString(context))) {
+      List<SinglePendingCommit> pending = listPendingUploadsToAbort(context);
+      abortPendingUploads(context, pending, suppressExceptions);
+    } catch (FileNotFoundException e) {
+      // nothing to list
+      LOG.debug("No job directory to read uploads from");
+    } catch (IOException e) {
+      failed = true;
+      maybeIgnore(suppressExceptions, "aborting job", e);
+    } finally {
+      super.abortJobInternal(context, failed || suppressExceptions);
+    }
+  }
+
+  /**
+   * Delete the working paths of a job. Does not attempt to clean up
+   * the work of the wrapped committer.
+   * <ol>
+   *   <li>The job attempt path</li>
+   *   <li>$dest/__temporary</li>
+   *   <li>the local working directory for staged files</li>
+   * </ol>
+   * @param context job context
+   * @throws IOException IO failure
+   */
+  protected void deleteDestinationPaths(JobContext context) throws IOException 
{
+    Path attemptPath = getJobAttemptPath(context);
+    ignoreIOExceptions(LOG,
+        "Deleting Job attempt Path", attemptPath.toString(),
+        () -> deleteWithWarning(
+            getJobAttemptFileSystem(context),
+            attemptPath,
+            true));
+
+    // delete the __temporary directory. This will cause problems
+    // if there is >1 task targeting the same dest dir
+    deleteWithWarning(getDestFS(),
+        new Path(getOutputPath(), TEMPORARY),
+        true);
+    // and the working path
+    deleteTaskWorkingPathQuietly(context);
+  }
+
+
+  @Override
+  public void setupTask(TaskAttemptContext context) throws IOException {
+    Path taskAttemptPath = getTaskAttemptPath(context);
+    try (DurationInfo d = new DurationInfo(LOG,
+        "%s: setup task attempt path %s ", getRole(), taskAttemptPath)) {
+      // create the local FS
+      taskAttemptPath.getFileSystem(getConf()).mkdirs(taskAttemptPath);
+      wrappedCommitter.setupTask(context);
+    }
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext context)
+      throws IOException {
+    try (DurationInfo d = new DurationInfo(LOG,
+        "%s: needsTaskCommit() Task %s",
+        getRole(), context.getTaskAttemptID())) {
+      // check for files on the local FS in the attempt path
+      Path attemptPath = getTaskAttemptPath(context);
+      FileSystem fs = getTaskAttemptFilesystem(context);
+
+      // This could be made more efficient with a probe "hasChildren(Path)"
+      // which returns true if there is >1 entry under a given path.
+      FileStatus[] stats = fs.listStatus(attemptPath);
+      LOG.debug("{} files to commit under {}", stats.length, attemptPath);
+      return stats.length > 0;
+    } catch (FileNotFoundException e) {
+      // list didn't find a directory, so nothing to commit
+      // TODO: throw this up as an error?
+      LOG.info("No files to commit");
+      throw e;
+    }
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext context) throws IOException {
+    try (DurationInfo d = new DurationInfo(LOG,
+        "%s: commit task %s", getRole(), context.getTaskAttemptID())) {
+      int count = commitTaskInternal(context, getTaskOutput(context));
+      LOG.info("{}: upload file count: {}", getRole(), count);
+    } catch (IOException e) {
+      LOG.error("{}: commit of task {} failed",
+          getRole(), context.getTaskAttemptID(), e);
+      getCommitOperations().taskCompleted(false);
+      throw e;
+    }
+    getCommitOperations().taskCompleted(true);
+  }
+
+  /**
+   * Commit the task by uploading all created files and then
+   * writing a pending entry for them.
+   * @param context task context
+   * @param taskOutput list of files from the output
+   * @return number of uploads committed.
+   * @throws IOException IO Failures.
+   */
+  protected int commitTaskInternal(final TaskAttemptContext context,
+      List<? extends FileStatus> taskOutput)
+      throws IOException {
+    LOG.debug("{}: commitTaskInternal", getRole());
+    Configuration conf = context.getConfiguration();
+
+    final Path attemptPath = getTaskAttemptPath(context);
+    FileSystem attemptFS = getTaskAttemptFilesystem(context);
+    LOG.debug("{}: attempt path is {}", getRole(), attemptPath);
+
+    // add the commits file to the wrapped committer's task attempt location.
+    // of this method.
+    Path commitsAttemptPath = wrappedCommitter.getTaskAttemptPath(context);
+    FileSystem commitsFS = commitsAttemptPath.getFileSystem(conf);
+
+    // keep track of unfinished commits in case one fails. if something fails,
+    // we will try to abort the ones that had already succeeded.
+    int commitCount = taskOutput.size();
+    final Queue<SinglePendingCommit> commits = new ConcurrentLinkedQueue<>();
+    LOG.info("{}: uploading from staging directory to S3", getRole());
+    LOG.info("{}: Saving pending data information to {}",
+        getRole(), commitsAttemptPath);
+    if (taskOutput.isEmpty()) {
+      // there is nothing to write. needsTaskCommit() should have caught
+      // this, so warn that there is some kind of problem in the protocol.
+      LOG.warn("{}: No files to commit", getRole());
+    } else {
+      boolean threw = true;
+      // before the uploads, report some progress
+      context.progress();
+
+      PendingSet pendingCommits = new PendingSet(commitCount);
+      try {
+        Tasks.foreach(taskOutput)
+            .stopOnFailure()
+            .executeWith(buildThreadPool(context))
+            .run(stat -> {
+              Path path = stat.getPath();
+              File localFile = new File(path.toUri().getPath());
+              String relative = Paths.getRelativePath(attemptPath, path);
+              String partition = Paths.getPartition(relative);
+              String key = getFinalKey(relative, context);
+              Path destPath = getDestS3AFS().keyToQualifiedPath(key);
+              SinglePendingCommit commit = getCommitOperations()
+                  .uploadFileToPendingCommit(
+                      localFile,
+                      destPath,
+                      partition,
+                      uploadPartSize);
+              LOG.debug("{}: adding pending commit {}", getRole(), commit);
+              commits.add(commit);
+            });
+
+        for (SinglePendingCommit commit : commits) {
+          pendingCommits.add(commit);
+        }
+
+        // save the data
+        // although overwrite=false, there's still a risk of > 1 entry being
+        // committed if the FS doesn't have create-no-overwrite consistency.
+
+        LOG.debug("Saving {} pending commit(s)) to file {}",
+            pendingCommits.size(),
+            commitsAttemptPath);
+        pendingCommits.save(commitsFS, commitsAttemptPath, false);
+        threw = false;
+
+      } finally {
+        if (threw) {
+          LOG.error(
+              "{}: Exception during commit process, aborting {} commit(s)",
+              getRole(), commits.size());
+          Tasks.foreach(commits)
+              .suppressExceptions()
+              .run(commit -> getCommitOperations().abortSingleCommit(commit));
+          deleteTaskAttemptPathQuietly(context);
+        }
+      }
+      // always purge attempt information at this point.
+      Paths.clearTempFolderInfo(context.getTaskAttemptID());
+    }
+
+    LOG.debug("Committing wrapped task");
+    wrappedCommitter.commitTask(context);
+
+    LOG.debug("Cleaning up attempt dir {}", attemptPath);
+    attemptFS.delete(attemptPath, true);
+    return commits.size();
+  }
+
+  /**
+   * Abort the task.
+   * The API specifies that the task has not yet been committed, so there are
+   * no uploads that need to be cancelled.
+   * Accordingly just delete files on the local FS, and call abortTask in
+   * the wrapped committer.
+   * <b>Important: this may be called in the AM after a container failure.</b>
+   * When that occurs and the failed container was on a different host in the
+   * cluster, the local files will not be deleted.
+   * @param context task context
+   * @throws IOException any failure
+   */
+  @Override
+  public void abortTask(TaskAttemptContext context) throws IOException {
+    // the API specifies that the task has not yet been committed, so there are
+    // no uploads that need to be cancelled. just delete files on the local FS.
+    try (DurationInfo d = new DurationInfo(LOG,
+        "Abort task %s", context.getTaskAttemptID())) {
+      deleteTaskAttemptPathQuietly(context);
+      deleteTaskWorkingPathQuietly(context);
+      wrappedCommitter.abortTask(context);
+    } catch (IOException e) {
+      LOG.error("{}: exception when aborting task {}",
+          getRole(), context.getTaskAttemptID(), e);
+      throw e;
+    }
+  }
+
+  /**
+   * Get the work path for a task.
+   * @param context job/task complex
+   * @param uuid UUID
+   * @return a path
+   * @throws IOException failure to build the path
+   */
+  private static Path taskAttemptWorkingPath(TaskAttemptContext context,
+      String uuid) throws IOException {
+    return getTaskAttemptPath(context,
+        Paths.getLocalTaskAttemptTempDir(
+            context.getConfiguration(),
+            uuid,
+            context.getTaskAttemptID()));
+  }
+
+  /**
+   * Delete the working path of a task; no-op if there is none, that
+   * is: this is a job.
+   * @param context job/task context
+   */
+  protected void deleteTaskWorkingPathQuietly(JobContext context) {
+    ignoreIOExceptions(LOG, "Delete working path", "",
+        () -> {
+          Path path = buildWorkPath(context, getUUID());
+          if (path != null) {
+            deleteQuietly(path.getFileSystem(getConf()), path, true);
+          }
+        });
+  }
+
+  /**
+   * Get the key of the destination "directory" of the job/task.
+   * @param context job context
+   * @return key to write to
+   */
+  private String getS3KeyPrefix(JobContext context) {
+    return s3KeyPrefix;
+  }
+
+  /**
+   * A UUID for this upload, as calculated with.
+   * {@link #getUploadUUID(Configuration, String)}
+   * @return the UUID for files
+   */
+  protected String getUUID() {
+    return uuid;
+  }
+
+  /**
+   * Returns the {@link ConflictResolution} mode for this commit.
+   *
+   * @param context the JobContext for this commit
+   * @param fsConf filesystem config
+   * @return the ConflictResolution mode
+   */
+  public final ConflictResolution getConflictResolutionMode(
+      JobContext context,
+      Configuration fsConf) {
+    if (conflictResolution == null) {
+      this.conflictResolution = ConflictResolution.valueOf(
+          getConfictModeOption(context, fsConf));
+    }
+    return conflictResolution;
+  }
+
+  /**
+   * Get the conflict mode option string.
+   * @param context context with the config
+   * @param fsConf filesystem config
+   * @return the trimmed configuration option, upper case.
+   */
+  public static String getConfictModeOption(JobContext context,
+      Configuration fsConf) {
+    return getConfigurationOption(context,
+        fsConf,
+        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE,
+        DEFAULT_CONFLICT_MODE).toUpperCase(Locale.ENGLISH);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java
new file mode 100644
index 0000000..c5fb967
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Internal staging committer constants.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class StagingCommitterConstants {
+
+  private StagingCommitterConstants() {
+  }
+
+  /**
+   * The temporary path for staging data, if not explicitly set.
+   * By using an unqualified path, this will be qualified to be relative
+   * to the users' home directory, so protectec from access for others.
+   */
+  public static final String FILESYSTEM_TEMP_PATH = "tmp/staging";
+
+  /** Name of the root partition :{@value}. */
+  public static final String TABLE_ROOT = "table_root";
+
+  /**
+   * Filename used under {@code ~/${UUID}} for the staging files.
+   */
+  public static final String STAGING_UPLOADS = "staging-uploads";
+
+  // Spark configuration keys
+
+  /**
+   * The UUID for jobs: {@value}.
+   */
+  public static final String SPARK_WRITE_UUID =
+      "spark.sql.sources.writeJobUUID";
+
+  /**
+   * The App ID for jobs.
+   */
+
+  public static final String SPARK_APP_ID = "spark.app.id";
+
+  public static final String JAVA_IO_TMPDIR = "java.io.tmpdir";
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterFactory.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterFactory.java
new file mode 100644
index 0000000..292b16b
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitterFactory;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
+
+/**
+ * Factory for the staging committer.
+ * This is for internal test use, rather than the public directory and
+ * partitioned committers.
+ */
+public class StagingCommitterFactory
+    extends AbstractS3ACommitterFactory {
+
+  /**
+   * Name of this class: {@value}.
+   */
+  public static final String CLASSNAME
+      = "org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterFactory";
+
+  public PathOutputCommitter createTaskCommitter(S3AFileSystem fileSystem,
+      Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    return new StagingCommitter(outputPath, context);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/package-info.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/package-info.java
new file mode 100644
index 0000000..174fbb0
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/package-info.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * The staging committers.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
index 458eb83..13384cf 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
@@ -67,8 +68,12 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.Invoker;
+import org.apache.hadoop.fs.s3a.Retries;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3AInstrumentation;
+import org.apache.hadoop.fs.s3a.S3ARetryPolicy;
+import org.apache.hadoop.fs.s3a.S3AUtils;
 import org.apache.hadoop.fs.s3a.Tristate;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
@@ -211,6 +216,28 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
   private RetryPolicy dataAccessRetryPolicy;
   private S3AInstrumentation.S3GuardInstrumentation instrumentation;
 
+  /** Owner FS: only valid if configured with an owner FS. */
+  private S3AFileSystem owner;
+
+  /** Invoker for IO. Until configured properly, use try-once. */
+  private Invoker invoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL,
+      Invoker.NO_OP
+  );
+
+  /** Data access can have its own policies. */
+  private Invoker dataAccess;
+
+  /**
+   * Total limit on the number of throttle events after which
+   * we stop warning in the log. Keeps the noise down.
+   */
+  private static final int THROTTLE_EVENT_LOG_LIMIT = 100;
+
+  /**
+   * Count of the total number of throttle events; used to crank back logging.
+   */
+  private AtomicInteger throttleEventCount = new AtomicInteger(0);
+
   /**
    * A utility function to create DynamoDB instance.
    * @param conf the file system configuration
@@ -232,28 +259,34 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
   }
 
   @Override
+  @Retries.OnceRaw
   public void initialize(FileSystem fs) throws IOException {
     Preconditions.checkArgument(fs instanceof S3AFileSystem,
         "DynamoDBMetadataStore only supports S3A filesystem.");
-    final S3AFileSystem s3afs = (S3AFileSystem) fs;
-    instrumentation = s3afs.getInstrumentation().getS3GuardInstrumentation();
-    final String bucket = s3afs.getBucket();
-    String confRegion = s3afs.getConf().getTrimmed(S3GUARD_DDB_REGION_KEY);
+    owner = (S3AFileSystem) fs;
+    instrumentation = owner.getInstrumentation().getS3GuardInstrumentation();
+    final String bucket = owner.getBucket();
+    conf = owner.getConf();
+    String confRegion = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
     if (!StringUtils.isEmpty(confRegion)) {
       region = confRegion;
       LOG.debug("Overriding S3 region with configured DynamoDB region: {}",
           region);
     } else {
-      region = s3afs.getBucketLocation();
+      region = owner.getBucketLocation();
       LOG.debug("Inferring DynamoDB region from S3 bucket: {}", region);
     }
-    username = s3afs.getUsername();
-    conf = s3afs.getConf();
+    username = owner.getUsername();
     dynamoDB = createDynamoDB(conf, region);
 
     // use the bucket as the DynamoDB table name if not specified in config
     tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY, bucket);
-    setMaxRetries(conf);
+    initDataAccessRetries(conf);
+
+    // set up a full retry policy
+    invoker = new Invoker(new S3ARetryPolicy(conf),
+        this::retryEvent
+    );
 
     initTable();
 
@@ -283,6 +316,7 @@ public class DynamoDBMetadataStore implements MetadataStore 
{
    * @throws IllegalArgumentException if the configuration is incomplete
    */
   @Override
+  @Retries.OnceRaw
   public void initialize(Configuration config) throws IOException {
     conf = config;
     // use the bucket as the DynamoDB table name if not specified in config
@@ -295,7 +329,7 @@ public class DynamoDBMetadataStore implements MetadataStore 
{
     dynamoDB = createDynamoDB(conf, region);
 
     username = UserGroupInformation.getCurrentUser().getShortUserName();
-    setMaxRetries(conf);
+    initDataAccessRetries(conf);
 
     initTable();
   }
@@ -304,22 +338,25 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
    * Set retry policy. This is driven by the value of
    * {@link Constants#S3GUARD_DDB_MAX_RETRIES} with an exponential backoff
    * between each attempt of {@link #MIN_RETRY_SLEEP_MSEC} milliseconds.
-   * @param config
+   * @param config configuration for data access
    */
-  private void setMaxRetries(Configuration config) {
+  private void initDataAccessRetries(Configuration config) {
     int maxRetries = config.getInt(S3GUARD_DDB_MAX_RETRIES,
         S3GUARD_DDB_MAX_RETRIES_DEFAULT);
     dataAccessRetryPolicy = RetryPolicies
         .exponentialBackoffRetry(maxRetries, MIN_RETRY_SLEEP_MSEC,
             TimeUnit.MILLISECONDS);
+    dataAccess = new Invoker(dataAccessRetryPolicy, this::retryEvent);
   }
 
   @Override
+  @Retries.RetryTranslated
   public void delete(Path path) throws IOException {
     innerDelete(path, true);
   }
 
   @Override
+  @Retries.RetryTranslated
   public void forgetMetadata(Path path) throws IOException {
     innerDelete(path, false);
   }
@@ -332,9 +369,10 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
    * @param tombstone flag to create a tombstone marker
    * @throws IOException I/O error.
    */
-  private void innerDelete(Path path, boolean tombstone)
+  @Retries.RetryTranslated
+  private void innerDelete(final Path path, boolean tombstone)
       throws IOException {
-    path = checkPath(path);
+    checkPath(path);
     LOG.debug("Deleting from table {} in region {}: {}",
         tableName, region, path);
 
@@ -343,23 +381,25 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
       LOG.debug("Skip deleting root directory as it does not exist in table");
       return;
     }
-
-    try {
-      if (tombstone) {
-        Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem(
-            PathMetadata.tombstone(path));
-        table.putItem(item);
-      } else {
-        table.deleteItem(pathToKey(path));
-      }
-    } catch (AmazonClientException e) {
-      throw translateException("delete", path, e);
+    // the policy on whether repeating delete operations is based
+    // on that of S3A itself
+    boolean idempotent = S3AFileSystem.DELETE_CONSIDERED_IDEMPOTENT;
+    if (tombstone) {
+      Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem(
+          PathMetadata.tombstone(path));
+      invoker.retry("Put tombstone", path.toString(), idempotent,
+          () -> table.putItem(item));
+    } else {
+      PrimaryKey key = pathToKey(path);
+      invoker.retry("Delete key", path.toString(), idempotent,
+          () -> table.deleteItem(key));
     }
   }
 
   @Override
+  @Retries.RetryTranslated
   public void deleteSubtree(Path path) throws IOException {
-    path = checkPath(path);
+    checkPath(path);
     LOG.debug("Deleting subtree from table {} in region {}: {}",
         tableName, region, path);
 
@@ -375,6 +415,7 @@ public class DynamoDBMetadataStore implements MetadataStore 
{
     }
   }
 
+  @Retries.OnceRaw
   private Item getConsistentItem(PrimaryKey key) {
     final GetItemSpec spec = new GetItemSpec()
         .withPrimaryKey(key)
@@ -383,52 +424,65 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
   }
 
   @Override
+  @Retries.OnceTranslated
   public PathMetadata get(Path path) throws IOException {
     return get(path, false);
   }
 
   @Override
+  @Retries.OnceTranslated
   public PathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
       throws IOException {
-    path = checkPath(path);
+    checkPath(path);
     LOG.debug("Get from table {} in region {}: {}", tableName, region, path);
+    return Invoker.once("get", path.toString(),
+        () -> innerGet(path, wantEmptyDirectoryFlag));
+  }
 
-    try {
-      final PathMetadata meta;
-      if (path.isRoot()) {
-        // Root does not persist in the table
-        meta = new PathMetadata(makeDirStatus(username, path));
-      } else {
-        final Item item = getConsistentItem(pathToKey(path));
-        meta = itemToPathMetadata(item, username);
-        LOG.debug("Get from table {} in region {} returning for {}: {}",
-            tableName, region, path, meta);
-      }
+  /**
+   * Inner get operation, as invoked in the retry logic.
+   * @param path the path to get
+   * @param wantEmptyDirectoryFlag Set to true to give a hint to the
+   *   MetadataStore that it should try to compute the empty directory flag.
+   * @return metadata for {@code path}, {@code null} if not found
+   * @throws IOException IO problem
+   * @throws AmazonClientException dynamo DB level problem
+   */
+  @Retries.OnceRaw
+  private PathMetadata innerGet(Path path, boolean wantEmptyDirectoryFlag)
+      throws IOException {
+    final PathMetadata meta;
+    if (path.isRoot()) {
+      // Root does not persist in the table
+      meta = new PathMetadata(makeDirStatus(username, path));
+    } else {
+      final Item item = getConsistentItem(pathToKey(path));
+      meta = itemToPathMetadata(item, username);
+      LOG.debug("Get from table {} in region {} returning for {}: {}",
+          tableName, region, path, meta);
+    }
 
-      if (wantEmptyDirectoryFlag && meta != null) {
-        final FileStatus status = meta.getFileStatus();
-        // for directory, we query its direct children to determine isEmpty bit
-        if (status.isDirectory()) {
-          final QuerySpec spec = new QuerySpec()
-              .withHashKey(pathToParentKeyAttribute(path))
-              .withConsistentRead(true)
-              .withFilterExpression(IS_DELETED + " = :false")
-              .withValueMap(deleteTrackingValueMap);
-          final ItemCollection<QueryOutcome> items = table.query(spec);
-          boolean hasChildren = items.iterator().hasNext();
-          // When this class has support for authoritative
-          // (fully-cached) directory listings, we may also be able to answer
-          // TRUE here.  Until then, we don't know if we have full listing or
-          // not, thus the UNKNOWN here:
-          meta.setIsEmptyDirectory(
-              hasChildren ? Tristate.FALSE : Tristate.UNKNOWN);
-        }
+    if (wantEmptyDirectoryFlag && meta != null) {
+      final FileStatus status = meta.getFileStatus();
+      // for directory, we query its direct children to determine isEmpty bit
+      if (status.isDirectory()) {
+        final QuerySpec spec = new QuerySpec()
+            .withHashKey(pathToParentKeyAttribute(path))
+            .withConsistentRead(true)
+            .withFilterExpression(IS_DELETED + " = :false")
+            .withValueMap(deleteTrackingValueMap);
+        final ItemCollection<QueryOutcome> items = table.query(spec);
+        boolean hasChildren = items.iterator().hasNext();
+        // When this class has support for authoritative
+        // (fully-cached) directory listings, we may also be able to answer
+        // TRUE here.  Until then, we don't know if we have full listing or
+        // not, thus the UNKNOWN here:
+        meta.setIsEmptyDirectory(
+            hasChildren ? Tristate.FALSE : Tristate.UNKNOWN);
       }
-
-      return meta;
-    } catch (AmazonClientException e) {
-      throw translateException("get", path, e);
     }
+
+    return meta;
   }
 
   /**
@@ -445,35 +499,38 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
   }
 
   @Override
-  public DirListingMetadata listChildren(Path path) throws IOException {
-    path = checkPath(path);
+  @Retries.OnceTranslated
+  public DirListingMetadata listChildren(final Path path) throws IOException {
+    checkPath(path);
     LOG.debug("Listing table {} in region {}: {}", tableName, region, path);
 
     // find the children in the table
-    try {
-      final QuerySpec spec = new QuerySpec()
-          .withHashKey(pathToParentKeyAttribute(path))
-          .withConsistentRead(true); // strictly consistent read
-      final ItemCollection<QueryOutcome> items = table.query(spec);
-
-      final List<PathMetadata> metas = new ArrayList<>();
-      for (Item item : items) {
-        PathMetadata meta = itemToPathMetadata(item, username);
-        metas.add(meta);
-      }
-      LOG.trace("Listing table {} in region {} for {} returning {}",
-          tableName, region, path, metas);
+    return Invoker.once("listChildren", path.toString(),
+        () -> {
+          final QuerySpec spec = new QuerySpec()
+              .withHashKey(pathToParentKeyAttribute(path))
+              .withConsistentRead(true); // strictly consistent read
+          final ItemCollection<QueryOutcome> items = table.query(spec);
 
-      return (metas.isEmpty() && get(path) == null)
-          ? null
-          : new DirListingMetadata(path, metas, false);
-    } catch (AmazonClientException e) {
-      // failure, including the path not being present
-      throw translateException("listChildren", path, e);
-    }
+          final List<PathMetadata> metas = new ArrayList<>();
+          for (Item item : items) {
+            PathMetadata meta = itemToPathMetadata(item, username);
+            metas.add(meta);
+          }
+          LOG.trace("Listing table {} in region {} for {} returning {}",
+              tableName, region, path, metas);
+
+          return (metas.isEmpty() && get(path) == null)
+              ? null
+              : new DirListingMetadata(path, metas, false);
+        });
   }
 
-  // build the list of all parent entries.
+  /**
+   * build the list of all parent entries.
+   * @param pathsToCreate paths to create
+   * @return the full ancestry paths
+   */
   Collection<PathMetadata> completeAncestry(
       Collection<PathMetadata> pathsToCreate) {
     // Key on path to allow fast lookup
@@ -499,6 +556,7 @@ public class DynamoDBMetadataStore implements MetadataStore 
{
   }
 
   @Override
+  @Retries.OnceTranslated
   public void move(Collection<Path> pathsToDelete,
       Collection<PathMetadata> pathsToCreate) throws IOException {
     if (pathsToDelete == null && pathsToCreate == null) {
@@ -527,21 +585,20 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
       }
     }
 
-    try {
-      processBatchWriteRequest(null, pathMetadataToItem(newItems));
-    } catch (AmazonClientException e) {
-      throw translateException("move", (String) null, e);
-    }
+    Invoker.once("move", tableName,
+        () -> processBatchWriteRequest(null, pathMetadataToItem(newItems)));
   }
 
   /**
    * Helper method to issue a batch write request to DynamoDB.
    *
-   * Callers of this method should catch the {@link AmazonClientException} and
-   * translate it for better error report and easier debugging.
+   * The retry logic here is limited to repeating the write operations
+   * until all items have been written; there is no other attempt
+   * at recovery/retry. Throttling is handled internally.
    * @param keysToDelete primary keys to be deleted; can be null
    * @param itemsToPut new items to be put; can be null
    */
+  @Retries.OnceRaw("Outstanding batch items are updated with backoff")
   private void processBatchWriteRequest(PrimaryKey[] keysToDelete,
       Item[] itemsToPut) throws IOException {
     final int totalToDelete = (keysToDelete == null ? 0 : keysToDelete.length);
@@ -575,7 +632,7 @@ public class DynamoDBMetadataStore implements MetadataStore 
{
       // Check for unprocessed keys in case of exceeding provisioned throughput
       Map<String, List<WriteRequest>> unprocessed = res.getUnprocessedItems();
       int retryCount = 0;
-      while (unprocessed.size() > 0) {
+      while (!unprocessed.isEmpty()) {
         retryBackoff(retryCount++);
         res = dynamoDB.batchWriteItemUnprocessed(unprocessed);
         unprocessed = res.getUnprocessedItems();
@@ -603,12 +660,17 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
         LOG.debug("Sleeping {} msec before next retry", action.delayMillis);
         Thread.sleep(action.delayMillis);
       }
+    } catch (InterruptedException e) {
+      throw (IOException)new InterruptedIOException(e.toString()).initCause(e);
+    } catch (IOException e) {
+      throw e;
     } catch (Exception e) {
       throw new IOException("Unexpected exception", e);
     }
   }
 
   @Override
+  @Retries.OnceRaw
   public void put(PathMetadata meta) throws IOException {
     // For a deeply nested path, this method will automatically create the full
     // ancestry and save respective item in DynamoDB table.
@@ -624,6 +686,7 @@ public class DynamoDBMetadataStore implements MetadataStore 
{
   }
 
   @Override
+  @Retries.OnceRaw
   public void put(Collection<PathMetadata> metas) throws IOException {
     LOG.debug("Saving batch to table {} in region {}", tableName, region);
 
@@ -633,6 +696,7 @@ public class DynamoDBMetadataStore implements MetadataStore 
{
   /**
    * Helper method to get full path of ancestors that are nonexistent in table.
    */
+  @Retries.OnceRaw
   private Collection<PathMetadata> fullPathsToPut(PathMetadata meta)
       throws IOException {
     checkPathMetadata(meta);
@@ -675,25 +739,34 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
         null, owner, owner, f);
   }
 
+  /**
+   * {@inheritDoc}.
+   * There is retry around building the list of paths to update, but
+   * the call to {@link #processBatchWriteRequest(PrimaryKey[], Item[])}
+   * is only tried once.
+   * @param meta Directory listing metadata.
+   * @throws IOException
+   */
   @Override
+  @Retries.OnceTranslated("retry(listFullPaths); once(batchWrite)")
   public void put(DirListingMetadata meta) throws IOException {
     LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta);
 
     // directory path
-    PathMetadata p = new PathMetadata(makeDirStatus(meta.getPath(), username),
+    Path path = meta.getPath();
+    PathMetadata p = new PathMetadata(makeDirStatus(path, username),
         meta.isEmpty(), false);
 
     // First add any missing ancestors...
-    final Collection<PathMetadata> metasToPut = fullPathsToPut(p);
+    final Collection<PathMetadata> metasToPut = invoker.retry(
+        "paths to put", path.toString(), true,
+        () -> fullPathsToPut(p));
 
     // next add all children of the directory
     metasToPut.addAll(meta.getListing());
 
-    try {
-      processBatchWriteRequest(null, pathMetadataToItem(metasToPut));
-    } catch (AmazonClientException e) {
-      throw translateException("put", (String) null, e);
-    }
+    Invoker.once("put", path.toString(),
+        () -> processBatchWriteRequest(null, pathMetadataToItem(metasToPut)));
   }
 
   @Override
@@ -709,6 +782,7 @@ public class DynamoDBMetadataStore implements MetadataStore 
{
   }
 
   @Override
+  @Retries.OnceTranslated
   public void destroy() throws IOException {
     if (table == null) {
       LOG.info("In destroy(): no table to delete");
@@ -731,10 +805,11 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
       throw new InterruptedIOException("Table " + tableName
           + " in region " + region + " has not been deleted");
     } catch (AmazonClientException e) {
-      throw translateException("destroy", (String) null, e);
+      throw translateException("destroy", tableName, e);
     }
   }
 
+  @Retries.OnceRaw
   private ItemCollection<ScanOutcome> expiredFiles(long modTime) {
     String filterExpression = "mod_time < :mod_time";
     String projectionExpression = "parent,child";
@@ -743,6 +818,7 @@ public class DynamoDBMetadataStore implements MetadataStore 
{
   }
 
   @Override
+  @Retries.OnceRaw("once(batchWrite)")
   public void prune(long modTime) throws IOException {
     int itemCount = 0;
     try {
@@ -797,6 +873,7 @@ public class DynamoDBMetadataStore implements MetadataStore 
{
    * or table is being deleted, or any other I/O exception occurred.
    */
   @VisibleForTesting
+  @Retries.OnceRaw
   void initTable() throws IOException {
     table = dynamoDB.getTable(tableName);
     try {
@@ -848,7 +925,7 @@ public class DynamoDBMetadataStore implements MetadataStore 
{
       }
 
     } catch (AmazonClientException e) {
-      throw translateException("initTable", (String) null, e);
+      throw translateException("initTable", tableName, e);
     }
   }
 
@@ -856,8 +933,11 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
    * Get the version mark item in the existing DynamoDB table.
    *
    * As the version marker item may be created by another concurrent thread or
-   * process, we retry a limited times before we fail to get it.
+   * process, we sleep and retry a limited times before we fail to get it.
+   * This does not include handling any failure other than "item not found",
+   * so this method is tagged as "OnceRaw"
    */
+  @Retries.OnceRaw
   private Item getVersionMarkerItem() throws IOException {
     final PrimaryKey versionMarkerKey =
         createVersionMarkerPrimaryKey(VERSION_MARKER);
@@ -913,16 +993,20 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
    * @param t table to block on.
    * @throws IOException IO problems
    * @throws InterruptedIOException if the wait was interrupted
+   * @throws IllegalArgumentException if an exception was raised in the waiter
    */
-  private void waitForTableActive(Table t) throws IOException {
+  @Retries.OnceRaw
+  private void waitForTableActive(Table t) throws InterruptedIOException {
     try {
       t.waitForActive();
     } catch (InterruptedException e) {
       LOG.warn("Interrupted while waiting for table {} in region {} active",
           tableName, region, e);
       Thread.currentThread().interrupt();
-      throw (IOException) new InterruptedIOException("DynamoDB table '"
-          + tableName + "' is not active yet in region " + 
region).initCause(e);
+      throw (InterruptedIOException)
+          new InterruptedIOException("DynamoDB table '"
+          + tableName + "' is not active yet in region " + region)
+              .initCause(e);
     }
   }
 
@@ -933,6 +1017,7 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
    * @throws IOException on any failure.
    * @throws InterruptedIOException if the wait was interrupted
    */
+  @Retries.OnceRaw
   private void createTable(ProvisionedThroughput capacity) throws IOException {
     try {
       LOG.info("Creating non-existent DynamoDB table {} in region {}",
@@ -960,6 +1045,7 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
    * @param item item to put
    * @return the outcome.
    */
+  @Retries.OnceRaw
   PutItemOutcome putItem(Item item) {
     LOG.debug("Putting item {}", item);
     return table.putItem(item);
@@ -967,22 +1053,27 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
 
   /**
    * Provision the table with given read and write capacity units.
+   * Call will fail if the table is busy, or the new values match the current
+   * ones.
+   * @param readCapacity read units
+   * @param writeCapacity write units
+   * @throws IOException on a failure
    */
+  @Retries.RetryTranslated
   void provisionTable(Long readCapacity, Long writeCapacity)
       throws IOException {
     final ProvisionedThroughput toProvision = new ProvisionedThroughput()
         .withReadCapacityUnits(readCapacity)
         .withWriteCapacityUnits(writeCapacity);
-    try {
-      final ProvisionedThroughputDescription p =
-          table.updateTable(toProvision).getProvisionedThroughput();
-      LOG.info("Provision table {} in region {}: readCapacityUnits={}, "
-              + "writeCapacityUnits={}",
-          tableName, region, p.getReadCapacityUnits(),
-          p.getWriteCapacityUnits());
-    } catch (AmazonClientException e) {
-      throw translateException("provisionTable", (String) null, e);
-    }
+    invoker.retry("ProvisionTable", tableName, true,
+        () -> {
+          final ProvisionedThroughputDescription p =
+              table.updateTable(toProvision).getProvisionedThroughput();
+          LOG.info("Provision table {} in region {}: readCapacityUnits={}, "
+                  + "writeCapacityUnits={}",
+              tableName, region, p.getReadCapacityUnits(),
+              p.getWriteCapacityUnits());
+        });
   }
 
   Table getTable() {
@@ -999,8 +1090,10 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
   }
 
   /**
-   * Validates a path object; it must be absolute, and contain a host
-   * (bucket) component.
+   * Validates a path object; it must be absolute, have an s3a:/// scheme
+   * and contain a host (bucket) component.
+   * @param path path to check
+   * @return the path passed in
    */
   private Path checkPath(Path path) {
     Preconditions.checkNotNull(path);
@@ -1025,6 +1118,7 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
   }
 
   @Override
+  @Retries.OnceRaw
   public Map<String, String> getDiagnostics() throws IOException {
     Map<String, String> map = new TreeMap<>();
     if (table != null) {
@@ -1052,6 +1146,7 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
     return map;
   }
 
+  @Retries.OnceRaw
   private TableDescription getTableDescription(boolean forceUpdate) {
     TableDescription desc = table.getDescription();
     if (desc == null || forceUpdate) {
@@ -1061,6 +1156,7 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
   }
 
   @Override
+  @Retries.OnceRaw
   public void updateParameters(Map<String, String> parameters)
       throws IOException {
     Preconditions.checkNotNull(table, "Not initialized");
@@ -1103,4 +1199,46 @@ public class DynamoDBMetadataStore implements 
MetadataStore {
     }
   }
 
+  /**
+   * Callback from {@link Invoker} when an operation is retried.
+   * @param text text of the operation
+   * @param ex exception
+   * @param attempts number of attempts
+   * @param idempotent is the method idempotent
+   */
+  void retryEvent(
+      String text,
+      IOException ex,
+      int attempts,
+      boolean idempotent) {
+    if (S3AUtils.isThrottleException(ex)) {
+      // throttled
+      if (instrumentation != null) {
+        instrumentation.throttled();
+      }
+      int eventCount = throttleEventCount.addAndGet(1);
+      if (attempts == 1 && eventCount < THROTTLE_EVENT_LOG_LIMIT) {
+        LOG.warn("DynamoDB IO limits reached in {};"
+                + " consider increasing capacity: {}", text, ex.toString());
+        LOG.debug("Throttled", ex);
+      } else {
+        // user has been warned already, log at debug only.
+        LOG.debug("DynamoDB IO limits reached in {};"
+                + " consider increasing capacity: {}", text, ex.toString());
+      }
+    } else if (attempts == 1) {
+      // not throttled. Log on the first attempt only
+      LOG.info("Retrying {}: {}", text, ex.toString());
+      LOG.debug("Retrying {}", text, ex);
+    }
+
+    if (instrumentation != null) {
+      // note a retry
+      instrumentation.retrying();
+    }
+    if (owner != null) {
+      owner.metastoreOperationRetried(ex, attempts, idempotent);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
index c7c810a..a56b055 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Retries;
 import org.apache.hadoop.fs.s3a.S3AFileStatus;
 import org.apache.hadoop.fs.s3a.S3AInstrumentation;
 import org.apache.hadoop.fs.s3a.Tristate;
@@ -83,6 +84,7 @@ public final class S3Guard {
    * @return Reference to new MetadataStore.
    * @throws IOException if the metadata store cannot be instantiated
    */
+  @Retries.OnceTranslated
   public static MetadataStore getMetadataStore(FileSystem fs)
       throws IOException {
     Preconditions.checkNotNull(fs);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
index 4f0e8f7..ace043b 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.s3a.S3AFileStatus;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3AUtils;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 import org.apache.hadoop.fs.shell.CommandFormat;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.GenericOptionsParser;
@@ -960,6 +961,7 @@ public abstract class S3GuardTool extends Configured 
implements Tool {
     public static final String AUTH_FLAG = "auth";
     public static final String NONAUTH_FLAG = "nonauth";
     public static final String ENCRYPTION_FLAG = "encryption";
+    public static final String MAGIC_FLAG = "magic";
 
     public static final String PURPOSE = "provide/check S3Guard information"
         + " about a specific bucket";
@@ -967,11 +969,15 @@ public abstract class S3GuardTool extends Configured 
implements Tool {
         + "\t" + PURPOSE + "\n\n"
         + "Common options:\n"
         + "  -" + GUARDED_FLAG + " - Require S3Guard\n"
+        + "  -" + UNGUARDED_FLAG + " - Require S3Guard to be disabled\n"
+        + "  -" + AUTH_FLAG + " - Require the S3Guard mode to be 
\"authoritative\"\n"
+        + "  -" + NONAUTH_FLAG + " - Require the S3Guard mode to be 
\"non-authoritative\"\n"
+        + "  -" + MAGIC_FLAG + " - Require the S3 filesystem to be support the 
\"magic\" committer\n"
         + "  -" + ENCRYPTION_FLAG
         + " -require {none, sse-s3, sse-kms} - Require encryption policy";
 
     BucketInfo(Configuration conf) {
-      super(conf, GUARDED_FLAG, UNGUARDED_FLAG, AUTH_FLAG, NONAUTH_FLAG);
+      super(conf, GUARDED_FLAG, UNGUARDED_FLAG, AUTH_FLAG, NONAUTH_FLAG, 
MAGIC_FLAG);
       CommandFormat format = getCommandFormat();
       format.addOptionWithValue(ENCRYPTION_FLAG);
     }
@@ -1014,6 +1020,11 @@ public abstract class S3GuardTool extends Configured 
implements Tool {
       } else {
         println(out, "Filesystem %s is not using S3Guard", fsUri);
       }
+      boolean magic = fs.hasCapability(
+          CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER);
+      println(out, "The \"magic\" committer %s supported",
+          magic ? "is" : "is not");
+
       println(out, "%nS3A Client");
 
       String endpoint = conf.getTrimmed(ENDPOINT, "");
@@ -1043,6 +1054,9 @@ public abstract class S3GuardTool extends Configured 
implements Tool {
           throw badState("S3Guard is not enabled for %s", fsUri);
         }
       }
+      if (commands.getOpt(MAGIC_FLAG) && !magic) {
+        throw badState("The magic committer is not enabled for %s", fsUri);
+      }
 
       String desiredEncryption = getCommandFormat()
           .getOptValue(ENCRYPTION_FLAG);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to