This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f44abc3  HADOOP-16207 Improved S3A MR tests.
f44abc3 is described below

commit f44abc3e11676579bdea94fce045d081ae38e6c3
Author: Steve Loughran <ste...@cloudera.com>
AuthorDate: Fri Oct 4 14:11:22 2019 +0100

    HADOOP-16207 Improved S3A MR tests.
    
    Contributed by Steve Loughran.
    
    Replaces the committer-specific terasort and MR test jobs with 
parameterization
    of the (now single tests) and use of file:// over hdfs:// as the cluster FS.
    
    The parameterization ensures that only one of the specific committer tests
    run at a time -overloads of the test machines are less likely, and so the
    suites can be pulled back into the parallel phase.
    
    There's also more detailed validation of the stage outputs of the 
terasorting;
    if one test fails the rest are all skipped. This and the fact that job
    output is stored under target/yarn-${timestamp} means failures should
    be more debuggable.
    
    Change-Id: Iefa370ba73c6419496e6e69dd6673d00f37ff095
---
 hadoop-tools/hadoop-aws/pom.xml                    |   4 -
 .../fs/s3a/commit/staging/StagingCommitter.java    |   3 +-
 .../commit/staging/StagingCommitterConstants.java  |   2 +-
 .../hadoop/fs/s3a/commit/AbstractCommitITest.java  |  18 +-
 .../fs/s3a/commit/AbstractITCommitMRJob.java       | 223 -------
 .../fs/s3a/commit/AbstractYarnClusterITest.java    | 196 +++++--
 .../commit/integration/ITestS3ACommitterMRJob.java | 644 +++++++++++++++++++++
 .../fs/s3a/commit/magic/ITestMagicCommitMRJob.java | 120 ----
 .../integration/ITestDirectoryCommitMRJob.java     |  61 --
 .../integration/ITestPartitionCommitMRJob.java     |  62 --
 .../integration/ITestStagingCommitMRJob.java       |  94 ---
 .../ITestStagingCommitMRJobBadDest.java            |  89 ---
 .../terasort/ITestTerasortDirectoryCommitter.java  |  62 --
 .../terasort/ITestTerasortMagicCommitter.java      |  73 ---
 ...mmitTerasortIT.java => ITestTerasortOnS3A.java} | 238 ++++++--
 .../hadoop-aws/src/test/resources/log4j.properties |   2 +-
 16 files changed, 987 insertions(+), 904 deletions(-)

diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index ff330e5..bd204b0 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -188,8 +188,6 @@
                     <exclude>**/ITestDynamoDBMetadataStoreScale.java</exclude>
                     <!-- Terasort MR jobs spawn enough processes that they use 
up all RAM -->
                     <exclude>**/ITestTerasort*.java</exclude>
-                    <!-- MR jobs spawn enough processes that they use up all 
RAM -->
-                    <exclude>**/ITest*CommitMRJob.java</exclude>
                     <!-- operations across the metastore -->
                     <exclude>**/ITestS3GuardDDBRootOperations.java</exclude>
                   </excludes>
@@ -231,8 +229,6 @@
                     <!--  the local FS. Running them sequentially guarantees 
isolation -->
                     <!-- and that they don't conflict with the other MR jobs 
for RAM -->
                     <include>**/ITestTerasort*.java</include>
-                    <!-- MR jobs spawn enough processes that they use up all 
RAM -->
-                    <include>**/ITest*CommitMRJob.java</include>
                     <!-- operations across the metastore -->
                     <include>**/ITestS3AContractRootDir.java</include>
                     <include>**/ITestS3GuardDDBRootOperations.java</include>
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
index 7ec4478..833edd4 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java
@@ -677,7 +677,8 @@ public class StagingCommitter extends AbstractS3ACommitter {
     // we will try to abort the ones that had already succeeded.
     int commitCount = taskOutput.size();
     final Queue<SinglePendingCommit> commits = new ConcurrentLinkedQueue<>();
-    LOG.info("{}: uploading from staging directory to S3", getRole());
+    LOG.info("{}: uploading from staging directory to S3 {}", getRole(),
+        attemptPath);
     LOG.info("{}: Saving pending data information to {}",
         getRole(), commitsAttemptPath);
     if (taskOutput.isEmpty()) {
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java
index c5fb967..c41715b 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitterConstants.java
@@ -34,7 +34,7 @@ public final class StagingCommitterConstants {
   /**
    * The temporary path for staging data, if not explicitly set.
    * By using an unqualified path, this will be qualified to be relative
-   * to the users' home directory, so protectec from access for others.
+   * to the users' home directory, so protected from access for others.
    */
   public static final String FILESYSTEM_TEMP_PATH = "tmp/staging";
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
index 6023daa..1cf3fb4 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractCommitITest.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import com.amazonaws.services.s3.AmazonS3;
+import org.assertj.core.api.Assertions;
 import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -111,7 +112,9 @@ public abstract class AbstractCommitITest extends 
AbstractS3ATestBase {
         MAGIC_COMMITTER_ENABLED,
         S3A_COMMITTER_FACTORY_KEY,
         FS_S3A_COMMITTER_NAME,
-        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE);
+        FS_S3A_COMMITTER_STAGING_CONFLICT_MODE,
+        FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
+        FAST_UPLOAD_BUFFER);
 
     conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
     conf.setLong(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
@@ -209,6 +212,7 @@ public abstract class AbstractCommitITest extends 
AbstractS3ATestBase {
    */
   @Override
   public void teardown() throws Exception {
+    Thread.currentThread().setName("teardown");
     LOG.info("AbstractCommitITest::teardown");
     waitForConsistency();
     // make sure there are no failures any more
@@ -359,7 +363,7 @@ public abstract class AbstractCommitITest extends 
AbstractS3ATestBase {
    * @throws IOException IO Failure
    */
   protected SuccessData verifySuccessMarker(Path dir) throws IOException {
-    return validateSuccessFile(dir, "", getFileSystem(), "query");
+    return validateSuccessFile(dir, "", getFileSystem(), "query", 0);
   }
 
   /**
@@ -437,13 +441,15 @@ public abstract class AbstractCommitITest extends 
AbstractS3ATestBase {
    * @param committerName name of committer to match, or ""
    * @param fs filesystem
    * @param origin origin (e.g. "teragen" for messages)
+   * @param minimumFileCount minimum number of files to have been created
    * @return the success data
    * @throws IOException IO failure
    */
   public static SuccessData validateSuccessFile(final Path outputPath,
       final String committerName,
       final S3AFileSystem fs,
-      final String origin) throws IOException {
+      final String origin,
+      final int minimumFileCount) throws IOException {
     SuccessData successData = loadSuccessFile(fs, outputPath, origin);
     String commitDetails = successData.toString();
     LOG.info("Committer name " + committerName + "\n{}",
@@ -456,6 +462,9 @@ public abstract class AbstractCommitITest extends 
AbstractS3ATestBase {
       assertEquals("Wrong committer in " + commitDetails,
           committerName, successData.getCommitter());
     }
+    Assertions.assertThat(successData.getFilenames())
+        .describedAs("Files committed")
+        .hasSizeGreaterThanOrEqualTo(minimumFileCount);
     return successData;
   }
 
@@ -485,8 +494,9 @@ public abstract class AbstractCommitITest extends 
AbstractS3ATestBase {
         status.isFile());
     assertTrue("0 byte success file "
             + success + " from " + origin
-            + "; a s3guard committer was not used",
+            + "; an S3A committer was not used",
         status.getLen() > 0);
+    LOG.info("Loading committer success file {}", success);
     return SuccessData.load(fs, success);
   }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
deleted file mode 100644
index 1a51847..0000000
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitMRJob.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a.commit;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
-import com.google.common.collect.Sets;
-import org.assertj.core.api.Assertions;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3a.S3AFileSystem;
-import org.apache.hadoop.fs.s3a.S3AUtils;
-import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.util.DurationInfo;
-
-import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
-import static 
org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID;
-import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH;
-
-/**
- * Test for an MR Job with all the different committers.
- */
-public abstract class AbstractITCommitMRJob extends AbstractYarnClusterITest {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(AbstractITCommitMRJob.class);
-
-  @Override
-  protected Configuration createConfiguration() {
-    Configuration conf = super.createConfiguration();
-    disableFilesystemCaching(conf);
-    return conf;
-  }
-
-  @Rule
-  public final TemporaryFolder temp = new TemporaryFolder();
-
-  @Test
-  public void testMRJob() throws Exception {
-    describe("Run a simple MR Job");
-
-    S3AFileSystem fs = getFileSystem();
-    // final dest is in S3A
-    Path outputPath = path(getMethodName());
-    // create and delete to force in a tombstone marker -see HADOOP-16207
-    fs.mkdirs(outputPath);
-    fs.delete(outputPath, true);
-
-    String commitUUID = UUID.randomUUID().toString();
-    String suffix = isUniqueFilenames() ? ("-" + commitUUID) : "";
-    int numFiles = getTestFileCount();
-    List<String> expectedFiles = new ArrayList<>(numFiles);
-    Set<String> expectedKeys = Sets.newHashSet();
-    for (int i = 0; i < numFiles; i += 1) {
-      File file = temp.newFile(i + ".text");
-      try (FileOutputStream out = new FileOutputStream(file)) {
-        out.write(("file " + i).getBytes(StandardCharsets.UTF_8));
-      }
-      String filename = String.format("part-m-%05d%s", i, suffix);
-      Path path = new Path(outputPath, filename);
-      expectedFiles.add(path.toString());
-      expectedKeys.add("/" + fs.pathToKey(path));
-    }
-    Collections.sort(expectedFiles);
-
-    Job mrJob = createJob();
-    JobConf jobConf = (JobConf) mrJob.getConfiguration();
-
-    mrJob.setOutputFormatClass(LoggingTextOutputFormat.class);
-    FileOutputFormat.setOutputPath(mrJob, outputPath);
-
-    File mockResultsFile = temp.newFile("committer.bin");
-    mockResultsFile.delete();
-    String committerPath = "file:" + mockResultsFile;
-    jobConf.set("mock-results-file", committerPath);
-    jobConf.set(FS_S3A_COMMITTER_STAGING_UUID, commitUUID);
-    jobConf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, "/staging");
-
-    mrJob.setInputFormatClass(TextInputFormat.class);
-    FileInputFormat.addInputPath(mrJob, new Path(temp.getRoot().toURI()));
-
-    mrJob.setMapperClass(MapClass.class);
-    mrJob.setNumReduceTasks(0);
-
-    // an attempt to set up log4j properly, which clearly doesn't work
-    URL log4j = getClass().getClassLoader().getResource("log4j.properties");
-    if (log4j != null && log4j.getProtocol().equals("file")) {
-      Path log4jPath = new Path(log4j.toURI());
-      LOG.debug("Using log4j path {}", log4jPath);
-      mrJob.addFileToClassPath(log4jPath);
-      String sysprops = String.format("-Xmx256m -Dlog4j.configuration=%s",
-          log4j);
-      jobConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, sysprops);
-      jobConf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, sysprops);
-      jobConf.set("yarn.app.mapreduce.am.command-opts", sysprops);
-    }
-
-    applyCustomConfigOptions(jobConf);
-    // fail fast if anything goes wrong
-    mrJob.setMaxMapAttempts(1);
-
-    mrJob.submit();
-    try (DurationInfo ignore = new DurationInfo(LOG, "Job Execution")) {
-      boolean succeeded = mrJob.waitForCompletion(true);
-      assertTrue("MR job failed", succeeded);
-    }
-
-    waitForConsistency();
-    verifyPathExists(fs,
-        "MR job Output directory not found,"
-            + " even though the job did not report a failure",
-        outputPath);
-    assertIsDirectory(outputPath);
-    FileStatus[] results = fs.listStatus(outputPath,
-        S3AUtils.HIDDEN_FILE_FILTER);
-    int fileCount = results.length;
-    List<String> actualFiles = new ArrayList<>(fileCount);
-    assertTrue("No files in output directory", fileCount != 0);
-    LOG.info("Found {} files", fileCount);
-    for (FileStatus result : results) {
-      LOG.debug("result: {}", result);
-      actualFiles.add(result.getPath().toString());
-    }
-    Collections.sort(actualFiles);
-
-    SuccessData successData = validateSuccessFile(outputPath, committerName(),
-        fs, "MR job");
-    List<String> successFiles = successData.getFilenames();
-    String commitData = successData.toString();
-    assertFalse("No filenames in " + commitData,
-        successFiles.isEmpty());
-
-    Assertions.assertThat(actualFiles)
-        .describedAs("Committed files in the job output directory")
-        .containsExactlyInAnyOrderElementsOf(expectedFiles);
-
-    Assertions.assertThat(successFiles)
-        .describedAs("List of committed files in %s", commitData)
-        .containsExactlyInAnyOrderElementsOf(expectedKeys);
-
-    assertPathDoesNotExist("temporary dir",
-        new Path(outputPath, CommitConstants.TEMPORARY));
-    customPostExecutionValidation(outputPath, successData);
-  }
-
-  /**
-   *  Test Mapper.
-   *  This is executed in separate process, and must not make any assumptions
-   *  about external state.
-   */
-  public static class MapClass
-      extends Mapper<LongWritable, Text, LongWritable, Text> {
-
-    private int operations;
-    private String id = "";
-    private LongWritable l = new LongWritable();
-    private Text t = new Text();
-
-    @Override
-    protected void setup(Context context)
-        throws IOException, InterruptedException {
-      super.setup(context);
-      // force in Log4J logging
-      org.apache.log4j.BasicConfigurator.configure();
-      boolean scaleMap = context.getConfiguration()
-          .getBoolean(KEY_SCALE_TESTS_ENABLED, false);
-      operations = scaleMap ? SCALE_TEST_KEYS : BASE_TEST_KEYS;
-      id = context.getTaskAttemptID().toString();
-    }
-
-    @Override
-    protected void map(LongWritable key, Text value, Context context)
-        throws IOException, InterruptedException {
-      for (int i = 0; i < operations; i++) {
-        l.set(i);
-        t.set(String.format("%s:%05d", id, i));
-        context.write(l, t);
-      }
-    }
-  }
-
-}
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java
index 2e8f1f0..783c626 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractYarnClusterITest.java
@@ -19,8 +19,12 @@
 package org.apache.hadoop.fs.s3a.commit;
 
 import java.io.IOException;
-import java.util.UUID;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
 
+import org.junit.AfterClass;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,19 +37,22 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.deployService;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.prepareTestConfiguration;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.terminateService;
+import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH;
 import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES;
 
 /**
  * Full integration test MR jobs.
  *
- * This is all done on shared static mini YARN and HDFS clusters, set up before
- * any of the tests methods run.
+ * This is all done on shared static mini YARN and (optionally) HDFS clusters,
+ * set up before any of the tests methods run.
  *
  * To isolate tests properly for parallel test runs, that static state
  * needs to be stored in the final classes implementing the tests, and
@@ -61,38 +68,54 @@ import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_S
  * If two subclasses of this class are instantiated in the same JVM, in order,
  * they are guaranteed to be isolated.
  *
- * History: this is a superclass extracted from
- * {@link AbstractITCommitMRJob} while adding support for testing terasorting.
- *
  */
 public abstract class AbstractYarnClusterITest extends AbstractCommitITest {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(AbstractYarnClusterITest.class);
 
-  private static final int TEST_FILE_COUNT = 2;
-  private static final int SCALE_TEST_FILE_COUNT = 50;
+  private static final int TEST_FILE_COUNT = 1;
+  private static final int SCALE_TEST_FILE_COUNT = 10;
 
-  public static final int SCALE_TEST_KEYS = 1000;
+  public static final int SCALE_TEST_KEYS = 100;
   public static final int BASE_TEST_KEYS = 10;
 
+  public static final int NO_OF_NODEMANAGERS = 2;
+
   private boolean scaleTest;
 
-  private boolean uniqueFilenames = false;
+  /**
+   * The static cluster binding with the lifecycle of this test; served
+   * through instance-level methods for sharing across methods in the
+   * suite.
+   */
+  @SuppressWarnings("StaticNonFinalField")
+  private static ClusterBinding clusterBinding;
+
+
+  @AfterClass
+  public static void teardownClusters() throws IOException {
+    terminateCluster(clusterBinding);
+    clusterBinding = null;
+  }
 
   /**
    * This is the cluster binding which every subclass must create.
    */
   protected static final class ClusterBinding {
 
+    private String clusterName;
+
     private final MiniDFSClusterService hdfs;
 
     private final MiniMRYarnCluster yarn;
 
     public ClusterBinding(
+        final String clusterName,
         final MiniDFSClusterService hdfs,
         final MiniMRYarnCluster yarn) {
-      this.hdfs = checkNotNull(hdfs);
+      this.clusterName = clusterName;
+      this.hdfs = hdfs;
       this.yarn = checkNotNull(yarn);
     }
 
@@ -100,6 +123,18 @@ public abstract class AbstractYarnClusterITest extends 
AbstractCommitITest {
       return hdfs;
     }
 
+    /**
+     * Get the cluster FS, which will either be HDFS or the local FS.
+     * @return a filesystem.
+     * @throws IOException failure
+     */
+    public FileSystem getClusterFS() throws IOException {
+      MiniDFSClusterService miniCluster = getHdfs();
+      return miniCluster != null
+          ? miniCluster.getClusterFS()
+          : FileSystem.getLocal(yarn.getConfig());
+    }
+
     public MiniMRYarnCluster getYarn() {
       return yarn;
     }
@@ -108,6 +143,10 @@ public abstract class AbstractYarnClusterITest extends 
AbstractCommitITest {
       return getYarn().getConfig();
     }
 
+    public String getClusterName() {
+      return clusterName;
+    }
+
     public void terminate() {
       terminateService(getYarn());
       terminateService(getHdfs());
@@ -115,74 +154,111 @@ public abstract class AbstractYarnClusterITest extends 
AbstractCommitITest {
   }
 
   /**
-   * Create the cluster binding. This must be done in
-   * class setup of the (final) subclass.
-   * The HDFS and YARN clusters share the same configuration, so
+   * Create the cluster binding.
+   * The configuration will be patched by propagating down options
+   * from the maven build (S3Guard binding etc) and turning off unwanted
+   * YARN features.
+   *
+   * If an HDFS cluster is requested,
+   * the HDFS and YARN clusters will share the same configuration, so
    * the HDFS cluster binding is implicitly propagated to YARN.
+   * If one is not requested, the local filesystem is used as the cluster FS.
    * @param conf configuration to start with.
+   * @param useHDFS should an HDFS cluster be instantiated.
    * @return the cluster binding.
    * @throws IOException failure.
    */
-  protected static ClusterBinding createCluster(JobConf conf)
-      throws IOException {
-
+  protected static ClusterBinding createCluster(
+      final JobConf conf,
+      final boolean useHDFS) throws IOException {
+    prepareTestConfiguration(conf);
     conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
     conf.setLong(CommonConfigurationKeys.FS_DU_INTERVAL_KEY, Long.MAX_VALUE);
-
-    // create a unique cluster name.
-    String clusterName = "yarn-" + UUID.randomUUID();
-    MiniDFSClusterService miniDFSClusterService = deployService(conf,
-        new MiniDFSClusterService());
+    // minicluster tests overreact to not enough disk space.
+    conf.setBoolean(YarnConfiguration.NM_DISK_HEALTH_CHECK_ENABLE, false);
+    conf.setInt(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 100);
+    // create a unique cluster name based on the current time in millis.
+    String timestamp = LocalDateTime.now().format(
+        DateTimeFormatter.ofPattern("yyyy-MM-dd-HH.mm.ss.SS"));
+    String clusterName = "yarn-" + timestamp;
+    MiniDFSClusterService miniDFSClusterService =
+        useHDFS
+            ? deployService(conf, new MiniDFSClusterService())
+            : null;
     MiniMRYarnCluster yarnCluster = deployService(conf,
-        new MiniMRYarnCluster(clusterName, 2));
-    return new ClusterBinding(miniDFSClusterService, yarnCluster);
+        new MiniMRYarnCluster(clusterName, NO_OF_NODEMANAGERS));
+    return new ClusterBinding(clusterName, miniDFSClusterService, yarnCluster);
   }
 
-  protected static void terminateCluster(ClusterBinding clusterBinding) {
-    if (clusterBinding != null) {
-      clusterBinding.terminate();
+  /**
+   * Terminate the cluster if it is not null.
+   * @param cluster the cluster
+   */
+  protected static void terminateCluster(ClusterBinding cluster) {
+    if (cluster != null) {
+      cluster.terminate();
     }
   }
 
   /**
-   * Get the cluster binding for this subclass
-   * @return
+   * Get the cluster binding for this subclass.
+   * @return the cluster binding
    */
-  protected abstract ClusterBinding getClusterBinding();
-
-  protected MiniDFSClusterService getHdfs() {
-    return getClusterBinding().getHdfs();
+  protected ClusterBinding getClusterBinding() {
+    return clusterBinding;
   }
 
-
   protected MiniMRYarnCluster getYarn() {
     return getClusterBinding().getYarn();
   }
 
-  public FileSystem getLocalFS() {
-    return getHdfs().getLocalFS();
+  /**
+   * Get the cluster filesystem -hdfs or local.
+   * @return the filesystem shared across the yarn nodes.
+   */
+  protected FileSystem getClusterFS() throws IOException {
+    return getClusterBinding().getClusterFS();
   }
 
-  protected FileSystem getDFS() {
-    return getHdfs().getClusterFS();
-  }
+
+  /**
+   * We stage work into a temporary directory rather than directly under
+   * the user's home directory, as that is often rejected by CI test
+   * runners.
+   */
+  @Rule
+  public final TemporaryFolder stagingFilesDir = new TemporaryFolder();
 
   /**
    * The name of the committer as returned by
-   * {@link AbstractS3ACommitter#getName()} and used for committer 
construction.
+   * {@link AbstractS3ACommitter#getName()}
+   * and used for committer construction.
    */
   protected abstract String committerName();
 
+  /**
+   * binding on demand rather than in a BeforeClass static method.
+   * Subclasses can override this to change the binding options.
+   * @return the cluster binding
+   */
+  protected ClusterBinding demandCreateClusterBinding() throws Exception {
+    return createCluster(new JobConf(), false);
+  }
+
   @Override
   public void setup() throws Exception {
     super.setup();
-    assertNotNull("cluster is not bound",
-        getClusterBinding());
 
     scaleTest = getTestPropertyBool(
         getConfiguration(),
         KEY_SCALE_TESTS_ENABLED,
         DEFAULT_SCALE_TESTS_ENABLED);
+    if (getClusterBinding() == null) {
+      clusterBinding = demandCreateClusterBinding();
+    }
+    assertNotNull("cluster is not bound",
+        getClusterBinding());
+
   }
 
   @Override
@@ -190,28 +266,46 @@ public abstract class AbstractYarnClusterITest extends 
AbstractCommitITest {
     return SCALE_TEST_TIMEOUT_SECONDS * 1000;
   }
 
-  protected JobConf newJobConf() {
-    return new JobConf(getYarn().getConfig());
+  /**
+   * Create a job configuration.
+   * This creates a new job conf from the yarn
+   * cluster configuration then calls
+   * {@link #applyCustomConfigOptions(JobConf)} to allow it to be customized.
+   * @return the new job configuration.
+   * @throws IOException failure
+   */
+  protected JobConf newJobConf() throws IOException {
+    JobConf jobConf = new JobConf(getYarn().getConfig());
+    jobConf.addResource(getConfiguration());
+    applyCustomConfigOptions(jobConf);
+    return jobConf;
   }
 
 
-  protected Job createJob() throws IOException {
-    Configuration jobConf = getClusterBinding().getConf();
-    jobConf.addResource(getConfiguration());
+  protected Job createJob(Configuration jobConf) throws IOException {
     Job mrJob = Job.getInstance(jobConf, getMethodName());
     patchConfigurationForCommitter(mrJob.getConfiguration());
     return mrJob;
   }
 
+  /**
+   * Patch the (job) configuration for this committer.
+   * @param jobConf configuration to patch
+   * @return a configuration which will run this configuration.
+   */
   protected Configuration patchConfigurationForCommitter(
       final Configuration jobConf) {
     jobConf.setBoolean(FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
-        uniqueFilenames);
+        isUniqueFilenames());
     bindCommitter(jobConf,
         CommitConstants.S3A_COMMITTER_FACTORY,
         committerName());
     // pass down the scale test flag
-    jobConf.setBoolean(KEY_SCALE_TESTS_ENABLED, scaleTest);
+    jobConf.setBoolean(KEY_SCALE_TESTS_ENABLED, isScaleTest());
+    // and fix the commit dir to the local FS across all workers.
+    String staging = stagingFilesDir.getRoot().getAbsolutePath();
+    LOG.info("Staging temp dir is {}", staging);
+    jobConf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, staging);
     return jobConf;
   }
 
@@ -220,7 +314,7 @@ public abstract class AbstractYarnClusterITest extends 
AbstractCommitITest {
    * @return the number of mappers to create.
    */
   public int getTestFileCount() {
-    return scaleTest ? SCALE_TEST_FILE_COUNT : TEST_FILE_COUNT;
+    return isScaleTest() ? SCALE_TEST_FILE_COUNT : TEST_FILE_COUNT;
   }
 
   /**
@@ -258,6 +352,6 @@ public abstract class AbstractYarnClusterITest extends 
AbstractCommitITest {
   }
 
   public boolean isUniqueFilenames() {
-    return uniqueFilenames;
+    return false;
   }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java
new file mode 100644
index 0000000..e3e4449
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/integration/ITestS3ACommitterMRJob.java
@@ -0,0 +1,644 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.integration;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Sets;
+import org.assertj.core.api.Assertions;
+import org.junit.FixMethodOrder;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AUtils;
+import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
+import org.apache.hadoop.fs.s3a.commit.LoggingTextOutputFormat;
+import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
+import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
+import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
+import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.DurationInfo;
+
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
+import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
+import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants._SUCCESS;
+import static 
org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_STAGING_UUID;
+import static 
org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory;
+import static 
org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.STAGING_UPLOADS;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test an MR Job with all the different committers.
+ * <p>
+ * This is a fairly complex parameterization: it is designed to
+ * avoid the overhead of starting a Yarn cluster for
+ * individual committer types, so speed up operations.
+ * <p>
+ * It also implicitly guarantees that there is never more than one of these
+ * MR jobs active at a time, so avoids overloading the test machine with too
+ * many processes.
+ * How the binding works:
+ * <ol>
+ *   <li>
+ *     Each parameterized suite is configured through its own
+ *     {@link CommitterTestBinding} subclass.
+ *   </li>
+ *   <li>
+ *     JUnit runs these test suites one parameterized binding at a time.
+ *   </li>
+ *   <li>
+ *     The test suites are declared to be executed in ascending order, so
+ *     that for a specific binding, the order is {@link #test_000()},
+ *     {@link #test_100()} {@link #test_200_execute()} and finally
+ *     {@link #test_500()}.
+ *   </li>
+ *   <li>
+ *     {@link #test_000()} calls {@link CommitterTestBinding#validate()} to
+ *     as to validate the state of the committer. This is primarily to
+ *     verify that the binding setup mechanism is working.
+ *   </li>
+ *   <li>
+ *     {@link #test_100()} is relayed to
+ *     {@link CommitterTestBinding#test_100()},
+ *     for any preflight tests.
+ *   </li>
+ *   <li>
+ *     The {@link #test_200_execute()} test runs the MR job for that
+ *     particular binding with standard reporting and verification of the
+ *     outcome.
+ *   </li>
+ *   <li>
+ *     {@link #test_500()} test is relayed to
+ *     {@link CommitterTestBinding#test_500()}, for any post-MR-job tests.
+ * </ol>
+ *
+ * A new S3A FileSystem instance is created for each test_ method, so the
+ * pre-execute and post-execute validators cannot inspect the state of the
+ * FS as part of their tests.
+ * However, as the MR workers and AM all run in their own processes, there's
+ * generally no useful information about the job in the local S3AFileSystem
+ * instance.
+ */
+@RunWith(Parameterized.class)
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class ITestS3ACommitterMRJob extends AbstractYarnClusterITest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3ACommitterMRJob.class);
+
+  /**
+   * Test array for parameterized test runs.
+   *
+   * @return the committer binding for this run.
+   */
+  @Parameterized.Parameters(name = "{0}")
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{
+        {new DirectoryCommitterTestBinding()},
+        {new PartitionCommitterTestBinding()},
+        {new MagicCommitterTestBinding()},
+    });
+  }
+
+  /**
+   * The committer binding for this instance.
+   */
+  private final CommitterTestBinding committerTestBinding;
+
+  /**
+   * Parameterized constructor.
+   * @param committerTestBinding binding for the test.
+   */
+  public ITestS3ACommitterMRJob(
+      final CommitterTestBinding committerTestBinding) {
+    this.committerTestBinding = committerTestBinding;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    // configure the test binding for this specific test case.
+    committerTestBinding.setup(getClusterBinding(), getFileSystem());
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    disableFilesystemCaching(conf);
+    return conf;
+  }
+
+  @Rule
+  public final TemporaryFolder localFilesDir = new TemporaryFolder();
+
+  @Override
+  protected String committerName() {
+    return committerTestBinding.getCommitterName();
+  }
+
+  @Override
+  public boolean useInconsistentClient() {
+    return committerTestBinding.useInconsistentClient();
+  }
+
+  /**
+   * Verify that the committer binding is happy.
+   */
+  @Test
+  public void test_000() throws Throwable {
+    committerTestBinding.validate();
+
+  }
+  @Test
+  public void test_100() throws Throwable {
+    committerTestBinding.test_100();
+  }
+
+  @Test
+  public void test_200_execute() throws Exception {
+    describe("Run an MR with committer %s", committerName());
+
+    S3AFileSystem fs = getFileSystem();
+    // final dest is in S3A
+    // we can't use the method name as the template places square braces into
+    // that and URI creation fails.
+
+    Path outputPath = path("ITestS3ACommitterMRJob-execute-"+ committerName());
+    // create and delete to force in a tombstone marker -see HADOOP-16207
+    fs.mkdirs(outputPath);
+    fs.delete(outputPath, true);
+
+    String commitUUID = UUID.randomUUID().toString();
+    String suffix = isUniqueFilenames() ? ("-" + commitUUID) : "";
+    int numFiles = getTestFileCount();
+
+    // create all the input files on the local FS.
+    List<String> expectedFiles = new ArrayList<>(numFiles);
+    Set<String> expectedKeys = Sets.newHashSet();
+    for (int i = 0; i < numFiles; i += 1) {
+      File file = localFilesDir.newFile(i + ".text");
+      try (FileOutputStream out = new FileOutputStream(file)) {
+        out.write(("file " + i).getBytes(StandardCharsets.UTF_8));
+      }
+      String filename = String.format("part-m-%05d%s", i, suffix);
+      Path path = new Path(outputPath, filename);
+      expectedFiles.add(path.toString());
+      expectedKeys.add("/" + fs.pathToKey(path));
+    }
+    Collections.sort(expectedFiles);
+
+    Job mrJob = createJob(newJobConf());
+    JobConf jobConf = (JobConf) mrJob.getConfiguration();
+
+    mrJob.setOutputFormatClass(LoggingTextOutputFormat.class);
+    FileOutputFormat.setOutputPath(mrJob, outputPath);
+
+    File mockResultsFile = localFilesDir.newFile("committer.bin");
+    mockResultsFile.delete();
+    String committerPath = "file:" + mockResultsFile;
+    jobConf.set("mock-results-file", committerPath);
+
+    // setting up staging options is harmless for other committers
+    jobConf.set(FS_S3A_COMMITTER_STAGING_UUID, commitUUID);
+
+    mrJob.setInputFormatClass(TextInputFormat.class);
+    FileInputFormat.addInputPath(mrJob,
+        new Path(localFilesDir.getRoot().toURI()));
+
+    mrJob.setMapperClass(MapClass.class);
+    mrJob.setNumReduceTasks(0);
+
+    // an attempt to set up log4j properly, which clearly doesn't work
+    URL log4j = getClass().getClassLoader().getResource("log4j.properties");
+    if (log4j != null && "file".equals(log4j.getProtocol())) {
+      Path log4jPath = new Path(log4j.toURI());
+      LOG.debug("Using log4j path {}", log4jPath);
+      mrJob.addFileToClassPath(log4jPath);
+      String sysprops = String.format("-Xmx128m -Dlog4j.configuration=%s",
+          log4j);
+      jobConf.set(JobConf.MAPRED_MAP_TASK_JAVA_OPTS, sysprops);
+      jobConf.set(JobConf.MAPRED_REDUCE_TASK_JAVA_OPTS, sysprops);
+      jobConf.set("yarn.app.mapreduce.am.command-opts", sysprops);
+    }
+
+    applyCustomConfigOptions(jobConf);
+    // fail fast if anything goes wrong
+    mrJob.setMaxMapAttempts(1);
+
+    try (DurationInfo ignore = new DurationInfo(LOG, "Job Submit")) {
+      mrJob.submit();
+    }
+    String jobID = mrJob.getJobID().toString();
+    String logLocation = "logs under "
+        + getYarn().getTestWorkDir().getAbsolutePath();
+    try (DurationInfo ignore = new DurationInfo(LOG, "Job Execution")) {
+      mrJob.waitForCompletion(true);
+    }
+    JobStatus status = mrJob.getStatus();
+    if (!mrJob.isSuccessful()) {
+      // failure of job.
+      // be as meaningful as possible.
+      String message =
+          String.format("Job %s failed in state %s with cause %s.\n"
+                  + "Consult %s",
+              jobID,
+              status.getState(),
+              status.getFailureInfo(),
+              logLocation);
+      LOG.error(message);
+      fail(message);
+    }
+
+    waitForConsistency();
+    Path successPath = new Path(outputPath, _SUCCESS);
+    SuccessData successData = validateSuccessFile(outputPath,
+        committerName(),
+        fs,
+        "MR job " + jobID,
+        1);
+    String commitData = successData.toString();
+
+    FileStatus[] results = fs.listStatus(outputPath,
+        S3AUtils.HIDDEN_FILE_FILTER);
+    int fileCount = results.length;
+    Assertions.assertThat(fileCount)
+        .describedAs("No files from job %s in output directory %s; see %s",
+            jobID,
+            outputPath,
+            logLocation)
+        .isNotEqualTo(0);
+
+    List<String> actualFiles = Arrays.stream(results)
+        .map(s -> s.getPath().toString())
+        .sorted()
+        .collect(Collectors.toList());
+
+    Assertions.assertThat(actualFiles)
+        .describedAs("Files found in %s", outputPath)
+        .isEqualTo(expectedFiles);
+
+    Assertions.assertThat(successData.getFilenames())
+        .describedAs("Success files listed in %s:%s",
+            successPath, commitData)
+        .isNotEmpty()
+        .containsExactlyInAnyOrderElementsOf(expectedKeys);
+
+    assertPathDoesNotExist("temporary dir should only be from"
+            + " classic file committers",
+        new Path(outputPath, CommitConstants.TEMPORARY));
+    customPostExecutionValidation(outputPath, successData);
+  }
+
+  @Override
+  protected void applyCustomConfigOptions(final JobConf jobConf)
+      throws IOException {
+    committerTestBinding.applyCustomConfigOptions(jobConf);
+  }
+
+  @Override
+  protected void customPostExecutionValidation(final Path destPath,
+      final SuccessData successData) throws Exception {
+    committerTestBinding.validateResult(destPath, successData);
+  }
+
+  /**
+   * This is the extra test which committer test bindings can add.
+   */
+  @Test
+  public void test_500() throws Throwable {
+    committerTestBinding.test_500();
+  }
+
+  /**
+   *  Test Mapper.
+   *  This is executed in separate process, and must not make any assumptions
+   *  about external state.
+   */
+  public static class MapClass
+      extends Mapper<LongWritable, Text, LongWritable, Text> {
+
+    private int operations;
+
+    private String id = "";
+
+    private LongWritable l = new LongWritable();
+
+    private Text t = new Text();
+
+    @Override
+    protected void setup(Context context)
+        throws IOException, InterruptedException {
+      super.setup(context);
+      // force in Log4J logging
+      org.apache.log4j.BasicConfigurator.configure();
+      // and pick up scale test flag as passed down
+      boolean scaleMap = context.getConfiguration()
+          .getBoolean(KEY_SCALE_TESTS_ENABLED, false);
+      operations = scaleMap ? SCALE_TEST_KEYS : BASE_TEST_KEYS;
+      id = context.getTaskAttemptID().toString();
+    }
+
+    @Override
+    protected void map(LongWritable key, Text value, Context context)
+        throws IOException, InterruptedException {
+      for (int i = 0; i < operations; i++) {
+        l.set(i);
+        t.set(String.format("%s:%05d", id, i));
+        context.write(l, t);
+      }
+    }
+  }
+
+  /**
+   * A binding class for committer tests.
+   * Subclasses of this will be instantiated and drive the parameterized
+   * test suite.
+   *
+   * These classes will be instantiated in a static array of the suite, and
+   * not bound to a cluster binding or filesystem.
+   *
+   * The per-method test {@link #setup()} method will call
+   * {@link #setup(ClusterBinding, S3AFileSystem)}, to link the instance
+   * to the specific test cluster <i>and test filesystem</i> in use
+   * in that test.
+   */
+  private abstract static class CommitterTestBinding {
+
+    /**
+     * Name.
+     */
+    private final String committerName;
+
+    /**
+     * Cluster binding.
+     */
+    private ClusterBinding binding;
+
+    /**
+     * The S3A filesystem.
+     */
+    private S3AFileSystem remoteFS;
+
+    /**
+     * Constructor.
+     * @param committerName name of the committer for messages.
+     */
+    protected CommitterTestBinding(final String committerName) {
+      this.committerName = committerName;
+    }
+
+    /**
+     * Set up the test binding: this is called during test setup.
+     * @param cluster the active test cluster.
+     * @param fs the S3A Filesystem used as a destination.
+     */
+    private void setup(
+        ClusterBinding cluster,
+        S3AFileSystem fs) {
+      this.binding = cluster;
+      this.remoteFS = fs;
+    }
+
+    protected String getCommitterName() {
+      return committerName;
+    }
+
+    protected ClusterBinding getBinding() {
+      return binding;
+    }
+
+    protected S3AFileSystem getRemoteFS() {
+      return remoteFS;
+    }
+
+    protected FileSystem getClusterFS() throws IOException {
+      return getBinding().getClusterFS();
+    }
+
+    @Override
+    public String toString() {
+      return committerName;
+    }
+
+    /**
+     * Override point to let implementations tune the MR Job conf.
+     * @param jobConf configuration
+     */
+    protected void applyCustomConfigOptions(JobConf jobConf)
+        throws IOException {
+    }
+
+    /**
+     * Should the inconsistent S3A client be used?
+     * @return true for inconsistent listing
+     */
+    public abstract boolean useInconsistentClient();
+
+    /**
+     * Override point for any committer specific validation operations;
+     * called after the base assertions have all passed.
+     * @param destPath destination of work
+     * @param successData loaded success data
+     * @throws Exception failure
+     */
+    protected void validateResult(Path destPath,
+        SuccessData successData)
+        throws Exception {
+
+    }
+
+    /**
+     * A test to run before the main {@link #test_200_execute()} test is
+     * invoked.
+     * @throws Throwable failure.
+     */
+    void test_100() throws Throwable {
+
+    }
+
+    /**
+     * A test to run after the main {@link #test_200_execute()} test is
+     * invoked.
+     * @throws Throwable failure.
+     */
+    void test_500() throws Throwable {
+
+    }
+
+    /**
+     * Validate the state of the binding.
+     * This is called in {@link #test_000()} so will
+     * fail independently of the other tests.
+     * @throws Throwable failure.
+     */
+    public void validate() throws Throwable {
+      assertNotNull("Not bound to a cluster", binding);
+      assertNotNull("No cluster filesystem", getClusterFS());
+      assertNotNull("No yarn cluster", binding.getYarn());
+    }
+  }
+
+  /**
+   * The directory staging committer.
+   */
+  private static final class DirectoryCommitterTestBinding
+      extends CommitterTestBinding {
+
+    private DirectoryCommitterTestBinding() {
+      super(DirectoryStagingCommitter.NAME);
+    }
+
+    /**
+     * @return true for inconsistent listing
+     */
+    public boolean useInconsistentClient() {
+      return true;
+    }
+
+    /**
+     * Verify that staging commit dirs are made absolute under the user's
+     * home directory, so, in a secure cluster, private.
+     */
+    @Override
+    void test_100() throws Throwable {
+      FileSystem fs = getClusterFS();
+      Configuration conf = fs.getConf();
+      String pri = "private";
+      conf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, pri);
+      Path dir = getMultipartUploadCommitsDirectory(conf, "uuid");
+      Assertions.assertThat(dir.isAbsolute())
+          .describedAs("non-absolute path")
+          .isTrue();
+      String stagingTempDir = dir.toString().toLowerCase(Locale.ENGLISH);
+      String self = UserGroupInformation.getCurrentUser()
+          .getShortUserName().toLowerCase(Locale.ENGLISH);
+      Assertions.assertThat(stagingTempDir)
+          .describedAs("Staging committer temp path in cluster")
+          .contains(pri + "/" + self)
+          .endsWith("uuid/" + STAGING_UPLOADS);
+    }
+  }
+
+  /**
+   * The partition committer test binding.
+   */
+  private static final class PartitionCommitterTestBinding
+      extends CommitterTestBinding {
+
+    private PartitionCommitterTestBinding() {
+      super(PartitionedStagingCommitter.NAME);
+    }
+
+    /**
+     * @return true for inconsistent listing
+     */
+    public boolean useInconsistentClient() {
+      return true;
+    }
+  }
+
+  /**
+   * The magic committer test binding.
+   * This includes extra result validation.
+   */
+  private static final class MagicCommitterTestBinding
+      extends CommitterTestBinding {
+
+    private MagicCommitterTestBinding() {
+      super(MagicS3GuardCommitter.NAME);
+    }
+
+    /**
+     * @return we need a consistent store.
+     */
+    public boolean useInconsistentClient() {
+      return false;
+    }
+
+    /**
+     * The result validation here is that there isn't a __magic directory
+     * any more.
+     * @param destPath destination of work
+     * @param successData loaded success data
+     * @throws Exception failure
+     */
+    @Override
+    protected void validateResult(final Path destPath,
+        final SuccessData successData)
+        throws Exception {
+      Path magicDir = new Path(destPath, MAGIC);
+
+      // if an FNFE isn't raised on getFileStatus, list out the directory
+      // tree
+      S3AFileSystem fs = getRemoteFS();
+      // log the contents
+      lsR(fs, destPath, true);
+      intercept(FileNotFoundException.class, () -> {
+        final FileStatus st = fs.getFileStatus(magicDir);
+        StringBuilder result = new StringBuilder("Found magic dir which should"
+            + " have been deleted at ").append(st).append('\n');
+        result.append("[");
+        applyLocatedFiles(fs.listFiles(magicDir, true),
+            (status) -> result.append(status.getPath()).append('\n'));
+        result.append("[");
+        return result.toString();
+      });
+    }
+  }
+
+}
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitMRJob.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitMRJob.java
deleted file mode 100644
index e403ab4..0000000
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitMRJob.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a.commit.magic;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3a.S3AFileSystem;
-import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
-import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
-import org.apache.hadoop.mapred.JobConf;
-
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
-import static org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles;
-import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
-import static org.apache.hadoop.test.LambdaTestUtils.intercept;
-
-/**
- * Full integration test for the Magic Committer.
- *
- * There's no need to disable the committer setting for the filesystem here,
- * because the committers are being instantiated in their own processes;
- * the settings in {@link 
AbstractITCommitMRJob#applyCustomConfigOptions(JobConf)} are
- * passed down to these processes.
- */
-public final class ITestMagicCommitMRJob extends AbstractITCommitMRJob {
-
-  /**
-   * The static cluster binding with the lifecycle of this test; served
-   * through instance-level methods for sharing across methods in the
-   * suite.
-   */
-  @SuppressWarnings("StaticNonFinalField")
-  private static ClusterBinding clusterBinding;
-
-  @BeforeClass
-  public static void setupClusters() throws IOException {
-    clusterBinding = createCluster(new JobConf());
-  }
-
-  @AfterClass
-  public static void teardownClusters() throws IOException {
-    terminateCluster(clusterBinding);
-  }
-
-  @Override
-  public ClusterBinding getClusterBinding() {
-    return clusterBinding;
-  }
-
-  /**
-   * Need consistency here.
-   * @return false
-   */
-  @Override
-  public boolean useInconsistentClient() {
-    return false;
-  }
-
-  @Override
-  protected String committerName() {
-    return MagicS3GuardCommitter.NAME;
-  }
-
-  /**
-   * Turn on the magic commit support for the FS, else nothing will work.
-   * @param conf configuration
-   */
-  @Override
-  protected void applyCustomConfigOptions(JobConf conf) {
-    conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
-  }
-
-  /**
-   * Check that the magic dir was cleaned up.
-   * {@inheritDoc}
-   */
-  @Override
-  protected void customPostExecutionValidation(Path destPath,
-      SuccessData successData) throws Exception {
-    Path magicDir = new Path(destPath, MAGIC);
-
-    // if an FNFE isn't raised on getFileStatus, list out the directory
-    // tree
-    S3AFileSystem fs = getFileSystem();
-    // log the contents
-    lsR(fs, destPath, true);
-    intercept(FileNotFoundException.class, () -> {
-      final FileStatus st = fs.getFileStatus(magicDir);
-      StringBuilder result = new StringBuilder("Found magic dir which should"
-          + " have been deleted at ").append(st).append('\n');
-      result.append("[");
-      applyLocatedFiles(fs.listFiles(magicDir, true),
-          (status) -> result.append(status.getPath()).append('\n'));
-      result.append("[");
-      return result.toString();
-    });
-  }
-}
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitMRJob.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitMRJob.java
deleted file mode 100644
index 1e44086..0000000
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitMRJob.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a.commit.staging.integration;
-
-import java.io.IOException;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
-import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
-import org.apache.hadoop.mapred.JobConf;
-
-/**
- * Full integration test for the directory committer.
- */
-public final class ITestDirectoryCommitMRJob extends AbstractITCommitMRJob {
-
-  /**
-   * The static cluster binding with the lifecycle of this test; served
-   * through instance-level methods for sharing across methods in the
-   * suite.
-   */
-  @SuppressWarnings("StaticNonFinalField")
-  private static ClusterBinding clusterBinding;
-
-  @BeforeClass
-  public static void setupClusters() throws IOException {
-    clusterBinding = createCluster(new JobConf());  }
-
-  @AfterClass
-  public static void teardownClusters() throws IOException {
-    terminateCluster(clusterBinding);
-  }
-
-  @Override
-  public ClusterBinding getClusterBinding() {
-    return clusterBinding;
-  }
-
-  @Override
-  protected String committerName() {
-    return DirectoryStagingCommitter.NAME;
-  }
-}
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionCommitMRJob.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionCommitMRJob.java
deleted file mode 100644
index 6106974..0000000
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionCommitMRJob.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a.commit.staging.integration;
-
-import java.io.IOException;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
-import org.apache.hadoop.fs.s3a.commit.staging.PartitionedStagingCommitter;
-import org.apache.hadoop.mapred.JobConf;
-
-/**
- * Full integration test for the partition committer.
- */
-public final class ITestPartitionCommitMRJob extends AbstractITCommitMRJob {
-
-  /**
-   * The static cluster binding with the lifecycle of this test; served
-   * through instance-level methods for sharing across methods in the
-   * suite.
-   */
-  @SuppressWarnings("StaticNonFinalField")
-  private static ClusterBinding clusterBinding;
-
-  @BeforeClass
-  public static void setupClusters() throws IOException {
-    clusterBinding = createCluster(new JobConf());
-  }
-
-  @AfterClass
-  public static void teardownClusters() throws IOException {
-    terminateCluster(clusterBinding);
-  }
-
-  @Override
-  public ClusterBinding getClusterBinding() {
-    return clusterBinding;
-  }
-
-  @Override
-  protected String committerName() {
-    return PartitionedStagingCommitter.NAME;
-  }
-}
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJob.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJob.java
deleted file mode 100644
index 218c72a..0000000
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJob.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a.commit.staging.integration;
-
-import java.io.IOException;
-
-import org.hamcrest.core.StringContains;
-import org.hamcrest.core.StringEndsWith;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
-import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_TMP_PATH;
-import static 
org.apache.hadoop.fs.s3a.commit.staging.Paths.getMultipartUploadCommitsDirectory;
-import static 
org.apache.hadoop.fs.s3a.commit.staging.StagingCommitterConstants.STAGING_UPLOADS;
-
-/**
- * Full integration test for the staging committer.
- */
-public final class ITestStagingCommitMRJob extends AbstractITCommitMRJob {
-
-  /**
-   * The static cluster binding with the lifecycle of this test; served
-   * through instance-level methods for sharing across methods in the
-   * suite.
-   */
-  @SuppressWarnings("StaticNonFinalField")
-  private static ClusterBinding clusterBinding;
-
-  @BeforeClass
-  public static void setupClusters() throws IOException {
-    clusterBinding = createCluster(new JobConf());
-  }
-
-  @AfterClass
-  public static void teardownClusters() throws IOException {
-    terminateCluster(clusterBinding);
-  }
-
-  @Override
-  public ClusterBinding getClusterBinding() {
-    return clusterBinding;
-  }
-
-  @Override
-  protected String committerName() {
-    return StagingCommitter.NAME;
-  }
-
-  /**
-   * Verify that staging commit dirs are made absolute under the user's
-   * home directory, so, in a secure cluster, private.
-   */
-  @Test
-  public void testStagingDirectory() throws Throwable {
-    FileSystem hdfs = getDFS();
-    Configuration conf = hdfs.getConf();
-    conf.set(FS_S3A_COMMITTER_STAGING_TMP_PATH, "private");
-    Path dir = getMultipartUploadCommitsDirectory(conf, "UUID");
-    assertThat("Directory " + dir + " path is wrong",
-        dir.toString(),
-        StringEndsWith.endsWith("UUID/"
-        + STAGING_UPLOADS));
-    assertTrue("path unqualified", dir.isAbsolute());
-    String self = UserGroupInformation.getCurrentUser().getShortUserName();
-    assertThat(dir.toString(),
-        StringContains.containsString("/user/" + self + "/private"));
-  }
-
-}
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJobBadDest.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJobBadDest.java
deleted file mode 100644
index 7248813..0000000
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitMRJobBadDest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a.commit.staging.integration;
-
-import java.io.IOException;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3a.S3AFileSystem;
-import org.apache.hadoop.fs.s3a.commit.AbstractITCommitMRJob;
-import org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter;
-import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.test.LambdaTestUtils;
-
-/**
- * This is a test to verify that the committer will fail if the destination
- * directory exists, and that this happens in job setup.
- */
-public final class ITestStagingCommitMRJobBadDest extends 
AbstractITCommitMRJob {
-
-  /**
-   * The static cluster binding with the lifecycle of this test; served
-   * through instance-level methods for sharing across methods in the
-   * suite.
-   */
-  @SuppressWarnings("StaticNonFinalField")
-  private static ClusterBinding clusterBinding;
-
-  @BeforeClass
-  public static void setupClusters() throws IOException {
-    clusterBinding = createCluster(new JobConf());
-  }
-
-  @AfterClass
-  public static void teardownClusters() throws IOException {
-    terminateCluster(clusterBinding);
-  }
-
-  @Override
-  public ClusterBinding getClusterBinding() {
-    return clusterBinding;
-  }
-
-  @Override
-  protected String committerName() {
-    return StagingCommitter.NAME;
-  }
-
-  /**
-   * create the destination directory and expect a failure.
-   * @param conf configuration
-   */
-  @Override
-  protected void applyCustomConfigOptions(JobConf conf) throws IOException {
-    // This is the destination in the S3 FS
-    String outdir = conf.get(FileOutputFormat.OUTDIR);
-    S3AFileSystem fs = getFileSystem();
-    Path outputPath = new Path(outdir);
-    fs.mkdirs(outputPath);
-  }
-
-  @Override
-  public void testMRJob() throws Exception {
-    LambdaTestUtils.intercept(FileAlreadyExistsException.class,
-        "Output directory",
-        super::testMRJob);
-  }
-
-}
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortDirectoryCommitter.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortDirectoryCommitter.java
deleted file mode 100644
index cb9cdd0..0000000
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortDirectoryCommitter.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a.commit.terasort;
-
-import java.io.IOException;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
-import org.apache.hadoop.mapred.JobConf;
-
-/**
- * Terasort with the directory committer.
- */
-public final class ITestTerasortDirectoryCommitter extends 
AbstractCommitTerasortIT {
-
-  /**
-   * The static cluster binding with the lifecycle of this test; served
-   * through instance-level methods for sharing across methods in the
-   * suite.
-   */
-  @SuppressWarnings("StaticNonFinalField")
-  private static ClusterBinding clusterBinding;
-
-  @BeforeClass
-  public static void setupClusters() throws IOException {
-    clusterBinding = createCluster(new JobConf());
-  }
-
-  @AfterClass
-  public static void teardownClusters() throws IOException {
-    clusterBinding.terminate();
-  }
-
-  @Override
-  public ClusterBinding getClusterBinding() {
-    return clusterBinding;
-  }
-
-  @Override
-  protected String committerName() {
-    return DirectoryStagingCommitter.NAME;
-  }
-
-}
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortMagicCommitter.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortMagicCommitter.java
deleted file mode 100644
index e1b4eac..0000000
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortMagicCommitter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a.commit.terasort;
-
-import java.io.IOException;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
-import org.apache.hadoop.mapred.JobConf;
-
-import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
-
-/**
- * Terasort with the magic committer.
- */
-public final class ITestTerasortMagicCommitter
-    extends AbstractCommitTerasortIT {
-
-  /**
-   * The static cluster binding with the lifecycle of this test; served
-   * through instance-level methods for sharing across methods in the
-   * suite.
-   */
-  @SuppressWarnings("StaticNonFinalField")
-  private static ClusterBinding clusterBinding;
-
-  @BeforeClass
-  public static void setupClusters() throws IOException {
-    clusterBinding = createCluster(new JobConf());
-  }
-
-  @AfterClass
-  public static void teardownClusters() throws IOException {
-    clusterBinding.terminate();
-  }
-
-  @Override
-  public ClusterBinding getClusterBinding() {
-    return clusterBinding;
-  }
-  @Override
-  protected String committerName() {
-    return MagicS3GuardCommitter.NAME;
-  }
-
-  /**
-   * Turn on the magic commit support for the FS, else nothing will work.
-   * @param conf configuration
-   */
-  @Override
-  protected void applyCustomConfigOptions(JobConf conf) {
-    conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
-  }
-
-}
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java
similarity index 50%
rename from 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java
rename to 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java
index 479b3c8..dc6c6d1 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/AbstractCommitTerasortIT.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/terasort/ITestTerasortOnS3A.java
@@ -19,24 +19,33 @@
 package org.apache.hadoop.fs.s3a.commit.terasort;
 
 import java.io.File;
-import java.nio.charset.Charset;
+import java.io.FileNotFoundException;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Optional;
-import java.util.function.BiConsumer;
+import java.util.function.Consumer;
 
+import org.junit.Assume;
 import org.junit.FixMethodOrder;
 import org.junit.Test;
+import org.junit.runner.RunWith;
 import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.examples.terasort.TeraGen;
 import org.apache.hadoop.examples.terasort.TeraSort;
 import org.apache.hadoop.examples.terasort.TeraSortConfigKeys;
 import org.apache.hadoop.examples.terasort.TeraValidate;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.commit.AbstractYarnClusterITest;
+import org.apache.hadoop.fs.s3a.commit.magic.MagicS3GuardCommitter;
+import org.apache.hadoop.fs.s3a.commit.staging.DirectoryStagingCommitter;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.DurationInfo;
 import org.apache.hadoop.util.StringUtils;
@@ -44,46 +53,80 @@ import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
 import static java.util.Optional.empty;
-import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND;
-import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR;
+import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
 
 /**
  * Runs Terasort against S3A.
  *
- * This is all done on a shared mini YARN and HDFS clusters, set up before
- * any of the tests methods run.
- *
+ * Parameterized by committer name, using a YARN cluster
+ * shared across all test runs.
  * The tests run in sequence, so each operation is isolated.
- * This also means that the test paths deleted in test
+ * This also means that the test paths are deleted in test
  * teardown; shared variables must all be static.
+ *
+ * The test is a scale test; for each parameter it takes a few minutes to
+ * run the full suite.
+ * Before anyone calls that out as slow: try running the test with the file
+ * committer.
  */
 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
+@RunWith(Parameterized.class)
 @SuppressWarnings("StaticNonFinalField")
-public abstract class AbstractCommitTerasortIT extends
-    AbstractYarnClusterITest {
+public class ITestTerasortOnS3A extends AbstractYarnClusterITest {
 
   private static final Logger LOG =
-      LoggerFactory.getLogger(AbstractCommitTerasortIT.class);
+      LoggerFactory.getLogger(ITestTerasortOnS3A.class);
 
-  // all the durations are optional as they only get filled in when
-  // a test run successfully completes. Failed tests don't have numbers.
-  private static Optional<DurationInfo> terasortDuration = empty();
+  public static final int EXPECTED_PARTITION_COUNT = 10;
 
-  private static Optional<DurationInfo> teragenStageDuration = empty();
+  public static final int PARTITION_SAMPLE_SIZE = 1000;
 
-  private static Optional<DurationInfo> terasortStageDuration = empty();
+  public static final int ROW_COUNT = 1000;
 
-  private static Optional<DurationInfo> teravalidateStageDuration = empty();
+  /**
+   * Duration tracker created in the first of the test cases and closed
+   * in {@link #test_140_teracomplete()}.
+   */
+  private static Optional<DurationInfo> terasortDuration = empty();
 
+  /**
+   * Tracker of which stages are completed and how long they took.
+   */
+  private static Map<String, DurationInfo> completedStages = new HashMap<>();
+
+  /** Name of the committer for this run. */
+  private final String committerName;
+
+  /** Base path for all the terasort input and output paths. */
   private Path terasortPath;
 
+  /** Input (teragen) path. */
   private Path sortInput;
 
+  /** Path where sorted data goes. */
   private Path sortOutput;
 
+  /** Path for validated job's output. */
   private Path sortValidate;
 
   /**
+   * Test array for parameterized test runs.
+   *
+   * @return the committer binding for this run.
+   */
+  @Parameterized.Parameters(name = "{0}")
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{
+        {DirectoryStagingCommitter.NAME},
+        {MagicS3GuardCommitter.NAME}});
+  }
+
+  public ITestTerasortOnS3A(final String committerName) {
+    this.committerName = committerName;
+  }
+
+  /**
    * Not using special paths here.
    * @return false
    */
@@ -93,6 +136,11 @@ public abstract class AbstractCommitTerasortIT extends
   }
 
   @Override
+  protected String committerName() {
+    return committerName;
+  }
+
+  @Override
   public void setup() throws Exception {
     super.setup();
     requireScaleTestsEnabled();
@@ -100,44 +148,88 @@ public abstract class AbstractCommitTerasortIT extends
   }
 
   /**
-   * Set up for terasorting by initializing paths.
-   * The paths used must be unique across parallel runs.
+   * Set up the job conf with the options for terasort chosen by the scale
+   * options.
+   * @param conf configuration
    */
-  private void prepareToTerasort() {
+  @Override
+  protected void applyCustomConfigOptions(JobConf conf) {
     // small sample size for faster runs
-    Configuration yarnConfig = getYarn().getConfig();
-    yarnConfig.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), 1000);
-    yarnConfig.setBoolean(
-        TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
-        true);
-    yarnConfig.setBoolean(
+    conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
+    conf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(),
+        getSampleSizeForEachPartition());
+    conf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(),
+        getExpectedPartitionCount());
+    conf.setBoolean(
         TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(),
         false);
-    terasortPath = new Path("/terasort-" + getClass().getSimpleName())
+  }
+
+  private int getExpectedPartitionCount() {
+    return EXPECTED_PARTITION_COUNT;
+  }
+
+  private int getSampleSizeForEachPartition() {
+    return PARTITION_SAMPLE_SIZE;
+  }
+
+  protected int getRowCount() {
+    return ROW_COUNT;
+  }
+
+  /**
+   * Set up the terasort by initializing paths variables
+   * The paths used must be unique across parameterized runs but
+   * common across all test cases in a single parameterized run.
+   */
+  private void prepareToTerasort() {
+    // small sample size for faster runs
+    terasortPath = new Path("/terasort-" + committerName)
         .makeQualified(getFileSystem());
     sortInput = new Path(terasortPath, "sortin");
     sortOutput = new Path(terasortPath, "sortout");
     sortValidate = new Path(terasortPath, "validate");
-    if (!terasortDuration.isPresent()) {
-      terasortDuration = Optional.of(new DurationInfo(LOG, "Terasort"));
-    }
+
+  }
+
+  /**
+   * Declare that a stage has completed.
+   * @param stage stage name/key in the map
+   * @param d duration.
+   */
+  private static void completedStage(final String stage,
+      final DurationInfo d) {
+    completedStages.put(stage, d);
   }
 
   /**
-   * Execute a single stage in the terasort,
-   * @param stage Stage name for messages/assertions.
+   * Declare a stage which is required for this test case.
+   * @param stage stage name
+   */
+  private static void requireStage(final String stage) {
+    Assume.assumeTrue(
+        "Required stage was not completed: " + stage,
+        completedStages.get(stage) != null);
+  }
+
+  /**
+   * Execute a single stage in the terasort.
+   * Updates the completed stages map with the stage duration -if successful.
+   * @param stage Stage name for the stages map.
    * @param jobConf job conf
-   * @param dest destination directory -the _SUCCESS File will be expected 
here.
+   * @param dest destination directory -the _SUCCESS file will be expected 
here.
    * @param tool tool to run.
    * @param args args for the tool.
+   * @param minimumFileCount minimum number of files to have been created
    * @throws Exception any failure
    */
-  private Optional<DurationInfo> executeStage(
+  private void executeStage(
       final String stage,
       final JobConf jobConf,
       final Path dest,
       final Tool tool,
-      final String[] args) throws Exception {
+      final String[] args,
+      final int minimumFileCount) throws Exception {
     int result;
     DurationInfo d = new DurationInfo(LOG, stage);
     try {
@@ -145,22 +237,30 @@ public abstract class AbstractCommitTerasortIT extends
     } finally {
       d.close();
     }
+    dumpOutputTree(dest);
     assertEquals(stage
         + "(" + StringUtils.join(", ", args) + ")"
         + " failed", 0, result);
-    validateSuccessFile(dest, committerName(), getFileSystem(), stage);
-    return Optional.of(d);
+    validateSuccessFile(dest, committerName(), getFileSystem(), stage,
+        minimumFileCount);
+    completedStage(stage, d);
   }
 
   /**
    * Set up terasort by cleaning out the destination, and note the initial
    * time before any of the jobs are executed.
+   *
+   * This is executed first <i>for each parameterized run</i>.
+   * It is where all variables which need to be reset for each run need
+   * to be reset.
    */
   @Test
   public void test_100_terasort_setup() throws Throwable {
     describe("Setting up for a terasort");
 
     getFileSystem().delete(terasortPath, true);
+    completedStages = new HashMap<>();
+    terasortDuration = Optional.of(new DurationInfo(LOG, false, "Terasort"));
   }
 
   @Test
@@ -170,42 +270,46 @@ public abstract class AbstractCommitTerasortIT extends
 
     JobConf jobConf = newJobConf();
     patchConfigurationForCommitter(jobConf);
-    teragenStageDuration = executeStage("Teragen",
+    executeStage("teragen",
         jobConf,
         sortInput,
         new TeraGen(),
-        new String[]{Integer.toString(SCALE_TEST_KEYS), sortInput.toString()});
+        new String[]{Integer.toString(getRowCount()), sortInput.toString()},
+        1);
   }
 
+
   @Test
   public void test_120_terasort() throws Throwable {
     describe("Terasort from %s to %s", sortInput, sortOutput);
+    requireStage("teragen");
     getFileSystem().delete(sortOutput, true);
 
     loadSuccessFile(getFileSystem(), sortInput, "previous teragen stage");
     JobConf jobConf = newJobConf();
     patchConfigurationForCommitter(jobConf);
-    // this job adds some data, so skip it.
-    jobConf.set(FS_S3A_COMMITTER_STAGING_CONFLICT_MODE, CONFLICT_MODE_APPEND);
-    terasortStageDuration = executeStage("TeraSort",
+    executeStage("terasort",
         jobConf,
         sortOutput,
         new TeraSort(),
-        new String[]{sortInput.toString(), sortOutput.toString()});
+        new String[]{sortInput.toString(), sortOutput.toString()},
+        1);
   }
 
   @Test
   public void test_130_teravalidate() throws Throwable {
     describe("TeraValidate from %s to %s", sortOutput, sortValidate);
+    requireStage("terasort");
     getFileSystem().delete(sortValidate, true);
     loadSuccessFile(getFileSystem(), sortOutput, "previous terasort stage");
     JobConf jobConf = newJobConf();
     patchConfigurationForCommitter(jobConf);
-    teravalidateStageDuration = executeStage("TeraValidate",
+    executeStage("teravalidate",
         jobConf,
         sortValidate,
         new TeraValidate(),
-        new String[]{sortOutput.toString(), sortValidate.toString()});
+        new String[]{sortOutput.toString(), sortValidate.toString()},
+        1);
   }
 
   /**
@@ -214,7 +318,10 @@ public abstract class AbstractCommitTerasortIT extends
    */
   @Test
   public void test_140_teracomplete() throws Throwable {
-    terasortDuration.get().close();
+    terasortDuration.ifPresent(d -> {
+      d.close();
+      completedStage("overall", d);
+    });
 
     final StringBuilder results = new StringBuilder();
     results.append("\"Operation\"\t\"Duration\"\n");
@@ -222,19 +329,20 @@ public abstract class AbstractCommitTerasortIT extends
     // this is how you dynamically create a function in a method
     // for use afterwards.
     // Works because there's no IOEs being raised in this sequence.
-    BiConsumer<String, Optional<DurationInfo>> stage =
-        (s, od) ->
-            results.append(String.format("\"%s\"\t\"%s\"\n",
-                s,
-                od.map(DurationInfo::getDurationString).orElse("")));
-
-    stage.accept("Generate", teragenStageDuration);
-    stage.accept("Terasort", terasortStageDuration);
-    stage.accept("Validate", teravalidateStageDuration);
-    stage.accept("Completed", terasortDuration);
+    Consumer<String> stage = (s) -> {
+      DurationInfo duration = completedStages.get(s);
+      results.append(String.format("\"%s\"\t\"%s\"\n",
+          s,
+          duration == null ? "" : duration));
+    };
+
+    stage.accept("teragen");
+    stage.accept("terasort");
+    stage.accept("teravalidate");
+    stage.accept("overall");
     String text = results.toString();
     File resultsFile = File.createTempFile("results", ".csv");
-    FileUtils.write(resultsFile, text, Charset.forName("UTF-8"));
+    FileUtils.write(resultsFile, text, StandardCharsets.UTF_8);
     LOG.info("Results are in {}\n{}", resultsFile, text);
   }
 
@@ -252,4 +360,18 @@ public abstract class AbstractCommitTerasortIT extends
   public void test_200_directory_deletion() throws Throwable {
     getFileSystem().delete(terasortPath, true);
   }
+
+  /**
+   * Dump the files under a path -but not fail if the path is not present.,
+   * @param path path to dump
+   * @throws Exception any failure.
+   */
+  protected void dumpOutputTree(Path path) throws Exception {
+    LOG.info("Files under output directory {}", path);
+    try {
+      lsR(getFileSystem(), path, true);
+    } catch (FileNotFoundException e) {
+      LOG.info("Output directory {} not found", path);
+    }
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties 
b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
index f616264..6e20fbc 100644
--- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
+++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties
@@ -58,7 +58,7 @@ log4j.logger.org.apache.hadoop.ipc.Server=WARN
 # Log S3Guard classes
 #log4j.logger.org.apache.hadoop.fs.s3a.s3guard=DEBUG
 # if set to debug, this will log the PUT/DELETE operations on a store
-#log4j.logger.org.apache.hadoop.fs.s3a.s3guard.Operations=DEBUG
+log4j.logger.org.apache.hadoop.fs.s3a.s3guard.Operations=DEBUG
 
 # Log Committer classes
 #log4j.logger.org.apache.hadoop.fs.s3a.commit=DEBUG


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to