HADOOP-13786 Add S3A committer for zero-rename commits to S3 endpoints.
Contributed by Steve Loughran and Ryan Blue.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/de8b6ca5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/de8b6ca5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/de8b6ca5

Branch: refs/heads/YARN-6592
Commit: de8b6ca5ef8614de6d6277b7617e27c788b0555c
Parents: 782ba3b
Author: Steve Loughran <ste...@apache.org>
Authored: Wed Nov 22 15:28:12 2017 +0000
Committer: Steve Loughran <ste...@apache.org>
Committed: Wed Nov 22 15:28:12 2017 +0000

----------------------------------------------------------------------
 .../dev-support/findbugsExcludeFile.xml         |    7 +
 .../apache/hadoop/fs/FSDataOutputStream.java    |    9 +
 .../apache/hadoop/fs/PathExistsException.java   |    4 +-
 .../org/apache/hadoop/fs/StorageStatistics.java |    5 +
 .../apache/hadoop/util/JsonSerialization.java   |  299 +++
 .../src/main/resources/core-default.xml         |  117 +-
 .../hadoop/fs/contract/ContractTestUtils.java   |   51 +-
 .../apache/hadoop/test/GenericTestUtils.java    |   29 +-
 .../org/apache/hadoop/test/HadoopTestBase.java  |   51 +-
 .../org/apache/hadoop/test/LambdaTestUtils.java |  144 +-
 .../hadoop/util/TestJsonSerialization.java      |  185 ++
 .../mapreduce/TestMapreduceConfigFields.java    |   27 +-
 .../lib/output/BindingPathOutputCommitter.java  |  184 ++
 .../lib/output/FileOutputCommitter.java         |   12 +-
 .../lib/output/FileOutputCommitterFactory.java  |   38 +
 .../mapreduce/lib/output/FileOutputFormat.java  |   10 +-
 .../lib/output/NamedCommitterFactory.java       |   79 +
 .../lib/output/PathOutputCommitter.java         |   17 +
 .../lib/output/PathOutputCommitterFactory.java  |  204 ++
 .../src/main/resources/mapred-default.xml       |   22 +
 .../lib/output/TestPathOutputCommitter.java     |   24 +-
 .../output/TestPathOutputCommitterFactory.java  |  495 +++++
 hadoop-tools/hadoop-aws/pom.xml                 |   46 +-
 .../hadoop/fs/s3a/AWSBadRequestException.java   |   42 +
 .../hadoop/fs/s3a/AWSClientIOException.java     |    3 +-
 .../hadoop/fs/s3a/AWSNoResponseException.java   |   31 +
 .../hadoop/fs/s3a/AWSRedirectException.java     |   38 +
 .../fs/s3a/AWSServiceThrottledException.java    |   42 +
 .../hadoop/fs/s3a/AWSStatus500Exception.java    |   37 +
 .../s3a/BlockingThreadPoolExecutorService.java  |    2 +-
 .../org/apache/hadoop/fs/s3a/Constants.java     |   72 +-
 .../fs/s3a/InconsistentAmazonS3Client.java      |  232 ++-
 .../java/org/apache/hadoop/fs/s3a/Invoker.java  |  485 +++++
 .../java/org/apache/hadoop/fs/s3a/Listing.java  |   26 +-
 .../java/org/apache/hadoop/fs/s3a/Retries.java  |   92 +
 .../hadoop/fs/s3a/S3ABlockOutputStream.java     |  307 +--
 .../org/apache/hadoop/fs/s3a/S3ADataBlocks.java |    2 +-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java |  940 +++++----
 .../apache/hadoop/fs/s3a/S3AInputStream.java    |   56 +-
 .../hadoop/fs/s3a/S3AInstrumentation.java       |  231 ++-
 .../apache/hadoop/fs/s3a/S3ARetryPolicy.java    |  246 +++
 .../hadoop/fs/s3a/S3AStorageStatistics.java     |   12 +-
 .../java/org/apache/hadoop/fs/s3a/S3AUtils.java |  324 ++-
 .../org/apache/hadoop/fs/s3a/S3ListRequest.java |   11 +
 .../hadoop/fs/s3a/S3ObjectAttributes.java       |   10 +-
 .../org/apache/hadoop/fs/s3a/Statistic.java     |   56 +-
 .../hadoop/fs/s3a/WriteOperationHelper.java     |  474 +++++
 .../fs/s3a/commit/AbstractS3ACommitter.java     |  756 +++++++
 .../s3a/commit/AbstractS3ACommitterFactory.java |   90 +
 .../hadoop/fs/s3a/commit/CommitConstants.java   |  240 +++
 .../hadoop/fs/s3a/commit/CommitOperations.java  |  596 ++++++
 .../hadoop/fs/s3a/commit/CommitUtils.java       |  129 ++
 .../hadoop/fs/s3a/commit/CommitUtilsWithMR.java |  192 ++
 .../apache/hadoop/fs/s3a/commit/Duration.java   |   60 +
 .../hadoop/fs/s3a/commit/DurationInfo.java      |   59 +
 .../s3a/commit/InternalCommitterConstants.java  |  100 +
 .../hadoop/fs/s3a/commit/LocalTempDir.java      |   80 +
 .../fs/s3a/commit/MagicCommitIntegration.java   |  182 ++
 .../hadoop/fs/s3a/commit/MagicCommitPaths.java  |  229 ++
 .../fs/s3a/commit/PathCommitException.java      |   43 +
 .../apache/hadoop/fs/s3a/commit/PutTracker.java |  100 +
 .../fs/s3a/commit/S3ACommitterFactory.java      |  129 ++
 .../org/apache/hadoop/fs/s3a/commit/Tasks.java  |  410 ++++
 .../hadoop/fs/s3a/commit/ValidationFailure.java |   53 +
 .../hadoop/fs/s3a/commit/files/PendingSet.java  |  192 ++
 .../s3a/commit/files/PersistentCommitData.java  |   69 +
 .../s3a/commit/files/SinglePendingCommit.java   |  432 ++++
 .../hadoop/fs/s3a/commit/files/SuccessData.java |  322 +++
 .../fs/s3a/commit/files/package-info.java       |   45 +
 .../fs/s3a/commit/magic/MagicCommitTracker.java |  161 ++
 .../s3a/commit/magic/MagicS3GuardCommitter.java |  288 +++
 .../magic/MagicS3GuardCommitterFactory.java     |   47 +
 .../fs/s3a/commit/magic/package-info.java       |   27 +
 .../hadoop/fs/s3a/commit/package-info.java      |   28 +
 .../s3a/commit/staging/ConflictResolution.java  |   33 +
 .../staging/DirectoryStagingCommitter.java      |  116 ++
 .../DirectoryStagingCommitterFactory.java       |   48 +
 .../staging/PartitionedStagingCommitter.java    |  159 ++
 .../PartitionedStagingCommitterFactory.java     |   48 +
 .../hadoop/fs/s3a/commit/staging/Paths.java     |  300 +++
 .../fs/s3a/commit/staging/StagingCommitter.java |  851 ++++++++
 .../staging/StagingCommitterConstants.java      |   64 +
 .../commit/staging/StagingCommitterFactory.java |   49 +
 .../fs/s3a/commit/staging/package-info.java     |   27 +
 .../fs/s3a/s3guard/DynamoDBMetadataStore.java   |  368 ++--
 .../apache/hadoop/fs/s3a/s3guard/S3Guard.java   |    2 +
 .../hadoop/fs/s3a/s3guard/S3GuardTool.java      |   16 +-
 .../tools/hadoop-aws/committer_architecture.md  | 1951 ++++++++++++++++++
 .../markdown/tools/hadoop-aws/committers.md     |  819 ++++++++
 .../src/site/markdown/tools/hadoop-aws/index.md |  188 +-
 .../site/markdown/tools/hadoop-aws/s3guard.md   |    7 +-
 .../site/markdown/tools/hadoop-aws/testing.md   |   60 +
 .../tools/hadoop-aws/troubleshooting_s3a.md     |  124 ++
 .../s3a/ITestS3AContractGetFileStatus.java      |   11 +
 .../hadoop/fs/s3a/AbstractS3AMockTest.java      |    5 +-
 .../hadoop/fs/s3a/AbstractS3ATestBase.java      |   23 +-
 .../hadoop/fs/s3a/ITestS3AConfiguration.java    |    8 +-
 .../hadoop/fs/s3a/ITestS3AEncryptionSSEC.java   |    6 +-
 .../hadoop/fs/s3a/ITestS3AFailureHandling.java  |   49 +-
 .../fs/s3a/ITestS3AFileOperationCost.java       |    2 +
 .../fs/s3a/ITestS3ATemporaryCredentials.java    |   15 +-
 .../fs/s3a/ITestS3GuardListConsistency.java     |   45 +-
 .../apache/hadoop/fs/s3a/MockS3AFileSystem.java |  322 +++
 .../hadoop/fs/s3a/MockS3ClientFactory.java      |    8 +
 .../apache/hadoop/fs/s3a/S3ATestConstants.java  |    6 +
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java  |  132 +-
 .../hadoop/fs/s3a/StorageStatisticsTracker.java |   99 +
 .../org/apache/hadoop/fs/s3a/TestInvoker.java   |  460 +++++
 .../org/apache/hadoop/fs/s3a/TestListing.java   |    2 +-
 .../fs/s3a/TestS3AExceptionTranslation.java     |   84 +-
 .../fs/s3a/commit/AbstractCommitITest.java      |  412 ++++
 .../fs/s3a/commit/AbstractITCommitMRJob.java    |  324 +++
 .../fs/s3a/commit/AbstractITCommitProtocol.java | 1371 ++++++++++++
 .../fs/s3a/commit/CommitterFaultInjection.java  |   44 +
 .../s3a/commit/CommitterFaultInjectionImpl.java |  131 ++
 .../fs/s3a/commit/ITestCommitOperations.java    |  545 +++++
 .../fs/s3a/commit/LoggingTextOutputFormat.java  |  133 ++
 .../fs/s3a/commit/MiniDFSClusterService.java    |   79 +
 .../fs/s3a/commit/TestMagicCommitPaths.java     |  246 +++
 .../apache/hadoop/fs/s3a/commit/TestTasks.java  |  550 +++++
 .../fs/s3a/commit/magic/ITMagicCommitMRJob.java |   70 +
 .../commit/magic/ITestMagicCommitProtocol.java  |  190 ++
 .../commit/magic/ITestS3AHugeMagicCommits.java  |  195 ++
 .../commit/staging/MockedStagingCommitter.java  |   98 +
 .../staging/PartitionedCommitterForTesting.java |   58 +
 .../fs/s3a/commit/staging/StagingTestBase.java  |  724 +++++++
 .../hadoop/fs/s3a/commit/staging/TestPaths.java |  127 ++
 .../commit/staging/TestStagingCommitter.java    |  696 +++++++
 .../TestStagingDirectoryOutputCommitter.java    |  138 ++
 .../TestStagingPartitionedFileListing.java      |  186 ++
 .../TestStagingPartitionedJobCommit.java        |  236 +++
 .../TestStagingPartitionedTaskCommit.java       |  237 +++
 .../integration/ITDirectoryCommitMRJob.java     |   33 +
 .../integration/ITPartitionCommitMRJob.java     |   33 +
 .../integration/ITStagingCommitMRJob.java       |   66 +
 .../ITestDirectoryCommitProtocol.java           |  131 ++
 .../ITestPartitionedCommitProtocol.java         |  139 ++
 .../integration/ITestStagingCommitProtocol.java |  190 ++
 .../s3guard/AbstractS3GuardToolTestBase.java    |   23 +
 .../s3a/s3guard/TestDynamoDBMetadataStore.java  |   13 +-
 .../fs/s3a/scale/AbstractSTestS3AHugeFiles.java |  191 +-
 .../hadoop-aws/src/test/resources/core-site.xml |    9 +-
 .../src/test/resources/log4j.properties         |   42 +-
 .../registry/client/binding/JsonSerDeser.java   |  224 +-
 144 files changed, 24108 insertions(+), 1172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml 
b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
index 4bafd8e..c056d21 100644
--- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml
@@ -446,4 +446,11 @@
     <Method name="setInstance"/>
     <Bug pattern="ME_ENUM_FIELD_SETTER"/>
   </Match>
+
+  <!-- findbugs is complaining that a stream isn't closed. It will be. -->
+  <Match>
+    <Class name="org.apache.hadoop.util.JsonSerialization"/>
+    <Method name="save"/>
+    <Bug pattern="OBL_UNSATISFIED_OBLIGATION"/>
+  </Match>
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java
index 1d95cd3..5970373 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java
@@ -101,6 +101,15 @@ public class FSDataOutputStream extends DataOutputStream
     out.close(); // This invokes PositionCache.close()
   }
 
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "FSDataOutputStream{");
+    sb.append("wrappedStream=").append(wrappedStream);
+    sb.append('}');
+    return sb.toString();
+  }
+
   /**
    * Get a reference to the wrapped output stream.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathExistsException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathExistsException.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathExistsException.java
index ccc1f0c..cd9f70a 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathExistsException.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathExistsException.java
@@ -27,7 +27,7 @@ public class PathExistsException extends PathIOException {
     super(path, "File exists");
   }
   
-  protected PathExistsException(String path, String error) {
+  public PathExistsException(String path, String error) {
     super(path, error);
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java
index d987ad0..5a3d736 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageStatistics.java
@@ -97,6 +97,11 @@ public abstract class StorageStatistics {
     public long getValue() {
       return value;
     }
+
+    @Override
+    public String toString() {
+      return name + " = " + value;
+    }
   }
 
   private final String name;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java
new file mode 100644
index 0000000..15f4fef
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Support for marshalling objects to and from JSON.
+ *
+ * It constructs an object mapper as an instance field.
+ * and synchronizes access to those methods
+ * which use the mapper.
+ *
+ * This class was extracted from
+ * {@code org.apache.hadoop.registry.client.binding.JsonSerDeser},
+ * which is now a subclass of this class.
+ * @param <T> Type to marshal.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class JsonSerialization<T> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(JsonSerialization.class);
+  private static final String UTF_8 = "UTF-8";
+
+  private final Class<T> classType;
+  private final ObjectMapper mapper;
+
+  /**
+   * Create an instance bound to a specific type.
+   * @param classType class to marshall
+   * @param failOnUnknownProperties fail if an unknown property is encountered.
+   * @param pretty generate pretty (indented) output?
+   */
+  public JsonSerialization(Class<T> classType,
+      boolean failOnUnknownProperties, boolean pretty) {
+    Preconditions.checkArgument(classType != null, "null classType");
+    this.classType = classType;
+    this.mapper = new ObjectMapper();
+    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
+        failOnUnknownProperties);
+    mapper.configure(SerializationFeature.INDENT_OUTPUT, pretty);
+  }
+
+  /**
+   * Get the simple name of the class type to be marshalled.
+   * @return the name of the class being marshalled
+   */
+  public String getName() {
+    return classType.getSimpleName();
+  }
+
+  /**
+   * Convert from JSON.
+   *
+   * @param json input
+   * @return the parsed JSON
+   * @throws IOException IO problems
+   * @throws JsonParseException If the input is not well-formatted
+   * @throws JsonMappingException failure to map from the JSON to this class
+   */
+  @SuppressWarnings("unchecked")
+  public synchronized T fromJson(String json)
+      throws IOException, JsonParseException, JsonMappingException {
+    if (json.isEmpty()) {
+      throw new EOFException("No data");
+    }
+    try {
+      return mapper.readValue(json, classType);
+    } catch (IOException e) {
+      LOG.error("Exception while parsing json : {}\n{}", e, json, e);
+      throw e;
+    }
+  }
+
+  /**
+   * Read from an input stream.
+   * @param stream stream to read from
+   * @return the parsed entity
+   * @throws IOException IO problems
+   * @throws JsonParseException If the input is not well-formatted
+   * @throws JsonMappingException failure to map from the JSON to this class
+   */
+  public synchronized T fromJsonStream(InputStream stream) throws IOException {
+    return mapper.readValue(stream, classType);
+  }
+
+  /**
+   * Load from a JSON text file.
+   * @param jsonFile input file
+   * @return the parsed JSON
+   * @throws IOException IO problems
+   * @throws JsonParseException If the input is not well-formatted
+   * @throws JsonMappingException failure to map from the JSON to this class
+   */
+  @SuppressWarnings("unchecked")
+  public synchronized T load(File jsonFile)
+      throws IOException, JsonParseException, JsonMappingException {
+    if (!jsonFile.isFile()) {
+      throw new FileNotFoundException("Not a file: " + jsonFile);
+    }
+    if (jsonFile.length() == 0) {
+      throw new EOFException("File is empty: " + jsonFile);
+    }
+    try {
+      return mapper.readValue(jsonFile, classType);
+    } catch (IOException e) {
+      LOG.error("Exception while parsing json file {}", jsonFile, e);
+      throw e;
+    }
+  }
+
+  /**
+   * Save to a local file. Any existing file is overwritten unless
+   * the OS blocks that.
+   * @param file file
+   * @param path path
+   * @throws IOException IO exception
+   */
+  public void save(File file, T instance) throws
+      IOException {
+    writeJsonAsBytes(instance, new FileOutputStream(file));
+  }
+
+  /**
+   * Convert from a JSON file.
+   * @param resource input file
+   * @return the parsed JSON
+   * @throws IOException IO problems
+   * @throws JsonParseException If the input is not well-formatted
+   * @throws JsonMappingException failure to map from the JSON to this class
+   */
+  @SuppressWarnings({"IOResourceOpenedButNotSafelyClosed"})
+  public synchronized T fromResource(String resource)
+      throws IOException, JsonParseException, JsonMappingException {
+    try (InputStream resStream = this.getClass()
+        .getResourceAsStream(resource)) {
+      if (resStream == null) {
+        throw new FileNotFoundException(resource);
+      }
+      return mapper.readValue(resStream, classType);
+    } catch (IOException e) {
+      LOG.error("Exception while parsing json resource {}", resource, e);
+      throw e;
+    }
+  }
+
+  /**
+   * clone by converting to JSON and back again.
+   * This is much less efficient than any Java clone process.
+   * @param instance instance to duplicate
+   * @return a new instance
+   * @throws IOException IO problems.
+   */
+  public T fromInstance(T instance) throws IOException {
+    return fromJson(toJson(instance));
+  }
+
+  /**
+   * Load from a Hadoop filesystem.
+   * There's a check for data availability after the file is open, by
+   * raising an EOFException if stream.available == 0.
+   * This allows for a meaningful exception without the round trip overhead
+   * of a getFileStatus call before opening the file. It may be brittle
+   * against an FS stream which doesn't return a value here, but the
+   * standard filesystems all do.
+   * JSON parsing and mapping problems
+   * are converted to IOEs.
+   * @param fs filesystem
+   * @param path path
+   * @return a loaded object
+   * @throws IOException IO or JSON parse problems
+   */
+  public T load(FileSystem fs, Path path) throws IOException {
+    try (FSDataInputStream dataInputStream = fs.open(path)) {
+      // throw an EOF exception if there is no data available.
+      if (dataInputStream.available() == 0) {
+        throw new EOFException("No data in " + path);
+      }
+      return fromJsonStream(dataInputStream);
+    } catch (JsonProcessingException e) {
+      throw new IOException(
+          String.format("Failed to read JSON file \"%s\": %s", path, e),
+          e);
+    }
+  }
+
+  /**
+   * Save to a Hadoop filesystem.
+   * @param fs filesystem
+   * @param path path
+   * @param overwrite should any existing file be overwritten
+   * @throws IOException IO exception
+   */
+  public void save(FileSystem fs, Path path, T instance,
+      boolean overwrite) throws
+      IOException {
+    writeJsonAsBytes(instance, fs.create(path, overwrite));
+  }
+
+  /**
+   * Write the JSON as bytes, then close the file.
+   * @param dataOutputStream an output stream that will always be closed
+   * @throws IOException on any failure
+   */
+  private void writeJsonAsBytes(T instance,
+      OutputStream dataOutputStream) throws IOException {
+    try {
+      dataOutputStream.write(toBytes(instance));
+    } finally {
+      dataOutputStream.close();
+    }
+  }
+
+  /**
+   * Convert JSON to bytes.
+   * @param instance instance to convert
+   * @return a byte array
+   * @throws IOException IO problems
+   */
+  public byte[] toBytes(T instance) throws IOException {
+    return mapper.writeValueAsBytes(instance);
+  }
+
+  /**
+   * Deserialize from a byte array.
+   * @param bytes byte array
+   * @throws IOException IO problems
+   * @throws EOFException not enough data
+   */
+  public T fromBytes(byte[] bytes) throws IOException {
+    return fromJson(new String(bytes, 0, bytes.length, UTF_8));
+  }
+
+  /**
+   * Convert an instance to a JSON string.
+   * @param instance instance to convert
+   * @return a JSON string description
+   * @throws JsonProcessingException Json generation problems
+   */
+  public synchronized String toJson(T instance) throws JsonProcessingException 
{
+    return mapper.writeValueAsString(instance);
+  }
+
+  /**
+   * Convert an instance to a string form for output. This is a robust
+   * operation which will convert any JSON-generating exceptions into
+   * error text.
+   * @param instance non-null instance
+   * @return a JSON string
+   */
+  public String toString(T instance) {
+    Preconditions.checkArgument(instance != null, "Null instance argument");
+    try {
+      return toJson(instance);
+    } catch (JsonProcessingException e) {
+      return "Failed to convert to a string: " + e;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml 
b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 8db9f44..183faa5 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1123,7 +1123,8 @@
 <property>
   <name>fs.s3a.multipart.purge.age</name>
   <value>86400</value>
-  <description>Minimum age in seconds of multipart uploads to purge.
+  <description>Minimum age in seconds of multipart uploads to purge
+    on startup if "fs.s3a.multipart.purge" is true
   </description>
 </property>
 
@@ -1345,6 +1346,120 @@
 </property>
 
 <property>
+  <name>fs.s3a.retry.limit</name>
+  <value>${fs.s3a.attempts.maximum}</value>
+  <description>
+    Number of times to retry any repeatable S3 client request on failure,
+    excluding throttling requests.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.retry.interval</name>
+  <value>500ms</value>
+  <description>
+    Interval between attempts to retry operations for any reason other
+    than S3 throttle errors.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.retry.throttle.limit</name>
+  <value>${fs.s3a.attempts.maximum}</value>
+  <description>
+    Number of times to retry any throttled request.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.retry.throttle.interval</name>
+  <value>1000ms</value>
+  <description>
+    Interval between retry attempts on throttled requests.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.committer.name</name>
+  <value>file</value>
+  <description>
+    Committer to create for output to S3A, one of:
+    "file", "directory", "partitioned", "magic".
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.committer.magic.enabled</name>
+  <value>false</value>
+  <description>
+    Enable support in the filesystem for the S3 "Magic" committer.
+    When working with AWS S3, S3Guard must be enabled for the destination
+    bucket, as consistent metadata listings are required.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.committer.threads</name>
+  <value>8</value>
+  <description>
+    Number of threads in committers for parallel operations on files
+    (upload, commit, abort, delete...)
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.committer.staging.tmp.path</name>
+  <value>tmp/staging</value>
+  <description>
+    Path in the cluster filesystem for temporary data.
+    This is for HDFS, not the local filesystem.
+    It is only for the summary data of each file, not the actual
+    data being committed.
+    Using an unqualified path guarantees that the full path will be
+    generated relative to the home directory of the user creating the job,
+    hence private (assuming home directory permissions are secure).
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.committer.staging.unique-filenames</name>
+  <value>true</value>
+  <description>
+    Option for final files to have a unique name through job attempt info,
+    or the value of fs.s3a.committer.staging.uuid
+    When writing data with the "append" conflict option, this guarantees
+    that new data will not overwrite any existing data.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.committer.staging.conflict-mode</name>
+  <value>fail</value>
+  <description>
+    Staging committer conflict resolution policy.
+    Supported: "fail", "append", "replace".
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.committer.staging.abort.pending.uploads</name>
+  <value>true</value>
+  <description>
+    Should the staging committers abort all pending uploads to the destination
+    directory?
+
+    Changing this if more than one partitioned committer is
+    writing to the same destination tree simultaneously; otherwise
+    the first job to complete will cancel all outstanding uploads from the
+    others. However, it may lead to leaked outstanding uploads from failed
+    tasks. If disabled, configure the bucket lifecycle to remove uploads
+    after a time period, and/or set up a workflow to explicitly delete
+    entries. Otherwise there is a risk that uncommitted uploads may run up
+    bills.
+  </description>
+</property>
+
+<property>
   <name>fs.AbstractFileSystem.s3a.impl</name>
   <value>org.apache.hadoop.fs.s3a.S3A</value>
   <description>The implementation class of the S3A 
AbstractFileSystem.</description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
index e0cc7d6..d2cbca0 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
@@ -725,6 +725,8 @@ public class ContractTestUtils extends Assert {
 
   /**
    * Read in "length" bytes, convert to an ascii string.
+   * This uses {@link #toChar(byte)} to escape bytes, so cannot be used
+   * for round trip operations.
    * @param fs filesystem
    * @param path path to read
    * @param length #of bytes to read.
@@ -742,6 +744,28 @@ public class ContractTestUtils extends Assert {
   }
 
   /**
+   * Read in "length" bytes, convert to UTF8 string.
+   * @param fs filesystem
+   * @param path path to read
+   * @param length #of bytes to read. If -1: use file length.
+   * @return the bytes read and converted to a string
+   * @throws IOException IO problems
+   */
+  public static String readUTF8(FileSystem fs,
+                                  Path path,
+                                  int length) throws IOException {
+    if (length < 0) {
+      FileStatus status = fs.getFileStatus(path);
+      length = (int) status.getLen();
+    }
+    try (FSDataInputStream in = fs.open(path)) {
+      byte[] buf = new byte[length];
+      in.readFully(0, buf);
+      return new String(buf, "UTF-8");
+    }
+  }
+
+  /**
    * Take an array of filestats and convert to a string
    * (prefixed with/ a [%02d] counter).
    * @param stats array of stats
@@ -857,11 +881,30 @@ public class ContractTestUtils extends Assert {
    */
   public static void assertPathExists(FileSystem fileSystem, String message,
                                Path path) throws IOException {
-    if (!fileSystem.exists(path)) {
+    verifyPathExists(fileSystem, message, path);
+  }
+
+  /**
+   * Verify that a path exists, returning the file status of the path.
+   *
+   * @param fileSystem filesystem to examine
+   * @param message message to include in the assertion failure message
+   * @param path path in the filesystem
+   * @throws FileNotFoundException raised if the path is missing
+   * @throws IOException IO problems
+   */
+  public static FileStatus verifyPathExists(FileSystem fileSystem,
+      String message,
+      Path path) throws IOException {
+    try {
+      return fileSystem.getFileStatus(path);
+    } catch (FileNotFoundException e) {
       //failure, report it
-      ls(fileSystem, path.getParent());
-      throw new FileNotFoundException(message + ": not found " + path
-                                      + " in " + path.getParent());
+      LOG.error("{}: not found {}; parent listing is:\n{}",
+          message, path, ls(fileSystem, path.getParent()));
+      throw (IOException)new FileNotFoundException(
+          message + ": not found " + path + " in " + path.getParent())
+          .initCause(e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
index 4cb9f8b..0db6c73 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java
@@ -316,20 +316,37 @@ public abstract class GenericTestUtils {
   /**
    * Assert that an exception's <code>toString()</code> value
    * contained the expected text.
-   * @param string expected string
+   * @param expectedText expected string
    * @param t thrown exception
    * @throws AssertionError if the expected string is not found
    */
-  public static void assertExceptionContains(String string, Throwable t) {
+  public static void assertExceptionContains(String expectedText, Throwable t) 
{
+    assertExceptionContains(expectedText, t, "");
+  }
+
+  /**
+   * Assert that an exception's <code>toString()</code> value
+   * contained the expected text.
+   * @param expectedText expected string
+   * @param t thrown exception
+   * @param message any extra text for the string
+   * @throws AssertionError if the expected string is not found
+   */
+  public static void assertExceptionContains(String expectedText,
+      Throwable t,
+      String message) {
     Assert.assertNotNull(E_NULL_THROWABLE, t);
     String msg = t.toString();
     if (msg == null) {
       throw new AssertionError(E_NULL_THROWABLE_STRING, t);
     }
-    if (!msg.contains(string)) {
-      throw new AssertionError("Expected to find '" + string + "' "
-          + E_UNEXPECTED_EXCEPTION + ":"
-          + StringUtils.stringifyException(t),
+    if (expectedText != null && !msg.contains(expectedText)) {
+      String prefix = org.apache.commons.lang.StringUtils.isEmpty(message)
+          ? "" : (message + ": ");
+      throw new AssertionError(
+          String.format("%s Expected to find '%s' %s: %s",
+              prefix, expectedText, E_UNEXPECTED_EXCEPTION,
+              StringUtils.stringifyException(t)),
           t);
     }
   }  

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/HadoopTestBase.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/HadoopTestBase.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/HadoopTestBase.java
index 43d5be8..cb7df4b 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/HadoopTestBase.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/HadoopTestBase.java
@@ -17,29 +17,37 @@
  */
 package org.apache.hadoop.test;
 
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Rule;
+import org.junit.rules.TestName;
 import org.junit.rules.Timeout;
 
 /**
  * A base class for JUnit4 tests that sets a default timeout for all tests
- * that subclass this test
+ * that subclass this test.
+ *
+ * Threads are named to the method being executed, for ease of diagnostics
+ * in logs and thread dumps.
  */
-public abstract class HadoopTestBase {
+public abstract class HadoopTestBase extends Assert {
+
   /**
-   * System property name to set the test timeout: {@value}
+   * System property name to set the test timeout: {@value}.
    */
   public static final String PROPERTY_TEST_DEFAULT_TIMEOUT =
-    "test.default.timeout";
+      "test.default.timeout";
 
   /**
    * The default timeout (in milliseconds) if the system property
    * {@link #PROPERTY_TEST_DEFAULT_TIMEOUT}
-   * is not set: {@value}
+   * is not set: {@value}.
    */
   public static final int TEST_DEFAULT_TIMEOUT_VALUE = 100000;
 
   /**
-   * The JUnit rule that sets the default timeout for tests
+   * The JUnit rule that sets the default timeout for tests.
    */
   @Rule
   public Timeout defaultTimeout = retrieveTestTimeout();
@@ -64,4 +72,35 @@ public abstract class HadoopTestBase {
     }
     return new Timeout(millis);
   }
+
+  /**
+   * The method name.
+   */
+  @Rule
+  public TestName methodName = new TestName();
+
+  /**
+   * Get the method name; defaults to the value of {@link #methodName}.
+   * Subclasses may wish to override it, which will tune the thread naming.
+   * @return the name of the method.
+   */
+  protected String getMethodName() {
+    return methodName.getMethodName();
+  }
+
+  /**
+   * Static initializer names this thread "JUnit".
+   */
+  @BeforeClass
+  public static void nameTestThread() {
+    Thread.currentThread().setName("JUnit");
+  }
+
+  /**
+   * Before each method, the thread is renamed to match the method name.
+   */
+  @Before
+  public void nameThreadToMethod() {
+    Thread.currentThread().setName("JUnit-" + getMethodName());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
index 3ea9ab8..22208f7 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java
@@ -19,11 +19,13 @@
 package org.apache.hadoop.test;
 
 import com.google.common.base.Preconditions;
+import org.junit.Assert;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.util.Time;
 
+import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeoutException;
 
@@ -43,7 +45,7 @@ import java.util.concurrent.TimeoutException;
  * check; useful when checking the contents of the exception.
  */
 public final class LambdaTestUtils {
-  public static final Logger LOG =
+  private static final Logger LOG =
       LoggerFactory.getLogger(LambdaTestUtils.class);
 
   private LambdaTestUtils() {
@@ -60,6 +62,7 @@ public final class LambdaTestUtils {
    * Interface to implement for converting a timeout into some form
    * of exception to raise.
    */
+  @FunctionalInterface
   public interface TimeoutHandler {
 
     /**
@@ -371,16 +374,11 @@ public final class LambdaTestUtils {
       Class<E> clazz,
       Callable<T> eval)
       throws Exception {
-    try {
-      T result = eval.call();
-      throw new AssertionError("Expected an exception, got "
-          + robustToString(result));
-    } catch (Throwable e) {
-      if (clazz.isAssignableFrom(e.getClass())) {
-        return (E)e;
-      }
-      throw e;
-    }
+    return intercept(clazz,
+        null,
+        "Expected a " + clazz.getName() + " to be thrown," +
+            " but got the result: ",
+        eval);
   }
 
   /**
@@ -451,6 +449,59 @@ public final class LambdaTestUtils {
   }
 
   /**
+   * Intercept an exception; throw an {@code AssertionError} if one not raised.
+   * The caught exception is rethrown if it is of the wrong class or
+   * does not contain the text defined in {@code contained}.
+   * <p>
+   * Example: expect deleting a nonexistent file to raise a
+   * {@code FileNotFoundException} with the {@code toString()} value
+   * containing the text {@code "missing"}.
+   * <pre>
+   * FileNotFoundException ioe = intercept(FileNotFoundException.class,
+   *   "missing",
+   *   "path should not be found",
+   *   () -> {
+   *     filesystem.delete(new Path("/missing"), false);
+   *   });
+   * </pre>
+   *
+   * @param clazz class of exception; the raised exception must be this class
+   * <i>or a subclass</i>.
+   * @param contained string which must be in the {@code toString()} value
+   * of the exception
+   * @param message any message tho include in exception/log messages
+   * @param eval expression to eval
+   * @param <T> return type of expression
+   * @param <E> exception class
+   * @return the caught exception if it was of the expected type and contents
+   * @throws Exception any other exception raised
+   * @throws AssertionError if the evaluation call didn't raise an exception.
+   * The error includes the {@code toString()} value of the result, if this
+   * can be determined.
+   * @see GenericTestUtils#assertExceptionContains(String, Throwable)
+   */
+  public static <T, E extends Throwable> E intercept(
+      Class<E> clazz,
+      String contained,
+      String message,
+      Callable<T> eval)
+      throws Exception {
+    E ex;
+    try {
+      T result = eval.call();
+      throw new AssertionError(message + ": " + robustToString(result));
+    } catch (Throwable e) {
+      if (!clazz.isAssignableFrom(e.getClass())) {
+        throw e;
+      } else {
+        ex = (E) e;
+      }
+    }
+    GenericTestUtils.assertExceptionContains(contained, ex, message);
+    return ex;
+  }
+
+  /**
    * Variant of {@link #intercept(Class, Callable)} to simplify void
    * invocations.
    * @param clazz class of exception; the raised exception must be this class
@@ -468,9 +519,41 @@ public final class LambdaTestUtils {
       String contained,
       VoidCallable eval)
       throws Exception {
-    E ex = intercept(clazz, eval);
-    GenericTestUtils.assertExceptionContains(contained, ex);
-    return ex;
+    return intercept(clazz, contained,
+        "Expecting " + clazz.getName()
+        + (contained != null? (" with text " + contained) : "")
+        + " but got ",
+        () -> {
+          eval.call();
+          return "void";
+        });
+  }
+
+  /**
+   * Variant of {@link #intercept(Class, Callable)} to simplify void
+   * invocations.
+   * @param clazz class of exception; the raised exception must be this class
+   * <i>or a subclass</i>.
+   * @param contained string which must be in the {@code toString()} value
+   * of the exception
+   * @param message any message tho include in exception/log messages
+   * @param eval expression to eval
+   * @param <E> exception class
+   * @return the caught exception if it was of the expected type
+   * @throws Exception any other exception raised
+   * @throws AssertionError if the evaluation call didn't raise an exception.
+   */
+  public static <E extends Throwable> E intercept(
+      Class<E> clazz,
+      String contained,
+      String message,
+      VoidCallable eval)
+      throws Exception {
+    return intercept(clazz, contained, message,
+        () -> {
+          eval.call();
+          return "void";
+        });
   }
 
   /**
@@ -495,6 +578,38 @@ public final class LambdaTestUtils {
   }
 
   /**
+   * Assert that an optional value matches an expected one;
+   * checks include null and empty on the actual value.
+   * @param message message text
+   * @param expected expected value
+   * @param actual actual optional value
+   * @param <T> type
+   */
+  public static <T> void assertOptionalEquals(String message,
+      T expected,
+      Optional<T> actual) {
+    Assert.assertNotNull(message, actual);
+    Assert.assertTrue(message +" -not present", actual.isPresent());
+    Assert.assertEquals(message, expected, actual.get());
+  }
+
+  /**
+   * Assert that an optional value matches an expected one;
+   * checks include null and empty on the actual value.
+   * @param message message text
+   * @param expected expected value
+   * @param actual actual optional value
+   * @param <T> type
+   */
+  public static <T> void assertOptionalUnset(String message,
+      Optional<T> actual) {
+    Assert.assertNotNull(message, actual);
+    if (actual.isPresent()) {
+      Assert.fail("Expected empty option, got " + actual.get().toString());
+    }
+  }
+
+  /**
    * Returns {@code TimeoutException} on a timeout. If
    * there was a inner class passed in, includes it as the
    * inner failure.
@@ -638,6 +753,7 @@ public final class LambdaTestUtils {
    * A simple interface for lambdas, which returns nothing; this exists
    * to simplify lambda tests on operations with no return value.
    */
+  @FunctionalInterface
   public interface VoidCallable {
     void call() throws Exception;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestJsonSerialization.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestJsonSerialization.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestJsonSerialization.java
new file mode 100644
index 0000000..991697d
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestJsonSerialization.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util;
+
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.Serializable;
+import java.util.Objects;
+
+import com.fasterxml.jackson.core.JsonParseException;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.test.HadoopTestBase;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+/**
+ * Test the JSON serialization helper.
+ */
+public class TestJsonSerialization extends HadoopTestBase {
+
+  private final JsonSerialization<KeyVal> serDeser =
+      new JsonSerialization<>(KeyVal.class, true, true);
+
+  private final KeyVal source = new KeyVal("key", "1");
+
+  private static class KeyVal implements Serializable {
+    private String name;
+    private String value;
+
+    KeyVal(String name, String value) {
+      this.name = name;
+      this.value = value;
+    }
+
+    KeyVal() {
+    }
+
+    @Override
+    public String toString() {
+      final StringBuilder sb = new StringBuilder("SimpleJson{");
+      sb.append("name='").append(name).append('\'');
+      sb.append(", value='").append(value).append('\'');
+      sb.append('}');
+      return sb.toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      KeyVal that = (KeyVal) o;
+      return Objects.equals(name, that.name) &&
+          Objects.equals(value, that.value);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(name, value);
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public void setName(String name) {
+      this.name = name;
+    }
+
+    public String getValue() {
+      return value;
+    }
+
+    public void setValue(String value) {
+      this.value = value;
+    }
+  }
+
+  @Test
+  public void testStringRoundTrip() throws Throwable {
+    String wire = serDeser.toJson(source);
+    KeyVal unmarshalled = serDeser.fromJson(wire);
+    assertEquals("Failed to unmarshall: " + wire, source, unmarshalled);
+  }
+
+  @Test
+  public void testBytesRoundTrip() throws Throwable {
+    byte[] wire = serDeser.toBytes(source);
+    KeyVal unmarshalled = serDeser.fromBytes(wire);
+    assertEquals(source, unmarshalled);
+  }
+
+  @Test
+  public void testBadBytesRoundTrip() throws Throwable {
+    LambdaTestUtils.intercept(JsonParseException.class,
+        "token",
+        () -> serDeser.fromBytes(new byte[]{'a'}));
+  }
+
+  @Test
+  public void testCloneViaJson() throws Throwable {
+    KeyVal unmarshalled = serDeser.fromInstance(source);
+    assertEquals(source, unmarshalled);
+  }
+
+  @Test
+  public void testFileRoundTrip() throws Throwable {
+    File tempFile = File.createTempFile("Keyval", ".json");
+    tempFile.delete();
+    try {
+      serDeser.save(tempFile, source);
+      assertEquals(source, serDeser.load(tempFile));
+    } finally {
+      tempFile.delete();
+    }
+  }
+
+  @Test
+  public void testEmptyFile() throws Throwable {
+    File tempFile = File.createTempFile("Keyval", ".json");
+    try {
+      LambdaTestUtils.intercept(EOFException.class,
+          "empty",
+          () -> serDeser.load(tempFile));
+    } finally {
+      tempFile.delete();
+    }
+  }
+
+  @Test
+  public void testFileSystemRoundTrip() throws Throwable {
+    File tempFile = File.createTempFile("Keyval", ".json");
+    tempFile.delete();
+    Path tempPath = new Path(tempFile.toURI());
+    LocalFileSystem fs = FileSystem.getLocal(new Configuration());
+    try {
+      serDeser.save(fs, tempPath, source, false);
+      assertEquals(source, serDeser.load(fs, tempPath));
+    } finally {
+      fs.delete(tempPath, false);
+    }
+  }
+
+  @Test
+  public void testFileSystemEmptyPath() throws Throwable {
+    File tempFile = File.createTempFile("Keyval", ".json");
+    Path tempPath = new Path(tempFile.toURI());
+    LocalFileSystem fs = FileSystem.getLocal(new Configuration());
+    try {
+      LambdaTestUtils.intercept(EOFException.class,
+          () -> serDeser.load(fs, tempPath));
+      fs.delete(tempPath, false);
+      LambdaTestUtils.intercept(FileNotFoundException.class,
+          () -> serDeser.load(fs, tempPath));
+    } finally {
+      fs.delete(tempPath, false);
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java
index 5d42fbf..f8aaab7 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/TestMapreduceConfigFields.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 
 /**
@@ -53,14 +54,23 @@ public class TestMapreduceConfigFields extends 
TestConfigurationFieldsBase {
   @SuppressWarnings("deprecation")
   @Override
   public void initializeMemberVariables() {
-    xmlFilename = new String("mapred-default.xml");
-    configurationClasses = new Class[] { MRJobConfig.class, MRConfig.class,
-        JHAdminConfig.class, ShuffleHandler.class, FileOutputFormat.class,
-       FileInputFormat.class, Job.class, NLineInputFormat.class,
-       JobConf.class, FileOutputCommitter.class };
+    xmlFilename = "mapred-default.xml";
+    configurationClasses = new Class[] {
+        MRJobConfig.class,
+        MRConfig.class,
+        JHAdminConfig.class,
+        ShuffleHandler.class,
+        FileOutputFormat.class,
+        FileInputFormat.class,
+        Job.class,
+        NLineInputFormat.class,
+        JobConf.class,
+        FileOutputCommitter.class,
+        PathOutputCommitterFactory.class
+    };
 
     // Initialize used variables
-    configurationPropsToSkipCompare = new HashSet<String>();
+    configurationPropsToSkipCompare = new HashSet<>();
 
     // Set error modes
     errorIfMissingConfigProps = true;
@@ -82,6 +92,11 @@ public class TestMapreduceConfigFields extends 
TestConfigurationFieldsBase {
         MRJobConfig.MAP_RESOURCE_TYPE_PREFIX);
     configurationPropsToSkipCompare.add(
         MRJobConfig.REDUCE_RESOURCE_TYPE_PREFIX);
+
+    // PathOutputCommitterFactory values
+    xmlPrefixToSkipCompare = new HashSet<>();
+    xmlPrefixToSkipCompare.add(
+        PathOutputCommitterFactory.COMMITTER_FACTORY_SCHEME);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java
new file mode 100644
index 0000000..f12678b
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/BindingPathOutputCommitter.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * This is a special committer which creates the factory for the committer and
+ * runs off that. Why does it exist? So that you can explicitly instantiate
+ * a committer by classname and yet still have the actual implementation
+ * driven dynamically by the factory options and destination filesystem.
+ * This simplifies integration
+ * with existing code which takes the classname of a committer.
+ * There's no factory for this, as that would lead to a loop.
+ *
+ * All commit protocol methods and accessors are delegated to the
+ * wrapped committer.
+ *
+ * How to use:
+ *
+ * <ol>
+ *   <li>
+ *     In applications which take a classname of committer in
+ *     a configuration option, set it to the canonical name of this class
+ *     (see {@link #NAME}). When this class is instantiated, it will
+ *     use the factory mechanism to locate the configured committer for the
+ *     destination.
+ *   </li>
+ *   <li>
+ *     In code, explicitly create an instance of this committer through
+ *     its constructor, then invoke commit lifecycle operations on it.
+ *     The dynamically configured committer will be created in the constructor
+ *     and have the lifecycle operations relayed to it.
+ *   </li>
+ * </ol>
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class BindingPathOutputCommitter extends PathOutputCommitter {
+
+  /**
+   * The classname for use in configurations.
+   */
+  public static final String NAME
+      = BindingPathOutputCommitter.class.getCanonicalName();
+
+  /**
+   * The bound committer.
+   */
+  private final PathOutputCommitter committer;
+
+  /**
+   * Instantiate.
+   * @param outputPath output path (may be null)
+   * @param context task context
+   * @throws IOException on any failure.
+   */
+  public BindingPathOutputCommitter(Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    super(outputPath, context);
+    committer = PathOutputCommitterFactory.getCommitterFactory(outputPath,
+        context.getConfiguration())
+        .createOutputCommitter(outputPath, context);
+  }
+
+  @Override
+  public Path getOutputPath() {
+    return committer.getOutputPath();
+  }
+
+  @Override
+  public Path getWorkPath() throws IOException {
+    return committer.getWorkPath();
+  }
+
+  @Override
+  public void setupJob(JobContext jobContext) throws IOException {
+    committer.setupJob(jobContext);
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext taskContext) throws IOException {
+    committer.setupTask(taskContext);
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext taskContext)
+      throws IOException {
+    return committer.needsTaskCommit(taskContext);
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext taskContext) throws IOException {
+    committer.commitTask(taskContext);
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext taskContext) throws IOException {
+    committer.abortTask(taskContext);
+  }
+
+  @Override
+  @SuppressWarnings("deprecation")
+  public void cleanupJob(JobContext jobContext) throws IOException {
+    super.cleanupJob(jobContext);
+  }
+
+  @Override
+  public void commitJob(JobContext jobContext) throws IOException {
+    committer.commitJob(jobContext);
+  }
+
+  @Override
+  public void abortJob(JobContext jobContext, JobStatus.State state)
+      throws IOException {
+    committer.abortJob(jobContext, state);
+  }
+
+  @SuppressWarnings("deprecation")
+  @Override
+  public boolean isRecoverySupported() {
+    return committer.isRecoverySupported();
+  }
+
+  @Override
+  public boolean isCommitJobRepeatable(JobContext jobContext)
+      throws IOException {
+    return committer.isCommitJobRepeatable(jobContext);
+  }
+
+  @Override
+  public boolean isRecoverySupported(JobContext jobContext) throws IOException 
{
+    return committer.isRecoverySupported(jobContext);
+  }
+
+  @Override
+  public void recoverTask(TaskAttemptContext taskContext) throws IOException {
+    committer.recoverTask(taskContext);
+  }
+
+  @Override
+  public boolean hasOutputPath() {
+    return committer.hasOutputPath();
+  }
+
+  @Override
+  public String toString() {
+    return "BindingPathOutputCommitter{"
+        + "committer=" + committer +
+        '}';
+  }
+
+  /**
+   * Get the inner committer.
+   * @return the bonded committer.
+   */
+  public PathOutputCommitter getCommitter() {
+    return committer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
index 325b2e7..86af2cf 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
@@ -155,17 +155,11 @@ public class FileOutputCommitter extends 
PathOutputCommitter {
    * @return the path where final output of the job should be placed.  This
    * could also be considered the committed application attempt path.
    */
-  private Path getOutputPath() {
+  @Override
+  public Path getOutputPath() {
     return this.outputPath;
   }
-  
-  /**
-   * @return true if we have an output path set, else false.
-   */
-  private boolean hasOutputPath() {
-    return this.outputPath != null;
-  }
-  
+
   /**
    * @return the path where the output of pending job attempts are
    * stored.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitterFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitterFactory.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitterFactory.java
new file mode 100644
index 0000000..12b2841
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitterFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * Creates a {@link FileOutputCommitter}, always.
+ */
+public final class FileOutputCommitterFactory
+    extends PathOutputCommitterFactory {
+
+  @Override
+  public PathOutputCommitter createOutputCommitter(Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    return createFileOutputCommitter(outputPath, context);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
index 0e7efa3..bbda26a 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java
@@ -328,12 +328,14 @@ public abstract class FileOutputFormat<K, V> extends 
OutputFormat<K, V> {
     job.getConfiguration().set(BASE_OUTPUT_NAME, name);
   }
 
-  public synchronized 
-     OutputCommitter getOutputCommitter(TaskAttemptContext context
-                                        ) throws IOException {
+  public synchronized
+      OutputCommitter getOutputCommitter(TaskAttemptContext context)
+      throws IOException {
     if (committer == null) {
       Path output = getOutputPath(context);
-      committer = new FileOutputCommitter(output, context);
+      committer = PathOutputCommitterFactory.getCommitterFactory(
+          output,
+          context.getConfiguration()).createOutputCommitter(output, context);
     }
     return committer;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NamedCommitterFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NamedCommitterFactory.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NamedCommitterFactory.java
new file mode 100644
index 0000000..b7378af
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NamedCommitterFactory.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * A factory which creates any named committer identified
+ * in the option {@link PathOutputCommitterFactory#NAMED_COMMITTER_CLASS}.
+ */
+public final class NamedCommitterFactory extends
+    PathOutputCommitterFactory {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(NamedCommitterFactory.class);
+
+  @SuppressWarnings("JavaReflectionMemberAccess")
+  @Override
+  public PathOutputCommitter createOutputCommitter(Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    Class<? extends PathOutputCommitter> clazz = loadCommitterClass(context);
+    LOG.debug("Using PathOutputCommitter implementation {}", clazz);
+    try {
+      Constructor<? extends PathOutputCommitter> ctor
+          = clazz.getConstructor(Path.class, TaskAttemptContext.class);
+      return ctor.newInstance(outputPath, context);
+    } catch (NoSuchMethodException
+        | InstantiationException
+        | IllegalAccessException
+        | InvocationTargetException e) {
+      throw new IOException("Failed to create " + clazz
+          + ":" + e, e);
+    }
+  }
+
+  /**
+   * Load the class named in {@link #NAMED_COMMITTER_CLASS}.
+   * @param context job or task context
+   * @return the committer class
+   * @throws IOException if no committer was defined.
+   */
+  private Class<? extends PathOutputCommitter> loadCommitterClass(
+      JobContext context) throws IOException {
+    Preconditions.checkNotNull(context, "null context");
+    Configuration conf = context.getConfiguration();
+    String value = conf.get(NAMED_COMMITTER_CLASS, "");
+    if (value.isEmpty()) {
+      throw new IOException("No committer defined in " + 
NAMED_COMMITTER_CLASS);
+    }
+    return conf.getClass(NAMED_COMMITTER_CLASS,
+        FileOutputCommitter.class, PathOutputCommitter.class);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java
index 2df30ba..3679d9f 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitter.java
@@ -76,9 +76,26 @@ public abstract class PathOutputCommitter extends 
OutputCommitter {
   }
 
   /**
+   * Get the final directory where work will be placed once the job
+   * is committed. This may be null, in which case, there is no output
+   * path to write data to.
+   * @return the path where final output of the job should be placed.
+   */
+  public abstract Path getOutputPath();
+
+  /**
+   * Predicate: is there an output path?
+   * @return true if we have an output path set, else false.
+   */
+  public boolean hasOutputPath() {
+    return getOutputPath() != null;
+  }
+
+  /**
    * Get the directory that the task should write results into.
    * Warning: there's no guarantee that this work path is on the same
    * FS as the final output, or that it's visible across machines.
+   * May be null.
    * @return the work directory
    * @throws IOException IO problem
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.java
new file mode 100644
index 0000000..0df14d1
--- /dev/null
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PathOutputCommitterFactory.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A factory for committers implementing the {@link PathOutputCommitter}
+ * methods, and so can be used from {@link FileOutputFormat}.
+ * The base implementation returns {@link FileOutputCommitter} instances.
+ *
+ * Algorithm:
+ * <ol>
+ *   <ul>If an explicit committer factory is named, it is used.</ul>
+ *   <ul>The output path is examined.
+ *   If is non null and there is an explicit schema for that filesystem,
+ *   its factory is instantiated.</ul>
+ *   <ul>Otherwise, an instance of {@link FileOutputCommitter} is
+ *   created.</ul>
+ * </ol>
+ *
+ * In {@link FileOutputFormat}, the created factory has its method
+ * {@link #createOutputCommitter(Path, TaskAttemptContext)} with a task
+ * attempt context and a possibly null path.
+ *
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class PathOutputCommitterFactory extends Configured {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PathOutputCommitterFactory.class);
+
+  /**
+   * Name of the configuration option used to configure the
+   * output committer factory to use unless there is a specific
+   * one for a schema.
+   */
+  public static final String COMMITTER_FACTORY_CLASS =
+      "mapreduce.outputcommitter.factory.class";
+
+  /**
+   * Scheme prefix for per-filesystem scheme committers.
+   */
+  public static final String COMMITTER_FACTORY_SCHEME =
+      "mapreduce.outputcommitter.factory.scheme";
+
+  /**
+   * String format pattern for per-filesystem scheme committers.
+   */
+  public static final String COMMITTER_FACTORY_SCHEME_PATTERN =
+      COMMITTER_FACTORY_SCHEME + ".%s";
+
+
+  /**
+   * The {@link FileOutputCommitter} factory.
+   */
+  public static final String FILE_COMMITTER_FACTORY  =
+      "org.apache.hadoop.mapreduce.lib.output.FileOutputCommitterFactory";
+
+  /**
+   * The {@link FileOutputCommitter} factory.
+   */
+  public static final String NAMED_COMMITTER_FACTORY  =
+      "org.apache.hadoop.mapreduce.lib.output.NamedCommitterFactory";
+
+  /**
+   * The named output committer.
+   * Creates any committer listed in
+   */
+  public static final String NAMED_COMMITTER_CLASS =
+      "mapreduce.outputcommitter.named.classname";
+
+  /**
+   * Default committer factory name: {@value}.
+   */
+  public static final String COMMITTER_FACTORY_DEFAULT =
+      FILE_COMMITTER_FACTORY;
+
+  /**
+   * Create an output committer for a task attempt.
+   * @param outputPath output path. This may be null.
+   * @param context context
+   * @return a new committer
+   * @throws IOException problems instantiating the committer
+   */
+  public PathOutputCommitter createOutputCommitter(
+      Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    return createFileOutputCommitter(outputPath, context);
+  }
+
+  /**
+   * Create an instance of the default committer, a {@link FileOutputCommitter}
+   * for a task.
+   * @param outputPath the task's output path, or or null if no output path
+   * has been defined.
+   * @param context the task attempt context
+   * @return the committer to use
+   * @throws IOException problems instantiating the committer
+   */
+  protected final PathOutputCommitter createFileOutputCommitter(
+      Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    LOG.debug("Creating FileOutputCommitter for path {} and context {}",
+        outputPath, context);
+    return new FileOutputCommitter(outputPath, context);
+  }
+
+  /**
+   * Get the committer factory for a configuration.
+   * @param outputPath the job's output path. If null, it means that the
+   * schema is unknown and a per-schema factory cannot be determined.
+   * @param conf configuration
+   * @return an instantiated committer factory
+   */
+  public static PathOutputCommitterFactory getCommitterFactory(
+      Path outputPath,
+      Configuration conf) {
+    // determine which key to look up the overall one or a schema-specific
+    // key
+    LOG.debug("Looking for committer factory for path {}", outputPath);
+    String key = COMMITTER_FACTORY_CLASS;
+    if (StringUtils.isEmpty(conf.getTrimmed(key)) && outputPath != null) {
+      // there is no explicit factory and there's an output path
+      // Get the scheme of the destination
+      String scheme = outputPath.toUri().getScheme();
+
+      // and see if it has a key
+      String schemeKey = String.format(COMMITTER_FACTORY_SCHEME_PATTERN,
+          scheme);
+      if (StringUtils.isNotEmpty(conf.getTrimmed(schemeKey))) {
+        // it does, so use that key in the classname lookup
+        LOG.debug("Using schema-specific factory for {}", outputPath);
+        key = schemeKey;
+      } else {
+        LOG.debug("No scheme-specific factory defined in {}", schemeKey);
+      }
+    }
+
+    // create the factory. Before using Configuration.getClass, check
+    // for an empty configuration value, as that raises ClassNotFoundException.
+    Class<? extends PathOutputCommitterFactory> factory;
+    String trimmedValue = conf.getTrimmed(key, "");
+    if (StringUtils.isEmpty(trimmedValue)) {
+      // empty/null value, use default
+      LOG.debug("No output committer factory defined,"
+          + " defaulting to FileOutputCommitterFactory");
+      factory = FileOutputCommitterFactory.class;
+    } else {
+      // key is set, get the class
+      factory = conf.getClass(key,
+          FileOutputCommitterFactory.class,
+          PathOutputCommitterFactory.class);
+      LOG.debug("Using OutputCommitter factory class {} from key {}",
+          factory, key);
+    }
+    return ReflectionUtils.newInstance(factory, conf);
+  }
+
+  /**
+   * Create the committer factory for a task attempt & destination, then
+   * create the committer from it.
+   * @param outputPath the task's output path, or or null if no output path
+   * has been defined.
+   * @param context the task attempt context
+   * @return the committer to use
+   * @throws IOException problems instantiating the committer
+   */
+  public static PathOutputCommitter createCommitter(Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    return getCommitterFactory(outputPath,
+        context.getConfiguration())
+        .createOutputCommitter(outputPath, context);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 9d166c7..1e432ce 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -2043,4 +2043,26 @@
   <name>mapreduce.job.send-token-conf</name>
   <value></value>
 </property>
+
+<property>
+  <description>
+    The name of an output committer factory for MRv2 FileOutputFormat to use
+    for committing work. If set, overrides any per-filesystem committer
+    defined for the destination filesystem.
+  </description>
+  <name>mapreduce.outputcommitter.factory.class</name>
+  <value></value>
+</property>
+
+
+<property>
+  <name>mapreduce.outputcommitter.factory.scheme.s3a</name>
+  <value>org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory</value>
+  <description>
+    The committer factory to use when writing data to S3A filesystems.
+    If mapreduce.outputcommitter.factory.class is set, it will
+    override this property.
+  </description>
+</property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java
----------------------------------------------------------------------
diff --git 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java
 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java
index 9cff82f..3b73934 100644
--- 
a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java
+++ 
b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitter.java
@@ -109,14 +109,34 @@ public class TestPathOutputCommitter extends Assert {
     public void abortTask(TaskAttemptContext taskContext) throws IOException {
 
     }
+
+    @Override
+    public Path getOutputPath() {
+      return null;
+    }
   }
 
   /**
    * Stub task context.
+   * The {@link #getConfiguration()} method returns the configuration supplied
+   * in the constructor; while {@link #setOutputCommitter(OutputCommitter)}
+   * sets the committer returned in {@link #getOutputCommitter()}.
+   * Otherwise, the methods are all no-ops.
    */
-  public class TaskContext
+  public static class TaskContext
       implements TaskInputOutputContext<String, String, String, String> {
 
+    private final Configuration configuration;
+
+    public TaskContext() {
+      this(new Configuration());
+    }
+
+    public TaskContext(Configuration conf) {
+      this.configuration = conf;
+    }
+
+
     private OutputCommitter outputCommitter;
 
     public void setOutputCommitter(OutputCommitter outputCommitter) {
@@ -180,7 +200,7 @@ public class TestPathOutputCommitter extends Assert {
 
     @Override
     public Configuration getConfiguration() {
-      return null;
+      return configuration;
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to