HADOOP-13186. Multipart Uploader API. Contributed by Ewan Higgs

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/980031bb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/980031bb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/980031bb

Branch: refs/heads/HDDS-4
Commit: 980031bb043dd026a6bf42b0e71d304ac89294a5
Parents: 3905fdb
Author: Chris Douglas <cdoug...@apache.org>
Authored: Sun Jun 17 11:54:26 2018 -0700
Committer: Chris Douglas <cdoug...@apache.org>
Committed: Sun Jun 17 11:54:26 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/fs/BBPartHandle.java |  58 +++++++
 .../org/apache/hadoop/fs/BBUploadHandle.java    |  57 +++++++
 .../hadoop/fs/FileSystemMultipartUploader.java  | 132 ++++++++++++++++
 .../hadoop/fs/LocalFileSystemPathHandle.java    | 100 +++++++++++++
 .../org/apache/hadoop/fs/MultipartUploader.java |  90 +++++++++++
 .../hadoop/fs/MultipartUploaderFactory.java     |  65 ++++++++
 .../java/org/apache/hadoop/fs/PartHandle.java   |  45 ++++++
 .../apache/hadoop/fs/RawLocalFileSystem.java    |  61 +++++++-
 .../UnsupportedMultipartUploaderException.java  |  41 +++++
 .../java/org/apache/hadoop/fs/UploadHandle.java |  47 ++++++
 .../hadoop-common/src/main/proto/FSProtos.proto |   8 +
 ...rg.apache.hadoop.fs.MultipartUploaderFactory |  16 ++
 .../fs/AbstractSystemMultipartUploaderTest.java | 143 ++++++++++++++++++
 .../TestLocalFileSystemMultipartUploader.java   |  65 ++++++++
 .../AbstractContractPathHandleTest.java         |   6 +
 .../TestRawlocalContractPathHandle.java         |  40 +++++
 .../src/test/resources/contract/rawlocal.xml    |   5 +
 .../hdfs/DFSMultipartUploaderFactory.java       |  40 +++++
 ...rg.apache.hadoop.fs.MultipartUploaderFactory |  16 ++
 .../hadoop/fs/TestHDFSMultipartUploader.java    |  76 ++++++++++
 .../hadoop/fs/s3a/S3AMultipartUploader.java     | 150 +++++++++++++++++++
 ...rg.apache.hadoop.fs.MultipartUploaderFactory |  15 ++
 .../org.apache.hadoop.fs.MultipartUploader      |  16 ++
 23 files changed, 1290 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BBPartHandle.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BBPartHandle.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BBPartHandle.java
new file mode 100644
index 0000000..e1336b8
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BBPartHandle.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Byte array backed part handle.
+ */
+public final class BBPartHandle implements PartHandle {
+
+  private static final long serialVersionUID = 0x23ce3eb1;
+
+  private final byte[] bytes;
+
+  private BBPartHandle(ByteBuffer byteBuffer){
+    this.bytes = byteBuffer.array();
+  }
+
+  public static PartHandle from(ByteBuffer byteBuffer) {
+    return new BBPartHandle(byteBuffer);
+  }
+
+  @Override
+  public ByteBuffer bytes() {
+    return ByteBuffer.wrap(bytes);
+  }
+
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(bytes);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof PartHandle)) {
+      return false;
+
+    }
+    PartHandle o = (PartHandle) other;
+    return bytes().equals(o.bytes());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BBUploadHandle.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BBUploadHandle.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BBUploadHandle.java
new file mode 100644
index 0000000..6430c14
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BBUploadHandle.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+/**
+ * Byte array backed upload handle.
+ */
+public final class BBUploadHandle implements UploadHandle {
+
+  private static final long serialVersionUID = 0x69d5509b;
+
+  private final byte[] bytes;
+
+  private BBUploadHandle(ByteBuffer byteBuffer){
+    this.bytes = byteBuffer.array();
+  }
+
+  public static UploadHandle from(ByteBuffer byteBuffer) {
+    return new BBUploadHandle(byteBuffer);
+  }
+
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(bytes);
+  }
+
+  @Override
+  public ByteBuffer bytes() {
+    return ByteBuffer.wrap(bytes);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof UploadHandle)) {
+      return false;
+    }
+    UploadHandle o = (UploadHandle) other;
+    return bytes().equals(o.bytes());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java
new file mode 100644
index 0000000..b57ff3d
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import com.google.common.base.Charsets;
+import org.apache.commons.compress.utils.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * A MultipartUploader that uses the basic FileSystem commands.
+ * This is done in three stages:
+ * Init - create a temp _multipart directory.
+ * PutPart - copying the individual parts of the file to the temp directory.
+ * Complete - use {@link FileSystem#concat} to merge the files; and then delete
+ * the temp directory.
+ */
+public class FileSystemMultipartUploader extends MultipartUploader {
+
+  private final FileSystem fs;
+
+  public FileSystemMultipartUploader(FileSystem fs) {
+    this.fs = fs;
+  }
+
+  @Override
+  public UploadHandle initialize(Path filePath) throws IOException {
+    Path collectorPath = createCollectorPath(filePath);
+    fs.mkdirs(collectorPath, FsPermission.getDirDefault());
+
+    ByteBuffer byteBuffer = ByteBuffer.wrap(
+        collectorPath.toString().getBytes(Charsets.UTF_8));
+    return BBUploadHandle.from(byteBuffer);
+  }
+
+  @Override
+  public PartHandle putPart(Path filePath, InputStream inputStream,
+      int partNumber, UploadHandle uploadId, long lengthInBytes)
+      throws IOException {
+
+    byte[] uploadIdByteArray = uploadId.toByteArray();
+    Path collectorPath = new Path(new String(uploadIdByteArray, 0,
+        uploadIdByteArray.length, Charsets.UTF_8));
+    Path partPath =
+        Path.mergePaths(collectorPath, Path.mergePaths(new 
Path(Path.SEPARATOR),
+            new Path(Integer.toString(partNumber) + ".part")));
+    FSDataOutputStreamBuilder outputStream = fs.createFile(partPath);
+    FSDataOutputStream fsDataOutputStream = outputStream.build();
+    IOUtils.copy(inputStream, fsDataOutputStream, 4096);
+    fsDataOutputStream.close();
+    return BBPartHandle.from(ByteBuffer.wrap(
+        partPath.toString().getBytes(Charsets.UTF_8)));
+  }
+
+  private Path createCollectorPath(Path filePath) {
+    return Path.mergePaths(filePath.getParent(),
+        Path.mergePaths(new Path(filePath.getName().split("\\.")[0]),
+            Path.mergePaths(new Path("_multipart"),
+                new Path(Path.SEPARATOR))));
+  }
+
+  @Override
+  @SuppressWarnings("deprecation") // rename w/ OVERWRITE
+  public PathHandle complete(Path filePath,
+      List<Pair<Integer, PartHandle>> handles, UploadHandle multipartUploadId)
+      throws IOException {
+    handles.sort(Comparator.comparing(Pair::getKey));
+    List<Path> partHandles = handles
+        .stream()
+        .map(pair -> {
+          byte[] byteArray = pair.getValue().toByteArray();
+          return new Path(new String(byteArray, 0, byteArray.length,
+              Charsets.UTF_8));
+        })
+        .collect(Collectors.toList());
+
+    Path collectorPath = createCollectorPath(filePath);
+    Path filePathInsideCollector = Path.mergePaths(collectorPath,
+        new Path(Path.SEPARATOR + filePath.getName()));
+    fs.create(filePathInsideCollector).close();
+    fs.concat(filePathInsideCollector,
+        partHandles.toArray(new Path[handles.size()]));
+    fs.rename(filePathInsideCollector, filePath, Options.Rename.OVERWRITE);
+    fs.delete(collectorPath, true);
+    FileStatus status = fs.getFileStatus(filePath);
+    return fs.getPathHandle(status);
+  }
+
+  @Override
+  public void abort(Path filePath, UploadHandle uploadId) throws IOException {
+    byte[] uploadIdByteArray = uploadId.toByteArray();
+    Path collectorPath = new Path(new String(uploadIdByteArray, 0,
+        uploadIdByteArray.length, Charsets.UTF_8));
+    fs.delete(collectorPath, true);
+  }
+
+  /**
+   * Factory for creating MultipartUploaderFactory objects for file://
+   * filesystems.
+   */
+  public static class Factory extends MultipartUploaderFactory {
+    protected MultipartUploader createMultipartUploader(FileSystem fs,
+        Configuration conf) {
+      if (fs.getScheme().equals("file")) {
+        return new FileSystemMultipartUploader(fs);
+      }
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemPathHandle.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemPathHandle.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemPathHandle.java
new file mode 100644
index 0000000..a6b37b3
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystemPathHandle.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.fs.FSProtos.LocalFileSystemPathHandleProto;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Opaque handle to an entity in a FileSystem.
+ */
+public class LocalFileSystemPathHandle implements PathHandle {
+
+  private final String path;
+  private final Long mtime;
+
+  public LocalFileSystemPathHandle(String path, Optional<Long> mtime) {
+    this.path = path;
+    this.mtime = mtime.orElse(null);
+  }
+
+  public LocalFileSystemPathHandle(ByteBuffer bytes) throws IOException {
+    if (null == bytes) {
+      throw new IOException("Missing PathHandle");
+    }
+    LocalFileSystemPathHandleProto p =
+        LocalFileSystemPathHandleProto.parseFrom(ByteString.copyFrom(bytes));
+    path = p.hasPath()   ? p.getPath()  : null;
+    mtime = p.hasMtime() ? p.getMtime() : null;
+  }
+
+  public String getPath() {
+    return path;
+  }
+
+  public void verify(FileStatus stat) throws InvalidPathHandleException {
+    if (null == stat) {
+      throw new InvalidPathHandleException("Could not resolve handle");
+    }
+    if (mtime != null && mtime != stat.getModificationTime()) {
+      throw new InvalidPathHandleException("Content changed");
+    }
+  }
+
+  @Override
+  public ByteBuffer bytes() {
+    LocalFileSystemPathHandleProto.Builder b =
+        LocalFileSystemPathHandleProto.newBuilder();
+    b.setPath(path);
+    if (mtime != null) {
+      b.setMtime(mtime);
+    }
+    return b.build().toByteString().asReadOnlyByteBuffer();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    LocalFileSystemPathHandle that = (LocalFileSystemPathHandle) o;
+    return Objects.equals(path, that.path) &&
+        Objects.equals(mtime, that.mtime);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(path, mtime);
+  }
+
+  @Override
+  public String toString() {
+    return "LocalFileSystemPathHandle{" +
+        "path='" + path + '\'' +
+        ", mtime=" + mtime +
+        '}';
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
new file mode 100644
index 0000000..24a9216
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MultipartUploader is an interface for copying files multipart and across
+ * multiple nodes. Users should:
+ * 1. Initialize an upload
+ * 2. Upload parts in any order
+ * 3. Complete the upload in order to have it materialize in the destination 
FS.
+ *
+ * Implementers should make sure that the complete function should make sure
+ * that 'complete' will reorder parts if the destination FS doesn't already
+ * do it for them.
+ */
+public abstract class MultipartUploader {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(MultipartUploader.class);
+
+  /**
+   * Initialize a multipart upload.
+   * @param filePath Target path for upload.
+   * @return unique identifier associating part uploads.
+   * @throws IOException
+   */
+  public abstract UploadHandle initialize(Path filePath) throws IOException;
+
+  /**
+   * Put part as part of a multipart upload. It should be possible to have
+   * parts uploaded in any order (or in parallel).
+   * @param filePath Target path for upload (same as {@link 
#initialize(Path)}).
+   * @param inputStream Data for this part.
+   * @param partNumber Index of the part relative to others.
+   * @param uploadId Identifier from {@link #initialize(Path)}.
+   * @param lengthInBytes Target length to read from the stream.
+   * @return unique PartHandle identifier for the uploaded part.
+   * @throws IOException
+   */
+  public abstract PartHandle putPart(Path filePath, InputStream inputStream,
+      int partNumber, UploadHandle uploadId, long lengthInBytes)
+      throws IOException;
+
+  /**
+   * Complete a multipart upload.
+   * @param filePath Target path for upload (same as {@link #initialize(Path)}.
+   * @param handles Identifiers with associated part numbers from
+   *          {@link #putPart(Path, InputStream, int, UploadHandle, long)}.
+   *          Depending on the backend, the list order may be significant.
+   * @param multipartUploadId Identifier from {@link #initialize(Path)}.
+   * @return unique PathHandle identifier for the uploaded file.
+   * @throws IOException
+   */
+  public abstract PathHandle complete(Path filePath,
+      List<Pair<Integer, PartHandle>> handles, UploadHandle multipartUploadId)
+      throws IOException;
+
+  /**
+   * Aborts a multipart upload.
+   * @param filePath Target path for upload (same as {@link #initialize(Path)}.
+   * @param multipartuploadId Identifier from {@link #initialize(Path)}.
+   * @throws IOException
+   */
+  public abstract void abort(Path filePath, UploadHandle multipartuploadId)
+      throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java
new file mode 100644
index 0000000..b0fa798
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploaderFactory.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.ServiceLoader;
+
+/**
+ * {@link ServiceLoader}-driven uploader API for storage services supporting
+ * multipart uploads.
+ */
+public abstract class MultipartUploaderFactory {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(MultipartUploaderFactory.class);
+
+  /**
+   * Multipart Uploaders listed as services.
+   */
+  private static ServiceLoader<MultipartUploaderFactory> serviceLoader =
+      ServiceLoader.load(MultipartUploaderFactory.class,
+          MultipartUploaderFactory.class.getClassLoader());
+
+  // Iterate through the serviceLoader to avoid lazy loading.
+  // Lazy loading would require synchronization in concurrent use cases.
+  static {
+    Iterator<MultipartUploaderFactory> iterServices = serviceLoader.iterator();
+    while (iterServices.hasNext()) {
+      iterServices.next();
+    }
+  }
+
+  public static MultipartUploader get(FileSystem fs, Configuration conf)
+      throws IOException {
+    MultipartUploader mpu = null;
+    for (MultipartUploaderFactory factory : serviceLoader) {
+      mpu = factory.createMultipartUploader(fs, conf);
+      if (mpu != null) {
+        break;
+      }
+    }
+    return mpu;
+  }
+
+  protected abstract MultipartUploader createMultipartUploader(FileSystem fs,
+      Configuration conf) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java
new file mode 100644
index 0000000..df70b74
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+/**
+ * Opaque, serializable reference to an part id for multipart uploads.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface PartHandle extends Serializable {
+  /**
+   * @return Serialized from in bytes.
+   */
+  default byte[] toByteArray() {
+    ByteBuffer bb = bytes();
+    byte[] ret = new byte[bb.remaining()];
+    bb.get(ret);
+    return ret;
+  }
+
+  ByteBuffer bytes();
+
+  @Override
+  boolean equals(Object other);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
index c0f8199..bd003ae 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
@@ -40,6 +40,7 @@ import java.nio.file.attribute.BasicFileAttributeView;
 import java.nio.file.attribute.FileTime;
 import java.util.Arrays;
 import java.util.EnumSet;
+import java.util.Optional;
 import java.util.StringTokenizer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -212,7 +213,19 @@ public class RawLocalFileSystem extends FileSystem {
     return new FSDataInputStream(new BufferedFSInputStream(
         new LocalFSFileInputStream(f), bufferSize));
   }
-  
+
+  @Override
+  public FSDataInputStream open(PathHandle fd, int bufferSize)
+      throws IOException {
+    if (!(fd instanceof LocalFileSystemPathHandle)) {
+      fd = new LocalFileSystemPathHandle(fd.bytes());
+    }
+    LocalFileSystemPathHandle id = (LocalFileSystemPathHandle) fd;
+    id.verify(getFileStatus(new Path(id.getPath())));
+    return new FSDataInputStream(new BufferedFSInputStream(
+        new LocalFSFileInputStream(new Path(id.getPath())), bufferSize));
+  }
+
   /*********************************************************
    * For create()'s FSOutputStream.
    *********************************************************/
@@ -246,7 +259,7 @@ public class RawLocalFileSystem extends FileSystem {
         }
       }
     }
-    
+
     /*
      * Just forward to the fos
      */
@@ -351,6 +364,18 @@ public class RawLocalFileSystem extends FileSystem {
   }
 
   @Override
+  public void concat(final Path trg, final Path [] psrcs) throws IOException {
+    final int bufferSize = 4096;
+    try(FSDataOutputStream out = create(trg)) {
+      for (Path src : psrcs) {
+        try(FSDataInputStream in = open(src)) {
+          IOUtils.copyBytes(in, out, bufferSize, false);
+        }
+      }
+    }
+  }
+
+  @Override
   public boolean rename(Path src, Path dst) throws IOException {
     // Attempt rename using Java API.
     File srcFile = pathToFile(src);
@@ -863,6 +888,38 @@ public class RawLocalFileSystem extends FileSystem {
     }
   }
 
+  /**
+   * Hook to implement support for {@link PathHandle} operations.
+   * @param stat Referent in the target FileSystem
+   * @param opts Constraints that determine the validity of the
+   *            {@link PathHandle} reference.
+   */
+  protected PathHandle createPathHandle(FileStatus stat,
+      Options.HandleOpt... opts) {
+    if (stat.isDirectory() || stat.isSymlink()) {
+      throw new IllegalArgumentException("PathHandle only available for 
files");
+    }
+    String authority = stat.getPath().toUri().getAuthority();
+    if (authority != null && !authority.equals("file://")) {
+      throw new IllegalArgumentException("Wrong FileSystem: " + 
stat.getPath());
+    }
+    Options.HandleOpt.Data data =
+        Options.HandleOpt.getOpt(Options.HandleOpt.Data.class, opts)
+            .orElse(Options.HandleOpt.changed(false));
+    Options.HandleOpt.Location loc =
+        Options.HandleOpt.getOpt(Options.HandleOpt.Location.class, opts)
+            .orElse(Options.HandleOpt.moved(false));
+    if (loc.allowChange()) {
+      throw new UnsupportedOperationException("Tracking file movement in " +
+          "basic FileSystem is not supported");
+    }
+    final Path p = stat.getPath();
+    final Optional<Long> mtime = !data.allowChange()
+        ? Optional.of(stat.getModificationTime())
+        : Optional.empty();
+    return new LocalFileSystemPathHandle(p.toString(), mtime);
+  }
+
   @Override
   public boolean supportsSymlinks() {
     return true;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UnsupportedMultipartUploaderException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UnsupportedMultipartUploaderException.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UnsupportedMultipartUploaderException.java
new file mode 100644
index 0000000..5606a80
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UnsupportedMultipartUploaderException.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * MultipartUploader for a given file system name/scheme is not supported.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class UnsupportedMultipartUploaderException extends IOException {
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * Constructs exception with the specified detail message.
+   *
+   * @param message exception message.
+   */
+  public UnsupportedMultipartUploaderException(final String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UploadHandle.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UploadHandle.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UploadHandle.java
new file mode 100644
index 0000000..143b4d1
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/UploadHandle.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+
+/**
+ * Opaque, serializable reference to an uploadId for multipart uploads.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface UploadHandle extends Serializable {
+
+  /**
+   * @return Serialized from in bytes.
+   */
+  default byte[] toByteArray() {
+    ByteBuffer bb = bytes();
+    byte[] ret = new byte[bb.remaining()];
+    bb.get(ret);
+    return ret;
+  }
+
+  ByteBuffer bytes();
+
+  @Override
+  boolean equals(Object other);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-common-project/hadoop-common/src/main/proto/FSProtos.proto
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/proto/FSProtos.proto 
b/hadoop-common-project/hadoop-common/src/main/proto/FSProtos.proto
index 5b8c45d..c3b768a 100644
--- a/hadoop-common-project/hadoop-common/src/main/proto/FSProtos.proto
+++ b/hadoop-common-project/hadoop-common/src/main/proto/FSProtos.proto
@@ -68,3 +68,11 @@ message FileStatusProto {
   optional bytes ec_data                = 17;
   optional uint32 flags                 = 18 [default = 0];
 }
+
+/**
+ * Placeholder type for consistent basic FileSystem operations.
+ */
+message LocalFileSystemPathHandleProto {
+    optional uint64 mtime = 1;
+    optional string path = 2;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory
 
b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory
new file mode 100644
index 0000000..f0054fe
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.fs.FileSystemMultipartUploader$Factory

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java
new file mode 100644
index 0000000..f132089
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
+
+import org.junit.Test;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public abstract class AbstractSystemMultipartUploaderTest {
+
+  abstract FileSystem getFS() throws IOException;
+
+  abstract Path getBaseTestPath();
+
+  @Test
+  public void testMultipartUpload() throws Exception {
+    FileSystem fs = getFS();
+    Path file = new Path(getBaseTestPath(), "some-file");
+    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+    UploadHandle uploadHandle = mpu.initialize(file);
+    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
+    StringBuilder sb = new StringBuilder();
+    for (int i = 1; i <= 100; ++i) {
+      String contents = "ThisIsPart" + i + "\n";
+      sb.append(contents);
+      int len = contents.getBytes().length;
+      InputStream is = IOUtils.toInputStream(contents, "UTF-8");
+      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len);
+      partHandles.add(Pair.of(i, partHandle));
+    }
+    PathHandle fd = mpu.complete(file, partHandles, uploadHandle);
+    byte[] fdData = IOUtils.toByteArray(fs.open(fd));
+    byte[] fileData = IOUtils.toByteArray(fs.open(file));
+    String readString = new String(fdData);
+    assertEquals(sb.toString(), readString);
+    assertArrayEquals(fdData, fileData);
+  }
+
+  @Test
+  public void testMultipartUploadReverseOrder() throws Exception {
+    FileSystem fs = getFS();
+    Path file = new Path(getBaseTestPath(), "some-file");
+    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+    UploadHandle uploadHandle = mpu.initialize(file);
+    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
+    StringBuilder sb = new StringBuilder();
+    for (int i = 1; i <= 100; ++i) {
+      String contents = "ThisIsPart" + i + "\n";
+      sb.append(contents);
+    }
+    for (int i = 100; i > 0; --i) {
+      String contents = "ThisIsPart" + i + "\n";
+      int len = contents.getBytes().length;
+      InputStream is = IOUtils.toInputStream(contents, "UTF-8");
+      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len);
+      partHandles.add(Pair.of(i, partHandle));
+    }
+    PathHandle fd = mpu.complete(file, partHandles, uploadHandle);
+    byte[] fdData = IOUtils.toByteArray(fs.open(fd));
+    byte[] fileData = IOUtils.toByteArray(fs.open(file));
+    String readString = new String(fdData);
+    assertEquals(sb.toString(), readString);
+    assertArrayEquals(fdData, fileData);
+  }
+
+  @Test
+  public void testMultipartUploadReverseOrderNoNContiguousPartNumbers()
+      throws Exception {
+    FileSystem fs = getFS();
+    Path file = new Path(getBaseTestPath(), "some-file");
+    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+    UploadHandle uploadHandle = mpu.initialize(file);
+    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
+    StringBuilder sb = new StringBuilder();
+    for (int i = 2; i <= 200; i += 2) {
+      String contents = "ThisIsPart" + i + "\n";
+      sb.append(contents);
+    }
+    for (int i = 200; i > 0; i -= 2) {
+      String contents = "ThisIsPart" + i + "\n";
+      int len = contents.getBytes().length;
+      InputStream is = IOUtils.toInputStream(contents, "UTF-8");
+      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len);
+      partHandles.add(Pair.of(i, partHandle));
+    }
+    PathHandle fd = mpu.complete(file, partHandles, uploadHandle);
+    byte[] fdData = IOUtils.toByteArray(fs.open(fd));
+    byte[] fileData = IOUtils.toByteArray(fs.open(file));
+    String readString = new String(fdData);
+    assertEquals(sb.toString(), readString);
+    assertArrayEquals(fdData, fileData);
+  }
+
+  @Test
+  public void testMultipartUploadAbort() throws Exception {
+    FileSystem fs = getFS();
+    Path file = new Path(getBaseTestPath(), "some-file");
+    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+    UploadHandle uploadHandle = mpu.initialize(file);
+    for (int i = 100; i >= 50; --i) {
+      String contents = "ThisIsPart" + i + "\n";
+      int len = contents.getBytes().length;
+      InputStream is = IOUtils.toInputStream(contents, "UTF-8");
+      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len);
+    }
+    mpu.abort(file, uploadHandle);
+
+    String contents = "ThisIsPart49\n";
+    int len = contents.getBytes().length;
+    InputStream is = IOUtils.toInputStream(contents, "UTF-8");
+
+    try {
+      mpu.putPart(file, is, 49, uploadHandle, len);
+      fail("putPart should have thrown an exception");
+    } catch (IOException ok) {
+      // ignore
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java
new file mode 100644
index 0000000..21d01b6
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.conf.Configuration;
+import static org.apache.hadoop.test.GenericTestUtils.getRandomizedTestDir;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Test the FileSystemMultipartUploader on local file system.
+ */
+public class TestLocalFileSystemMultipartUploader
+    extends AbstractSystemMultipartUploaderTest {
+
+  private static FileSystem fs;
+  private File tmp;
+
+  @BeforeClass
+  public static void init() throws IOException {
+    fs = LocalFileSystem.getLocal(new Configuration());
+  }
+
+  @Before
+  public void setup() throws IOException {
+    tmp = getRandomizedTestDir();
+    tmp.mkdirs();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    tmp.delete();
+  }
+
+  @Override
+  public FileSystem getFS() {
+    return fs;
+  }
+
+  @Override
+  public Path getBaseTestPath() {
+    return new Path(tmp.getAbsolutePath());
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractPathHandleTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractPathHandleTest.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractPathHandleTest.java
index fbe28c3..36cfa6c 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractPathHandleTest.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractPathHandleTest.java
@@ -123,6 +123,12 @@ public abstract class AbstractContractPathHandleTest
     HandleOpt.Data data = HandleOpt.getOpt(HandleOpt.Data.class, opts)
         .orElseThrow(IllegalArgumentException::new);
     FileStatus stat = testFile(B1);
+    try {
+      // Temporary workaround while RawLocalFS supports only second precision
+      Thread.sleep(1000);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
     // modify the file by appending data
     appendFile(getFileSystem(), stat.getPath(), B2);
     byte[] b12 = Arrays.copyOf(B1, B1.length + B2.length);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawlocalContractPathHandle.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawlocalContractPathHandle.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawlocalContractPathHandle.java
new file mode 100644
index 0000000..3c088d2
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/rawlocal/TestRawlocalContractPathHandle.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.fs.contract.rawlocal;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.contract.AbstractContractPathHandleTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.localfs.LocalFSContract;
+import org.apache.hadoop.fs.contract.rawlocal.RawlocalFSContract;
+
+public class TestRawlocalContractPathHandle
+    extends AbstractContractPathHandleTest {
+
+  public TestRawlocalContractPathHandle(String testname,
+      Options.HandleOpt[] opts, boolean serialized) {
+    super(testname, opts, serialized);
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new RawlocalFSContract(conf);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-common-project/hadoop-common/src/test/resources/contract/rawlocal.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/resources/contract/rawlocal.xml 
b/hadoop-common-project/hadoop-common/src/test/resources/contract/rawlocal.xml
index a0d1d21..8cbd4a0 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/resources/contract/rawlocal.xml
+++ 
b/hadoop-common-project/hadoop-common/src/test/resources/contract/rawlocal.xml
@@ -122,4 +122,9 @@
     <value>true</value>
   </property>
 
+  <property>
+    <name>fs.contract.supports-content-check</name>
+    <value>true</value>
+  </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSMultipartUploaderFactory.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSMultipartUploaderFactory.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSMultipartUploaderFactory.java
new file mode 100644
index 0000000..e9959c1
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSMultipartUploaderFactory.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemMultipartUploader;
+import org.apache.hadoop.fs.MultipartUploader;
+import org.apache.hadoop.fs.MultipartUploaderFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+
+/**
+ * Support for HDFS multipart uploads, built on
+ * {@link FileSystem#concat(Path, Path[])}.
+ */
+public class DFSMultipartUploaderFactory extends MultipartUploaderFactory {
+  protected MultipartUploader createMultipartUploader(FileSystem fs,
+      Configuration conf) {
+    if (fs.getScheme().equals(HdfsConstants.HDFS_URI_SCHEME)) {
+      return new FileSystemMultipartUploader(fs);
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-hdfs-project/hadoop-hdfs-client/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory
new file mode 100644
index 0000000..b153fd9
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.hdfs.DFSMultipartUploaderFactory

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java
new file mode 100644
index 0000000..96c5093
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+
+public class TestHDFSMultipartUploader
+    extends AbstractSystemMultipartUploaderTest {
+
+  private static MiniDFSCluster cluster;
+  private Path tmp;
+
+  @Rule
+  public TestName name = new TestName();
+
+  @BeforeClass
+  public static void init() throws IOException {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    cluster = new MiniDFSCluster.Builder(conf,
+          GenericTestUtils.getRandomizedTestDir())
+        .numDataNodes(1)
+        .build();
+    cluster.waitClusterUp();
+  }
+
+  @AfterClass
+  public static void cleanup() throws IOException {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Before
+  public void setup() throws IOException {
+    tmp = new Path(cluster.getFileSystem().getWorkingDirectory(),
+        name.getMethodName());
+    cluster.getFileSystem().mkdirs(tmp);
+  }
+
+  @Override
+  public FileSystem getFS() throws IOException {
+    return cluster.getFileSystem();
+  }
+
+  @Override
+  public Path getBaseTestPath() {
+    return tmp;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java
new file mode 100644
index 0000000..34c88d4
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
+import com.google.common.base.Charsets;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BBPartHandle;
+import org.apache.hadoop.fs.BBUploadHandle;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.MultipartUploader;
+import org.apache.hadoop.fs.MultipartUploaderFactory;
+import org.apache.hadoop.fs.PartHandle;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathHandle;
+import org.apache.hadoop.fs.UploadHandle;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * MultipartUploader for S3AFileSystem. This uses the S3 multipart
+ * upload mechanism.
+ */
+public class S3AMultipartUploader extends MultipartUploader {
+
+  private final S3AFileSystem s3a;
+
+  public S3AMultipartUploader(FileSystem fs, Configuration conf) {
+    if (!(fs instanceof S3AFileSystem)) {
+      throw new IllegalArgumentException(
+          "S3A MultipartUploads must use S3AFileSystem");
+    }
+    s3a = (S3AFileSystem) fs;
+  }
+
+  @Override
+  public UploadHandle initialize(Path filePath) throws IOException {
+    String key = s3a.pathToKey(filePath);
+    InitiateMultipartUploadRequest request =
+        new InitiateMultipartUploadRequest(s3a.getBucket(), key);
+    LOG.debug("initialize request: {}", request);
+    InitiateMultipartUploadResult result = 
s3a.initiateMultipartUpload(request);
+    String uploadId = result.getUploadId();
+    return BBUploadHandle.from(ByteBuffer.wrap(
+        uploadId.getBytes(Charsets.UTF_8)));
+  }
+
+  @Override
+  public PartHandle putPart(Path filePath, InputStream inputStream,
+      int partNumber, UploadHandle uploadId, long lengthInBytes) {
+    String key = s3a.pathToKey(filePath);
+    UploadPartRequest request = new UploadPartRequest();
+    byte[] uploadIdBytes = uploadId.toByteArray();
+    request.setUploadId(new String(uploadIdBytes, 0, uploadIdBytes.length,
+        Charsets.UTF_8));
+    request.setInputStream(inputStream);
+    request.setPartSize(lengthInBytes);
+    request.setPartNumber(partNumber);
+    request.setBucketName(s3a.getBucket());
+    request.setKey(key);
+    LOG.debug("putPart request: {}", request);
+    UploadPartResult result = s3a.uploadPart(request);
+    String eTag = result.getETag();
+    return BBPartHandle.from(ByteBuffer.wrap(eTag.getBytes(Charsets.UTF_8)));
+  }
+
+  @Override
+  public PathHandle complete(Path filePath,
+      List<Pair<Integer, PartHandle>> handles, UploadHandle uploadId) {
+    String key = s3a.pathToKey(filePath);
+    CompleteMultipartUploadRequest request =
+        new CompleteMultipartUploadRequest();
+    request.setBucketName(s3a.getBucket());
+    request.setKey(key);
+    byte[] uploadIdBytes = uploadId.toByteArray();
+    request.setUploadId(new String(uploadIdBytes, 0, uploadIdBytes.length,
+        Charsets.UTF_8));
+    List<PartETag> eTags = handles
+        .stream()
+        .map(handle -> {
+          byte[] partEtagBytes = handle.getRight().toByteArray();
+          return new PartETag(handle.getLeft(),
+              new String(partEtagBytes, 0, partEtagBytes.length,
+                  Charsets.UTF_8));
+        })
+        .collect(Collectors.toList());
+    request.setPartETags(eTags);
+    LOG.debug("Complete request: {}", request);
+    CompleteMultipartUploadResult completeMultipartUploadResult =
+        s3a.getAmazonS3Client().completeMultipartUpload(request);
+
+    byte[] eTag = DFSUtilClient.string2Bytes(
+        completeMultipartUploadResult.getETag());
+    return (PathHandle) () -> ByteBuffer.wrap(eTag);
+  }
+
+  @Override
+  public void abort(Path filePath, UploadHandle uploadId) {
+    String key = s3a.pathToKey(filePath);
+    byte[] uploadIdBytes = uploadId.toByteArray();
+    String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length,
+        Charsets.UTF_8);
+    AbortMultipartUploadRequest request = new AbortMultipartUploadRequest(s3a
+        .getBucket(), key, uploadIdString);
+    LOG.debug("Abort request: {}", request);
+    s3a.getAmazonS3Client().abortMultipartUpload(request);
+  }
+
+  /**
+   * Factory for creating MultipartUploader objects for s3a:// FileSystems.
+   */
+  public static class Factory extends MultipartUploaderFactory {
+    @Override
+    protected MultipartUploader createMultipartUploader(FileSystem fs,
+        Configuration conf) {
+      if (fs.getScheme().equals("s3a")) {
+        return new S3AMultipartUploader(fs, conf);
+      }
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-tools/hadoop-aws/src/main/resources/META-INF/org.apache.hadoop.fs.MultipartUploaderFactory
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/org.apache.hadoop.fs.MultipartUploaderFactory
 
b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/org.apache.hadoop.fs.MultipartUploaderFactory
new file mode 100644
index 0000000..2e4bc24
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/org.apache.hadoop.fs.MultipartUploaderFactory
@@ -0,0 +1,15 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.hadoop.fs.s3a.S3AMultipartUploader$Factory

http://git-wip-us.apache.org/repos/asf/hadoop/blob/980031bb/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploader
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploader
 
b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploader
new file mode 100644
index 0000000..d16846b
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploader
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.fs.s3a.S3AMultipartUploader


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to