http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
new file mode 100644
index 0000000..2c348f5
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
@@ -0,0 +1,696 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.google.common.collect.Sets;
+import org.hamcrest.core.StringStartsWith;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.AWSClientIOException;
+import org.apache.hadoop.fs.s3a.MockS3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.*;
+import static 
org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.*;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.fs.s3a.commit.staging.Paths.*;
+import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.*;
+import static org.apache.hadoop.test.LambdaTestUtils.*;
+
+/**
+ * The main unit test suite of the staging committer.
+ * Parameterized on thread count and unique filename policy.
+ */
+@RunWith(Parameterized.class)
+public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
+
+  private static final JobID JOB_ID = new JobID("job", 1);
+  private static final TaskAttemptID AID = new TaskAttemptID(
+      new TaskID(JOB_ID, TaskType.REDUCE, 2), 3);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestStagingCommitter.class);
+
+  private final int numThreads;
+  private final boolean uniqueFilenames;
+  private JobContext job = null;
+  private TaskAttemptContext tac = null;
+  private Configuration conf = null;
+  private MockedStagingCommitter jobCommitter = null;
+  private MockedStagingCommitter committer = null;
+
+  // created in Before
+  private S3AFileSystem mockFS = null;
+  private MockS3AFileSystem wrapperFS = null;
+
+  // created in Before
+  private StagingTestBase.ClientResults results = null;
+  private StagingTestBase.ClientErrors errors = null;
+  private AmazonS3 mockClient = null;
+  private File tmpDir;
+
+  /**
+   * Describe a test in the logs.
+   * @param text text to print
+   * @param args arguments to format in the printing
+   */
+  protected void describe(String text, Object... args) {
+    LOG.info("\n\n: {}\n", String.format(text, args));
+  }
+
+  /**
+   * Test array for parameterized test runs: how many threads and
+   * how many files to use.
+   * @return a list of parameter tuples.
+   */
+  @Parameterized.Parameters
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][] {
+        {0, false},
+        {1, true},
+        {3, true},
+    });
+  }
+
+  public TestStagingCommitter(int numThreads, boolean uniqueFilenames) {
+    this.numThreads = numThreads;
+    this.uniqueFilenames = uniqueFilenames;
+  }
+
+  @Before
+  public void setupCommitter() throws Exception {
+    JobConf jobConf = getConfiguration();
+    jobConf.setInt(FS_S3A_COMMITTER_THREADS, numThreads);
+    jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
+        uniqueFilenames);
+    jobConf.set(FS_S3A_COMMITTER_STAGING_UUID,
+        UUID.randomUUID().toString());
+
+    this.results = new StagingTestBase.ClientResults();
+    this.errors = new StagingTestBase.ClientErrors();
+    this.mockClient = newMockS3Client(results, errors);
+    this.mockFS = createAndBindMockFSInstance(jobConf,
+        Pair.of(results, errors));
+    this.wrapperFS = lookupWrapperFS(jobConf);
+    // and bind the FS
+    wrapperFS.setAmazonS3Client(mockClient);
+
+
+    this.job = new JobContextImpl(jobConf, JOB_ID);
+    this.tac = new TaskAttemptContextImpl(
+        new Configuration(job.getConfiguration()), AID);
+
+    this.jobCommitter = new MockedStagingCommitter(OUTPUT_PATH, tac);
+    jobCommitter.setupJob(job);
+
+    // get the task's configuration copy so modifications take effect
+    this.conf = tac.getConfiguration();
+    this.conf.setInt(MULTIPART_SIZE, 100);
+
+    tmpDir = File.createTempFile("testStagingCommitter", "");
+    tmpDir.delete();
+    tmpDir.mkdirs();
+
+    String tmp = tmpDir.getCanonicalPath();
+    this.conf.set(BUFFER_DIR,
+        String.format("%s/local-0/, %s/local-1 ", tmp, tmp));
+
+    this.committer = new MockedStagingCommitter(OUTPUT_PATH, tac);
+    Paths.resetTempFolderCache();
+  }
+
+  @After
+  public void cleanup() {
+    try {
+      if (tmpDir != null) {
+        FileUtils.deleteDirectory(tmpDir);
+      }
+    } catch (IOException ignored) {
+
+    }
+  }
+
+  @Test
+  public void testUUIDPropagation() throws Exception {
+    Configuration config = new Configuration();
+    String jobUUID = addUUID(config);
+    assertEquals("Upload UUID", jobUUID,
+        StagingCommitter.getUploadUUID(config, JOB_ID));
+  }
+
+  private String addUUID(Configuration config) {
+    String jobUUID = UUID.randomUUID().toString();
+    config.set(FS_S3A_COMMITTER_STAGING_UUID, jobUUID);
+    return jobUUID;
+  }
+
+  @Test
+  public void testAttemptPathConstructionNoSchema() throws Exception {
+    Configuration config = new Configuration();
+    final String jobUUID = addUUID(config);
+    config.set(BUFFER_DIR, "/tmp/mr-local-0,/tmp/mr-local-1");
+    String commonPath = "file:/tmp/mr-local-";
+
+    assertThat("Missing scheme should produce local file paths",
+        getLocalTaskAttemptTempDir(config,
+            jobUUID, tac.getTaskAttemptID()).toString(),
+        StringStartsWith.startsWith(commonPath));
+  }
+
+  @Test
+  public void testAttemptPathConstructionWithSchema() throws Exception {
+    Configuration config = new Configuration();
+    final String jobUUID = addUUID(config);
+    String commonPath = "file:/tmp/mr-local-";
+
+    config.set(BUFFER_DIR,
+        "file:/tmp/mr-local-0,file:/tmp/mr-local-1");
+
+    assertThat("Path should be the same with file scheme",
+        getLocalTaskAttemptTempDir(config,
+            jobUUID, tac.getTaskAttemptID()).toString(),
+        StringStartsWith.startsWith(commonPath));
+  }
+
+  @Test
+  public void testAttemptPathConstructionWrongSchema() throws Exception {
+    Configuration config = new Configuration();
+    final String jobUUID = addUUID(config);
+    config.set(BUFFER_DIR,
+        "hdfs://nn:8020/tmp/mr-local-0,hdfs://nn:8020/tmp/mr-local-1");
+    intercept(IllegalArgumentException.class, "Wrong FS",
+        () -> getLocalTaskAttemptTempDir(config, jobUUID,
+                tac.getTaskAttemptID()));
+  }
+
+  @Test
+  public void testCommitPathConstruction() throws Exception {
+    Path committedTaskPath = committer.getCommittedTaskPath(tac);
+    assertEquals("Path should be in HDFS: " + committedTaskPath,
+        "hdfs", committedTaskPath.toUri().getScheme());
+    String ending = STAGING_UPLOADS + "/_temporary/0/task_job_0001_r_000002";
+    assertTrue("Did not end with \"" + ending +"\" :" + committedTaskPath,
+        committedTaskPath.toString().endsWith(ending));
+  }
+
+  @Test
+  public void testSingleTaskCommit() throws Exception {
+    Path file = new Path(commitTask(committer, tac, 1).iterator().next());
+
+    List<String> uploads = results.getUploads();
+    assertEquals("Should initiate one upload: " + results, 1, uploads.size());
+
+    Path committedPath = committer.getCommittedTaskPath(tac);
+    FileSystem dfs = committedPath.getFileSystem(conf);
+
+    assertEquals("Should commit to HDFS: "+ committer, getDFS(), dfs);
+
+    FileStatus[] stats = dfs.listStatus(committedPath);
+    assertEquals("Should produce one commit file: " + results, 1, 
stats.length);
+    assertEquals("Should name the commits file with the task ID: " + results,
+        "task_job_0001_r_000002", stats[0].getPath().getName());
+
+    PendingSet pending = PendingSet.load(dfs, stats[0].getPath());
+    assertEquals("Should have one pending commit", 1, pending.size());
+    SinglePendingCommit commit = pending.getCommits().get(0);
+    assertEquals("Should write to the correct bucket:" + results,
+        BUCKET, commit.getBucket());
+    assertEquals("Should write to the correct key: " + results,
+        OUTPUT_PREFIX + "/" + file.getName(), commit.getDestinationKey());
+
+    assertValidUpload(results.getTagsByUpload(), commit);
+  }
+
+  /**
+   * This originally verified that empty files weren't PUT. They are now.
+   * @throws Exception on a failure
+   */
+  @Test
+  public void testSingleTaskEmptyFileCommit() throws Exception {
+    committer.setupTask(tac);
+
+    Path attemptPath = committer.getTaskAttemptPath(tac);
+
+    String rand = UUID.randomUUID().toString();
+    writeOutputFile(tac.getTaskAttemptID(), attemptPath, rand, 0);
+
+    committer.commitTask(tac);
+
+    List<String> uploads = results.getUploads();
+    assertEquals("Should initiate one upload", 1, uploads.size());
+
+    Path committedPath = committer.getCommittedTaskPath(tac);
+    FileSystem dfs = committedPath.getFileSystem(conf);
+
+    assertEquals("Should commit to HDFS", getDFS(), dfs);
+
+    assertIsFile(dfs, committedPath);
+    FileStatus[] stats = dfs.listStatus(committedPath);
+    assertEquals("Should produce one commit file", 1, stats.length);
+    assertEquals("Should name the commits file with the task ID",
+        "task_job_0001_r_000002", stats[0].getPath().getName());
+
+    PendingSet pending = PendingSet.load(dfs,
+        stats[0].getPath());
+    assertEquals("Should have one pending commit", 1, pending.size());
+  }
+
+  @Test
+  public void testSingleTaskMultiFileCommit() throws Exception {
+    int numFiles = 3;
+    Set<String> files = commitTask(committer, tac, numFiles);
+
+    List<String> uploads = results.getUploads();
+    assertEquals("Should initiate multiple uploads", numFiles, uploads.size());
+
+    Path committedPath = committer.getCommittedTaskPath(tac);
+    FileSystem dfs = committedPath.getFileSystem(conf);
+
+    assertEquals("Should commit to HDFS", getDFS(), dfs);
+    assertIsFile(dfs, committedPath);
+    FileStatus[] stats = dfs.listStatus(committedPath);
+    assertEquals("Should produce one commit file", 1, stats.length);
+    assertEquals("Should name the commits file with the task ID",
+        "task_job_0001_r_000002", stats[0].getPath().getName());
+
+    List<SinglePendingCommit> pending =
+        PendingSet.load(dfs, stats[0].getPath()).getCommits();
+    assertEquals("Should have correct number of pending commits",
+        files.size(), pending.size());
+
+    Set<String> keys = Sets.newHashSet();
+    for (SinglePendingCommit commit : pending) {
+      assertEquals("Should write to the correct bucket: " + commit,
+          BUCKET, commit.getBucket());
+      assertValidUpload(results.getTagsByUpload(), commit);
+      keys.add(commit.getDestinationKey());
+    }
+
+    assertEquals("Should write to the correct key",
+        files, keys);
+  }
+
+  @Test
+  public void testTaskInitializeFailure() throws Exception {
+    committer.setupTask(tac);
+
+    errors.failOnInit(1);
+
+    Path attemptPath = committer.getTaskAttemptPath(tac);
+    FileSystem fs = attemptPath.getFileSystem(conf);
+
+    writeOutputFile(tac.getTaskAttemptID(), attemptPath,
+        UUID.randomUUID().toString(), 10);
+    writeOutputFile(tac.getTaskAttemptID(), attemptPath,
+        UUID.randomUUID().toString(), 10);
+
+    intercept(AWSClientIOException.class,
+        "Fail on init 1",
+        "Should fail during init",
+        () -> committer.commitTask(tac));
+
+    assertEquals("Should have initialized one file upload",
+        1, results.getUploads().size());
+    assertEquals("Should abort the upload",
+        new HashSet<>(results.getUploads()),
+        getAbortedIds(results.getAborts()));
+    assertPathDoesNotExist(fs,
+        "Should remove the attempt path",
+        attemptPath);
+  }
+
+  @Test
+  public void testTaskSingleFileUploadFailure() throws Exception {
+    describe("Set up a single file upload to fail on upload 2");
+    committer.setupTask(tac);
+
+    errors.failOnUpload(2);
+
+    Path attemptPath = committer.getTaskAttemptPath(tac);
+    FileSystem fs = attemptPath.getFileSystem(conf);
+
+    writeOutputFile(tac.getTaskAttemptID(), attemptPath,
+        UUID.randomUUID().toString(), 10);
+
+    intercept((Class<? extends Exception>) AWSClientIOException.class,
+        "Fail on upload 2",
+        "Should fail during upload",
+        () -> {
+          committer.commitTask(tac);
+          return committer.toString();
+        });
+
+    assertEquals("Should have attempted one file upload",
+        1, results.getUploads().size());
+    assertEquals("Should abort the upload",
+        results.getUploads().get(0),
+        results.getAborts().get(0).getUploadId());
+    assertPathDoesNotExist(fs, "Should remove the attempt path",
+        attemptPath);
+  }
+
+  @Test
+  public void testTaskMultiFileUploadFailure() throws Exception {
+    committer.setupTask(tac);
+
+    errors.failOnUpload(5);
+
+    Path attemptPath = committer.getTaskAttemptPath(tac);
+    FileSystem fs = attemptPath.getFileSystem(conf);
+
+    writeOutputFile(tac.getTaskAttemptID(), attemptPath,
+        UUID.randomUUID().toString(), 10);
+    writeOutputFile(tac.getTaskAttemptID(), attemptPath,
+        UUID.randomUUID().toString(), 10);
+
+    intercept((Class<? extends Exception>) AWSClientIOException.class,
+        "Fail on upload 5",
+        "Should fail during upload",
+        () -> {
+          committer.commitTask(tac);
+          return committer.toString();
+        });
+
+    assertEquals("Should have attempted two file uploads",
+        2, results.getUploads().size());
+    assertEquals("Should abort the upload",
+        new HashSet<>(results.getUploads()),
+        getAbortedIds(results.getAborts()));
+    assertPathDoesNotExist(fs, "Should remove the attempt path",
+        attemptPath);
+  }
+
+  @Test
+  public void testTaskUploadAndAbortFailure() throws Exception {
+    committer.setupTask(tac);
+
+    errors.failOnUpload(5);
+    errors.failOnAbort(0);
+
+    Path attemptPath = committer.getTaskAttemptPath(tac);
+    FileSystem fs = attemptPath.getFileSystem(conf);
+
+    writeOutputFile(tac.getTaskAttemptID(), attemptPath,
+        UUID.randomUUID().toString(), 10);
+    writeOutputFile(tac.getTaskAttemptID(), attemptPath,
+        UUID.randomUUID().toString(), 10);
+
+    intercept((Class<? extends Exception>) AWSClientIOException.class,
+        "Fail on upload 5",
+        "Should suppress abort failure, propagate upload failure",
+        ()-> {
+            committer.commitTask(tac);
+            return committer.toString();
+        });
+
+    assertEquals("Should have attempted two file uploads",
+        2, results.getUploads().size());
+    assertEquals("Should not have succeeded with any aborts",
+        new HashSet<>(),
+        getAbortedIds(results.getAborts()));
+    assertPathDoesNotExist(fs, "Should remove the attempt path", attemptPath);
+  }
+
+  @Test
+  public void testSingleTaskAbort() throws Exception {
+    committer.setupTask(tac);
+
+    Path attemptPath = committer.getTaskAttemptPath(tac);
+    FileSystem fs = attemptPath.getFileSystem(conf);
+
+    Path outPath = writeOutputFile(
+        tac.getTaskAttemptID(), attemptPath, UUID.randomUUID().toString(), 10);
+
+    committer.abortTask(tac);
+
+    assertEquals("Should not upload anything",
+        0, results.getUploads().size());
+    assertEquals("Should not upload anything",
+        0, results.getParts().size());
+    assertPathDoesNotExist(fs, "Should remove all attempt data", outPath);
+    assertPathDoesNotExist(fs, "Should remove the attempt path", attemptPath);
+
+  }
+
+  @Test
+  public void testJobCommit() throws Exception {
+    Path jobAttemptPath = jobCommitter.getJobAttemptPath(job);
+    FileSystem fs = jobAttemptPath.getFileSystem(conf);
+
+    Set<String> uploads = runTasks(job, 4, 3);
+    assertNotEquals(0, uploads.size());
+
+    assertPathExists(fs, "No job attempt path", jobAttemptPath);
+
+    jobCommitter.commitJob(job);
+    assertEquals("Should have aborted no uploads",
+        0, results.getAborts().size());
+
+    assertEquals("Should have deleted no uploads",
+        0, results.getDeletes().size());
+
+    assertEquals("Should have committed all uploads",
+        uploads, getCommittedIds(results.getCommits()));
+
+    assertPathDoesNotExist(fs, "jobAttemptPath not deleted", jobAttemptPath);
+
+  }
+
+  @Test
+  public void testJobCommitFailure() throws Exception {
+    Path jobAttemptPath = jobCommitter.getJobAttemptPath(job);
+    FileSystem fs = jobAttemptPath.getFileSystem(conf);
+
+    Set<String> uploads = runTasks(job, 4, 3);
+
+    assertPathExists(fs, "No job attempt path", jobAttemptPath);
+
+    errors.failOnCommit(5);
+    setMockLogLevel(MockS3AFileSystem.LOG_NAME);
+
+    intercept(AWSClientIOException.class,
+        "Fail on commit 5",
+        "Should propagate the commit failure",
+        () -> {
+          jobCommitter.commitJob(job);
+          return jobCommitter.toString();
+        });
+
+    assertEquals("Should have succeeded to commit some uploads",
+        5, results.getCommits().size());
+
+    assertEquals("Should have deleted the files that succeeded",
+        5, results.getDeletes().size());
+
+    Set<String> commits = results.getCommits()
+        .stream()
+        .map((commit) -> commit.getBucketName() + commit.getKey())
+        .collect(Collectors.toSet());
+
+    Set<String> deletes = results.getDeletes()
+        .stream()
+        .map((delete) -> delete.getBucketName() + delete.getKey())
+        .collect(Collectors.toSet());
+
+    assertEquals("Committed and deleted objects should match",
+        commits, deletes);
+
+    assertEquals("Mismatch in aborted upload count",
+        7, results.getAborts().size());
+
+    Set<String> uploadIds = getCommittedIds(results.getCommits());
+    uploadIds.addAll(getAbortedIds(results.getAborts()));
+
+    assertEquals("Should have committed/deleted or aborted all uploads",
+        uploads, uploadIds);
+
+    assertPathDoesNotExist(fs, "jobAttemptPath not deleted", jobAttemptPath);
+  }
+
+  @Test
+  public void testJobAbort() throws Exception {
+    Path jobAttemptPath = jobCommitter.getJobAttemptPath(job);
+    FileSystem fs = jobAttemptPath.getFileSystem(conf);
+
+    Set<String> uploads = runTasks(job, 4, 3);
+
+    assertPathExists(fs, "No job attempt path", jobAttemptPath);
+    jobCommitter.abortJob(job, JobStatus.State.KILLED);
+    assertEquals("Should have committed no uploads: " + jobCommitter,
+        0, results.getCommits().size());
+
+    assertEquals("Should have deleted no uploads: " + jobCommitter,
+        0, results.getDeletes().size());
+
+    assertEquals("Should have aborted all uploads: " + jobCommitter,
+        uploads, getAbortedIds(results.getAborts()));
+
+    assertPathDoesNotExist(fs, "jobAttemptPath not deleted", jobAttemptPath);
+  }
+
+  /**
+   * Run tasks, return the uploaded dataset. The upload data is
+   * extracted from the {@link #results} field; this is reset
+   * before the operation.
+   * @param jobContext job ctx
+   * @param numTasks number of tasks to run
+   * @param numFiles number of files for each task to generate
+   * @return a set of all uploads
+   * @throws IOException on a failure.
+   */
+  private Set<String> runTasks(JobContext jobContext,
+      int numTasks, int numFiles)
+      throws IOException {
+    results.resetUploads();
+    Set<String> uploads = Sets.newHashSet();
+
+    for (int taskId = 0; taskId < numTasks; taskId += 1) {
+      TaskAttemptID attemptID = new TaskAttemptID(
+          new TaskID(JOB_ID, TaskType.REDUCE, taskId),
+          (taskId * 37) % numTasks);
+      TaskAttemptContext attempt = new TaskAttemptContextImpl(
+          new Configuration(jobContext.getConfiguration()), attemptID);
+      MockedStagingCommitter taskCommitter = new MockedStagingCommitter(
+          OUTPUT_PATH, attempt);
+      commitTask(taskCommitter, attempt, numFiles);
+    }
+
+    uploads.addAll(results.getUploads());
+    return uploads;
+  }
+
+  private static Set<String> getAbortedIds(
+      List<AbortMultipartUploadRequest> aborts) {
+    return aborts.stream()
+        .map(AbortMultipartUploadRequest::getUploadId)
+        .collect(Collectors.toSet());
+  }
+
+  private static Set<String> getCommittedIds(
+      List<CompleteMultipartUploadRequest> commits) {
+    return commits.stream()
+        .map(CompleteMultipartUploadRequest::getUploadId)
+        .collect(Collectors.toSet());
+  }
+
+  private Set<String> commitTask(StagingCommitter staging,
+      TaskAttemptContext attempt,
+      int numFiles)
+      throws IOException {
+    Path attemptPath = staging.getTaskAttemptPath(attempt);
+
+    Set<String> files = Sets.newHashSet();
+    for (int i = 0; i < numFiles; i += 1) {
+      Path outPath = writeOutputFile(
+          attempt.getTaskAttemptID(), attemptPath, 
UUID.randomUUID().toString(),
+          10 * (i + 1));
+      files.add(OUTPUT_PREFIX +
+          "/" + outPath.getName()
+          + (uniqueFilenames ? ("-" + staging.getUUID()) : ""));
+    }
+
+    staging.commitTask(attempt);
+
+    return files;
+  }
+
+  private static void assertValidUpload(Map<String, List<String>> parts,
+                                        SinglePendingCommit commit) {
+    assertTrue("Should commit a valid uploadId",
+        parts.containsKey(commit.getUploadId()));
+
+    List<String> tags = parts.get(commit.getUploadId());
+    assertEquals("Should commit the correct number of file parts",
+        tags.size(), commit.getPartCount());
+
+    for (int i = 0; i < tags.size(); i += 1) {
+      assertEquals("Should commit the correct part tags",
+          tags.get(i), commit.getEtags().get(i));
+    }
+  }
+
+  private static Path writeOutputFile(TaskAttemptID id, Path dest,
+                                      String content, long copies)
+      throws IOException {
+    String fileName = ((id.getTaskType() == TaskType.REDUCE) ? "r_" : "m_") +
+        id.getTaskID().getId() + "_" + id.getId() + "_" +
+        UUID.randomUUID().toString();
+    Path outPath = new Path(dest, fileName);
+    FileSystem fs = outPath.getFileSystem(getConfiguration());
+
+    try (OutputStream out = fs.create(outPath)) {
+      byte[] bytes = content.getBytes(StandardCharsets.UTF_8);
+      for (int i = 0; i < copies; i += 1) {
+        out.write(bytes);
+      }
+    }
+
+    return outPath;
+  }
+
+  /**
+   * Used during debugging mock test failures; cranks up logging of method
+   * calls.
+   * @param level log level
+   */
+  private void setMockLogLevel(int level) {
+    wrapperFS.setLogEvents(level);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java
new file mode 100644
index 0000000..4f0189e
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingDirectoryOutputCommitter.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.PathExistsException;
+import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
+
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.*;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.mockito.Mockito.*;
+
+/** Mocking test of directory committer. */
+public class TestStagingDirectoryOutputCommitter
+    extends StagingTestBase.JobCommitterTest<DirectoryStagingCommitter> {
+
+  @Override
+  DirectoryStagingCommitter newJobCommitter() throws Exception {
+    return new DirectoryStagingCommitter(OUTPUT_PATH,
+        createTaskAttemptForJob());
+  }
+
+  @Test
+  public void testBadConflictMode() throws Throwable {
+    getJob().getConfiguration().set(
+        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, "merge");
+    intercept(IllegalArgumentException.class,
+        "MERGE", "committer conflict",
+        this::newJobCommitter);
+  }
+
+  @Test
+  public void testDefaultConflictResolution() throws Exception {
+    getJob().getConfiguration().unset(
+        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
+    verifyFailureConflictOutcome();
+  }
+  @Test
+  public void testFailConflictResolution() throws Exception {
+    getJob().getConfiguration().set(
+        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_FAIL);
+    verifyFailureConflictOutcome();
+  }
+
+  protected void verifyFailureConflictOutcome() throws Exception {
+    FileSystem mockS3 = getMockS3A();
+    pathExists(mockS3, OUTPUT_PATH);
+    final DirectoryStagingCommitter committer = newJobCommitter();
+
+    intercept(PathExistsException.class,
+        InternalCommitterConstants.E_DEST_EXISTS,
+        "Should throw an exception because the path exists",
+        () -> committer.setupJob(getJob()));
+
+    intercept(PathExistsException.class,
+        InternalCommitterConstants.E_DEST_EXISTS,
+        "Should throw an exception because the path exists",
+        () -> committer.commitJob(getJob()));
+
+    reset(mockS3);
+    pathDoesNotExist(mockS3, OUTPUT_PATH);
+
+    committer.setupJob(getJob());
+    verifyExistenceChecked(mockS3, OUTPUT_PATH);
+    verifyNoMoreInteractions(mockS3);
+
+    reset(mockS3);
+    pathDoesNotExist(mockS3, OUTPUT_PATH);
+    committer.commitJob(getJob());
+    verifyExistenceChecked(mockS3, OUTPUT_PATH);
+    verifyCompletion(mockS3);
+  }
+
+  @Test
+  public void testAppendConflictResolution() throws Exception {
+    FileSystem mockS3 = getMockS3A();
+
+    pathExists(mockS3, OUTPUT_PATH);
+
+    getJob().getConfiguration().set(
+        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND);
+
+    final DirectoryStagingCommitter committer = newJobCommitter();
+
+    committer.setupJob(getJob());
+    verifyNoMoreInteractions(mockS3);
+
+    Mockito.reset(mockS3);
+    pathExists(mockS3, OUTPUT_PATH);
+
+    committer.commitJob(getJob());
+    verifyCompletion(mockS3);
+  }
+
+  @Test
+  public void testReplaceConflictResolution() throws Exception {
+    FileSystem mockS3 = getMockS3A();
+
+    pathExists(mockS3, OUTPUT_PATH);
+
+    getJob().getConfiguration().set(
+        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_REPLACE);
+
+    final DirectoryStagingCommitter committer = newJobCommitter();
+
+    committer.setupJob(getJob());
+    verifyNoMoreInteractions(mockS3);
+
+    Mockito.reset(mockS3);
+    pathExists(mockS3, OUTPUT_PATH);
+    canDelete(mockS3, OUTPUT_PATH);
+
+    committer.commitJob(getJob());
+    verifyDeleted(mockS3, OUTPUT_PATH);
+    verifyCompletion(mockS3);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedFileListing.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedFileListing.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedFileListing.java
new file mode 100644
index 0000000..139b4e3
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedFileListing.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.*;
+import static org.hamcrest.CoreMatchers.allOf;
+import static org.hamcrest.CoreMatchers.hasItem;
+
+/**
+ * Test partitioned staging committer's logic for putting data in the right
+ * place.
+ */
+public class TestStagingPartitionedFileListing
+    extends TaskCommitterTest<PartitionedStagingCommitter> {
+
+  @Override
+  PartitionedStagingCommitter newJobCommitter() throws IOException {
+    return new PartitionedStagingCommitter(OUTPUT_PATH,
+        createTaskAttemptForJob());
+  }
+
+  @Override
+  PartitionedStagingCommitter newTaskCommitter() throws IOException {
+    return new PartitionedStagingCommitter(OUTPUT_PATH, getTAC());
+  }
+
+  private FileSystem attemptFS;
+  private Path attemptPath;
+
+  @After
+  public void cleanupAttempt() {
+    cleanup("teardown", attemptFS, attemptPath);
+  }
+
+  @Test
+  public void testTaskOutputListing() throws Exception {
+    PartitionedStagingCommitter committer = newTaskCommitter();
+
+    // create files in the attempt path that should be found by getTaskOutput
+    attemptPath = committer.getTaskAttemptPath(getTAC());
+    attemptFS = attemptPath.getFileSystem(
+        getTAC().getConfiguration());
+    attemptFS.delete(attemptPath, true);
+
+    try {
+      List<String> expectedFiles = Lists.newArrayList();
+      for (String dateint : Arrays.asList("20161115", "20161116")) {
+        for (String hour : Arrays.asList("13", "14")) {
+          String relative = "dateint=" + dateint + "/hour=" + hour +
+              "/" + UUID.randomUUID().toString() + ".parquet";
+          expectedFiles.add(relative);
+          attemptFS.create(new Path(attemptPath, relative)).close();
+        }
+      }
+
+      List<String> actualFiles = committer.getTaskOutput(getTAC())
+          .stream()
+          .map(stat -> Paths.getRelativePath(attemptPath,
+              stat.getPath()))
+          .collect(Collectors.toList());
+      Collections.sort(expectedFiles);
+      Collections.sort(actualFiles);
+      assertEquals("File sets should match", expectedFiles, actualFiles);
+    } finally {
+      deleteQuietly(attemptFS, attemptPath, true);
+    }
+
+  }
+
+  @Test
+  public void testTaskOutputListingWithHiddenFiles() throws Exception {
+    PartitionedStagingCommitter committer = newTaskCommitter();
+
+    // create files in the attempt path that should be found by getTaskOutput
+    attemptPath = committer.getTaskAttemptPath(getTAC());
+    attemptFS = attemptPath.getFileSystem(
+        getTAC().getConfiguration());
+    attemptFS.delete(attemptPath, true);
+
+    try {
+      List<String> expectedFiles = Lists.newArrayList();
+      for (String dateint : Arrays.asList("20161115", "20161116")) {
+        String metadata = "dateint=" + dateint + "/" + "_metadata";
+        attemptFS.create(new Path(attemptPath, metadata)).close();
+
+        for (String hour : Arrays.asList("13", "14")) {
+          String relative = "dateint=" + dateint + "/hour=" + hour +
+              "/" + UUID.randomUUID().toString() + ".parquet";
+          expectedFiles.add(relative);
+          attemptFS.create(new Path(attemptPath, relative)).close();
+
+          String partial = "dateint=" + dateint + "/hour=" + hour +
+              "/." + UUID.randomUUID().toString() + ".partial";
+          attemptFS.create(new Path(attemptPath, partial)).close();
+        }
+      }
+
+      List<String> actualFiles = committer.getTaskOutput(getTAC()).stream()
+          .map(stat -> Paths.getRelativePath(attemptPath, stat.getPath()))
+          .collect(Collectors.toList());
+      Collections.sort(expectedFiles);
+      Collections.sort(actualFiles);
+      assertEquals("File sets should match", expectedFiles, actualFiles);
+    } finally {
+      deleteQuietly(attemptFS, attemptPath, true);
+    }
+
+  }
+
+  @Test
+  public void testPartitionsResolution() throws Throwable {
+
+    File tempDir = getTempDir();
+    File partitionsDir = new File(tempDir, "partitions");
+
+    attemptPath = new Path(partitionsDir.toURI());
+    attemptFS = FileSystem.getLocal(getJob().getConfiguration());
+    deleteQuietly(attemptFS, attemptPath, true);
+    attemptFS.mkdirs(attemptPath);
+    // initial partitioning -> empty
+    assertTrue(Paths.getPartitions(attemptPath, new ArrayList<>(0)).isEmpty());
+    String oct2017 = "year=2017/month=10";
+    Path octLog = new Path(attemptPath, oct2017 + "/log-2017-10-04.txt");
+    touch(attemptFS, octLog);
+    assertThat(listPartitions(attemptFS, attemptPath), hasItem(oct2017));
+
+    // add a root entry and it ends up under the table_root entry
+    Path rootFile = new Path(attemptPath, "root.txt");
+    touch(attemptFS, rootFile);
+    assertThat(listPartitions(attemptFS, attemptPath),
+        allOf(hasItem(oct2017),
+            hasItem(StagingCommitterConstants.TABLE_ROOT)));
+  }
+
+  /**
+   * List files in a filesystem using {@code listFiles()},
+   * then get all the partitions.
+   * @param fs filesystem
+   * @param base base of tree
+   * @return a list of partitions
+   * @throws IOException failure
+   */
+  private Set<String> listPartitions(FileSystem fs, Path base)
+      throws IOException {
+    List<FileStatus> statusList = mapLocatedFiles(
+        fs.listFiles(base, true), s -> (FileStatus) s);
+    return Paths.getPartitions(base, statusList);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
new file mode 100644
index 0000000..4df3912
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.MockS3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.PathCommitException;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.mockito.Mockito.*;
+import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+
+/** Mocking test of partitioned committer. */
+public class TestStagingPartitionedJobCommit
+    extends StagingTestBase.JobCommitterTest<PartitionedStagingCommitter> {
+
+  @Override
+  public void setupJob() throws Exception {
+    super.setupJob();
+    getWrapperFS().setLogEvents(MockS3AFileSystem.LOG_NAME);
+  }
+
+  @Override
+  PartitionedStagingCommitter newJobCommitter() throws IOException {
+    return new 
PartitionedStagingCommitterForTesting(createTaskAttemptForJob());
+  }
+
+  /**
+   * Subclass of the Partitioned Staging committer used in the test cases.
+   */
+  private static final class PartitionedStagingCommitterForTesting
+      extends PartitionedCommitterForTesting {
+
+    private boolean aborted = false;
+
+    private PartitionedStagingCommitterForTesting(TaskAttemptContext context)
+        throws IOException {
+      super(OUTPUT_PATH, context);
+    }
+
+    @Override
+    protected List<SinglePendingCommit> listPendingUploadsToCommit(
+        JobContext context) throws IOException {
+      List<SinglePendingCommit> pending = Lists.newArrayList();
+
+      for (String dateint : Arrays.asList("20161115", "20161116")) {
+        for (String hour : Arrays.asList("13", "14")) {
+          String key = OUTPUT_PREFIX + "/dateint=" + dateint + "/hour=" + hour 
+
+              "/" + UUID.randomUUID().toString() + ".parquet";
+          SinglePendingCommit commit = new SinglePendingCommit();
+          commit.setBucket(BUCKET);
+          commit.setDestinationKey(key);
+          commit.setUri("s3a://" + BUCKET + "/" + key);
+          commit.setUploadId(UUID.randomUUID().toString());
+          commit.setEtags(new ArrayList<>());
+          pending.add(commit);
+        }
+      }
+      return pending;
+    }
+
+    @Override
+    protected void abortJobInternal(JobContext context,
+        boolean suppressExceptions) throws IOException {
+      this.aborted = true;
+      super.abortJobInternal(context, suppressExceptions);
+    }
+  }
+
+  @Test
+  public void testDefaultFailAndAppend() throws Exception {
+    FileSystem mockS3 = getMockS3A();
+
+    // both fail and append don't check. fail is enforced at the task level.
+    for (String mode : Arrays.asList(null, CONFLICT_MODE_FAIL,
+        CONFLICT_MODE_APPEND)) {
+      if (mode != null) {
+        getJob().getConfiguration().set(
+            FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, mode);
+      } else {
+        getJob().getConfiguration().unset(
+            FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
+      }
+
+      PartitionedStagingCommitter committer = newJobCommitter();
+
+      // no directories exist
+      committer.commitJob(getJob());
+
+      // parent and peer directories exist
+      reset(mockS3);
+      pathsExist(mockS3, "dateint=20161116",
+          "dateint=20161116/hour=10");
+      committer.commitJob(getJob());
+      verifyCompletion(mockS3);
+
+      // a leaf directory exists.
+      // NOTE: this is not checked during job commit, the commit succeeds.
+      reset(mockS3);
+      pathsExist(mockS3, "dateint=20161115/hour=14");
+      committer.commitJob(getJob());
+      verifyCompletion(mockS3);
+    }
+  }
+
+  @Test
+  public void testBadConflictMode() throws Throwable {
+    getJob().getConfiguration().set(
+        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, "merge");
+    intercept(IllegalArgumentException.class,
+        "MERGE", "committer conflict",
+        this::newJobCommitter);
+  }
+
+  @Test
+  public void testReplace() throws Exception {
+    S3AFileSystem mockS3 = getMockS3A();
+
+    getJob().getConfiguration().set(
+        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_REPLACE);
+
+    PartitionedStagingCommitter committer = newJobCommitter();
+
+    committer.commitJob(getJob());
+    verifyReplaceCommitActions(mockS3);
+    verifyCompletion(mockS3);
+
+    // parent and peer directories exist
+    reset(mockS3);
+    pathsExist(mockS3, "dateint=20161115",
+        "dateint=20161115/hour=12");
+
+    committer.commitJob(getJob());
+    verifyReplaceCommitActions(mockS3);
+    verifyCompletion(mockS3);
+
+    // partition directories exist and should be removed
+    reset(mockS3);
+    pathsExist(mockS3, "dateint=20161115/hour=12",
+        "dateint=20161115/hour=13");
+    canDelete(mockS3, "dateint=20161115/hour=13");
+
+    committer.commitJob(getJob());
+    verifyDeleted(mockS3, "dateint=20161115/hour=13");
+    verifyReplaceCommitActions(mockS3);
+    verifyCompletion(mockS3);
+
+    // partition directories exist and should be removed
+    reset(mockS3);
+    pathsExist(mockS3, "dateint=20161116/hour=13",
+        "dateint=20161116/hour=14");
+
+    canDelete(mockS3, "dateint=20161116/hour=13",
+        "dateint=20161116/hour=14");
+
+    committer.commitJob(getJob());
+    verifyReplaceCommitActions(mockS3);
+    verifyDeleted(mockS3, "dateint=20161116/hour=13");
+    verifyDeleted(mockS3, "dateint=20161116/hour=14");
+    verifyCompletion(mockS3);
+  }
+
+
+  /**
+   * Verify the actions which replace does, essentially: delete the parent
+   * partitions.
+   * @param mockS3 s3 mock
+   */
+  protected void verifyReplaceCommitActions(FileSystem mockS3)
+      throws IOException {
+    verifyDeleted(mockS3, "dateint=20161115/hour=13");
+    verifyDeleted(mockS3, "dateint=20161115/hour=14");
+    verifyDeleted(mockS3, "dateint=20161116/hour=13");
+    verifyDeleted(mockS3, "dateint=20161116/hour=14");
+  }
+
+  @Test
+  public void testReplaceWithDeleteFailure() throws Exception {
+    FileSystem mockS3 = getMockS3A();
+
+    getJob().getConfiguration().set(
+        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_REPLACE);
+
+    final PartitionedStagingCommitter committer = newJobCommitter();
+
+    pathsExist(mockS3, "dateint=20161116/hour=14");
+    when(mockS3
+        .delete(
+            new Path(OUTPUT_PATH, "dateint=20161116/hour=14"),
+            true))
+        .thenThrow(new PathCommitException("fake",
+            "Fake IOException for delete"));
+
+    intercept(PathCommitException.class, "Fake IOException for delete",
+        "Should throw the fake IOException",
+        () -> committer.commitJob(getJob()));
+
+    verifyReplaceCommitActions(mockS3);
+    verifyDeleted(mockS3, "dateint=20161116/hour=14");
+    assertTrue("Should have aborted",
+        ((PartitionedStagingCommitterForTesting) committer).aborted);
+    verifyCompletion(mockS3);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java
new file mode 100644
index 0000000..ddcb56e
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedTaskCommit.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathExistsException;
+import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.mockito.Mockito.*;
+import static org.apache.hadoop.fs.s3a.commit.staging.StagingTestBase.*;
+
+/** Mocking test of the partitioned committer. */
+public class TestStagingPartitionedTaskCommit
+    extends StagingTestBase.TaskCommitterTest<PartitionedStagingCommitter> {
+
+  @Override
+  PartitionedStagingCommitter newJobCommitter() throws IOException {
+    return new PartitionedStagingCommitter(OUTPUT_PATH,
+        createTaskAttemptForJob());
+  }
+
+  @Override
+  PartitionedStagingCommitter newTaskCommitter() throws Exception {
+    return new PartitionedStagingCommitter(OUTPUT_PATH, getTAC());
+  }
+
+  // The set of files used by this test
+  private static List<String> relativeFiles = Lists.newArrayList();
+
+  @BeforeClass
+  public static void createRelativeFileList() {
+    for (String dateint : Arrays.asList("20161115", "20161116")) {
+      for (String hour : Arrays.asList("14", "15")) {
+        String relative = "dateint=" + dateint + "/hour=" + hour +
+            "/" + UUID.randomUUID().toString() + ".parquet";
+        relativeFiles.add(relative);
+      }
+    }
+  }
+
+  @Test
+  public void testBadConflictMode() throws Throwable {
+    getJob().getConfiguration().set(
+        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, "merge");
+    intercept(IllegalArgumentException.class,
+        "MERGE", "committer conflict", this::newJobCommitter);
+  }
+
+  @Test
+  public void testDefault() throws Exception {
+    FileSystem mockS3 = getMockS3A();
+
+    JobContext job = getJob();
+    job.getConfiguration().unset(
+        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
+    final PartitionedStagingCommitter committer = newTaskCommitter();
+
+    committer.setupTask(getTAC());
+    assertConflictResolution(committer, job, ConflictResolution.FAIL);
+    createTestOutputFiles(relativeFiles,
+        committer.getTaskAttemptPath(getTAC()), getTAC().getConfiguration());
+
+    // test failure when one partition already exists
+    reset(mockS3);
+    pathExists(mockS3, new Path(OUTPUT_PATH, 
relativeFiles.get(0)).getParent());
+
+    intercept(PathExistsException.class,
+        InternalCommitterConstants.E_DEST_EXISTS,
+        "Expected a PathExistsException as a partition already exists",
+        () -> committer.commitTask(getTAC()));
+
+    // test success
+    reset(mockS3);
+
+    committer.commitTask(getTAC());
+    Set<String> files = Sets.newHashSet();
+    for (InitiateMultipartUploadRequest request :
+        getMockResults().getRequests().values()) {
+      assertEquals(BUCKET, request.getBucketName());
+      files.add(request.getKey());
+    }
+    assertEquals("Should have the right number of uploads",
+        relativeFiles.size(), files.size());
+
+    Set<String> expected = buildExpectedList(committer);
+
+    assertEquals("Should have correct paths", expected, files);
+  }
+
+  @Test
+  public void testFail() throws Exception {
+    FileSystem mockS3 = getMockS3A();
+
+    getTAC().getConfiguration()
+        .set(FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_FAIL);
+
+    final PartitionedStagingCommitter committer = newTaskCommitter();
+
+    committer.setupTask(getTAC());
+    createTestOutputFiles(relativeFiles,
+        committer.getTaskAttemptPath(getTAC()), getTAC().getConfiguration());
+
+    // test failure when one partition already exists
+    reset(mockS3);
+    pathExists(mockS3, new Path(OUTPUT_PATH, 
relativeFiles.get(1)).getParent());
+
+    intercept(PathExistsException.class, "",
+        "Should complain because a partition already exists",
+        () -> committer.commitTask(getTAC()));
+
+    // test success
+    reset(mockS3);
+
+    committer.commitTask(getTAC());
+    Set<String> files = Sets.newHashSet();
+    for (InitiateMultipartUploadRequest request :
+        getMockResults().getRequests().values()) {
+      assertEquals(BUCKET, request.getBucketName());
+      files.add(request.getKey());
+    }
+    assertEquals("Should have the right number of uploads",
+        relativeFiles.size(), files.size());
+
+    Set<String> expected = buildExpectedList(committer);
+
+    assertEquals("Should have correct paths", expected, files);
+  }
+
+  @Test
+  public void testAppend() throws Exception {
+    FileSystem mockS3 = getMockS3A();
+
+    getTAC().getConfiguration()
+        .set(FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND);
+
+    PartitionedStagingCommitter committer = newTaskCommitter();
+
+    committer.setupTask(getTAC());
+    createTestOutputFiles(relativeFiles,
+        committer.getTaskAttemptPath(getTAC()), getTAC().getConfiguration());
+
+    // test success when one partition already exists
+    reset(mockS3);
+    pathExists(mockS3, new Path(OUTPUT_PATH, 
relativeFiles.get(2)).getParent());
+
+    committer.commitTask(getTAC());
+    Set<String> files = Sets.newHashSet();
+    for (InitiateMultipartUploadRequest request :
+        getMockResults().getRequests().values()) {
+      assertEquals(BUCKET, request.getBucketName());
+      files.add(request.getKey());
+    }
+    assertEquals("Should have the right number of uploads",
+        relativeFiles.size(), files.size());
+
+    Set<String> expected = buildExpectedList(committer);
+
+    assertEquals("Should have correct paths", expected, files);
+  }
+
+  @Test
+  public void testReplace() throws Exception {
+    // TODO: this committer needs to delete the data that already exists
+    // This test should assert that the delete was done
+    FileSystem mockS3 = getMockS3A();
+
+    getTAC().getConfiguration()
+        .set(FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_REPLACE);
+
+    PartitionedStagingCommitter committer = newTaskCommitter();
+
+    committer.setupTask(getTAC());
+    createTestOutputFiles(relativeFiles,
+        committer.getTaskAttemptPath(getTAC()), getTAC().getConfiguration());
+
+    // test success when one partition already exists
+    reset(mockS3);
+    pathExists(mockS3, new Path(OUTPUT_PATH, 
relativeFiles.get(3)).getParent());
+
+    committer.commitTask(getTAC());
+    Set<String> files = Sets.newHashSet();
+    for (InitiateMultipartUploadRequest request :
+        getMockResults().getRequests().values()) {
+      assertEquals(BUCKET, request.getBucketName());
+      files.add(request.getKey());
+    }
+    assertEquals("Should have the right number of uploads",
+        relativeFiles.size(), files.size());
+
+    Set<String> expected = buildExpectedList(committer);
+
+    assertEquals("Should have correct paths", expected, files);
+  }
+
+  public Set<String> buildExpectedList(StagingCommitter committer) {
+    Set<String> expected = Sets.newHashSet();
+    boolean unique = committer.useUniqueFilenames();
+    for (String relative : relativeFiles) {
+      expected.add(OUTPUT_PREFIX +
+          "/" +
+          (unique ? Paths.addUUID(relative, committer.getUUID()) : relative));
+    }
+    return expected;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java
new file mode 100644
index 0000000..c10ebed
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITDirectoryCommitMRJob.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging.integration;
+
+import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
+import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
+
+/**
+ * Full integration test for the directory committer.
+ */
+public class ITDirectoryCommitMRJob extends AbstractITCommitMRJob {
+
+  @Override
+  protected String committerName() {
+    return DirectoryStagingCommitter.NAME;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITPartitionCommitMRJob.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITPartitionCommitMRJob.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITPartitionCommitMRJob.java
new file mode 100644
index 0000000..1c19a95
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITPartitionCommitMRJob.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging.integration;
+
+import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
+import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter;
+
+/**
+ * Full integration test for the partition committer.
+ */
+public class ITPartitionCommitMRJob extends AbstractITCommitMRJob {
+
+  @Override
+  protected String committerName() {
+    return PartitionedStagingCommitter.NAME;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJob.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJob.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJob.java
new file mode 100644
index 0000000..76ad464
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITStagingCommitMRJob.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging.integration;
+
+import org.junit.Test;
+
+import org.hamcrest.core.StringContains;
+import org.hamcrest.core.StringEndsWith;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
+import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import static 
org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory;
+
+/**
+ * Full integration test for the staging committer.
+ */
+public class ITStagingCommitMRJob extends AbstractITCommitMRJob {
+
+  @Override
+  protected String committerName() {
+    return StagingCommitter.NAME;
+  }
+
+  /**
+   * Verify that staging commit dirs are made absolute under the user's
+   * home directory, so, in a secure cluster, private.
+   */
+  @Test
+  public void testStagingDirectory() throws Throwable {
+    FileSystem hdfs = getDFS();
+    Configuration conf = hdfs.getConf();
+    conf.set(CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH,
+        "private");
+    Path dir = getMultipartUploadCommitsDirectory(conf, "UUID");
+    assertThat(dir.toString(), StringEndsWith.endsWith(
+        "UUID/"
+        + StagingCommitterConstants.STAGING_UPLOADS));
+    assertTrue("path unqualified", dir.isAbsolute());
+    String self = UserGroupInformation.getCurrentUser().getShortUserName();
+    assertThat(dir.toString(),
+        StringContains.containsString("/user/" + self + "/private"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java
new file mode 100644
index 0000000..ea28ecb
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging.integration;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjection;
+import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjectionImpl;
+import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/** ITest of the low level protocol methods. */
+public class ITestDirectoryCommitProtocol extends ITestStagingCommitProtocol {
+
+  @Override
+  protected String suitename() {
+    return "ITestDirectoryCommitProtocol";
+  }
+
+  @Override
+  protected String getCommitterName() {
+    return CommitConstants.COMMITTER_NAME_DIRECTORY;
+  }
+
+  @Override
+  protected AbstractS3ACommitter createCommitter(
+      Path outputPath,
+      TaskAttemptContext context)
+      throws IOException {
+    return new DirectoryStagingCommitter(outputPath, context);
+  }
+
+  @Override
+  public AbstractS3ACommitter createFailingCommitter(
+      TaskAttemptContext tContext) throws IOException {
+    return new CommitterWithFailedThenSucceed(getOutDir(), tContext);
+  }
+
+  /**
+   * The class provides a overridden implementation of commitJobInternal which
+   * causes the commit failed for the first time then succeed.
+   */
+  private static final class CommitterWithFailedThenSucceed extends
+      DirectoryStagingCommitter implements CommitterFaultInjection {
+
+    private final CommitterFaultInjectionImpl injection;
+
+    CommitterWithFailedThenSucceed(Path outputPath,
+        TaskAttemptContext context) throws IOException {
+      super(outputPath, context);
+      injection = new CommitterFaultInjectionImpl(outputPath, context, true);
+    }
+
+    @Override
+    public void setupJob(JobContext context) throws IOException {
+      injection.setupJob(context);
+      super.setupJob(context);
+    }
+
+    @Override
+    public void abortJob(JobContext context, JobStatus.State state)
+        throws IOException {
+      injection.abortJob(context, state);
+      super.abortJob(context, state);
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    public void cleanupJob(JobContext context) throws IOException {
+      injection.cleanupJob(context);
+      super.cleanupJob(context);
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext context) throws IOException {
+      injection.setupTask(context);
+      super.setupTask(context);
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext context) throws IOException {
+      injection.commitTask(context);
+      super.commitTask(context);
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext context) throws IOException {
+      injection.abortTask(context);
+      super.abortTask(context);
+    }
+
+    @Override
+    public void commitJob(JobContext context) throws IOException {
+      injection.commitJob(context);
+      super.commitJob(context);
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext context)
+        throws IOException {
+      injection.needsTaskCommit(context);
+      return super.needsTaskCommit(context);
+    }
+
+    @Override
+    public void setFaults(CommitterFaultInjection.Faults... faults) {
+      injection.setFaults(faults);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionedCommitProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionedCommitProtocol.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionedCommitProtocol.java
new file mode 100644
index 0000000..e3bc150
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionedCommitProtocol.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging.integration;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjection;
+import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjectionImpl;
+import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
+import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+
+/** ITest of the low level protocol methods. */
+public class ITestPartitionedCommitProtocol extends ITestStagingCommitProtocol 
{
+
+  @Override
+  protected String suitename() {
+    return "ITestPartitionedCommitProtocol";
+  }
+
+  @Override
+  protected String getCommitterName() {
+    return CommitConstants.COMMITTER_NAME_PARTITIONED;
+  }
+
+  @Override
+  protected AbstractS3ACommitter createCommitter(
+      Path outputPath,
+      TaskAttemptContext context)
+      throws IOException {
+    return new PartitionedStagingCommitter(outputPath, context);
+  }
+
+  @Override
+  public AbstractS3ACommitter createFailingCommitter(
+      TaskAttemptContext tContext) throws IOException {
+    return new CommitterWithFailedThenSucceed(getOutDir(), tContext);
+  }
+
+  @Override
+  public void testMapFileOutputCommitter() throws Exception {
+    skip("Partioning committer is not suitable for Map Output");
+  }
+
+  /**
+   * The class provides a overridden implementation of commitJobInternal which
+   * causes the commit failed for the first time then succeed.
+   */
+  private static final class CommitterWithFailedThenSucceed extends
+      DirectoryStagingCommitter implements CommitterFaultInjection {
+    private final CommitterFaultInjectionImpl injection;
+
+    CommitterWithFailedThenSucceed(Path outputPath,
+        TaskAttemptContext context) throws IOException {
+      super(outputPath, context);
+      injection = new CommitterFaultInjectionImpl(outputPath, context, true);
+    }
+
+    @Override
+    public void setupJob(JobContext context) throws IOException {
+      injection.setupJob(context);
+      super.setupJob(context);
+    }
+
+    @Override
+    public void abortJob(JobContext context, JobStatus.State state)
+        throws IOException {
+      injection.abortJob(context, state);
+      super.abortJob(context, state);
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    public void cleanupJob(JobContext context) throws IOException {
+      injection.cleanupJob(context);
+      super.cleanupJob(context);
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext context) throws IOException {
+      injection.setupTask(context);
+      super.setupTask(context);
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext context) throws IOException {
+      injection.commitTask(context);
+      super.commitTask(context);
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext context) throws IOException {
+      injection.abortTask(context);
+      super.abortTask(context);
+    }
+
+    @Override
+    public void commitJob(JobContext context) throws IOException {
+      injection.commitJob(context);
+      super.commitJob(context);
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext context)
+        throws IOException {
+      injection.needsTaskCommit(context);
+      return super.needsTaskCommit(context);
+    }
+
+    @Override
+    public void setFaults(CommitterFaultInjection.Faults... faults) {
+      injection.setFaults(faults);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
new file mode 100644
index 0000000..08c572e
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.staging.integration;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.InconsistentS3ClientFactory;
+import org.apache.hadoop.fs.s3a.S3ClientFactory;
+import org.apache.hadoop.fs.s3a.commit.AbstractITCommitProtocol;
+import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
+import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjection;
+import org.apache.hadoop.fs.s3a.commit.CommitterFaultInjectionImpl;
+import org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants;
+import org.apache.hadoop.fs.s3a.commit.staging.Paths;
+import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import static org.apache.hadoop.fs.s3a.Constants.S3_CLIENT_FACTORY_IMPL;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+
+/** Test the staging committer's handling of the base protocol operations. */
+public class ITestStagingCommitProtocol extends AbstractITCommitProtocol {
+
+  @Override
+  protected String suitename() {
+    return "ITestStagingCommitProtocol";
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    conf.setInt(FS_S3A_COMMITTER_THREADS, 1);
+    // switch to the inconsistent filesystem
+    conf.setClass(S3_CLIENT_FACTORY_IMPL, InconsistentS3ClientFactory.class,
+        S3ClientFactory.class);
+
+    // disable unique filenames so that the protocol tests of FileOutputFormat
+    // and this test generate consistent names.
+    conf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES, false);
+    return conf;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+
+    // identify working dir for staging and delete
+    Configuration conf = getConfiguration();
+    String uuid = StagingCommitter.getUploadUUID(conf,
+        getTaskAttempt0().getJobID());
+    Path tempDir = Paths.getLocalTaskAttemptTempDir(conf, uuid,
+        getTaskAttempt0());
+    rmdir(tempDir, conf);
+  }
+
+  @Override
+  protected String getCommitterName() {
+    return InternalCommitterConstants.COMMITTER_NAME_STAGING;
+  }
+
+  @Override
+  protected AbstractS3ACommitter createCommitter(Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    return new StagingCommitter(outputPath, context);
+  }
+
+  public AbstractS3ACommitter createFailingCommitter(
+      TaskAttemptContext tContext) throws IOException {
+    return new CommitterWithFailedThenSucceed(getOutDir(), tContext);
+  }
+
+  @Override
+  protected boolean shouldExpectSuccessMarker() {
+    return false;
+  }
+
+  @Override
+  protected void expectJobCommitToFail(JobContext jContext,
+      AbstractS3ACommitter committer) throws Exception {
+    expectJobCommitFailure(jContext, committer,
+        IOException.class);
+  }
+
+  protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException 
{
+    // this is expected to be local FS
+    ContractTestUtils.assertPathExists(getLocalFS(), "task attempt", p);
+  }
+
+  protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException {
+    // this is expected to be local FS
+    ContractTestUtils.assertPathExists(getLocalFS(), "task attempt", p);
+  }
+
+  protected FileSystem getLocalFS() throws IOException {
+    return FileSystem.getLocal(getConfiguration());
+  }
+
+  /**
+   * The class provides a overridden implementation of commitJobInternal which
+   * causes the commit failed for the first time then succeed.
+   */
+  private static final class CommitterWithFailedThenSucceed extends
+      StagingCommitter implements CommitterFaultInjection {
+
+    private final CommitterFaultInjectionImpl injection;
+
+    CommitterWithFailedThenSucceed(Path outputPath,
+        TaskAttemptContext context) throws IOException {
+      super(outputPath, context);
+      injection = new CommitterFaultInjectionImpl(outputPath, context, true);
+    }
+    @Override
+    public void setupJob(JobContext context) throws IOException {
+      injection.setupJob(context);
+      super.setupJob(context);
+    }
+
+    @Override
+    public void abortJob(JobContext context, JobStatus.State state)
+        throws IOException {
+      injection.abortJob(context, state);
+      super.abortJob(context, state);
+    }
+
+    @Override
+    @SuppressWarnings("deprecation")
+    public void cleanupJob(JobContext context) throws IOException {
+      injection.cleanupJob(context);
+      super.cleanupJob(context);
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext context) throws IOException {
+      injection.setupTask(context);
+      super.setupTask(context);
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext context) throws IOException {
+      injection.commitTask(context);
+      super.commitTask(context);
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext context) throws IOException {
+      injection.abortTask(context);
+      super.abortTask(context);
+    }
+
+    @Override
+    public void commitJob(JobContext context) throws IOException {
+      injection.commitJob(context);
+      super.commitJob(context);
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext context)
+        throws IOException {
+      injection.needsTaskCommit(context);
+      return super.needsTaskCommit(context);
+    }
+
+    @Override
+    public void setFaults(Faults... faults) {
+      injection.setFaults(faults);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
index a33c001..22a028a 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/AbstractS3GuardToolTestBase.java
@@ -35,11 +35,14 @@ import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
 import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.StringUtils;
 
+import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.E_BAD_STATE;
 import static org.apache.hadoop.fs.s3a.s3guard.S3GuardTool.SUCCESS;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
@@ -235,6 +238,26 @@ public abstract class AbstractS3GuardToolTestBase extends 
AbstractS3ATestBase {
         });
   }
 
+  @Test
+  public void testProbeForMagic() throws Throwable {
+    S3AFileSystem fs = getFileSystem();
+    String name = fs.getUri().toString();
+    S3GuardTool.BucketInfo cmd = new S3GuardTool.BucketInfo(
+        getConfiguration());
+    if (fs.hasCapability(
+        CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER)) {
+      // if the FS is magic, expect this to work
+      exec(cmd, S3GuardTool.BucketInfo.MAGIC_FLAG, name);
+    } else {
+      // if the FS isn't magic, expect the probe to fail
+      ExitUtil.ExitException e = intercept(ExitUtil.ExitException.class,
+          () -> exec(cmd, S3GuardTool.BucketInfo.MAGIC_FLAG, name));
+      if (e.getExitCode() != E_BAD_STATE) {
+        throw e;
+      }
+    }
+  }
+
   /**
    * Get the test CSV file; assume() that it is not modified (i.e. we haven't
    * switched to a new storage infrastructure where the bucket is no longer

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java
index 02eb7b8..5763b83 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java
@@ -537,7 +537,7 @@ public class TestDynamoDBMetadataStore extends 
MetadataStoreTestBase {
   }
 
   @Test
-  public void testDeleteTable() throws IOException {
+  public void testDeleteTable() throws Exception {
     final String tableName = "testDeleteTable";
     final S3AFileSystem s3afs = getFileSystem();
     final Configuration conf = s3afs.getConf();
@@ -553,7 +553,6 @@ public class TestDynamoDBMetadataStore extends 
MetadataStoreTestBase {
       // delete table once more; be ResourceNotFoundException swallowed 
silently
       ddbms.destroy();
       verifyTableNotExist(tableName);
-
       try {
         // we can no longer list the destroyed table
         ddbms.listChildren(new Path(S3URI));
@@ -582,13 +581,9 @@ public class TestDynamoDBMetadataStore extends 
MetadataStoreTestBase {
    *
    * This should not rely on the {@link DynamoDBMetadataStore} implementation.
    */
-  private static void verifyTableNotExist(String tableName) {
-    final Table table = dynamoDB.getTable(tableName);
-    try {
-      table.describe();
-      fail("Expecting ResourceNotFoundException for table '" + tableName + 
"'");
-    } catch (ResourceNotFoundException ignored) {
-    }
+  private static void verifyTableNotExist(String tableName) throws Exception{
+    intercept(ResourceNotFoundException.class,
+        () -> dynamoDB.getTable(tableName).describe());
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to