http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
new file mode 100644
index 0000000..4d7f524
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java
@@ -0,0 +1,1371 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.test.LambdaTestUtils.*;
+
+/**
+ * Test the job/task commit actions of an S3A Committer, including trying to
+ * simulate some failure and retry conditions.
+ * Derived from
+ * {@code org.apache.hadoop.mapreduce.lib.output.TestFileOutputCommitter}.
+ *
+ * This is a complex test suite as it tries to explore the full lifecycle
+ * of committers, and is designed for subclassing.
+ */
+@SuppressWarnings({"unchecked", "ThrowableNotThrown", "unused"})
+public abstract class AbstractITCommitProtocol extends AbstractCommitITest {
+  private Path outDir;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AbstractITCommitProtocol.class);
+
+  private static final String SUB_DIR = "SUB_DIR";
+
+  protected static final String PART_00000 = "part-m-00000";
+
+  /**
+   * Counter to guarantee that even in parallel test runs, no job has the same
+   * ID.
+   */
+
+  private String jobId;
+
+  // A random task attempt id for testing.
+  private String attempt0;
+  private TaskAttemptID taskAttempt0;
+
+  private String attempt1;
+  private TaskAttemptID taskAttempt1;
+
+  private static final Text KEY_1 = new Text("key1");
+  private static final Text KEY_2 = new Text("key2");
+  private static final Text VAL_1 = new Text("val1");
+  private static final Text VAL_2 = new Text("val2");
+
+  /** A job to abort in test case teardown. */
+  private List<JobData> abortInTeardown = new ArrayList<>(1);
+
+  private final StandardCommitterFactory
+      standardCommitterFactory = new StandardCommitterFactory();
+
+  private void cleanupDestDir() throws IOException {
+    rmdir(this.outDir, getConfiguration());
+  }
+
+  /**
+   * This must return the name of a suite which is unique to the non-abstract
+   * test.
+   * @return a string which must be unique and a valid path.
+   */
+  protected abstract String suitename();
+
+  /**
+   * Get the log; can be overridden for test case log.
+   * @return a log.
+   */
+  public Logger log() {
+    return LOG;
+  }
+
+  /**
+   * Overridden method returns the suitename as well as the method name,
+   * so if more than one committer test is run in parallel, paths are
+   * isolated.
+   * @return a name for a method, unique across the suites and test cases.
+   */
+  @Override
+  protected String getMethodName() {
+    return suitename() + "-" + super.getMethodName();
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    jobId = randomJobId();
+    attempt0 = "attempt_" + jobId + "_m_000000_0";
+    taskAttempt0 = TaskAttemptID.forName(attempt0);
+    attempt1 = "attempt_" + jobId + "_m_000001_0";
+    taskAttempt1 = TaskAttemptID.forName(attempt1);
+
+    outDir = path(getMethodName());
+    S3AFileSystem fileSystem = getFileSystem();
+    bindFileSystem(fileSystem, outDir, fileSystem.getConf());
+    abortMultipartUploadsUnderPath(outDir);
+    cleanupDestDir();
+  }
+
+  /**
+   * Create a random Job ID using the fork ID as part of the number.
+   * @return fork ID string in a format parseable by Jobs
+   * @throws Exception failure
+   */
+  private String randomJobId() throws Exception {
+    String testUniqueForkId = System.getProperty(TEST_UNIQUE_FORK_ID, "0001");
+    int l = testUniqueForkId.length();
+    String trailingDigits = testUniqueForkId.substring(l - 4, l);
+    try {
+      int digitValue = Integer.valueOf(trailingDigits);
+      return String.format("20070712%04d_%04d",
+          (long)(Math.random() * 1000),
+          digitValue);
+    } catch (NumberFormatException e) {
+      throw new Exception("Failed to parse " + trailingDigits, e);
+    }
+  }
+
+  @Override
+  public void teardown() throws Exception {
+    describe("teardown");
+    abortInTeardown.forEach(this::abortJobQuietly);
+    if (outDir != null) {
+      try {
+        abortMultipartUploadsUnderPath(outDir);
+        cleanupDestDir();
+      } catch (IOException e) {
+        log().info("Exception during cleanup", e);
+      }
+    }
+    S3AFileSystem fileSystem = getFileSystem();
+    if (fileSystem != null) {
+      log().info("Statistics for {}:\n{}", fileSystem.getUri(),
+          fileSystem.getInstrumentation().dump("  ", " =  ", "\n", true));
+    }
+
+    super.teardown();
+  }
+
+  /**
+   * Add the specified job to the current list of jobs to abort in teardown.
+   * @param jobData job data.
+   */
+  protected void abortInTeardown(JobData jobData) {
+    abortInTeardown.add(jobData);
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    disableFilesystemCaching(conf);
+    bindCommitter(conf);
+    return conf;
+  }
+
+  /**
+   * Bind a path to the FS in the cache.
+   * @param fs filesystem
+   * @param path s3 path
+   * @param conf configuration
+   * @throws IOException any problem
+   */
+  private void bindFileSystem(FileSystem fs, Path path, Configuration conf)
+      throws IOException {
+    FileSystemTestHelper.addFileSystemForTesting(path.toUri(), conf, fs);
+  }
+
+  /***
+   * Bind to the committer from the methods of
+   * {@link #getCommitterFactoryName()} and {@link #getCommitterName()}.
+   * @param conf configuration to set up
+   */
+  protected void bindCommitter(Configuration conf) {
+    super.bindCommitter(conf, getCommitterFactoryName(), getCommitterName());
+  }
+
+  /**
+   * Create a committer for a task.
+   * @param context task context
+   * @return new committer
+   * @throws IOException failure
+   */
+  protected AbstractS3ACommitter createCommitter(
+      TaskAttemptContext context) throws IOException {
+    return createCommitter(getOutDir(), context);
+  }
+
+  /**
+   * Create a committer for a task and a given output path.
+   * @param outputPath path
+   * @param context task context
+   * @return new committer
+   * @throws IOException failure
+   */
+  protected abstract AbstractS3ACommitter createCommitter(
+      Path outputPath,
+      TaskAttemptContext context) throws IOException;
+
+
+  protected String getCommitterFactoryName() {
+    return CommitConstants.S3A_COMMITTER_FACTORY;
+  }
+
+  protected abstract String getCommitterName();
+
+  protected Path getOutDir() {
+    return outDir;
+  }
+
+  protected String getJobId() {
+    return jobId;
+  }
+
+  protected String getAttempt0() {
+    return attempt0;
+  }
+
+  protected TaskAttemptID getTaskAttempt0() {
+    return taskAttempt0;
+  }
+
+  protected String getAttempt1() {
+    return attempt1;
+  }
+
+  protected TaskAttemptID getTaskAttempt1() {
+    return taskAttempt1;
+  }
+
+  /**
+   * Functional interface for creating committers, designed to allow
+   * different factories to be used to create different failure modes.
+   */
+  @FunctionalInterface
+  public interface CommitterFactory {
+
+    /**
+     * Create a committer for a task.
+     * @param context task context
+     * @return new committer
+     * @throws IOException failure
+     */
+    AbstractS3ACommitter createCommitter(
+        TaskAttemptContext context) throws IOException;
+  }
+
+  /**
+   * The normal committer creation factory, uses the abstract methods
+   * in the class.
+   */
+  public class StandardCommitterFactory implements CommitterFactory {
+    @Override
+    public AbstractS3ACommitter createCommitter(TaskAttemptContext context)
+        throws IOException {
+      return AbstractITCommitProtocol.this.createCommitter(context);
+    }
+  }
+
+  /**
+   * Write some text out.
+   * @param context task
+   * @throws IOException IO failure
+   * @throws InterruptedException write interrupted
+   */
+  protected void writeTextOutput(TaskAttemptContext context)
+      throws IOException, InterruptedException {
+    describe("write output");
+    try (DurationInfo d = new DurationInfo(LOG,
+        "Writing Text output for task %s", context.getTaskAttemptID())) {
+      writeOutput(new LoggingTextOutputFormat().getRecordWriter(context),
+          context);
+    }
+  }
+
+  /**
+   * Write the standard output.
+   * @param writer record writer
+   * @param context task context
+   * @throws IOException IO failure
+   * @throws InterruptedException write interrupted
+   */
+  private void writeOutput(RecordWriter writer,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    NullWritable nullWritable = NullWritable.get();
+    try(CloseWriter cw = new CloseWriter(writer, context)) {
+      writer.write(KEY_1, VAL_1);
+      writer.write(null, nullWritable);
+      writer.write(null, VAL_1);
+      writer.write(nullWritable, VAL_2);
+      writer.write(KEY_2, nullWritable);
+      writer.write(KEY_1, null);
+      writer.write(null, null);
+      writer.write(KEY_2, VAL_2);
+      writer.close(context);
+    }
+  }
+
+  /**
+   * Write the output of a map.
+   * @param writer record writer
+   * @param context task context
+   * @throws IOException IO failure
+   * @throws InterruptedException write interrupted
+   */
+  private void writeMapFileOutput(RecordWriter writer,
+      TaskAttemptContext context) throws IOException, InterruptedException {
+    describe("\nWrite map output");
+    try (DurationInfo d = new DurationInfo(LOG,
+        "Writing Text output for task %s", context.getTaskAttemptID());
+         CloseWriter cw = new CloseWriter(writer, context)) {
+      for (int i = 0; i < 10; ++i) {
+        Text val = ((i & 1) == 1) ? VAL_1 : VAL_2;
+        writer.write(new LongWritable(i), val);
+      }
+      writer.close(context);
+    }
+  }
+
+  /**
+   * Details on a job for use in {@code startJob} and elsewhere.
+   */
+  public static class JobData {
+    private final Job job;
+    private final JobContext jContext;
+    private final TaskAttemptContext tContext;
+    private final AbstractS3ACommitter committer;
+    private final Configuration conf;
+
+    public JobData(Job job,
+        JobContext jContext,
+        TaskAttemptContext tContext,
+        AbstractS3ACommitter committer) {
+      this.job = job;
+      this.jContext = jContext;
+      this.tContext = tContext;
+      this.committer = committer;
+      conf = job.getConfiguration();
+    }
+  }
+
+  /**
+   * Create a new job. Sets the task attempt ID,
+   * and output dir; asks for a success marker.
+   * @return the new job
+   * @throws IOException failure
+   */
+  public Job newJob() throws IOException {
+    return newJob(outDir, getConfiguration(), attempt0);
+  }
+
+  /**
+   * Create a new job. Sets the task attempt ID,
+   * and output dir; asks for a success marker.
+   * @param dir dest dir
+   * @param configuration config to get the job from
+   * @param taskAttemptId task attempt
+   * @return the new job
+   * @throws IOException failure
+   */
+  private Job newJob(Path dir, Configuration configuration,
+      String taskAttemptId) throws IOException {
+    Job job = Job.getInstance(configuration);
+    Configuration conf = job.getConfiguration();
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttemptId);
+    conf.setBoolean(CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true);
+    FileOutputFormat.setOutputPath(job, dir);
+    return job;
+  }
+
+  /**
+   * Start a job with a committer; optionally write the test data.
+   * Always register the job to be aborted (quietly) in teardown.
+   * This is, from an "OO-purity perspective" the wrong kind of method to
+   * do: it's setting things up, mixing functionality, registering for 
teardown.
+   * Its aim is simple though: a common body of code for starting work
+   * in test cases.
+   * @param writeText should the text be written?
+   * @return the job data 4-tuple
+   * @throws IOException IO problems
+   * @throws InterruptedException interruption during write
+   */
+  protected JobData startJob(boolean writeText)
+      throws IOException, InterruptedException {
+    return startJob(standardCommitterFactory, writeText);
+  }
+
+  /**
+   * Start a job with a committer; optionally write the test data.
+   * Always register the job to be aborted (quietly) in teardown.
+   * This is, from an "OO-purity perspective" the wrong kind of method to
+   * do: it's setting things up, mixing functionality, registering for 
teardown.
+   * Its aim is simple though: a common body of code for starting work
+   * in test cases.
+   * @param factory the committer factory to use
+   * @param writeText should the text be written?
+   * @return the job data 4-tuple
+   * @throws IOException IO problems
+   * @throws InterruptedException interruption during write
+   */
+  protected JobData startJob(CommitterFactory factory, boolean writeText)
+      throws IOException, InterruptedException {
+    Job job = newJob();
+    Configuration conf = job.getConfiguration();
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
+    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
+    JobContext jContext = new JobContextImpl(conf, taskAttempt0.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
+        taskAttempt0);
+    AbstractS3ACommitter committer = factory.createCommitter(tContext);
+
+    // setup
+    JobData jobData = new JobData(job, jContext, tContext, committer);
+    setup(jobData);
+    abortInTeardown(jobData);
+
+    if (writeText) {
+      // write output
+      writeTextOutput(tContext);
+    }
+    return jobData;
+  }
+
+  /**
+   * Set up the job and task.
+   * @param jobData job data
+   * @throws IOException problems
+   */
+  protected void setup(JobData jobData) throws IOException {
+    AbstractS3ACommitter committer = jobData.committer;
+    JobContext jContext = jobData.jContext;
+    TaskAttemptContext tContext = jobData.tContext;
+    describe("\nsetup job");
+    try (DurationInfo d = new DurationInfo(LOG,
+        "setup job %s", jContext.getJobID())) {
+      committer.setupJob(jContext);
+    }
+    try (DurationInfo d = new DurationInfo(LOG,
+        "setup task %s", tContext.getTaskAttemptID())) {
+      committer.setupTask(tContext);
+    }
+    describe("setup complete\n");
+  }
+
+  /**
+   * Abort a job quietly.
+   * @param jobData job info
+   */
+  protected void abortJobQuietly(JobData jobData) {
+    abortJobQuietly(jobData.committer, jobData.jContext, jobData.tContext);
+  }
+
+  /**
+   * Abort a job quietly: first task, then job.
+   * @param committer committer
+   * @param jContext job context
+   * @param tContext task context
+   */
+  protected void abortJobQuietly(AbstractS3ACommitter committer,
+      JobContext jContext,
+      TaskAttemptContext tContext) {
+    describe("\naborting task");
+    try {
+      committer.abortTask(tContext);
+    } catch (IOException e) {
+      log().warn("Exception aborting task:", e);
+    }
+    describe("\naborting job");
+    try {
+      committer.abortJob(jContext, JobStatus.State.KILLED);
+    } catch (IOException e) {
+      log().warn("Exception aborting job", e);
+    }
+  }
+
+  /**
+   * Commit up the task and then the job.
+   * @param committer committer
+   * @param jContext job context
+   * @param tContext task context
+   * @throws IOException problems
+   */
+  protected void commit(AbstractS3ACommitter committer,
+      JobContext jContext,
+      TaskAttemptContext tContext) throws IOException {
+    try (DurationInfo d = new DurationInfo(LOG,
+        "committing work", jContext.getJobID())) {
+      describe("\ncommitting task");
+      committer.commitTask(tContext);
+      describe("\ncommitting job");
+      committer.commitJob(jContext);
+      describe("commit complete\n");
+    }
+  }
+
+  /**
+   * Execute work as part of a test, after creating the job.
+   * After the execution, {@link #abortJobQuietly(JobData)} is
+   * called for abort/cleanup.
+   * @param name name of work (for logging)
+   * @param action action to execute
+   * @throws Exception failure
+   */
+  protected void executeWork(String name, ActionToTest action)
+      throws Exception {
+    executeWork(name, startJob(false), action);
+  }
+
+  /**
+   * Execute work as part of a test, against the created job.
+   * After the execution, {@link #abortJobQuietly(JobData)} is
+   * called for abort/cleanup.
+   * @param name name of work (for logging)
+   * @param jobData job info
+   * @param action action to execute
+   * @throws Exception failure
+   */
+  public void executeWork(String name,
+      JobData jobData,
+      ActionToTest action) throws Exception {
+    try (DurationInfo d = new DurationInfo(LOG, "Executing %s", name)) {
+      action.exec(jobData.job,
+          jobData.jContext,
+          jobData.tContext,
+          jobData.committer);
+    } finally {
+      abortJobQuietly(jobData);
+    }
+  }
+
+  /**
+   * Verify that recovery doesn't work for these committers.
+   */
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testRecoveryAndCleanup() throws Exception {
+    describe("Test (unsupported) task recovery.");
+    JobData jobData = startJob(true);
+    TaskAttemptContext tContext = jobData.tContext;
+    AbstractS3ACommitter committer = jobData.committer;
+
+    assertNotNull("null workPath in committer " + committer,
+        committer.getWorkPath());
+    assertNotNull("null outputPath in committer " + committer,
+        committer.getOutputPath());
+
+    // Commit the task. This will promote data and metadata to where
+    // job commits will pick it up on commit or abort.
+    committer.commitTask(tContext);
+    assertTaskAttemptPathDoesNotExist(committer, tContext);
+
+    Configuration conf2 = jobData.job.getConfiguration();
+    conf2.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
+    conf2.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 2);
+    JobContext jContext2 = new JobContextImpl(conf2, taskAttempt0.getJobID());
+    TaskAttemptContext tContext2 = new TaskAttemptContextImpl(conf2,
+        taskAttempt0);
+    AbstractS3ACommitter committer2 = createCommitter(tContext2);
+    committer2.setupJob(tContext2);
+
+    assertFalse("recoverySupported in " + committer2,
+        committer2.isRecoverySupported());
+    intercept(PathCommitException.class, "recover",
+        () -> committer2.recoverTask(tContext2));
+
+    // at this point, task attempt 0 has failed to recover
+    // it should be abortable though. This will be a no-op as it already
+    // committed
+    describe("aborting task attempt 2; expect nothing to clean up");
+    committer2.abortTask(tContext2);
+    describe("Aborting job 2; expect pending commits to be aborted");
+    committer2.abortJob(jContext2, JobStatus.State.KILLED);
+    // now, state of system may still have pending data
+    assertNoMultipartUploadsPending(outDir);
+  }
+
+  protected void assertTaskAttemptPathDoesNotExist(
+      AbstractS3ACommitter committer, TaskAttemptContext context)
+      throws IOException {
+    Path attemptPath = committer.getTaskAttemptPath(context);
+    ContractTestUtils.assertPathDoesNotExist(
+        attemptPath.getFileSystem(context.getConfiguration()),
+        "task attempt dir",
+        attemptPath);
+  }
+
+  protected void assertJobAttemptPathDoesNotExist(
+      AbstractS3ACommitter committer, JobContext context)
+      throws IOException {
+    Path attemptPath = committer.getJobAttemptPath(context);
+    ContractTestUtils.assertPathDoesNotExist(
+        attemptPath.getFileSystem(context.getConfiguration()),
+        "job attempt dir",
+        attemptPath);
+  }
+
+  /**
+   * Verify the output of the directory.
+   * That includes the {@code part-m-00000-*}
+   * file existence and contents, as well as optionally, the success marker.
+   * @param dir directory to scan.
+   * @param expectSuccessMarker check the success marker?
+   * @throws Exception failure.
+   */
+  private void validateContent(Path dir, boolean expectSuccessMarker)
+      throws Exception {
+    if (expectSuccessMarker) {
+      verifySuccessMarker(dir);
+    }
+    Path expectedFile = getPart0000(dir);
+    log().debug("Validating content in {}", expectedFile);
+    StringBuffer expectedOutput = new StringBuffer();
+    expectedOutput.append(KEY_1).append('\t').append(VAL_1).append("\n");
+    expectedOutput.append(VAL_1).append("\n");
+    expectedOutput.append(VAL_2).append("\n");
+    expectedOutput.append(KEY_2).append("\n");
+    expectedOutput.append(KEY_1).append("\n");
+    expectedOutput.append(KEY_2).append('\t').append(VAL_2).append("\n");
+    String output = readFile(expectedFile);
+    assertEquals("Content of " + expectedFile,
+        expectedOutput.toString(), output);
+  }
+
+  /**
+   * Identify any path under the directory which begins with the
+   * {@code "part-m-00000"} sequence. There's some compensation for
+   * eventual consistency here.
+   * @param dir directory to scan
+   * @return the full path
+   * @throws FileNotFoundException the path is missing.
+   * @throws Exception failure.
+   */
+  protected Path getPart0000(final Path dir) throws Exception {
+    final FileSystem fs = dir.getFileSystem(getConfiguration());
+    return eventually(CONSISTENCY_WAIT, CONSISTENCY_PROBE_INTERVAL,
+        () -> getPart0000Immediately(fs, dir));
+  }
+
+  /**
+   * Identify any path under the directory which begins with the
+   * {@code "part-m-00000"} sequence. There's some compensation for
+   * eventual consistency here.
+   * @param fs FS to probe
+   * @param dir directory to scan
+   * @return the full path
+   * @throws FileNotFoundException the path is missing.
+   * @throws IOException failure.
+   */
+  private Path getPart0000Immediately(FileSystem fs, Path dir)
+      throws IOException {
+    FileStatus[] statuses = fs.listStatus(dir,
+        path -> path.getName().startsWith(PART_00000));
+    if (statuses.length != 1) {
+      // fail, with a listing of the parent dir
+      ContractTestUtils.assertPathExists(fs, "Output file",
+          new Path(dir, PART_00000));
+    }
+    return statuses[0].getPath();
+  }
+
+  /**
+   * Look for the partFile subdir of the output dir.
+   * @param fs filesystem
+   * @param dir output dir
+   * @throws Exception failure.
+   */
+  private void validateMapFileOutputContent(
+      FileSystem fs, Path dir) throws Exception {
+    // map output is a directory with index and data files
+    assertPathExists("Map output", dir);
+    Path expectedMapDir = getPart0000(dir);
+    assertPathExists("Map output", expectedMapDir);
+    assertIsDirectory(expectedMapDir);
+    FileStatus[] files = fs.listStatus(expectedMapDir);
+    assertTrue("No files found in " + expectedMapDir, files.length > 0);
+    assertPathExists("index file in " + expectedMapDir,
+        new Path(expectedMapDir, MapFile.INDEX_FILE_NAME));
+    assertPathExists("data file in " + expectedMapDir,
+        new Path(expectedMapDir, MapFile.DATA_FILE_NAME));
+  }
+
+  /**
+   * Dump all MPUs in the filesystem.
+   * @throws IOException IO failure
+   */
+  protected void dumpMultipartUploads() throws IOException {
+    countMultipartUploads("");
+  }
+
+  /**
+   * Full test of the expected lifecycle: start job, task, write, commit task,
+   * commit job.
+   * @throws Exception on a failure
+   */
+  @Test
+  public void testCommitLifecycle() throws Exception {
+    describe("Full test of the expected lifecycle:\n" +
+        " start job, task, write, commit task, commit job.\n" +
+        "Verify:\n" +
+        "* no files are visible after task commit\n" +
+        "* the expected file is visible after job commit\n" +
+        "* no outstanding MPUs after job commit");
+    JobData jobData = startJob(false);
+    JobContext jContext = jobData.jContext;
+    TaskAttemptContext tContext = jobData.tContext;
+    AbstractS3ACommitter committer = jobData.committer;
+
+    // write output
+    describe("1. Writing output");
+    writeTextOutput(tContext);
+
+    dumpMultipartUploads();
+    describe("2. Committing task");
+    assertTrue("No files to commit were found by " + committer,
+        committer.needsTaskCommit(tContext));
+    committer.commitTask(tContext);
+
+    // this is only task commit; there MUST be no part- files in the dest dir
+    waitForConsistency();
+    try {
+      applyLocatedFiles(getFileSystem().listFiles(outDir, false),
+          (status) ->
+              assertFalse("task committed file to dest :" + status,
+                  status.getPath().toString().contains("part")));
+    } catch (FileNotFoundException ignored) {
+      log().info("Outdir {} is not created by task commit phase ",
+          outDir);
+    }
+
+    describe("3. Committing job");
+    assertMultipartUploadsPending(outDir);
+    committer.commitJob(jContext);
+
+    // validate output
+    describe("4. Validating content");
+    validateContent(outDir, shouldExpectSuccessMarker());
+    assertNoMultipartUploadsPending(outDir);
+  }
+
+  @Test
+  public void testCommitterWithDuplicatedCommit() throws Exception {
+    describe("Call a task then job commit twice;" +
+        "expect the second task commit to fail.");
+    JobData jobData = startJob(true);
+    JobContext jContext = jobData.jContext;
+    TaskAttemptContext tContext = jobData.tContext;
+    AbstractS3ACommitter committer = jobData.committer;
+
+    // do commit
+    commit(committer, jContext, tContext);
+
+    // validate output
+    validateContent(outDir, shouldExpectSuccessMarker());
+
+    assertNoMultipartUploadsPending(outDir);
+
+    // commit task to fail on retry
+    expectFNFEonTaskCommit(committer, tContext);
+  }
+
+  protected boolean shouldExpectSuccessMarker() {
+    return true;
+  }
+
+  /**
+   * Simulate a failure on the first job commit; expect the
+   * second to succeed.
+   */
+  @Test
+  public void testCommitterWithFailure() throws Exception {
+    describe("Fail the first job commit then retry");
+    JobData jobData = startJob(new FailingCommitterFactory(), true);
+    JobContext jContext = jobData.jContext;
+    TaskAttemptContext tContext = jobData.tContext;
+    AbstractS3ACommitter committer = jobData.committer;
+
+    // do commit
+    committer.commitTask(tContext);
+
+    // now fail job
+    expectSimulatedFailureOnJobCommit(jContext, committer);
+
+    committer.commitJob(jContext);
+
+    // but the data got there, due to the order of operations.
+    validateContent(outDir, shouldExpectSuccessMarker());
+    expectJobCommitToFail(jContext, committer);
+  }
+
+  /**
+   * Override point: the failure expected on the attempt to commit a failed
+   * job.
+   * @param jContext job context
+   * @param committer committer
+   * @throws Exception any unexpected failure.
+   */
+  protected void expectJobCommitToFail(JobContext jContext,
+      AbstractS3ACommitter committer) throws Exception {
+    // next attempt will fail as there is no longer a directory to commit
+    expectJobCommitFailure(jContext, committer,
+        FileNotFoundException.class);
+  }
+
+  /**
+   * Expect a job commit operation to fail with a specific exception.
+   * @param jContext job context
+   * @param committer committer
+   * @param clazz class of exception
+   * @return the caught exception
+   * @throws Exception any unexpected failure.
+   */
+  protected static <E extends IOException> E expectJobCommitFailure(
+      JobContext jContext,
+      AbstractS3ACommitter committer,
+      Class<E> clazz)
+      throws Exception {
+
+    return intercept(clazz,
+        () -> {
+          committer.commitJob(jContext);
+          return committer.toString();
+        });
+  }
+
+  protected static void expectFNFEonTaskCommit(
+      AbstractS3ACommitter committer,
+      TaskAttemptContext tContext) throws Exception {
+    intercept(FileNotFoundException.class,
+        () -> {
+          committer.commitTask(tContext);
+          return committer.toString();
+        });
+  }
+
+  /**
+   * Simulate a failure on the first job commit; expect the
+   * second to succeed.
+   */
+  @Test
+  public void testCommitterWithNoOutputs() throws Exception {
+    describe("Have a task and job with no outputs: expect success");
+    JobData jobData = startJob(new FailingCommitterFactory(), false);
+    TaskAttemptContext tContext = jobData.tContext;
+    AbstractS3ACommitter committer = jobData.committer;
+
+    // do commit
+    committer.commitTask(tContext);
+    assertTaskAttemptPathDoesNotExist(committer, tContext);
+  }
+
+  protected static void expectSimulatedFailureOnJobCommit(JobContext jContext,
+      AbstractS3ACommitter committer) throws Exception {
+    ((CommitterFaultInjection) committer).setFaults(
+        CommitterFaultInjection.Faults.commitJob);
+    expectJobCommitFailure(jContext, committer,
+        CommitterFaultInjectionImpl.Failure.class);
+  }
+
+  @Test
+  public void testMapFileOutputCommitter() throws Exception {
+    describe("Test that the committer generates map output into a directory\n" 
+
+        "starting with the prefix part-");
+    JobData jobData = startJob(false);
+    JobContext jContext = jobData.jContext;
+    TaskAttemptContext tContext = jobData.tContext;
+    AbstractS3ACommitter committer = jobData.committer;
+    Configuration conf = jobData.conf;
+
+    // write output
+    writeMapFileOutput(new MapFileOutputFormat().getRecordWriter(tContext),
+        tContext);
+
+    // do commit
+    commit(committer, jContext, tContext);
+    S3AFileSystem fs = getFileSystem();
+    waitForConsistency();
+    lsR(fs, outDir, true);
+    String ls = ls(outDir);
+    describe("\nvalidating");
+
+    // validate output
+    verifySuccessMarker(outDir);
+
+    describe("validate output of %s", outDir);
+    validateMapFileOutputContent(fs, outDir);
+
+    // Ensure getReaders call works and also ignores
+    // hidden filenames (_ or . prefixes)
+    describe("listing");
+    FileStatus[] filtered = fs.listStatus(outDir, HIDDEN_FILE_FILTER);
+    assertEquals("listed children under " + ls,
+        1, filtered.length);
+    FileStatus fileStatus = filtered[0];
+    assertTrue("Not the part file: " + fileStatus,
+        fileStatus.getPath().getName().startsWith(PART_00000));
+
+    describe("getReaders()");
+    assertEquals("Number of MapFile.Reader entries with shared FS "
+            + outDir + " : " + ls,
+        1, getReaders(fs, outDir, conf).length);
+
+    describe("getReaders(new FS)");
+    FileSystem fs2 = FileSystem.get(outDir.toUri(), conf);
+    assertEquals("Number of MapFile.Reader entries with shared FS2 "
+            + outDir + " : " + ls,
+        1, getReaders(fs2, outDir, conf).length);
+
+    describe("MapFileOutputFormat.getReaders");
+    assertEquals("Number of MapFile.Reader entries with new FS in "
+            + outDir + " : " + ls,
+        1, MapFileOutputFormat.getReaders(outDir, conf).length);
+  }
+
+  /** Open the output generated by this format. */
+  @SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
+  private static MapFile.Reader[] getReaders(FileSystem fs,
+      Path dir,
+      Configuration conf) throws IOException {
+    Path[] names = FileUtil.stat2Paths(fs.listStatus(dir, HIDDEN_FILE_FILTER));
+
+    // sort names, so that hash partitioning works
+    Arrays.sort(names);
+
+    MapFile.Reader[] parts = new MapFile.Reader[names.length];
+    for (int i = 0; i < names.length; i++) {
+      parts[i] = new MapFile.Reader(names[i], conf);
+    }
+    return parts;
+  }
+
+  /**
+   * A functional interface which an action to test must implement.
+   */
+  @FunctionalInterface
+  public interface ActionToTest {
+    void exec(Job job, JobContext jContext, TaskAttemptContext tContext,
+        AbstractS3ACommitter committer) throws Exception;
+  }
+
+  @Test
+  public void testAbortTaskNoWorkDone() throws Exception {
+    executeWork("abort task no work",
+        (job, jContext, tContext, committer) ->
+            committer.abortTask(tContext));
+  }
+
+  @Test
+  public void testAbortJobNoWorkDone() throws Exception {
+    executeWork("abort task no work",
+        (job, jContext, tContext, committer) ->
+            committer.abortJob(jContext, JobStatus.State.RUNNING));
+  }
+
+  @Test
+  public void testCommitJobButNotTask() throws Exception {
+    executeWork("commit a job while a task's work is pending, " +
+            "expect task writes to be cancelled.",
+        (job, jContext, tContext, committer) -> {
+          // step 1: write the text
+          writeTextOutput(tContext);
+          // step 2: commit the job
+          createCommitter(tContext).commitJob(tContext);
+          // verify that no output can be observed
+          assertPart0000DoesNotExist(outDir);
+          // that includes, no pending MPUs; commitJob is expected to
+          // cancel any.
+          assertNoMultipartUploadsPending(outDir);
+        }
+    );
+  }
+
+  @Test
+  public void testAbortTaskThenJob() throws Exception {
+    JobData jobData = startJob(true);
+    AbstractS3ACommitter committer = jobData.committer;
+
+    // do abort
+    committer.abortTask(jobData.tContext);
+
+    intercept(FileNotFoundException.class, "",
+        () -> getPart0000(committer.getWorkPath()));
+
+    committer.abortJob(jobData.jContext, JobStatus.State.FAILED);
+    assertJobAbortCleanedUp(jobData);
+  }
+
+  /**
+   * Extension point: assert that the job was all cleaned up after an abort.
+   * Base assertions
+   * <ul>
+   *   <li>Output dir is absent or, if present, empty</li>
+   *   <li>No pending MPUs to/under the output dir</li>
+   * </ul>
+   * @param jobData job data
+   * @throws Exception failure
+   */
+  public void assertJobAbortCleanedUp(JobData jobData) throws Exception {
+    // special handling of magic directory; harmless in staging
+    S3AFileSystem fs = getFileSystem();
+    try {
+      FileStatus[] children = listChildren(fs, outDir);
+      if (children.length != 0) {
+        lsR(fs, outDir, true);
+      }
+      assertArrayEquals("Output directory not empty " + ls(outDir),
+          new FileStatus[0], children);
+    } catch (FileNotFoundException e) {
+      // this is a valid failure mode; it means the dest dir doesn't exist yet.
+    }
+    assertNoMultipartUploadsPending(outDir);
+  }
+
+  @Test
+  public void testFailAbort() throws Exception {
+    describe("Abort the task, then job (failed), abort the job again");
+    JobData jobData = startJob(true);
+    JobContext jContext = jobData.jContext;
+    TaskAttemptContext tContext = jobData.tContext;
+    AbstractS3ACommitter committer = jobData.committer;
+
+    // do abort
+    committer.abortTask(tContext);
+
+    committer.getJobAttemptPath(jContext);
+    committer.getTaskAttemptPath(tContext);
+    assertPart0000DoesNotExist(outDir);
+    assertSuccessMarkerDoesNotExist(outDir);
+    describe("Aborting job into %s", outDir);
+
+    committer.abortJob(jContext, JobStatus.State.FAILED);
+
+    assertTaskAttemptPathDoesNotExist(committer, tContext);
+    assertJobAttemptPathDoesNotExist(committer, jContext);
+
+    // try again; expect abort to be idempotent.
+    committer.abortJob(jContext, JobStatus.State.FAILED);
+    assertNoMultipartUploadsPending(outDir);
+  }
+
+  public void assertPart0000DoesNotExist(Path dir) throws Exception {
+    intercept(FileNotFoundException.class,
+        () -> getPart0000(dir));
+    assertPathDoesNotExist("expected output file", new Path(dir, PART_00000));
+  }
+
+  @Test
+  public void testAbortJobNotTask() throws Exception {
+    executeWork("abort task no work",
+        (job, jContext, tContext, committer) -> {
+          // write output
+          writeTextOutput(tContext);
+          committer.abortJob(jContext, JobStatus.State.RUNNING);
+          assertTaskAttemptPathDoesNotExist(
+              committer, tContext);
+          assertJobAttemptPathDoesNotExist(
+              committer, jContext);
+          assertNoMultipartUploadsPending(outDir);
+        });
+  }
+
+  /**
+   * This looks at what happens with concurrent commits.
+   * However, the failure condition it looks for (subdir under subdir)
+   * is the kind of failure you see on a rename-based commit.
+   *
+   * What it will not detect is the fact that both tasks will each commit
+   * to the destination directory. That is: whichever commits last wins.
+   *
+   * There's no way to stop this. Instead it is a requirement that the task
+   * commit operation is only executed when the committer is happy to
+   * commit only those tasks which it knows have succeeded, and abort those
+   * which have not.
+   * @throws Exception failure
+   */
+  @Test
+  public void testConcurrentCommitTaskWithSubDir() throws Exception {
+    Job job = newJob();
+    FileOutputFormat.setOutputPath(job, outDir);
+    final Configuration conf = job.getConfiguration();
+
+    final JobContext jContext =
+        new JobContextImpl(conf, taskAttempt0.getJobID());
+    AbstractS3ACommitter amCommitter = createCommitter(
+        new TaskAttemptContextImpl(conf, taskAttempt0));
+    amCommitter.setupJob(jContext);
+
+    final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2];
+    taCtx[0] = new TaskAttemptContextImpl(conf, taskAttempt0);
+    taCtx[1] = new TaskAttemptContextImpl(conf, taskAttempt1);
+
+    final TextOutputFormat[] tof = new LoggingTextOutputFormat[2];
+    for (int i = 0; i < tof.length; i++) {
+      tof[i] = new LoggingTextOutputFormat() {
+        @Override
+        public Path getDefaultWorkFile(
+            TaskAttemptContext context,
+            String extension) throws IOException {
+          final AbstractS3ACommitter foc = (AbstractS3ACommitter)
+              getOutputCommitter(context);
+          return new Path(new Path(foc.getWorkPath(), SUB_DIR),
+              getUniqueFile(context, getOutputName(context), extension));
+        }
+      };
+    }
+
+    final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2);
+    try {
+      for (int i = 0; i < taCtx.length; i++) {
+        final int taskIdx = i;
+        executor.submit(() -> {
+          final OutputCommitter outputCommitter =
+              tof[taskIdx].getOutputCommitter(taCtx[taskIdx]);
+          outputCommitter.setupTask(taCtx[taskIdx]);
+          final RecordWriter rw =
+              tof[taskIdx].getRecordWriter(taCtx[taskIdx]);
+          writeOutput(rw, taCtx[taskIdx]);
+          describe("Committing Task %d", taskIdx);
+          outputCommitter.commitTask(taCtx[taskIdx]);
+          return null;
+        });
+      }
+    } finally {
+      executor.shutdown();
+      while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
+        log().info("Awaiting thread termination!");
+      }
+    }
+
+    // if we commit here then all tasks will be committed, so there will
+    // be contention for that final directory: both parts will go in.
+
+    describe("\nCommitting Job");
+    amCommitter.commitJob(jContext);
+    assertPathExists("base output directory", outDir);
+    assertPart0000DoesNotExist(outDir);
+    Path outSubDir = new Path(outDir, SUB_DIR);
+    assertPathDoesNotExist("Must not end up with sub_dir/sub_dir",
+        new Path(outSubDir, SUB_DIR));
+
+    // validate output
+    // There's no success marker in the subdirectory
+    validateContent(outSubDir, false);
+  }
+
+  /**
+   * Create a committer which fails; the class
+   * {@link CommitterFaultInjectionImpl} implements the logic.
+   * @param tContext task context
+   * @return committer instance
+   * @throws IOException failure to instantiate
+   */
+  protected abstract AbstractS3ACommitter createFailingCommitter(
+      TaskAttemptContext tContext) throws IOException;
+
+  /**
+   * Factory for failing committers.
+   */
+  public class FailingCommitterFactory implements CommitterFactory {
+    @Override
+    public AbstractS3ACommitter createCommitter(TaskAttemptContext context)
+        throws IOException {
+      return createFailingCommitter(context);
+    }
+  }
+
+  @Test
+  public void testOutputFormatIntegration() throws Throwable {
+    Configuration conf = getConfiguration();
+    Job job = newJob();
+    job.setOutputFormatClass(LoggingTextOutputFormat.class);
+    conf = job.getConfiguration();
+    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
+    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
+    JobContext jContext = new JobContextImpl(conf, taskAttempt0.getJobID());
+    TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
+        taskAttempt0);
+    LoggingTextOutputFormat outputFormat = (LoggingTextOutputFormat)
+        ReflectionUtils.newInstance(tContext.getOutputFormatClass(), conf);
+    AbstractS3ACommitter committer = (AbstractS3ACommitter)
+        outputFormat.getOutputCommitter(tContext);
+
+    // setup
+    JobData jobData = new JobData(job, jContext, tContext, committer);
+    setup(jobData);
+    abortInTeardown(jobData);
+    LoggingTextOutputFormat.LoggingLineRecordWriter recordWriter
+        = outputFormat.getRecordWriter(tContext);
+    IntWritable iw = new IntWritable(1);
+    recordWriter.write(iw, iw);
+    Path dest = recordWriter.getDest();
+    validateTaskAttemptPathDuringWrite(dest);
+    recordWriter.close(tContext);
+    // at this point
+    validateTaskAttemptPathAfterWrite(dest);
+    assertTrue("Committer does not have data to commit " + committer,
+        committer.needsTaskCommit(tContext));
+    committer.commitTask(tContext);
+    committer.commitJob(jContext);
+    // validate output
+    verifySuccessMarker(outDir);
+  }
+
+  /**
+   * Create a committer through reflection then use it to abort
+   * a task. This mimics the action of an AM when a container fails and
+   * the AM wants to abort the task attempt.
+   */
+  @Test
+  public void testAMWorkflow() throws Throwable {
+    describe("Create a committer with a null output path & use as an AM");
+    JobData jobData = startJob(true);
+    JobContext jContext = jobData.jContext;
+    TaskAttemptContext tContext = jobData.tContext;
+
+    TaskAttemptContext newAttempt = taskAttemptForJob(
+        MRBuilderUtils.newJobId(1, 1, 1), jContext);
+    Configuration conf = jContext.getConfiguration();
+
+    // bind
+    LoggingTextOutputFormat.bind(conf);
+
+    OutputFormat<?, ?> outputFormat
+        = ReflectionUtils.newInstance(newAttempt
+        .getOutputFormatClass(), conf);
+    Path outputPath = FileOutputFormat.getOutputPath(newAttempt);
+    assertNotNull("null output path in new task attempt", outputPath);
+
+    AbstractS3ACommitter committer2 = (AbstractS3ACommitter)
+        outputFormat.getOutputCommitter(newAttempt);
+    committer2.abortTask(tContext);
+    assertNoMultipartUploadsPending(getOutDir());
+  }
+
+
+  @Test
+  public void testParallelJobsToAdjacentPaths() throws Throwable {
+
+    describe("Run two jobs in parallel, assert they both complete");
+    JobData jobData = startJob(true);
+    Job job1 = jobData.job;
+    AbstractS3ACommitter committer1 = jobData.committer;
+    JobContext jContext1 = jobData.jContext;
+    TaskAttemptContext tContext1 = jobData.tContext;
+
+    // now build up a second job
+    String jobId2 = randomJobId();
+    String attempt20 = "attempt_" + jobId2 + "_m_000000_0";
+    TaskAttemptID taskAttempt20 = TaskAttemptID.forName(attempt20);
+    String attempt21 = "attempt_" + jobId2 + "_m_000001_0";
+    TaskAttemptID taskAttempt21 = TaskAttemptID.forName(attempt21);
+
+    Path job1Dest = outDir;
+    Path job2Dest = new Path(getOutDir().getParent(),
+        getMethodName() + "job2Dest");
+    // little safety check
+    assertNotEquals(job1Dest, job2Dest);
+
+    // create the second job
+    Job job2 = newJob(job2Dest, new JobConf(getConfiguration()), attempt20);
+    Configuration conf2 = job2.getConfiguration();
+    conf2.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
+    try {
+      JobContext jContext2 = new JobContextImpl(conf2,
+          taskAttempt20.getJobID());
+      TaskAttemptContext tContext2 =
+          new TaskAttemptContextImpl(conf2, taskAttempt20);
+      AbstractS3ACommitter committer2 = createCommitter(job2Dest, tContext2);
+      JobData jobData2 = new JobData(job2, jContext2, tContext2, committer2);
+      setup(jobData2);
+      abortInTeardown(jobData2);
+      // make sure the directories are different
+      assertEquals(job2Dest, committer2.getOutputPath());
+
+      // job2 setup, write some data there
+      writeTextOutput(tContext2);
+
+      // at this point, job1 and job2 both have uncommitted tasks
+
+      // commit tasks in order task 2, task 1.
+      committer2.commitTask(tContext2);
+      committer1.commitTask(tContext1);
+
+      assertMultipartUploadsPending(job1Dest);
+      assertMultipartUploadsPending(job2Dest);
+
+      // commit jobs in order job 1, job 2
+      committer1.commitJob(jContext1);
+      assertNoMultipartUploadsPending(job1Dest);
+      getPart0000(job1Dest);
+      assertMultipartUploadsPending(job2Dest);
+
+      committer2.commitJob(jContext2);
+      getPart0000(job2Dest);
+      assertNoMultipartUploadsPending(job2Dest);
+    } finally {
+      // uncommitted files to this path need to be deleted in tests which fail
+      abortMultipartUploadsUnderPath(job2Dest);
+    }
+
+  }
+
+  protected void validateTaskAttemptPathDuringWrite(Path p) throws IOException 
{
+
+  }
+
+  protected void validateTaskAttemptPathAfterWrite(Path p) throws IOException {
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/CommitterFaultInjection.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/CommitterFaultInjection.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/CommitterFaultInjection.java
new file mode 100644
index 0000000..7dd1b0a
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/CommitterFaultInjection.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+/**
+ * Support for adding fault injection: all the failing committers in the IT
+ * tests must implement this.
+ */
+public interface CommitterFaultInjection {
+  String COMMIT_FAILURE_MESSAGE = "oops";
+
+  void setFaults(Faults... faults);
+
+  /**
+   * Operations which can fail.
+   */
+  enum Faults {
+    abortJob,
+    abortTask,
+    cleanupJob,
+    commitJob,
+    commitTask,
+    getWorkPath,
+    needsTaskCommit,
+    setupJob,
+    setupTask,
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/CommitterFaultInjectionImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/CommitterFaultInjectionImpl.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/CommitterFaultInjectionImpl.java
new file mode 100644
index 0000000..d423398
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/CommitterFaultInjectionImpl.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
+
+/**
+ * Implementation of the fault injection lifecycle.
+ * Can reset a fault on failure or always raise it.
+ */
+public final class CommitterFaultInjectionImpl
+    extends PathOutputCommitter implements CommitterFaultInjection {
+
+  private Set<Faults> faults;
+  private boolean resetOnFailure;
+
+  public CommitterFaultInjectionImpl(Path outputPath,
+      JobContext context,
+      boolean resetOnFailure,
+      Faults... faults) throws IOException {
+    super(outputPath, context);
+    setFaults(faults);
+    this.resetOnFailure = resetOnFailure;
+  }
+
+  @Override
+  public void setFaults(Faults... faults) {
+    this.faults = new HashSet<>(faults.length);
+    Collections.addAll(this.faults, faults);
+  }
+
+  /**
+   * Fail if the condition is in the set of faults, may optionally reset
+   * it before failing.
+   * @param condition condition to check for
+   * @throws Failure if the condition is faulting
+   */
+  private void maybeFail(Faults condition) throws Failure {
+    if (faults.contains(condition)) {
+      if (resetOnFailure) {
+        faults.remove(condition);
+      }
+      throw new Failure();
+    }
+  }
+
+  @Override
+  public Path getWorkPath() throws IOException {
+    maybeFail(Faults.getWorkPath);
+    return null;
+  }
+
+  @Override
+  public Path getOutputPath() {
+    return null;
+  }
+
+
+  @Override
+  public void setupJob(JobContext jobContext) throws IOException {
+    maybeFail(Faults.setupJob);
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext taskContext) throws IOException {
+    maybeFail(Faults.setupTask);
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext taskContext)
+      throws IOException {
+    maybeFail(Faults.needsTaskCommit);
+    return false;
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext taskContext) throws IOException {
+    maybeFail(Faults.commitTask);
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext taskContext) throws IOException {
+    maybeFail(Faults.abortTask);
+  }
+
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    maybeFail(Faults.commitJob);
+  }
+
+  @Override
+  public void abortJob(JobContext jobContext, JobStatus.State state)
+      throws IOException {
+    maybeFail(Faults.abortJob);
+  }
+
+  /**
+   * The exception raised on failure.
+   */
+  public static class Failure extends IOException {
+    public Failure() {
+      super(COMMIT_FAILURE_MESSAGE);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
new file mode 100644
index 0000000..2a98382
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
@@ -0,0 +1,545 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.List;
+
+import com.amazonaws.services.s3.model.PartETag;
+import org.junit.Assume;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
+import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker;
+import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
+import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitterFactory;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
+import static org.apache.hadoop.fs.s3a.commit.CommitUtils.*;
+import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static 
org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.*;
+
+/**
+ * Test the low-level binding of the S3A FS to the magic commit mechanism,
+ * and handling of the commit operations.
+ * This is done with an inconsistent client.
+ */
+public class ITestCommitOperations extends AbstractCommitITest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestCommitOperations.class);
+  private static final byte[] DATASET = dataset(1000, 'a', 32);
+  private static final String S3A_FACTORY_KEY = String.format(
+      COMMITTER_FACTORY_SCHEME_PATTERN, "s3a");
+
+  /**
+   * A compile time flag which allows you to disable failure reset before
+   * assertions and teardown.
+   * As S3A is now required to be resilient to failure on all FS operations,
+   * setting it to false ensures that even the assertions are checking
+   * the resilience codepaths.
+   */
+  private static final boolean RESET_FAILURES_ENABLED = false;
+
+  private static final float HIGH_THROTTLE = 0.25f;
+
+  private static final float FULL_THROTTLE = 1.0f;
+
+  private static final int STANDARD_FAILURE_LIMIT = 2;
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    bindCommitter(conf, CommitConstants.S3A_COMMITTER_FACTORY,
+        CommitConstants.COMMITTER_NAME_MAGIC);
+    return conf;
+  }
+
+  @Override
+  public boolean useInconsistentClient() {
+    return true;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    FileSystem.closeAll();
+    super.setup();
+    verifyIsMagicCommitFS(getFileSystem());
+    // abort,; rethrow on failure
+    setThrottling(HIGH_THROTTLE, STANDARD_FAILURE_LIMIT);
+  }
+
+  @Test
+  public void testCreateTrackerNormalPath() throws Throwable {
+    S3AFileSystem fs = getFileSystem();
+    MagicCommitIntegration integration
+        = new MagicCommitIntegration(fs, true);
+    String filename = "notdelayed.txt";
+    Path destFile = methodPath(filename);
+    String origKey = fs.pathToKey(destFile);
+    PutTracker tracker = integration.createTracker(destFile, origKey);
+    assertFalse("wrong type: " + tracker + " for " + destFile,
+        tracker instanceof MagicCommitTracker);
+  }
+
+  /**
+   * On a magic path, the magic tracker is returned.
+   * @throws Throwable failure
+   */
+  @Test
+  public void testCreateTrackerMagicPath() throws Throwable {
+    S3AFileSystem fs = getFileSystem();
+    MagicCommitIntegration integration
+        = new MagicCommitIntegration(fs, true);
+    String filename = "delayed.txt";
+    Path destFile = methodPath(filename);
+    String origKey = fs.pathToKey(destFile);
+    Path pendingPath = makeMagic(destFile);
+    verifyIsMagicCommitPath(fs, pendingPath);
+    String pendingPathKey = fs.pathToKey(pendingPath);
+    assertTrue("wrong path of " + pendingPathKey,
+        pendingPathKey.endsWith(filename));
+    final List<String> elements = splitPathToElements(pendingPath);
+    assertEquals("splitPathToElements()", filename, lastElement(elements));
+    List<String> finalDestination = finalDestination(elements);
+    assertEquals("finalDestination()",
+        filename,
+        lastElement(finalDestination));
+    final String destKey = elementsToKey(finalDestination);
+    assertEquals("destination key", origKey, destKey);
+
+    PutTracker tracker = integration.createTracker(pendingPath,
+        pendingPathKey);
+    assertTrue("wrong type: " + tracker + " for " + pendingPathKey,
+        tracker instanceof MagicCommitTracker);
+    assertEquals("tracker destination key", origKey, tracker.getDestKey());
+
+    Path pendingSuffixedPath = new Path(pendingPath,
+        "part-0000" + PENDING_SUFFIX);
+    assertFalse("still a delayed complete path " + pendingSuffixedPath,
+        fs.isMagicCommitPath(pendingSuffixedPath));
+    Path pendingSet = new Path(pendingPath,
+        "part-0000" + PENDINGSET_SUFFIX);
+    assertFalse("still a delayed complete path " + pendingSet,
+        fs.isMagicCommitPath(pendingSet));
+  }
+
+  @Test
+  public void testCreateAbortEmptyFile() throws Throwable {
+    describe("create then abort an empty file; throttled");
+    S3AFileSystem fs = getFileSystem();
+    String filename = "empty-abort.txt";
+    Path destFile = methodPath(filename);
+    Path pendingFilePath = makeMagic(destFile);
+    touch(fs, pendingFilePath);
+    validateIntermediateAndFinalPaths(pendingFilePath, destFile);
+    Path pendingDataPath = validatePendingCommitData(filename,
+        pendingFilePath);
+
+    CommitOperations actions = newCommitOperations();
+    // abort,; rethrow on failure
+    fullThrottle();
+    LOG.info("Abort call");
+    actions.abortAllSinglePendingCommits(pendingDataPath.getParent(), true)
+        .maybeRethrow();
+    resetFailures();
+    assertPathDoesNotExist("pending file not deleted", pendingDataPath);
+    assertPathDoesNotExist("dest file was created", destFile);
+  }
+
+  private void fullThrottle() {
+    setThrottling(FULL_THROTTLE, STANDARD_FAILURE_LIMIT);
+  }
+
+  private CommitOperations newCommitOperations() {
+    return new CommitOperations(getFileSystem());
+  }
+
+  @Override
+  protected void resetFailures() {
+    if (!RESET_FAILURES_ENABLED) {
+      super.resetFailures();
+    }
+  }
+
+  /**
+   * Create a new path which has the same filename as the dest file, but
+   * is in a magic directory under the destination dir.
+   * @param destFile final destination file
+   * @return magic path
+   */
+  private static Path makeMagic(Path destFile) {
+    return new Path(destFile.getParent(),
+        MAGIC + '/' + destFile.getName());
+  }
+
+  @Test
+  public void testCommitEmptyFile() throws Throwable {
+    describe("create then commit an empty file");
+    createCommitAndVerify("empty-commit.txt", new byte[0]);
+  }
+
+  @Test
+  public void testCommitSmallFile() throws Throwable {
+    describe("create then commit an empty file");
+    createCommitAndVerify("small-commit.txt", DATASET);
+  }
+
+  @Test
+  public void testAbortNonexistentDir() throws Throwable {
+    describe("Attempt to abort a directory that does not exist");
+    Path destFile = methodPath("testAbortNonexistentPath");
+    newCommitOperations()
+        .abortAllSinglePendingCommits(destFile, true)
+        .maybeRethrow();
+  }
+
+  @Test
+  public void testCommitterFactoryDefault() throws Throwable {
+    Configuration conf = new Configuration();
+    Path dest = methodPath();
+    conf.set(COMMITTER_FACTORY_CLASS,
+        MagicS3GuardCommitterFactory.CLASSNAME);
+    PathOutputCommitterFactory factory
+        = getCommitterFactory(dest, conf);
+    PathOutputCommitter committer = factory.createOutputCommitter(
+        methodPath(),
+        new TaskAttemptContextImpl(getConfiguration(),
+            new TaskAttemptID(new TaskID(), 1)));
+    assertEquals("Wrong committer",
+        MagicS3GuardCommitter.class, committer.getClass());
+  }
+
+  @Test
+  public void testCommitterFactorySchema() throws Throwable {
+    Configuration conf = new Configuration();
+    Path dest = methodPath();
+
+    conf.set(S3A_FACTORY_KEY,
+        MagicS3GuardCommitterFactory.CLASSNAME);
+    PathOutputCommitterFactory factory
+        = getCommitterFactory(dest, conf);
+    // the casting is an implicit type check
+    MagicS3GuardCommitter s3a = (MagicS3GuardCommitter)
+        factory.createOutputCommitter(
+            methodPath(),
+            new TaskAttemptContextImpl(getConfiguration(),
+            new TaskAttemptID(new TaskID(), 1)));
+    // should never get here
+    assertNotNull(s3a);
+  }
+
+  @Test
+  public void testBaseRelativePath() throws Throwable {
+    describe("Test creating file with a __base marker and verify that it ends" 
+
+        " up in where expected");
+    Path destDir = methodPath("testBaseRelativePath");
+    Path pendingBaseDir = new Path(destDir, MAGIC + "/child/" + BASE);
+    String child = "subdir/child.txt";
+    Path pendingChildPath = new Path(pendingBaseDir, child);
+    Path expectedDestPath = new Path(destDir, child);
+    createFile(getFileSystem(), pendingChildPath, true, DATASET);
+    commit("child.txt", pendingChildPath, expectedDestPath, 0, 0);
+  }
+
+  private void createCommitAndVerify(String filename, byte[] data)
+      throws Exception {
+    S3AFileSystem fs = getFileSystem();
+    Path destFile = methodPath(filename);
+    Path magicDest = makeMagic(destFile);
+    try(FSDataOutputStream stream = fs.create(magicDest, true)) {
+      assertTrue(stream.hasCapability(STREAM_CAPABILITY_MAGIC_OUTPUT));
+      if (data != null && data.length > 0) {
+        stream.write(data);
+      }
+      stream.close();
+    }
+    FileStatus status = getFileStatusEventually(fs, magicDest,
+        CONSISTENCY_WAIT);
+    assertEquals("Non empty marker file: " + status, 0, status.getLen());
+
+    commit(filename, destFile, HIGH_THROTTLE, 0);
+    verifyFileContents(fs, destFile, data);
+  }
+
+  /**
+   * Commit the file, with before and after checks on the dest and magic
+   * values.
+   * Failures can be set; they'll be reset after the commit.
+   * @param filename filename of file
+   * @param destFile destination path of file
+   * @param throttle probability of commit throttling
+   * @param failures failure limit
+   * @throws Exception any failure of the operation
+   */
+  private void commit(String filename,
+      Path destFile,
+      float throttle,
+      int failures) throws Exception {
+    commit(filename, makeMagic(destFile), destFile, throttle, failures);
+  }
+
+  /**
+   * Commit to a write to {@code magicFile} which is expected to
+   * be saved to {@code destFile}.
+   * Failures can be set; they'll be reset after the commit.
+   * @param magicFile path to write to
+   * @param destFile destination to verify
+   * @param throttle probability of commit throttling
+   * @param failures failure limit
+   */
+  private void commit(String filename,
+      Path magicFile,
+      Path destFile,
+      float throttle, int failures)
+      throws IOException {
+    resetFailures();
+    validateIntermediateAndFinalPaths(magicFile, destFile);
+    SinglePendingCommit commit = SinglePendingCommit.load(getFileSystem(),
+        validatePendingCommitData(filename, magicFile));
+    CommitOperations actions = newCommitOperations();
+    setThrottling(throttle, failures);
+    actions.commitOrFail(commit);
+    resetFailures();
+    verifyCommitExists(commit);
+  }
+
+  /**
+   * Perform any validation of paths.
+   * @param magicFilePath path to magic file
+   * @param destFile ultimate destination file
+   * @throws IOException IO failure
+   */
+  private void validateIntermediateAndFinalPaths(Path magicFilePath,
+      Path destFile)
+      throws IOException {
+    assertPathDoesNotExist("dest file was created", destFile);
+  }
+
+  /**
+   * Verify that the path at the end of a commit exists.
+   * This does not validate the size.
+   * @param commit commit to verify
+   * @throws FileNotFoundException dest doesn't exist
+   * @throws ValidationFailure commit arg is invalid
+   * @throws IOException invalid commit, IO failure
+   */
+  private void verifyCommitExists(SinglePendingCommit commit)
+      throws FileNotFoundException, ValidationFailure, IOException {
+    commit.validate();
+    // this will force an existence check
+    Path path = getFileSystem().keyToQualifiedPath(commit.getDestinationKey());
+    FileStatus status = getFileSystem().getFileStatus(path);
+    LOG.debug("Destination entry: {}", status);
+    if (!status.isFile()) {
+      throw new PathCommitException(path, "Not a file: " + status);
+    }
+  }
+
+  /**
+   * Validate that a pending commit data file exists, load it and validate
+   * its contents.
+   * @param filename short file name
+   * @param magicFile path that the file thinks that it was written to
+   * @return the path to the pending set
+   * @throws IOException IO problems
+   */
+  private Path validatePendingCommitData(String filename,
+      Path magicFile) throws IOException {
+    S3AFileSystem fs = getFileSystem();
+    Path pendingDataPath = new Path(magicFile.getParent(),
+        filename + PENDING_SUFFIX);
+    FileStatus fileStatus = verifyPathExists(fs, "no pending file",
+        pendingDataPath);
+    assertTrue("No data in " + fileStatus, fileStatus.getLen() > 0);
+    String data = read(fs, pendingDataPath);
+    LOG.info("Contents of {}: \n{}", pendingDataPath, data);
+    // really read it in and parse
+    SinglePendingCommit persisted = SinglePendingCommit.serializer()
+        .load(fs, pendingDataPath);
+    persisted.validate();
+    assertTrue("created timestamp wrong in " + persisted,
+        persisted.getCreated() > 0);
+    assertTrue("saved timestamp wrong in " + persisted,
+        persisted.getSaved() > 0);
+    List<String> etags = persisted.getEtags();
+    assertEquals("etag list " + persisted, 1, etags.size());
+    List<PartETag> partList = CommitOperations.toPartEtags(etags);
+    assertEquals("part list " + persisted, 1, partList.size());
+    return pendingDataPath;
+  }
+
+  /**
+   * Get a method-relative path.
+   * @param filename filename
+   * @return new path
+   * @throws IOException failure to create/parse the path.
+   */
+  private Path methodPath(String filename) throws IOException {
+    return new Path(methodPath(), filename);
+  }
+
+  /**
+   * Get a unique path for a method.
+   * @return a path
+   * @throws IOException
+   */
+  protected Path methodPath() throws IOException {
+    return path(getMethodName());
+  }
+
+  @Test
+  public void testUploadEmptyFile() throws Throwable {
+    File tempFile = File.createTempFile("commit", ".txt");
+    CommitOperations actions = newCommitOperations();
+    Path dest = methodPath("testUploadEmptyFile");
+    S3AFileSystem fs = getFileSystem();
+    fs.delete(dest, false);
+    fullThrottle();
+
+    SinglePendingCommit pendingCommit =
+        actions.uploadFileToPendingCommit(tempFile,
+            dest, null,
+            DEFAULT_MULTIPART_SIZE);
+    resetFailures();
+    assertPathDoesNotExist("pending commit", dest);
+    fullThrottle();
+    actions.commitOrFail(pendingCommit);
+    resetFailures();
+    FileStatus status = verifyPathExists(fs,
+        "uploaded file commit", dest);
+    assertEquals("File length in " + status, 0, status.getLen());
+  }
+
+  @Test
+  public void testUploadSmallFile() throws Throwable {
+    File tempFile = File.createTempFile("commit", ".txt");
+    String text = "hello, world";
+    FileUtils.write(tempFile, text, "UTF-8");
+    CommitOperations actions = newCommitOperations();
+    Path dest = methodPath("testUploadSmallFile");
+    S3AFileSystem fs = getFileSystem();
+    fullThrottle();
+    SinglePendingCommit pendingCommit =
+        actions.uploadFileToPendingCommit(tempFile,
+            dest, null,
+            DEFAULT_MULTIPART_SIZE);
+    resetFailures();
+    assertPathDoesNotExist("pending commit", dest);
+    fullThrottle();
+    actions.commitOrFail(pendingCommit);
+    resetFailures();
+    String s = readUTF8(fs, dest, -1);
+    assertEquals(text, s);
+  }
+
+  @Test(expected = FileNotFoundException.class)
+  public void testUploadMissingFile() throws Throwable {
+    File tempFile = File.createTempFile("commit", ".txt");
+    tempFile.delete();
+    CommitOperations actions = newCommitOperations();
+    Path dest = methodPath("testUploadMissingile");
+    fullThrottle();
+    actions.uploadFileToPendingCommit(tempFile, dest, null,
+        DEFAULT_MULTIPART_SIZE);
+  }
+
+  @Test
+  public void testRevertCommit() throws Throwable {
+    Path destFile = methodPath("part-0000");
+    S3AFileSystem fs = getFileSystem();
+    touch(fs, destFile);
+    CommitOperations actions = newCommitOperations();
+    SinglePendingCommit commit = new SinglePendingCommit();
+    commit.setDestinationKey(fs.pathToKey(destFile));
+    fullThrottle();
+    actions.revertCommit(commit);
+    resetFailures();
+    assertPathExists("parent of reverted commit", destFile.getParent());
+  }
+
+  @Test
+  public void testRevertMissingCommit() throws Throwable {
+    Path destFile = methodPath("part-0000");
+    S3AFileSystem fs = getFileSystem();
+    fs.delete(destFile, false);
+    CommitOperations actions = newCommitOperations();
+    SinglePendingCommit commit = new SinglePendingCommit();
+    commit.setDestinationKey(fs.pathToKey(destFile));
+    fullThrottle();
+    actions.revertCommit(commit);
+    assertPathExists("parent of reverted (nonexistent) commit",
+        destFile.getParent());
+  }
+
+  @Test
+  public void testFailuresInAbortListing() throws Throwable {
+    CommitOperations actions = newCommitOperations();
+    Path path = path("testFailuresInAbort");
+    getFileSystem().mkdirs(path);
+    setThrottling(HIGH_THROTTLE);
+    LOG.info("Aborting");
+    actions.abortPendingUploadsUnderPath(path);
+    LOG.info("Abort completed");
+    resetFailures();
+  }
+
+
+  /**
+   * Test a normal stream still works as expected in a magic filesystem,
+   * with a call of {@code hasCapability()} to check that it is normal.
+   * @throws Throwable failure
+   */
+  @Test
+  public void testWriteNormalStream() throws Throwable {
+    S3AFileSystem fs = getFileSystem();
+    Assume.assumeTrue(fs.hasCapability(STREAM_CAPABILITY_MAGIC_OUTPUT));
+
+    Path destFile = path("normal");
+    try (FSDataOutputStream out = fs.create(destFile, true)) {
+      out.writeChars("data");
+      assertFalse("stream has magic output: " + out,
+          out.hasCapability(STREAM_CAPABILITY_MAGIC_OUTPUT));
+      out.close();
+    }
+    FileStatus status = getFileStatusEventually(fs, destFile,
+        CONSISTENCY_WAIT);
+    assertTrue("Empty marker file: " + status, status.getLen() > 0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/LoggingTextOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/LoggingTextOutputFormat.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/LoggingTextOutputFormat.java
new file mode 100644
index 0000000..1ac8038
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/LoggingTextOutputFormat.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.GzipCodec;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A subclass of {@link TextOutputFormat} which logs what is happening, and
+ * returns a {@link LoggingLineRecordWriter} which allows the caller
+ * to get the destination path.
+ * @param <K> key
+ * @param <V> value
+ */
+public class LoggingTextOutputFormat<K, V> extends TextOutputFormat<K, V> {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(LoggingTextOutputFormat.class);
+
+  public static final String NAME
+      = "org.apache.hadoop.fs.s3a.commit.LoggingTextOutputFormat";
+
+  @Override
+  public LoggingLineRecordWriter<K, V> getRecordWriter(TaskAttemptContext job)
+      throws IOException, InterruptedException {
+    Configuration conf = job.getConfiguration();
+    boolean isCompressed = getCompressOutput(job);
+    String keyValueSeparator = conf.get(SEPARATOR, "\t");
+    CompressionCodec codec = null;
+    String extension = "";
+    if (isCompressed) {
+      Class<? extends CompressionCodec> codecClass =
+          getOutputCompressorClass(job, GzipCodec.class);
+      codec = ReflectionUtils.newInstance(codecClass, conf);
+      extension = codec.getDefaultExtension();
+    }
+    Path file = getDefaultWorkFile(job, extension);
+    FileSystem fs = file.getFileSystem(conf);
+    FSDataOutputStream fileOut = fs.create(file, false);
+    LOG.debug("Creating LineRecordWriter with destination {}", file);
+    if (isCompressed) {
+      return new LoggingLineRecordWriter<>(
+          file, new DataOutputStream(codec.createOutputStream(fileOut)),
+          keyValueSeparator);
+    } else {
+      return new LoggingLineRecordWriter<>(file, fileOut, keyValueSeparator);
+    }
+  }
+
+  /**
+   * Write a line; counts the number of lines written and logs @ debug in the
+   * {@code close()} call.
+   * @param <K> key
+   * @param <V> value
+   */
+  public static class LoggingLineRecordWriter<K, V>
+      extends LineRecordWriter<K, V> {
+    private final Path dest;
+    private long lines;
+
+    public LoggingLineRecordWriter(Path dest, DataOutputStream out,
+        String keyValueSeparator) {
+      super(out, keyValueSeparator);
+      this.dest = dest;
+    }
+
+    public LoggingLineRecordWriter(DataOutputStream out, Path dest) {
+      super(out);
+      this.dest = dest;
+    }
+
+    @Override
+    public synchronized void write(K key, V value) throws IOException {
+      super.write(key, value);
+      lines++;
+    }
+
+    public synchronized void close(TaskAttemptContext context)
+        throws IOException {
+      LOG.debug("Closing output file {} with {} lines :{}",
+          dest, lines, out);
+      out.close();
+    }
+
+    public Path getDest() {
+      return dest;
+    }
+
+    public long getLines() {
+      return lines;
+    }
+  }
+
+  /**
+   * Bind to a configuration for job submission.
+   * @param conf configuration
+   */
+  public static void bind(Configuration conf) {
+    conf.setClass(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR,
+        LoggingTextOutputFormat.class,
+        OutputFormat.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/MiniDFSClusterService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/MiniDFSClusterService.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/MiniDFSClusterService.java
new file mode 100644
index 0000000..7f689e0
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/MiniDFSClusterService.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.service.AbstractService;
+
+/**
+ * MiniDFS Cluster, encapsulated for use in different test suites.
+ */
+public class MiniDFSClusterService extends AbstractService {
+
+  public MiniDFSClusterService() {
+    super("MiniDFSTestCluster");
+  }
+
+  private MiniDFSCluster cluster = null;
+  private FileSystem clusterFS = null;
+  private LocalFileSystem localFS = null;
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    conf.setBoolean("dfs.webhdfs.enabled", false);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    Configuration conf = getConfig();
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1)
+        .format(true)
+        .racks(null)
+        .build();
+    clusterFS = cluster.getFileSystem();
+    localFS = FileSystem.getLocal(clusterFS.getConf());
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    clusterFS = null;
+    localFS = null;
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  public MiniDFSCluster getCluster() {
+    return cluster;
+  }
+
+  public FileSystem getClusterFS() {
+    return clusterFS;
+  }
+
+  public LocalFileSystem getLocalFS() {
+    return localFS;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to