HADOOP-13130. s3a failures can surface as RTEs, not IOEs. (Steve Loughran)

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/39ec1515
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/39ec1515
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/39ec1515

Branch: refs/heads/HDFS-1312
Commit: 39ec1515a205952eda7e171408a8b83eceb4abde
Parents: 500e946
Author: Steve Loughran <ste...@apache.org>
Authored: Sat May 21 16:39:25 2016 +0100
Committer: Steve Loughran <ste...@apache.org>
Committed: Sat May 21 16:39:31 2016 +0100

----------------------------------------------------------------------
 .../hadoop/fs/InvalidRequestException.java      |   4 +
 .../hadoop/fs/PathAccessDeniedException.java    |  12 +-
 .../apache/hadoop/fs/PathNotFoundException.java |  20 +-
 .../hadoop/fs/PathPermissionException.java      |  16 +-
 .../hadoop/fs/s3a/AWSClientIOException.java     |  50 +++
 .../apache/hadoop/fs/s3a/AWSS3IOException.java  |  61 +++
 .../hadoop/fs/s3a/AWSServiceIOException.java    |  72 +++
 .../hadoop/fs/s3a/S3AFastOutputStream.java      |  87 ++--
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 440 +++++++++++++------
 .../apache/hadoop/fs/s3a/S3AInputStream.java    |  33 +-
 .../apache/hadoop/fs/s3a/S3AOutputStream.java   |  28 +-
 .../java/org/apache/hadoop/fs/s3a/S3AUtils.java | 189 ++++++++
 .../org/apache/hadoop/fs/s3a/package-info.java  |  28 ++
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java  |  67 +++
 .../fs/s3a/TestS3AAWSCredentialsProvider.java   |  26 +-
 .../hadoop/fs/s3a/TestS3AConfiguration.java     |  11 +-
 .../hadoop/fs/s3a/TestS3AFailureHandling.java   | 194 ++++++++
 17 files changed, 1104 insertions(+), 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ec1515/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/InvalidRequestException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/InvalidRequestException.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/InvalidRequestException.java
index 437276d..081e085 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/InvalidRequestException.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/InvalidRequestException.java
@@ -29,4 +29,8 @@ public class InvalidRequestException extends IOException {
   public InvalidRequestException(String str) {
     super(str);
   }
+
+  public InvalidRequestException(String message, Throwable cause) {
+    super(message, cause);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ec1515/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathAccessDeniedException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathAccessDeniedException.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathAccessDeniedException.java
index 5277507..e0379b3 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathAccessDeniedException.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathAccessDeniedException.java
@@ -24,4 +24,14 @@ public class PathAccessDeniedException extends 
PathIOException {
   public PathAccessDeniedException(String path) {
     super(path, "Permission denied");
   }
-}
\ No newline at end of file
+
+  public PathAccessDeniedException(String path, Throwable cause) {
+    super(path, cause);
+  }
+
+  public PathAccessDeniedException(String path,
+      String error,
+      Throwable cause) {
+    super(path, error, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ec1515/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathNotFoundException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathNotFoundException.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathNotFoundException.java
index c5a0838..ae3a57f 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathNotFoundException.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathNotFoundException.java
@@ -18,12 +18,26 @@
 package org.apache.hadoop.fs;
 
 /**
- * Exception corresponding to Permission denied - ENOENT
+ * Exception corresponding to path not found: ENOENT/ENOFILE
  */
 public class PathNotFoundException extends PathIOException {
   static final long serialVersionUID = 0L;
   /** @param path for the exception */
   public PathNotFoundException(String path) {
     super(path, "No such file or directory");
-  }    
-}
\ No newline at end of file
+  }
+
+  public PathNotFoundException(String path, Throwable cause) {
+    super(path, cause);
+  }
+
+  public PathNotFoundException(String path, String error) {
+    super(path, error);
+  }
+
+  public PathNotFoundException(String path,
+      String error,
+      Throwable cause) {
+    super(path, error, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ec1515/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathPermissionException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathPermissionException.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathPermissionException.java
index 483b1de..3c3541c 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathPermissionException.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathPermissionException.java
@@ -26,4 +26,18 @@ public class PathPermissionException extends PathIOException 
{
   public PathPermissionException(String path) {
     super(path, "Operation not permitted");
   }
-}
\ No newline at end of file
+
+  public PathPermissionException(String path, Throwable cause) {
+    super(path, cause);
+  }
+
+  public PathPermissionException(String path, String error) {
+    super(path, error);
+  }
+
+  public PathPermissionException(String path,
+      String error,
+      Throwable cause) {
+    super(path, error, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ec1515/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java
new file mode 100644
index 0000000..a8c01cb
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.AmazonClientException;
+import com.google.common.base.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * IOException equivalent of an {@link AmazonClientException}.
+ */
+public class AWSClientIOException extends IOException {
+
+  private final String operation;
+
+  public AWSClientIOException(String operation,
+      AmazonClientException cause) {
+    super(cause);
+    Preconditions.checkArgument(operation != null, "Null 'operation' 
argument");
+    Preconditions.checkArgument(cause != null, "Null 'cause' argument");
+    this.operation = operation;
+  }
+
+  public AmazonClientException getCause() {
+    return (AmazonClientException) super.getCause();
+  }
+
+  @Override
+  public String getMessage() {
+    return operation + ": " + getCause().getMessage();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ec1515/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSS3IOException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSS3IOException.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSS3IOException.java
new file mode 100644
index 0000000..014d217
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSS3IOException.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import java.util.Map;
+
+/**
+ * Wrap a {@link AmazonS3Exception} as an IOE, relaying all
+ * getters.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class AWSS3IOException extends AWSServiceIOException {
+
+  /**
+   * Instantiate.
+   * @param operation operation which triggered this
+   * @param cause the underlying cause
+   */
+  public AWSS3IOException(String operation,
+      AmazonS3Exception cause) {
+    super(operation, cause);
+  }
+
+  public AmazonS3Exception getCause() {
+    return (AmazonS3Exception) super.getCause();
+  }
+
+  public String getErrorResponseXml() {
+    return getCause().getErrorResponseXml();
+  }
+
+  public Map<String, String> getAdditionalDetails() {
+    return getCause().getAdditionalDetails();
+  }
+
+  public String getExtendedRequestId() {
+    return getCause().getExtendedRequestId();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ec1515/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSServiceIOException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSServiceIOException.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSServiceIOException.java
new file mode 100644
index 0000000..a9c2c98
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSServiceIOException.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.AmazonServiceException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A specific exception from AWS operations.
+ * The exception must always be created with an {@link AmazonServiceException}.
+ * The attributes of this exception can all be directly accessed.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class AWSServiceIOException extends AWSClientIOException {
+
+  /**
+   * Instantiate.
+   * @param operation operation which triggered this
+   * @param cause the underlying cause
+   */
+  public AWSServiceIOException(String operation,
+      AmazonServiceException cause) {
+    super(operation, cause);
+  }
+
+  public AmazonServiceException getCause() {
+    return (AmazonServiceException) super.getCause();
+  }
+
+  public String getRequestId() {
+    return getCause().getRequestId();
+  }
+
+  public String getServiceName() {
+    return getCause().getServiceName();
+  }
+
+  public String getErrorCode() {
+    return getCause().getErrorCode();
+  }
+
+  public int getStatusCode() {
+    return getCause().getStatusCode();
+  }
+
+  public String getRawResponseContent() {
+    return getCause().getRawResponseContent();
+  }
+
+  public boolean isRetryable() {
+    return getCause().isRetryable();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ec1515/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
index bf0c7b9..6a59f3f 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.fs.s3a;
 
 import com.amazonaws.AmazonClientException;
-import com.amazonaws.AmazonServiceException;
 import com.amazonaws.event.ProgressEvent;
 import com.amazonaws.event.ProgressListener;
 import com.amazonaws.services.s3.AmazonS3Client;
@@ -54,6 +53,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 
 /**
  * Upload files/parts asap directly from a memory buffer (instead of buffering
@@ -152,10 +152,8 @@ public class S3AFastOutputStream extends OutputStream {
     this.executorService = 
MoreExecutors.listeningDecorator(threadPoolExecutor);
     this.multiPartUpload = null;
     this.progressListener = new ProgressableListener(progress);
-    if (LOG.isDebugEnabled()){
-      LOG.debug("Initialized S3AFastOutputStream for bucket '{}' key '{}'",
-          bucket, key);
-    }
+    LOG.debug("Initialized S3AFastOutputStream for bucket '{}' key '{}'",
+        bucket, key);
   }
 
   /**
@@ -210,15 +208,11 @@ public class S3AFastOutputStream extends OutputStream {
        requires multiple parts! */
       final byte[] allBytes = buffer.toByteArray();
       buffer = null; //earlier gc?
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Total length of initial buffer: {}", allBytes.length);
-      }
+      LOG.debug("Total length of initial buffer: {}", allBytes.length);
       int processedPos = 0;
       while ((multiPartThreshold - processedPos) >= partSize) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Initial buffer: processing from byte {} to byte {}",
-              processedPos, (processedPos + partSize - 1));
-        }
+        LOG.debug("Initial buffer: processing from byte {} to byte {}",
+            processedPos, (processedPos + partSize - 1));
         multiPartUpload.uploadPartAsync(new ByteArrayInputStream(allBytes,
             processedPos, partSize), partSize);
         processedPos += partSize;
@@ -235,7 +229,13 @@ public class S3AFastOutputStream extends OutputStream {
     }
   }
 
-
+  /**
+   * Close the stream. This will not return until the upload is complete
+   * or the attempt to perform the upload has failed.
+   * Exceptions raised in this method are indicative that the write has
+   * failed and data is at risk of being lost.
+   * @throws IOException on any failure.
+   */
   @Override
   public synchronized void close() throws IOException {
     if (closed) {
@@ -258,9 +258,7 @@ public class S3AFastOutputStream extends OutputStream {
       statistics.incrementWriteOps(1);
       // This will delete unnecessary fake parent directories
       fs.finishedWrite(key);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key);
-      }
+      LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key);
     } finally {
       buffer = null;
       super.close();
@@ -283,20 +281,14 @@ public class S3AFastOutputStream extends OutputStream {
     try {
       return new MultiPartUpload(
           client.initiateMultipartUpload(initiateMPURequest).getUploadId());
-    } catch (AmazonServiceException ase) {
-      throw new IOException("Unable to initiate MultiPartUpload (server side)" 
+
-          ": " + ase, ase);
     } catch (AmazonClientException ace) {
-      throw new IOException("Unable to initiate MultiPartUpload (client side)" 
+
-          ": " + ace, ace);
+      throw translateException("initiate MultiPartUpload", key, ace);
     }
   }
 
   private void putObject() throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Executing regular upload for bucket '{}' key '{}'", bucket,
-          key);
-    }
+    LOG.debug("Executing regular upload for bucket '{}' key '{}'",
+        bucket, key);
     final ObjectMetadata om = createDefaultMetadata();
     om.setContentLength(buffer.size());
     final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
@@ -317,10 +309,11 @@ public class S3AFastOutputStream extends OutputStream {
       LOG.warn("Interrupted object upload:" + ie, ie);
       Thread.currentThread().interrupt();
     } catch (ExecutionException ee) {
-      throw new IOException("Regular upload failed", ee.getCause());
+      throw extractException("regular upload", key, ee);
     }
   }
 
+
   private class MultiPartUpload {
     private final String uploadId;
     private final List<ListenableFuture<PartETag>> partETagsFutures;
@@ -328,13 +321,11 @@ public class S3AFastOutputStream extends OutputStream {
     public MultiPartUpload(String uploadId) {
       this.uploadId = uploadId;
       this.partETagsFutures = new ArrayList<ListenableFuture<PartETag>>();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Initiated multi-part upload for bucket '{}' key '{}' with " 
+
-            "id '{}'", bucket, key, uploadId);
-      }
+      LOG.debug("Initiated multi-part upload for bucket '{}' key '{}' with " +
+          "id '{}'", bucket, key, uploadId);
     }
 
-    public void uploadPartAsync(ByteArrayInputStream inputStream,
+    private void uploadPartAsync(ByteArrayInputStream inputStream,
         int partSize) {
       final int currentPartNumber = partETagsFutures.size() + 1;
       final UploadPartRequest request =
@@ -346,22 +337,21 @@ public class S3AFastOutputStream extends OutputStream {
           executorService.submit(new Callable<PartETag>() {
             @Override
             public PartETag call() throws Exception {
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
-                    uploadId);
-              }
+              LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
+                  uploadId);
               return client.uploadPart(request).getPartETag();
             }
           });
       partETagsFutures.add(partETagFuture);
     }
 
-    public List<PartETag> waitForAllPartUploads() throws IOException {
+    private List<PartETag> waitForAllPartUploads() throws IOException {
       try {
         return Futures.allAsList(partETagsFutures).get();
       } catch (InterruptedException ie) {
         LOG.warn("Interrupted partUpload:" + ie, ie);
         Thread.currentThread().interrupt();
+        return null;
       } catch (ExecutionException ee) {
         //there is no way of recovering so abort
         //cancel all partUploads
@@ -370,22 +360,23 @@ public class S3AFastOutputStream extends OutputStream {
         }
         //abort multipartupload
         this.abort();
-        throw new IOException("Part upload failed in multi-part upload with " +
-            "id '" +uploadId + "':" + ee, ee);
+        throw extractException("Multi-part upload with id '" + uploadId + "'",
+            key, ee);
       }
-      //should not happen?
-      return null;
     }
 
-    public void complete(List<PartETag> partETags) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Completing multi-part upload for key '{}', id '{}'", key,
-            uploadId);
+    private void complete(List<PartETag> partETags) throws IOException {
+      try {
+        LOG.debug("Completing multi-part upload for key '{}', id '{}'",
+            key, uploadId);
+        client.completeMultipartUpload(
+            new CompleteMultipartUploadRequest(bucket,
+                key,
+                uploadId,
+                partETags));
+      } catch (AmazonClientException e) {
+        throw translateException("Completing multi-part upload", key, e);
       }
-      final CompleteMultipartUploadRequest completeRequest =
-          new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags);
-      client.completeMultipartUpload(completeRequest);
-
     }
 
     public void abort() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ec1515/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 1a043f0..0eb720a 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -40,6 +40,7 @@ import com.amazonaws.auth.AWSCredentialsProviderChain;
 import com.amazonaws.auth.InstanceProfileCredentialsProvider;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.S3ClientOptions;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.CannedAccessControlList;
 import com.amazonaws.services.s3.model.DeleteObjectRequest;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
@@ -75,6 +76,7 @@ import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.VersionInfo;
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -126,85 +128,115 @@ public class S3AFileSystem extends FileSystem {
   public void initialize(URI name, Configuration conf) throws IOException {
     super.initialize(name, conf);
     setConf(conf);
-    instrumentation = new S3AInstrumentation(name);
-
-    uri = URI.create(name.getScheme() + "://" + name.getAuthority());
-    workingDir = new Path("/user", System.getProperty("user.name"))
-        .makeQualified(this.uri, this.getWorkingDirectory());
+    try {
+      instrumentation = new S3AInstrumentation(name);
+
+      uri = URI.create(name.getScheme() + "://" + name.getAuthority());
+      workingDir = new Path("/user", System.getProperty("user.name"))
+          .makeQualified(this.uri, this.getWorkingDirectory());
+
+      bucket = name.getHost();
+
+      AWSCredentialsProvider credentials =
+          getAWSCredentialsProvider(name, conf);
+
+      ClientConfiguration awsConf = new ClientConfiguration();
+      awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
+          DEFAULT_MAXIMUM_CONNECTIONS, 1));
+      boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
+          DEFAULT_SECURE_CONNECTIONS);
+      awsConf.setProtocol(secureConnections ?  Protocol.HTTPS : Protocol.HTTP);
+      awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
+          DEFAULT_MAX_ERROR_RETRIES, 0));
+      awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
+          DEFAULT_ESTABLISH_TIMEOUT, 0));
+      awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
+          DEFAULT_SOCKET_TIMEOUT, 0));
+      String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
+      if (!signerOverride.isEmpty()) {
+        LOG.debug("Signer override = {}", signerOverride);
+        awsConf.setSignerOverride(signerOverride);
+      }
 
-    bucket = name.getHost();
+      initProxySupport(conf, awsConf, secureConnections);
 
-    AWSCredentialsProvider credentials = getAWSCredentialsProvider(name, conf);
+      initUserAgent(conf, awsConf);
 
-    ClientConfiguration awsConf = new ClientConfiguration();
-    awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
-        DEFAULT_MAXIMUM_CONNECTIONS, 1));
-    boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
-        DEFAULT_SECURE_CONNECTIONS);
-    awsConf.setProtocol(secureConnections ?  Protocol.HTTPS : Protocol.HTTP);
-    awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
-        DEFAULT_MAX_ERROR_RETRIES, 0));
-    awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
-        DEFAULT_ESTABLISH_TIMEOUT, 0));
-    awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
-        DEFAULT_SOCKET_TIMEOUT, 0));
-    String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
-    if (!signerOverride.isEmpty()) {
-      LOG.debug("Signer override = {}", signerOverride);
-      awsConf.setSignerOverride(signerOverride);
-    }
+      initAmazonS3Client(conf, credentials, awsConf);
 
-    initProxySupport(conf, awsConf, secureConnections);
+      maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
+      partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
+      if (partSize < 5 * 1024 * 1024) {
+        LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
+        partSize = 5 * 1024 * 1024;
+      }
 
-    initUserAgent(conf, awsConf);
+      multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
+          DEFAULT_MIN_MULTIPART_THRESHOLD);
+      if (multiPartThreshold < 5 * 1024 * 1024) {
+        LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
+        multiPartThreshold = 5 * 1024 * 1024;
+      }
+      //check but do not store the block size
+      longOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
+      enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
 
-    initAmazonS3Client(conf, credentials, awsConf);
+      readAhead = longOption(conf, READAHEAD_RANGE, DEFAULT_READAHEAD_RANGE, 
0);
 
-    maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
-    partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
-    if (partSize < 5 * 1024 * 1024) {
-      LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
-      partSize = 5 * 1024 * 1024;
-    }
+      int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
+      if (maxThreads < 2) {
+        LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
+        maxThreads = 2;
+      }
+      int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS);
+      if (totalTasks < 1) {
+        LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1.");
+        totalTasks = 1;
+      }
+      long keepAliveTime = conf.getLong(KEEPALIVE_TIME, 
DEFAULT_KEEPALIVE_TIME);
+      threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads,
+          maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS,
+          "s3a-transfer-shared");
 
-    multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
-        DEFAULT_MIN_MULTIPART_THRESHOLD);
-    if (multiPartThreshold < 5 * 1024 * 1024) {
-      LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
-      multiPartThreshold = 5 * 1024 * 1024;
-    }
-    //check but do not store the block size
-    longOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
-    enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
-    readAhead = longOption(conf, READAHEAD_RANGE, DEFAULT_READAHEAD_RANGE, 0);
+      initTransferManager();
 
-    int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
-    if (maxThreads < 2) {
-      LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
-      maxThreads = 2;
-    }
-    int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS);
-    if (totalTasks < 1) {
-      LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1.");
-      totalTasks = 1;
-    }
-    long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
-    threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads,
-        maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS,
-        "s3a-transfer-shared");
+      initCannedAcls(conf);
 
-    initTransferManager();
+      verifyBucketExists();
 
-    initCannedAcls(conf);
+      initMultipartUploads(conf);
 
-    if (!s3.doesBucketExist(bucket)) {
-      throw new FileNotFoundException("Bucket " + bucket + " does not exist");
+      serverSideEncryptionAlgorithm =
+          conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM);
+    } catch (AmazonClientException e) {
+      throw translateException("initializing ", new Path(name), e);
     }
 
-    initMultipartUploads(conf);
-
-    serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM);
+  }
 
+  /**
+   * Verify that the bucket exists. This does not check permissions,
+   * not even read access.
+   * @throws FileNotFoundException the bucket is absent
+   * @throws IOException any other problem talking to S3
+   */
+  protected void verifyBucketExists()
+      throws FileNotFoundException, IOException {
+    try {
+      if (!s3.doesBucketExist(bucket)) {
+        throw new FileNotFoundException("Bucket " + bucket + " does not 
exist");
+      }
+    } catch (AmazonS3Exception e) {
+      // this is a sign of a serious startup problem so do dump everything
+      LOG.warn(stringify(e), e);
+      throw translateException("doesBucketExist", bucket, e);
+    } catch (AmazonServiceException e) {
+      // this is a sign of a serious startup problem so do dump everything
+      LOG.warn(stringify(e), e);
+      throw translateException("doesBucketExist", bucket, e);
+    } catch (AmazonClientException e) {
+      throw translateException("doesBucketExist", bucket, e);
+    }
   }
 
   void initProxySupport(Configuration conf, ClientConfiguration awsConf,
@@ -316,7 +348,7 @@ public class S3AFileSystem extends FileSystem {
     }
   }
 
-  private void initMultipartUploads(Configuration conf) {
+  private void initMultipartUploads(Configuration conf) throws IOException {
     boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART,
         DEFAULT_PURGE_EXISTING_MULTIPART);
     long purgeExistingMultipartAge = longOption(conf,
@@ -331,10 +363,10 @@ public class S3AFileSystem extends FileSystem {
       } catch (AmazonServiceException e) {
         if (e.getStatusCode() == 403) {
           instrumentation.errorIgnored();
-          LOG.debug("Failed to abort multipart uploads against {}," +
+          LOG.debug("Failed to purging multipart uploads against {}," +
               " FS may be read only", bucket, e);
         } else {
-          throw e;
+          throw translateException("purging multipart uploads", bucket, e);
         }
       }
     }
@@ -576,10 +608,28 @@ public class S3AFileSystem extends FileSystem {
    *
    * @param src path to be renamed
    * @param dst new path after rename
-   * @throws IOException on failure
+   * @throws IOException on IO failure
    * @return true if rename is successful
    */
   public boolean rename(Path src, Path dst) throws IOException {
+    try {
+      return innerRename(src, dst);
+    } catch (AmazonClientException e) {
+      throw translateException("rename(" + src +", " + dst + ")", src, e);
+    }
+  }
+
+  /**
+   * The inner rename operation. See {@link #rename(Path, Path)} for
+   * the description of the operation.
+   * @param src path to be renamed
+   * @param dst new path after rename
+   * @return true if rename is successful
+   * @throws IOException on IO failure.
+   * @throws AmazonClientException on failures inside the AWS SDK
+   */
+  private boolean innerRename(Path src, Path dst) throws IOException,
+      AmazonClientException {
     LOG.debug("Rename path {} to {}", src, dst);
 
     String srcKey = pathToKey(src);
@@ -722,7 +772,7 @@ public class S3AFileSystem extends FileSystem {
    *            when set to true
    */
   private void removeKeys(List<DeleteObjectsRequest.KeyVersion> keysToDelete,
-          boolean clearKeys) {
+          boolean clearKeys) throws AmazonClientException {
     if (enableMultiObjectsDelete) {
       DeleteObjectsRequest deleteRequest
           = new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
@@ -745,7 +795,9 @@ public class S3AFileSystem extends FileSystem {
     }
   }
 
-  /** Delete a file.
+  /**
+   * Delete a Path. This operation is at least {@code O(files)}, with
+   * added overheads to enumerate the path. It is also not atomic.
    *
    * @param f the path to delete.
    * @param recursive if path is a directory and set to
@@ -755,6 +807,26 @@ public class S3AFileSystem extends FileSystem {
    * @throws IOException due to inability to delete a directory or file.
    */
   public boolean delete(Path f, boolean recursive) throws IOException {
+    try {
+      return innerDelete(f, recursive);
+    } catch (AmazonClientException e) {
+      throw translateException("delete", f, e);
+    }
+  }
+
+  /**
+   * Delete a path. See {@link #delete(Path, boolean)}.
+   *
+   * @param f the path to delete.
+   * @param recursive if path is a directory and set to
+   * true, the directory is deleted else throws an exception. In
+   * case of a file the recursive can be set to either true or false.
+   * @return  true if delete is successful else false.
+   * @throws IOException due to inability to delete a directory or file.
+   * @throws AmazonClientException on failures inside the AWS SDK
+   */
+  private boolean innerDelete(Path f, boolean recursive) throws IOException,
+      AmazonClientException {
     LOG.debug("Delete path {} - recursive {}", f , recursive);
     S3AFileStatus status;
     try {
@@ -835,7 +907,8 @@ public class S3AFileSystem extends FileSystem {
     return true;
   }
 
-  private void createFakeDirectoryIfNecessary(Path f) throws IOException {
+  private void createFakeDirectoryIfNecessary(Path f)
+      throws IOException, AmazonClientException {
     String key = pathToKey(f);
     if (!key.isEmpty() && !exists(f)) {
       LOG.debug("Creating new fake directory at {}", f);
@@ -854,6 +927,25 @@ public class S3AFileSystem extends FileSystem {
    */
   public FileStatus[] listStatus(Path f) throws FileNotFoundException,
       IOException {
+    try {
+      return innerListStatus(f);
+    } catch (AmazonClientException e) {
+      throw translateException("listStatus", f, e);
+    }
+  }
+
+  /**
+   * List the statuses of the files/directories in the given path if the path 
is
+   * a directory.
+   *
+   * @param f given path
+   * @return the statuses of the files/directories in the given patch
+   * @throws FileNotFoundException when the path does not exist;
+   * @throws IOException due to an IO problem.
+   * @throws AmazonClientException on failures inside the AWS SDK
+   */
+  public FileStatus[] innerListStatus(Path f) throws FileNotFoundException,
+      IOException, AmazonClientException {
     String key = pathToKey(f);
     LOG.debug("List status for path: {}", f);
 
@@ -945,15 +1037,42 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
-   * Make the given file and all non-existent parents into
-   * directories. Has the semantics of Unix 'mkdir -p'.
+   *
+   * Make the given path and all non-existent parents into
+   * directories. Has the semantics of Unix @{code 'mkdir -p'}.
    * Existence of the directory hierarchy is not an error.
+   * @param path path to create
+   * @param permission to apply to f
+   * @return true if a directory was created
+   * @throws FileAlreadyExistsException there is a file at the path specified
+   * @throws IOException other IO problems
+   */
+  // TODO: If we have created an empty file at /foo/bar and we then call
+  // mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/?
+  public boolean mkdirs(Path path, FsPermission permission) throws IOException,
+      FileAlreadyExistsException {
+    try {
+      return innerMkdirs(path, permission);
+    } catch (AmazonClientException e) {
+      throw translateException("innerMkdirs", path, e);
+    }
+  }
+  /**
+   *
+   * Make the given path and all non-existent parents into
+   * directories.
+   * See {@link #mkdirs(Path, FsPermission)}
    * @param f path to create
    * @param permission to apply to f
+   * @return true if a directory was created
+   * @throws FileAlreadyExistsException there is a file at the path specified
+   * @throws IOException other IO problems
+   * @throws AmazonClientException on failures inside the AWS SDK
    */
   // TODO: If we have created an empty file at /foo/bar and we then call
   // mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/?
-  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+  private boolean innerMkdirs(Path f, FsPermission permission)
+      throws IOException, FileAlreadyExistsException, AmazonClientException {
     LOG.debug("Making directory: {}", f);
 
     try {
@@ -991,7 +1110,7 @@ public class S3AFileSystem extends FileSystem {
    * @param f The path we want information from
    * @return a FileStatus object
    * @throws java.io.FileNotFoundException when the path does not exist;
-   *         IOException see specific implementation
+   * @throws IOException on other problems.
    */
   public S3AFileStatus getFileStatus(Path f) throws IOException {
     String key = pathToKey(f);
@@ -1015,12 +1134,10 @@ public class S3AFileSystem extends FileSystem {
         }
       } catch (AmazonServiceException e) {
         if (e.getStatusCode() != 404) {
-          printAmazonServiceException(f.toString(), e);
-          throw e;
+          throw translateException("getFileStatus", f, e);
         }
       } catch (AmazonClientException e) {
-        printAmazonClientException(f.toString(), e);
-        throw e;
+        throw translateException("getFileStatus", f, e);
       }
 
       // Necessary?
@@ -1043,12 +1160,10 @@ public class S3AFileSystem extends FileSystem {
           }
         } catch (AmazonServiceException e) {
           if (e.getStatusCode() != 404) {
-            printAmazonServiceException(newKey, e);
-            throw e;
+            throw translateException("getFileStatus", newKey, e);
           }
         } catch (AmazonClientException e) {
-          printAmazonClientException(newKey, e);
-          throw e;
+          throw translateException("getFileStatus", newKey, e);
         }
       }
     }
@@ -1089,12 +1204,10 @@ public class S3AFileSystem extends FileSystem {
       }
     } catch (AmazonServiceException e) {
       if (e.getStatusCode() != 404) {
-        printAmazonServiceException(key, e);
-        throw e;
+        throw translateException("getFileStatus", key, e);
       }
     } catch (AmazonClientException e) {
-      printAmazonClientException(key, e);
-      throw e;
+      throw translateException("getFileStatus", key, e);
     }
 
     LOG.debug("Not Found: {}", f);
@@ -1113,10 +1226,42 @@ public class S3AFileSystem extends FileSystem {
    * @param overwrite whether to overwrite an existing file
    * @param src path
    * @param dst path
+   * @throws IOException IO problem
+   * @throws FileAlreadyExistsException the destination file exists and
+   * overwrite==false
+   * @throws AmazonClientException failure in the AWS SDK
    */
   @Override
   public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
       Path dst) throws IOException {
+    try {
+      innerCopyFromLocalFile(delSrc, overwrite, src, dst);
+    } catch (AmazonClientException e) {
+      throw translateException("copyFromLocalFile(" + src + ", " + dst + ")",
+          src, e);
+    }
+  }
+
+  /**
+   * The src file is on the local disk.  Add it to FS at
+   * the given dst name.
+   *
+   * This version doesn't need to create a temporary file to calculate the md5.
+   * Sadly this doesn't seem to be used by the shell cp :(
+   *
+   * delSrc indicates if the source should be removed
+   * @param delSrc whether to delete the src
+   * @param overwrite whether to overwrite an existing file
+   * @param src path
+   * @param dst path
+   * @throws IOException IO problem
+   * @throws FileAlreadyExistsException the destination file exists and
+   * overwrite==false
+   * @throws AmazonClientException failure in the AWS SDK
+   */
+  private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite,
+      Path src, Path dst)
+      throws IOException, FileAlreadyExistsException, AmazonClientException {
     String key = pathToKey(dst);
 
     if (!overwrite && exists(dst)) {
@@ -1166,8 +1311,12 @@ public class S3AFileSystem extends FileSystem {
     }
   }
 
+  /**
+   * Close the filesystem. This shuts down all transfers.
+   * @throws IOException IO problem
+   */
   @Override
-  public void close() throws IOException {
+  public synchronized void close() throws IOException {
     try {
       super.close();
     } finally {
@@ -1179,49 +1328,63 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
-  * Override getCanonicalServiceName because we don't support token in S3A.
-  */
+   * Override getCanonicalServiceName because we don't support token in S3A.
+   */
   @Override
   public String getCanonicalServiceName() {
     // Does not support Token
     return null;
   }
 
+  /**
+   * Copy a single object in the bucket via a COPY operation.
+   * @param srcKey source object path
+   * @param dstKey destination object path
+   * @param size object size
+   * @throws AmazonClientException on failures inside the AWS SDK
+   * @throws InterruptedIOException the operation was interrupted
+   * @throws IOException Other IO problems
+   */
   private void copyFile(String srcKey, String dstKey, long size)
-      throws IOException {
+      throws IOException, InterruptedIOException, AmazonClientException {
     LOG.debug("copyFile {} -> {} ", srcKey, dstKey);
 
-    ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
-    ObjectMetadata dstom = cloneObjectMetadata(srcom);
-    if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
-      dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm);
-    }
-    CopyObjectRequest copyObjectRequest =
-        new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
-    copyObjectRequest.setCannedAccessControlList(cannedACL);
-    copyObjectRequest.setNewObjectMetadata(dstom);
-
-    ProgressListener progressListener = new ProgressListener() {
-      public void progressChanged(ProgressEvent progressEvent) {
-        switch (progressEvent.getEventType()) {
-          case TRANSFER_PART_COMPLETED_EVENT:
-            statistics.incrementWriteOps(1);
-            break;
-          default:
-            break;
-        }
+    try {
+      ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
+      ObjectMetadata dstom = cloneObjectMetadata(srcom);
+      if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
+        dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm);
       }
-    };
+      CopyObjectRequest copyObjectRequest =
+          new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
+      copyObjectRequest.setCannedAccessControlList(cannedACL);
+      copyObjectRequest.setNewObjectMetadata(dstom);
+
+      ProgressListener progressListener = new ProgressListener() {
+        public void progressChanged(ProgressEvent progressEvent) {
+          switch (progressEvent.getEventType()) {
+            case TRANSFER_PART_COMPLETED_EVENT:
+              statistics.incrementWriteOps(1);
+              break;
+            default:
+              break;
+          }
+        }
+      };
 
-    Copy copy = transfers.copy(copyObjectRequest);
-    copy.addProgressListener(progressListener);
-    try {
-      copy.waitForCopyResult();
-      statistics.incrementWriteOps(1);
-      instrumentation.filesCopied(1, size);
-    } catch (InterruptedException e) {
-      throw new InterruptedIOException("Interrupted copying " + srcKey
-          + " to " + dstKey + ", cancelling");
+      Copy copy = transfers.copy(copyObjectRequest);
+      copy.addProgressListener(progressListener);
+      try {
+        copy.waitForCopyResult();
+        statistics.incrementWriteOps(1);
+        instrumentation.filesCopied(1, size);
+      } catch (InterruptedException e) {
+        throw new InterruptedIOException("Interrupted copying " + srcKey
+            + " to " + dstKey + ", cancelling");
+      }
+    } catch (AmazonClientException e) {
+      throw translateException("copyFile("+ srcKey+ ", " + dstKey + ")",
+          srcKey, e);
     }
   }
 
@@ -1240,11 +1403,20 @@ public class S3AFileSystem extends FileSystem {
     return date.getTime();
   }
 
-  public void finishedWrite(String key) throws IOException {
+  /**
+   * Perform post-write actions.
+   * @param key key written to
+   */
+  public void finishedWrite(String key) {
     deleteUnnecessaryFakeDirectories(keyToPath(key).getParent());
   }
 
-  private void deleteUnnecessaryFakeDirectories(Path f) throws IOException {
+  /**
+   * Delete mock parent directories which are no longer needed.
+   * This code swallows IO exceptions encountered
+   * @param f path
+   */
+  private void deleteUnnecessaryFakeDirectories(Path f) {
     while (true) {
       String key = "";
       try {
@@ -1260,7 +1432,7 @@ public class S3AFileSystem extends FileSystem {
           s3.deleteObject(bucket, key + "/");
           statistics.incrementWriteOps(1);
         }
-      } catch (FileNotFoundException | AmazonServiceException e) {
+      } catch (IOException | AmazonClientException e) {
         LOG.debug("While deleting key {} ", key, e);
         instrumentation.errorIgnored();
       }
@@ -1383,28 +1555,6 @@ public class S3AFileSystem extends FileSystem {
     return getConf().getLong(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
   }
 
-  private void printAmazonServiceException(String target,
-      AmazonServiceException ase) {
-    LOG.info("{}: caught an AmazonServiceException {}", target, ase);
-    LOG.info("This means your request made it to Amazon S3," +
-        " but was rejected with an error response for some reason.");
-    LOG.info("Error Message: {}", ase.getMessage());
-    LOG.info("HTTP Status Code: {}", ase.getStatusCode());
-    LOG.info("AWS Error Code: {}", ase.getErrorCode());
-    LOG.info("Error Type: {}", ase.getErrorType());
-    LOG.info("Request ID: {}", ase.getRequestId());
-    LOG.info("Class Name: {}", ase.getClass().getName());
-    LOG.info("Exception", ase);
-  }
-
-  private void printAmazonClientException(String target,
-      AmazonClientException ace) {
-    LOG.info("{}: caught an AmazonClientException {}", target, ace);
-    LOG.info("This means the client encountered " +
-        "a problem while trying to communicate with S3, " +
-        "such as not being able to access the network.", ace);
-  }
-
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ec1515/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 27557f8..7b5b7b3 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.s3a;
 
+import com.amazonaws.AmazonClientException;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.S3ObjectInputStream;
@@ -29,11 +30,14 @@ import org.apache.hadoop.fs.CanSetReadahead;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileSystem;
+
 import org.slf4j.Logger;
 
 import java.io.EOFException;
 import java.io.IOException;
 
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+
 /**
  * The input stream for an S3A object.
  *
@@ -112,7 +116,7 @@ public class S3AInputStream extends FSInputStream 
implements CanSetReadahead {
    * @param reason reason for reopen
    * @param targetPos target position
    * @param length length requested
-   * @throws IOException
+   * @throws IOException on any failure to open the object
    */
   private synchronized void reopen(String reason, long targetPos, long length)
       throws IOException {
@@ -126,13 +130,17 @@ public class S3AInputStream extends FSInputStream 
implements CanSetReadahead {
         uri, reason, targetPos, length, requestedStreamLen, pos, nextReadPos);
 
     streamStatistics.streamOpened();
-    GetObjectRequest request = new GetObjectRequest(bucket, key)
-        .withRange(targetPos, requestedStreamLen);
-    wrappedStream = client.getObject(request).getObjectContent();
+    try {
+      GetObjectRequest request = new GetObjectRequest(bucket, key)
+          .withRange(targetPos, requestedStreamLen);
+      wrappedStream = client.getObject(request).getObjectContent();
 
-    if (wrappedStream == null) {
-      throw new IOException("Null IO stream from reopen of (" + reason +  ") "
-          + uri);
+      if (wrappedStream == null) {
+        throw new IOException("Null IO stream from reopen of (" + reason +  ") 
"
+            + uri);
+      }
+    } catch (AmazonClientException e) {
+      throw translateException("Reopen at position " + targetPos, uri, e);
     }
 
     this.pos = targetPos;
@@ -276,10 +284,10 @@ public class S3AInputStream extends FSInputStream 
implements CanSetReadahead {
       return -1;
     }
 
-    lazySeek(nextReadPos, 1);
 
     int byteRead;
     try {
+      lazySeek(nextReadPos, 1);
       byteRead = wrappedStream.read();
     } catch (EOFException e) {
       return -1;
@@ -337,11 +345,16 @@ public class S3AInputStream extends FSInputStream 
implements CanSetReadahead {
       return -1;
     }
 
-    lazySeek(nextReadPos, len);
-    streamStatistics.readOperationStarted(nextReadPos, len);
+    try {
+      lazySeek(nextReadPos, len);
+    } catch (EOFException e) {
+      // the end of the file has moved
+      return -1;
+    }
 
     int bytesRead;
     try {
+      streamStatistics.readOperationStarted(nextReadPos, len);
       bytesRead = wrappedStream.read(buf, off, len);
     } catch (EOFException e) {
       throw e;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ec1515/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
index f9ff701..593e9e8 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.s3a;
 
+import com.amazonaws.AmazonClientException;
 import com.amazonaws.event.ProgressEvent;
 import com.amazonaws.event.ProgressEventType;
 import com.amazonaws.event.ProgressListener;
@@ -40,11 +41,13 @@ import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.io.OutputStream;
 
 import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT;
 import static 
com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT;
 import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 
 /**
  * Output stream to save data to S3.
@@ -92,13 +95,15 @@ public class S3AOutputStream extends OutputStream {
       lDirAlloc = new LocalDirAllocator("${hadoop.tmp.dir}/s3a");
     }
 
-    backupFile = lDirAlloc.createTmpFileForWrite("output-", 
LocalDirAllocator.SIZE_UNKNOWN, conf);
+    backupFile = lDirAlloc.createTmpFileForWrite("output-",
+        LocalDirAllocator.SIZE_UNKNOWN, conf);
     closed = false;
 
     LOG.debug("OutputStream for key '{}' writing to tempfile: {}",
         key, backupFile);
 
-    this.backupStream = new BufferedOutputStream(new 
FileOutputStream(backupFile));
+    this.backupStream = new BufferedOutputStream(
+        new FileOutputStream(backupFile));
   }
 
   @Override
@@ -123,7 +128,8 @@ public class S3AOutputStream extends OutputStream {
       if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
         om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
       }
-      PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, 
backupFile);
+      PutObjectRequest putObjectRequest =
+          new PutObjectRequest(bucket, key, backupFile);
       putObjectRequest.setCannedAcl(cannedACL);
       putObjectRequest.setMetadata(om);
 
@@ -135,18 +141,20 @@ public class S3AOutputStream extends OutputStream {
 
       upload.waitForUploadResult();
 
-      long delta = upload.getProgress().getBytesTransferred() - 
listener.getLastBytesTransferred();
+      long delta = upload.getProgress().getBytesTransferred() -
+          listener.getLastBytesTransferred();
       if (statistics != null && delta != 0) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("S3A write delta changed after finished: " + delta + " 
bytes");
-        }
+        LOG.debug("S3A write delta changed after finished: {} bytes", delta);
         statistics.incrementBytesWritten(delta);
       }
 
       // This will delete unnecessary fake parent directories
       fs.finishedWrite(key);
     } catch (InterruptedException e) {
-      throw new IOException(e);
+      throw (InterruptedIOException) new InterruptedIOException(e.toString())
+          .initCause(e);
+    } catch (AmazonClientException e) {
+      throw translateException("saving output", key , e);
     } finally {
       if (!backupFile.delete()) {
         LOG.warn("Could not delete temporary s3a file: {}", backupFile);
@@ -154,9 +162,7 @@ public class S3AOutputStream extends OutputStream {
       super.close();
       closed = true;
     }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("OutputStream for key '" + key + "' upload complete");
-    }
+    LOG.debug("OutputStream for key '{}' upload complete", key);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ec1515/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
new file mode 100644
index 0000000..12d14e2
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.file.AccessDeniedException;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Utility methods for S3A code.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public final class S3AUtils {
+
+  private S3AUtils() {
+  }
+
+  /**
+   * Translate an exception raised in an operation into an IOException.
+   * The specific type of IOException depends on the class of
+   * {@link AmazonClientException} passed in, and any status codes included
+   * in the operation. That is: HTTP error codes are examined and can be
+   * used to build a more specific response.
+   * @param operation operation
+   * @param path path operated on (must not be null)
+   * @param exception amazon exception raised
+   * @return an IOE which wraps the caught exception.
+   */
+  public static IOException translateException(String operation,
+      Path path,
+      AmazonClientException exception) {
+    return translateException(operation, path.toString(), exception);
+  }
+
+  /**
+   * Translate an exception raised in an operation into an IOException.
+   * The specific type of IOException depends on the class of
+   * {@link AmazonClientException} passed in, and any status codes included
+   * in the operation. That is: HTTP error codes are examined and can be
+   * used to build a more specific response.
+   * @param operation operation
+   * @param path path operated on (may be null)
+   * @param exception amazon exception raised
+   * @return an IOE which wraps the caught exception.
+   */
+  @SuppressWarnings("ThrowableInstanceNeverThrown")
+  public static IOException translateException(String operation,
+      String path,
+      AmazonClientException exception) {
+    String message = String.format("%s%s: %s",
+        operation,
+        path != null ? (" on " + path) : "",
+        exception);
+    if (!(exception instanceof AmazonServiceException)) {
+      return new AWSClientIOException(message, exception);
+    } else {
+
+      IOException ioe;
+      AmazonServiceException ase = (AmazonServiceException) exception;
+      // this exception is non-null if the service exception is an s3 one
+      AmazonS3Exception s3Exception = ase instanceof AmazonS3Exception
+          ? (AmazonS3Exception) ase
+          : null;
+      int status = ase.getStatusCode();
+      switch (status) {
+
+      // permissions
+      case 401:
+      case 403:
+        ioe = new AccessDeniedException(path, null, message);
+        ioe.initCause(ase);
+        break;
+
+      // the object isn't there
+      case 404:
+      case 410:
+        ioe = new FileNotFoundException(message);
+        ioe.initCause(ase);
+        break;
+
+      // out of range. This may happen if an object is overwritten with
+      // a shorter one while it is being read.
+      case 416:
+        ioe = new EOFException(message);
+        break;
+
+      default:
+        // no specific exit code. Choose an IOE subclass based on the class
+        // of the caught exception
+        ioe = s3Exception != null
+            ? new AWSS3IOException(message, s3Exception)
+            : new AWSServiceIOException(message, ase);
+        break;
+      }
+      return ioe;
+    }
+  }
+
+  /**
+   * Extract an exception from a failed future, and convert to an IOE.
+   * @param operation operation which failed
+   * @param path path operated on (may be null)
+   * @param ee execution exception
+   * @return an IOE which can be thrown
+   */
+  public static IOException extractException(String operation,
+      String path,
+      ExecutionException ee) {
+    IOException ioe;
+    Throwable cause = ee.getCause();
+    if (cause instanceof AmazonClientException) {
+      ioe = translateException(operation, path, (AmazonClientException) cause);
+    } else if (cause instanceof IOException) {
+      ioe = (IOException) cause;
+    } else {
+      ioe = new IOException(operation + " failed: " + cause, cause);
+    }
+    return ioe;
+  }
+
+  /**
+   * Get low level details of an amazon exception for logging; multi-line.
+   * @param e exception
+   * @return string details
+   */
+  public static String stringify(AmazonServiceException e) {
+    StringBuilder builder = new StringBuilder(
+        String.format("%s: %s error %d: %s; %s%s%n",
+            e.getErrorType(),
+            e.getServiceName(),
+            e.getStatusCode(),
+            e.getErrorCode(),
+            e.getErrorMessage(),
+            (e.isRetryable() ? " (retryable)": "")
+        ));
+    String rawResponseContent = e.getRawResponseContent();
+    if (rawResponseContent != null) {
+      builder.append(rawResponseContent);
+    }
+    return builder.toString();
+  }
+
+  /**
+   * Get low level details of an amazon exception for logging; multi-line.
+   * @param e exception
+   * @return string details
+   */
+  public static String stringify(AmazonS3Exception e) {
+    // get the low level details of an exception,
+    StringBuilder builder = new StringBuilder(
+        stringify((AmazonServiceException) e));
+    Map<String, String> details = e.getAdditionalDetails();
+    if (details != null) {
+      builder.append('\n');
+      for (Map.Entry<String, String> d : details.entrySet()) {
+        builder.append(d.getKey()).append('=')
+            .append(d.getValue()).append('\n');
+      }
+    }
+    return builder.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ec1515/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/package-info.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/package-info.java
new file mode 100644
index 0000000..a01af5c
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/package-info.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * S3A Filesystem. Except for the exceptions, it should
+ * all be hidden as implementation details.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ec1515/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 2308dd4..44bdc02 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -25,6 +25,7 @@ import org.junit.internal.AssumptionViolatedException;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.concurrent.Callable;
 
 public class S3ATestUtils {
 
@@ -72,4 +73,70 @@ public class S3ATestUtils {
     FileContext fc = FileContext.getFileContext(testURI,conf);
     return fc;
   }
+
+  /**
+   * Repeatedly attempt a callback until timeout or a {@link FailFastException}
+   * is raised. This is modeled on ScalaTests {@code eventually(Closure)} code.
+   * @param timeout timeout
+   * @param callback callback to invoke
+   * @throws FailFastException any fast-failure
+   * @throws Exception the exception which caused the iterator to fail
+   */
+  public static void eventually(int timeout, Callable<Void> callback)
+      throws Exception {
+    Exception lastException;
+    long endtime = System.currentTimeMillis() + timeout;
+    do {
+      try {
+        callback.call();
+        return;
+      } catch (FailFastException e) {
+        throw e;
+      } catch (Exception e) {
+        lastException = e;
+      }
+      Thread.sleep(500);
+    } while (endtime > System.currentTimeMillis());
+    throw lastException;
+  }
+
+  /**
+   * The exception to raise so as to exit fast from
+   * {@link #eventually(int, Callable)}.
+   */
+  public static class FailFastException extends Exception {
+    public FailFastException() {
+    }
+
+    public FailFastException(String message) {
+      super(message);
+    }
+
+    public FailFastException(String message, Throwable cause) {
+      super(message, cause);
+    }
+
+    public FailFastException(Throwable cause) {
+      super(cause);
+    }
+  }
+
+  /**
+   * Verify the class of an exception. If it is not as expected, rethrow it.
+   * Comparison is on the exact class, not subclass-of inference as
+   * offered by {@code instanceof}.
+   * @param clazz the expected exception class
+   * @param ex the exception caught
+   * @return the exception, if it is of the expected class
+   * @throws Exception the exception passed in.
+   */
+  public static Exception verifyExceptionClass(Class clazz,
+      Exception ex)
+      throws Exception {
+    if (!(ex.getClass().equals(clazz))) {
+      throw ex;
+    }
+    return ex;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ec1515/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java
index b20a768..1a11a45 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AAWSCredentialsProvider.java
@@ -23,8 +23,10 @@ import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.net.URI;
+import java.nio.file.AccessDeniedException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 
 import com.amazonaws.auth.AWSCredentials;
@@ -32,7 +34,6 @@ import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AWSCredentialsProviderChain;
 import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.auth.InstanceProfileCredentialsProvider;
-import com.amazonaws.services.s3.model.AmazonS3Exception;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,7 +50,7 @@ public class TestS3AAWSCredentialsProvider {
     Configuration conf = new Configuration();
     conf.set(AWS_CREDENTIALS_PROVIDER, "no.such.class");
     try {
-      S3ATestUtils.createTestFileSystem(conf);
+      createFailingFS(conf);
     } catch (IOException e) {
       if (!(e.getCause() instanceof ClassNotFoundException)) {
         LOG.error("Unexpected nested cause: {} in {}", e.getCause(), e, e);
@@ -58,6 +59,18 @@ public class TestS3AAWSCredentialsProvider {
     }
   }
 
+  /**
+   * Create a filesystem, expect it to fail by raising an IOException.
+   * Raises an assertion exception if in fact the FS does get instantiated.
+   * @param conf configuration
+   * @throws IOException an expected exception.
+   */
+  private void createFailingFS(Configuration conf) throws IOException {
+    S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf);
+    fs.listStatus(new Path("/"));
+    fail("Expected exception - got " + fs);
+  }
+
   static class BadCredentialsProvider implements AWSCredentialsProvider {
 
     @SuppressWarnings("unused")
@@ -79,12 +92,9 @@ public class TestS3AAWSCredentialsProvider {
     Configuration conf = new Configuration();
     conf.set(AWS_CREDENTIALS_PROVIDER, BadCredentialsProvider.class.getName());
     try {
-      S3ATestUtils.createTestFileSystem(conf);
-    } catch (AmazonS3Exception e) {
-      if (e.getStatusCode() != 403) {
-        LOG.error("Unexpected status code: {}", e.getStatusCode(), e);
-        throw e;
-      }
+      createFailingFS(conf);
+    } catch (AccessDeniedException e) {
+      // expected
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ec1515/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AConfiguration.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AConfiguration.java
index 4f3c7ae..513cae2 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AConfiguration.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AConfiguration.java
@@ -21,11 +21,9 @@ package org.apache.hadoop.fs.s3a;
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.S3ClientOptions;
-import com.amazonaws.services.s3.model.AmazonS3Exception;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.reflect.FieldUtils;
-import com.amazonaws.AmazonClientException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
@@ -43,7 +41,6 @@ import static org.junit.Assert.fail;
 
 import java.io.File;
 import java.net.URI;
-import java.lang.reflect.Field;
 
 import org.apache.hadoop.security.ProviderUtils;
 import org.apache.hadoop.security.alias.CredentialProvider;
@@ -122,7 +119,7 @@ public class TestS3AConfiguration {
     try {
       fs = S3ATestUtils.createTestFileSystem(conf);
       fail("Expected a connection error for proxy server at " + proxy);
-    } catch (AmazonClientException e) {
+    } catch (AWSClientIOException e) {
       // expected
     }
   }
@@ -155,14 +152,14 @@ public class TestS3AConfiguration {
     try {
       fs = S3ATestUtils.createTestFileSystem(conf);
       fail("Expected a connection error for proxy server");
-    } catch (AmazonClientException e) {
+    } catch (AWSClientIOException e) {
       // expected
     }
     conf.set(Constants.SECURE_CONNECTIONS, "false");
     try {
       fs = S3ATestUtils.createTestFileSystem(conf);
       fail("Expected a connection error for proxy server");
-    } catch (AmazonClientException e) {
+    } catch (AWSClientIOException e) {
       // expected
     }
   }
@@ -376,7 +373,7 @@ public class TestS3AConfiguration {
           clientOptions.isPathStyleAccess());
       byte[] file = ContractTestUtils.toAsciiByteArray("test file");
       ContractTestUtils.writeAndRead(fs, new 
Path("/path/style/access/testFile"), file, file.length, 
conf.getInt(Constants.FS_S3A_BLOCK_SIZE, file.length), false, true);
-    } catch (final AmazonS3Exception e) {
+    } catch (final AWSS3IOException e) {
       LOG.error("Caught exception: ", e);
       // Catch/pass standard path style access behaviour when live bucket
       // isn't in the same region as the s3 client default. See

http://git-wip-us.apache.org/repos/asf/hadoop/blob/39ec1515/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFailureHandling.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFailureHandling.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFailureHandling.java
new file mode 100644
index 0000000..58ac496
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFailureHandling.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
+import org.apache.hadoop.fs.contract.s3a.S3AContract;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.nio.file.AccessDeniedException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+
+/**
+ * Test S3A Failure translation, including a functional test
+ * generating errors during stream IO.
+ */
+public class TestS3AFailureHandling extends AbstractFSContractTestBase {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestS3AFailureHandling.class);
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+
+  @Test
+  public void testReadFileChanged() throws Throwable {
+    describe("overwrite a file with a shorter one during a read, seek");
+    final int fullLength = 8192;
+    final byte[] fullDataset = dataset(fullLength, 'a', 32);
+    final int shortLen = 4096;
+    final byte[] shortDataset = dataset(shortLen, 'A', 32);
+    final FileSystem fs = getFileSystem();
+    final Path testpath = path("readFileToChange.txt");
+    // initial write
+    writeDataset(fs, testpath, fullDataset, fullDataset.length, 1024, false);
+    try(FSDataInputStream instream = fs.open(testpath)) {
+      instream.seek(fullLength - 16);
+      assertTrue("no data to read", instream.read() >= 0);
+      // overwrite
+      writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, 
true);
+      // here the file length is less. Probe the file to see if this is true,
+      // with a spin and wait
+      eventually(30 *1000, new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          assertEquals(shortLen, fs.getFileStatus(testpath).getLen());
+          return null;
+        }
+      });
+      // here length is shorter. Assuming it has propagated to all replicas,
+      // the position of the input stream is now beyond the EOF.
+      // An attempt to seek backwards to a position greater than the
+      // short length will raise an exception from AWS S3, which must be
+      // translated into an EOF
+
+      instream.seek(shortLen + 1024);
+      int c = instream.read();
+      assertIsEOF("read()", c);
+
+      byte[] buf = new byte[256];
+
+      assertIsEOF("read(buffer)", instream.read(buf));
+      assertIsEOF("read(offset)",
+          instream.read(instream.getPos(), buf, 0, buf.length));
+
+      // now do a block read fully, again, backwards from the current pos
+      try {
+        instream.readFully(shortLen + 512, buf);
+        fail("Expected readFully to fail");
+      } catch (EOFException expected) {
+        LOG.debug("Expected EOF: ", expected);
+      }
+
+      assertIsEOF("read(offset)",
+          instream.read(shortLen + 510, buf, 0, buf.length));
+
+      // seek somewhere useful
+      instream.seek(shortLen - 256);
+
+      // delete the file. Reads must fail
+      fs.delete(testpath, false);
+
+      try {
+        int r = instream.read();
+        fail("Expected an exception, got " + r);
+      } catch (FileNotFoundException e) {
+        // expected
+      }
+
+      try {
+        instream.readFully(2048, buf);
+        fail("Expected readFully to fail");
+      } catch (FileNotFoundException e) {
+        // expected
+      }
+
+    }
+  }
+
+  /**
+   * Assert that a read operation returned an EOF value.
+   * @param operation specific operation
+   * @param readResult result
+   */
+  private void assertIsEOF(String operation, int readResult) {
+    assertEquals("Expected EOF from "+ operation
+        + "; got char " + (char) readResult, -1, readResult);
+  }
+
+  @Test
+  public void test404isNotFound() throws Throwable {
+    verifyTranslated(FileNotFoundException.class, createS3Exception(404));
+  }
+
+  protected Exception verifyTranslated(Class clazz,
+      AmazonClientException exception) throws Exception {
+    return verifyExceptionClass(clazz,
+        translateException("test", "/", exception));
+  }
+
+  @Test
+  public void test401isNotPermittedFound() throws Throwable {
+    verifyTranslated(AccessDeniedException.class,
+        createS3Exception(401));
+  }
+
+  protected AmazonS3Exception createS3Exception(int code) {
+    AmazonS3Exception source = new AmazonS3Exception("");
+    source.setStatusCode(code);
+    return source;
+  }
+
+  @Test
+  public void testGenericS3Exception() throws Throwable {
+    // S3 exception of no known type
+    AWSS3IOException ex = (AWSS3IOException)verifyTranslated(
+        AWSS3IOException.class,
+        createS3Exception(451));
+    assertEquals(451, ex.getStatusCode());
+  }
+
+  @Test
+  public void testGenericServiceS3Exception() throws Throwable {
+    // service exception of no known type
+    AmazonServiceException ase = new AmazonServiceException("unwind");
+    ase.setStatusCode(500);
+    AWSServiceIOException ex = (AWSServiceIOException)verifyTranslated(
+        AWSServiceIOException.class,
+        ase);
+    assertEquals(500, ex.getStatusCode());
+  }
+
+  @Test
+  public void testGenericClientException() throws Throwable {
+    // Generic Amazon exception
+    verifyTranslated(AWSClientIOException.class,
+        new AmazonClientException(""));
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to