http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java
new file mode 100644
index 0000000..b0b8a65
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.util.StopWatch;
+import org.junit.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Basic unit test for S3A's blocking executor service.
+ */
+public class ITestBlockingThreadPoolExecutorService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      BlockingThreadPoolExecutorService.class);
+
+  private static final int NUM_ACTIVE_TASKS = 4;
+  private static final int NUM_WAITING_TASKS = 2;
+  private static final int TASK_SLEEP_MSEC = 100;
+  private static final int SHUTDOWN_WAIT_MSEC = 200;
+  private static final int SHUTDOWN_WAIT_TRIES = 5;
+  private static final int BLOCKING_THRESHOLD_MSEC = 50;
+
+  private static final Integer SOME_VALUE = 1337;
+
+  private static BlockingThreadPoolExecutorService tpe = null;
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    ensureDestroyed();
+  }
+
+  /**
+   * Basic test of running one trivial task.
+   */
+  @Test
+  public void testSubmitCallable() throws Exception {
+    ensureCreated();
+    ListenableFuture<Integer> f = tpe.submit(callableSleeper);
+    Integer v = f.get();
+    assertEquals(SOME_VALUE, v);
+  }
+
+  /**
+   * More involved test, including detecting blocking when at capacity.
+   */
+  @Test
+  public void testSubmitRunnable() throws Exception {
+    ensureCreated();
+    int totalTasks = NUM_ACTIVE_TASKS + NUM_WAITING_TASKS;
+    StopWatch stopWatch = new StopWatch().start();
+    for (int i = 0; i < totalTasks; i++) {
+      tpe.submit(sleeper);
+      assertDidntBlock(stopWatch);
+    }
+    tpe.submit(sleeper);
+    assertDidBlock(stopWatch);
+  }
+
+  @Test
+  public void testShutdown() throws Exception {
+    // Cover create / destroy, regardless of when this test case runs
+    ensureCreated();
+    ensureDestroyed();
+
+    // Cover create, execute, destroy, regardless of when test case runs
+    ensureCreated();
+    testSubmitRunnable();
+    ensureDestroyed();
+  }
+
+  // Helper functions, etc.
+
+  private void assertDidntBlock(StopWatch sw) {
+    try {
+      assertFalse("Non-blocking call took too long.",
+          sw.now(TimeUnit.MILLISECONDS) > BLOCKING_THRESHOLD_MSEC);
+    } finally {
+      sw.reset().start();
+    }
+  }
+
+  private void assertDidBlock(StopWatch sw) {
+    try {
+      if (sw.now(TimeUnit.MILLISECONDS) < BLOCKING_THRESHOLD_MSEC) {
+        throw new RuntimeException("Blocking call returned too fast.");
+      }
+    } finally {
+      sw.reset().start();
+    }
+  }
+
+  private Runnable sleeper = new Runnable() {
+    @Override
+    public void run() {
+      String name = Thread.currentThread().getName();
+      try {
+        Thread.sleep(TASK_SLEEP_MSEC);
+      } catch (InterruptedException e) {
+        LOG.info("Thread {} interrupted.", name);
+        Thread.currentThread().interrupt();
+      }
+    }
+  };
+
+  private Callable<Integer> callableSleeper = new Callable<Integer>() {
+    @Override
+    public Integer call() throws Exception {
+      sleeper.run();
+      return SOME_VALUE;
+    }
+  };
+
+  /**
+   * Helper function to create thread pool under test.
+   */
+  private static void ensureCreated() throws Exception {
+    if (tpe == null) {
+      LOG.debug("Creating thread pool");
+      tpe = new BlockingThreadPoolExecutorService(NUM_ACTIVE_TASKS,
+          NUM_WAITING_TASKS, 1, TimeUnit.SECONDS, "btpetest");
+    }
+  }
+
+  /**
+   * Helper function to terminate thread pool under test, asserting that
+   * shutdown -> terminate works as expected.
+   */
+  private static void ensureDestroyed() throws Exception {
+    if (tpe == null) {
+      return;
+    }
+    int shutdownTries = SHUTDOWN_WAIT_TRIES;
+
+    tpe.shutdown();
+    if (!tpe.isShutdown()) {
+      throw new RuntimeException("Shutdown had no effect.");
+    }
+
+    while (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC,
+        TimeUnit.MILLISECONDS)) {
+      LOG.info("Waiting for thread pool shutdown.");
+      if (shutdownTries-- <= 0) {
+        LOG.error("Failed to terminate thread pool gracefully.");
+        break;
+      }
+    }
+    if (!tpe.isTerminated()) {
+      tpe.shutdownNow();
+      if (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC,
+          TimeUnit.MILLISECONDS)) {
+        throw new RuntimeException(
+            "Failed to terminate thread pool in timely manner.");
+      }
+    }
+    tpe = null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAWSCredentialsProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAWSCredentialsProvider.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAWSCredentialsProvider.java
new file mode 100644
index 0000000..cf8783c
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAWSCredentialsProvider.java
@@ -0,0 +1,250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.net.URI;
+import java.nio.file.AccessDeniedException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.Timeout;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.auth.AWSCredentialsProviderChain;
+import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.auth.InstanceProfileCredentialsProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.junit.Assert.*;
+
+/**
+ * Tests for {@link Constants#AWS_CREDENTIALS_PROVIDER} logic.
+ *
+ */
+public class ITestS3AAWSCredentialsProvider {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3AAWSCredentialsProvider.class);
+
+  @Rule
+  public Timeout testTimeout = new Timeout(1 * 60 * 1000);
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  /**
+   * Declare what exception to raise, and the text which must be found
+   * in it.
+   * @param exceptionClass class of exception
+   * @param text text in exception
+   */
+  private void expectException(Class<? extends Throwable> exceptionClass,
+      String text) {
+    exception.expect(exceptionClass);
+    exception.expectMessage(text);
+  }
+
+  @Test
+  public void testBadConfiguration() throws IOException {
+    Configuration conf = new Configuration();
+    conf.set(AWS_CREDENTIALS_PROVIDER, "no.such.class");
+    try {
+      createFailingFS(conf);
+    } catch (IOException e) {
+      if (!(e.getCause() instanceof ClassNotFoundException)) {
+        LOG.error("Unexpected nested cause: {} in {}", e.getCause(), e, e);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * Create a filesystem, expect it to fail by raising an IOException.
+   * Raises an assertion exception if in fact the FS does get instantiated.
+   * @param conf configuration
+   * @throws IOException an expected exception.
+   */
+  private void createFailingFS(Configuration conf) throws IOException {
+    S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf);
+    fs.listStatus(new Path("/"));
+    fail("Expected exception - got " + fs);
+  }
+
+  static class BadCredentialsProvider implements AWSCredentialsProvider {
+
+    @SuppressWarnings("unused")
+    public BadCredentialsProvider(URI name, Configuration conf) {
+    }
+
+    @Override
+    public AWSCredentials getCredentials() {
+      return new BasicAWSCredentials("bad_key", "bad_secret");
+    }
+
+    @Override
+    public void refresh() {
+    }
+  }
+
+  @Test
+  public void testBadCredentials() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(AWS_CREDENTIALS_PROVIDER, BadCredentialsProvider.class.getName());
+    try {
+      createFailingFS(conf);
+    } catch (AccessDeniedException e) {
+      // expected
+    }
+  }
+
+  static class GoodCredentialsProvider extends AWSCredentialsProviderChain {
+
+    @SuppressWarnings("unused")
+    public GoodCredentialsProvider(URI name, Configuration conf) {
+      super(new BasicAWSCredentialsProvider(conf.get(ACCESS_KEY),
+          conf.get(SECRET_KEY)), new InstanceProfileCredentialsProvider());
+    }
+  }
+
+  @Test
+  public void testGoodProvider() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(AWS_CREDENTIALS_PROVIDER, 
GoodCredentialsProvider.class.getName());
+    S3ATestUtils.createTestFileSystem(conf);
+  }
+
+  @Test
+  public void testAnonymousProvider() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(AWS_CREDENTIALS_PROVIDER,
+        AnonymousAWSCredentialsProvider.class.getName());
+    Path testFile = new Path(
+        conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE));
+    S3ATestUtils.useCSVDataEndpoint(conf);
+    FileSystem fs = FileSystem.newInstance(testFile.toUri(), conf);
+    assertNotNull(fs);
+    assertTrue(fs instanceof S3AFileSystem);
+    FileStatus stat = fs.getFileStatus(testFile);
+    assertNotNull(stat);
+    assertEquals(testFile, stat.getPath());
+  }
+
+  /**
+   * A credential provider whose constructor signature doesn't match.
+   */
+  static class ConstructorSignatureErrorProvider
+      implements AWSCredentialsProvider {
+
+    @SuppressWarnings("unused")
+    public ConstructorSignatureErrorProvider(String str) {
+    }
+
+    @Override
+    public AWSCredentials getCredentials() {
+      return null;
+    }
+
+    @Override
+    public void refresh() {
+    }
+  }
+
+  /**
+   * A credential provider whose constructor raises an NPE.
+   */
+  static class ConstructorFailureProvider
+      implements AWSCredentialsProvider {
+
+    @SuppressWarnings("unused")
+    public ConstructorFailureProvider() {
+      throw new NullPointerException("oops");
+    }
+
+    @Override
+    public AWSCredentials getCredentials() {
+      return null;
+    }
+
+    @Override
+    public void refresh() {
+    }
+  }
+
+  @Test
+  public void testProviderWrongClass() throws Exception {
+    expectProviderInstantiationFailure(this.getClass().getName(),
+        NOT_AWS_PROVIDER);
+  }
+
+  @Test
+  public void testProviderNotAClass() throws Exception {
+    expectProviderInstantiationFailure("NoSuchClass",
+        "ClassNotFoundException");
+  }
+
+  private void expectProviderInstantiationFailure(String option,
+      String expectedErrorText) throws IOException {
+    Configuration conf = new Configuration();
+    conf.set(AWS_CREDENTIALS_PROVIDER, option);
+    Path testFile = new Path(
+        conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE));
+    expectException(IOException.class, expectedErrorText);
+    URI uri = testFile.toUri();
+    S3AUtils.createAWSCredentialProviderSet(uri, conf, uri);
+  }
+
+  @Test
+  public void testProviderConstructorError() throws Exception {
+    expectProviderInstantiationFailure(
+        ConstructorSignatureErrorProvider.class.getName(),
+        CONSTRUCTOR_EXCEPTION);
+  }
+
+  @Test
+  public void testProviderFailureError() throws Exception {
+    expectProviderInstantiationFailure(
+        ConstructorFailureProvider.class.getName(),
+        INSTANTIATION_EXCEPTION);
+  }
+
+  @Test
+  public void testInstantiationChain() throws Throwable {
+    Configuration conf = new Configuration();
+    conf.set(AWS_CREDENTIALS_PROVIDER,
+        TemporaryAWSCredentialsProvider.NAME
+            + ", \t" + SimpleAWSCredentialsProvider.NAME
+            + " ,\n " + AnonymousAWSCredentialsProvider.NAME);
+    Path testFile = new Path(
+        conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE));
+
+    URI uri = testFile.toUri();
+    S3AUtils.createAWSCredentialProviderSet(uri, conf, uri);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockingThreadPool.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockingThreadPool.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockingThreadPool.java
new file mode 100644
index 0000000..4444d0c
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlockingThreadPool.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+/**
+ * Demonstrate that the threadpool blocks additional client requests if
+ * its queue is full (rather than throwing an exception) by initiating an
+ * upload consisting of 4 parts with 2 threads and 1 spot in the queue. The
+ * 4th part should not trigger an exception as it would with a
+ * non-blocking threadpool.
+ */
+public class ITestS3ABlockingThreadPool {
+
+  private Configuration conf;
+  private S3AFileSystem fs;
+
+  @Rule
+  public Timeout testTimeout = new Timeout(30 * 60 * 1000);
+
+  protected Path getTestPath() {
+    return new Path("/tests3a");
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new Configuration();
+    conf.setLong(Constants.MIN_MULTIPART_THRESHOLD, 5 * 1024 * 1024);
+    conf.setLong(Constants.MULTIPART_SIZE, 5 * 1024 * 1024);
+    conf.setInt(Constants.MAX_THREADS, 2);
+    conf.setInt(Constants.MAX_TOTAL_TASKS, 1);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (fs != null) {
+      fs.delete(getTestPath(), true);
+    }
+  }
+
+  @Test
+  public void testRegularMultiPartUpload() throws Exception {
+    fs = S3ATestUtils.createTestFileSystem(conf);
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 *
+        1024);
+  }
+
+  @Test
+  public void testFastMultiPartUpload() throws Exception {
+    conf.setBoolean(Constants.FAST_UPLOAD, true);
+    fs = S3ATestUtils.createTestFileSystem(conf);
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 *
+        1024);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlocksize.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlocksize.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlocksize.java
new file mode 100644
index 0000000..9a6dae7
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ABlocksize.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
+import org.apache.hadoop.fs.contract.s3a.S3AContract;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static 
org.apache.hadoop.fs.contract.ContractTestUtils.fileStatsToString;
+
+/**
+ * S3A tests for configuring block size.
+ */
+public class ITestS3ABlocksize extends AbstractFSContractTestBase {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3ABlocksize.class);
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+
+  @Rule
+  public Timeout testTimeout = new Timeout(30 * 60 * 1000);
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testBlockSize() throws Exception {
+    FileSystem fs = getFileSystem();
+    long defaultBlockSize = fs.getDefaultBlockSize();
+    assertEquals("incorrect blocksize",
+        S3AFileSystem.DEFAULT_BLOCKSIZE, defaultBlockSize);
+    long newBlockSize = defaultBlockSize * 2;
+    fs.getConf().setLong(Constants.FS_S3A_BLOCK_SIZE, newBlockSize);
+
+    Path dir = path("testBlockSize");
+    Path file = new Path(dir, "file");
+    createFile(fs, file, true, dataset(1024, 'a', 'z' - 'a'));
+    FileStatus fileStatus = fs.getFileStatus(file);
+    assertEquals("Double default block size in stat(): " + fileStatus,
+        newBlockSize,
+        fileStatus.getBlockSize());
+
+    // check the listing  & assert that the block size is picked up by
+    // this route too.
+    boolean found = false;
+    FileStatus[] listing = fs.listStatus(dir);
+    for (FileStatus stat : listing) {
+      LOG.info("entry: {}", stat);
+      if (file.equals(stat.getPath())) {
+        found = true;
+        assertEquals("Double default block size in ls(): " + stat,
+            newBlockSize,
+            stat.getBlockSize());
+      }
+    }
+    assertTrue("Did not find " + fileStatsToString(listing, ", "), found);
+  }
+
+  @Test
+  public void testRootFileStatusHasBlocksize() throws Throwable {
+    FileSystem fs = getFileSystem();
+    FileStatus status = fs.getFileStatus(new Path("/"));
+    assertTrue("Invalid root blocksize",
+        status.getBlockSize() >= 0);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
new file mode 100644
index 0000000..4e99339
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
@@ -0,0 +1,435 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.ClientConfiguration;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.S3ClientOptions;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.lang.reflect.FieldUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3native.S3xLoginHelper;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.net.URI;
+
+import org.apache.hadoop.security.ProviderUtils;
+import org.apache.hadoop.security.alias.CredentialProvider;
+import org.apache.hadoop.security.alias.CredentialProviderFactory;
+import org.apache.hadoop.util.VersionInfo;
+import org.apache.http.HttpStatus;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * S3A tests for configuration.
+ */
+public class ITestS3AConfiguration {
+  private static final String EXAMPLE_ID = "AKASOMEACCESSKEY";
+  private static final String EXAMPLE_KEY =
+      "RGV0cm9pdCBSZ/WQgY2xl/YW5lZCB1cAEXAMPLE";
+
+  private Configuration conf;
+  private S3AFileSystem fs;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3AConfiguration.class);
+
+  private static final String TEST_ENDPOINT = "test.fs.s3a.endpoint";
+
+  @Rule
+  public Timeout testTimeout = new Timeout(30 * 60 * 1000);
+
+  @Rule
+  public final TemporaryFolder tempDir = new TemporaryFolder();
+
+  /**
+   * Test if custom endpoint is picked up.
+   * <p/>
+   * The test expects TEST_ENDPOINT to be defined in the Configuration
+   * describing the endpoint of the bucket to which TEST_FS_S3A_NAME points
+   * (f.i. "s3-eu-west-1.amazonaws.com" if the bucket is located in Ireland).
+   * Evidently, the bucket has to be hosted in the region denoted by the
+   * endpoint for the test to succeed.
+   * <p/>
+   * More info and the list of endpoint identifiers:
+   * http://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testEndpoint() throws Exception {
+    conf = new Configuration();
+    String endpoint = conf.getTrimmed(TEST_ENDPOINT, "");
+    if (endpoint.isEmpty()) {
+      LOG.warn("Custom endpoint test skipped as " + TEST_ENDPOINT + "config " +
+          "setting was not detected");
+    } else {
+      conf.set(Constants.ENDPOINT, endpoint);
+      fs = S3ATestUtils.createTestFileSystem(conf);
+      AmazonS3Client s3 = fs.getAmazonS3Client();
+      String endPointRegion = "";
+      // Differentiate handling of "s3-" and "s3." based endpoint identifiers
+      String[] endpointParts = StringUtils.split(endpoint, '.');
+      if (endpointParts.length == 3) {
+        endPointRegion = endpointParts[0].substring(3);
+      } else if (endpointParts.length == 4) {
+        endPointRegion = endpointParts[1];
+      } else {
+        fail("Unexpected endpoint");
+      }
+      assertEquals("Endpoint config setting and bucket location differ: ",
+          endPointRegion, s3.getBucketLocation(fs.getUri().getHost()));
+    }
+  }
+
+  @Test
+  public void testProxyConnection() throws Exception {
+    conf = new Configuration();
+    conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
+    conf.set(Constants.PROXY_HOST, "127.0.0.1");
+    conf.setInt(Constants.PROXY_PORT, 1);
+    String proxy =
+        conf.get(Constants.PROXY_HOST) + ":" + conf.get(Constants.PROXY_PORT);
+    try {
+      fs = S3ATestUtils.createTestFileSystem(conf);
+      fail("Expected a connection error for proxy server at " + proxy);
+    } catch (AWSClientIOException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testProxyPortWithoutHost() throws Exception {
+    conf = new Configuration();
+    conf.unset(Constants.PROXY_HOST);
+    conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
+    conf.setInt(Constants.PROXY_PORT, 1);
+    try {
+      fs = S3ATestUtils.createTestFileSystem(conf);
+      fail("Expected a proxy configuration error");
+    } catch (IllegalArgumentException e) {
+      String msg = e.toString();
+      if (!msg.contains(Constants.PROXY_HOST) &&
+          !msg.contains(Constants.PROXY_PORT)) {
+        throw e;
+      }
+    }
+  }
+
+  @Test
+  public void testAutomaticProxyPortSelection() throws Exception {
+    conf = new Configuration();
+    conf.unset(Constants.PROXY_PORT);
+    conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
+    conf.set(Constants.PROXY_HOST, "127.0.0.1");
+    conf.set(Constants.SECURE_CONNECTIONS, "true");
+    try {
+      fs = S3ATestUtils.createTestFileSystem(conf);
+      fail("Expected a connection error for proxy server");
+    } catch (AWSClientIOException e) {
+      // expected
+    }
+    conf.set(Constants.SECURE_CONNECTIONS, "false");
+    try {
+      fs = S3ATestUtils.createTestFileSystem(conf);
+      fail("Expected a connection error for proxy server");
+    } catch (AWSClientIOException e) {
+      // expected
+    }
+  }
+
+  @Test
+  public void testUsernameInconsistentWithPassword() throws Exception {
+    conf = new Configuration();
+    conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
+    conf.set(Constants.PROXY_HOST, "127.0.0.1");
+    conf.setInt(Constants.PROXY_PORT, 1);
+    conf.set(Constants.PROXY_USERNAME, "user");
+    try {
+      fs = S3ATestUtils.createTestFileSystem(conf);
+      fail("Expected a connection error for proxy server");
+    } catch (IllegalArgumentException e) {
+      String msg = e.toString();
+      if (!msg.contains(Constants.PROXY_USERNAME) &&
+          !msg.contains(Constants.PROXY_PASSWORD)) {
+        throw e;
+      }
+    }
+    conf = new Configuration();
+    conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
+    conf.set(Constants.PROXY_HOST, "127.0.0.1");
+    conf.setInt(Constants.PROXY_PORT, 1);
+    conf.set(Constants.PROXY_PASSWORD, "password");
+    try {
+      fs = S3ATestUtils.createTestFileSystem(conf);
+      fail("Expected a connection error for proxy server");
+    } catch (IllegalArgumentException e) {
+      String msg = e.toString();
+      if (!msg.contains(Constants.PROXY_USERNAME) &&
+          !msg.contains(Constants.PROXY_PASSWORD)) {
+        throw e;
+      }
+    }
+  }
+
+  @Test
+  public void testCredsFromCredentialProvider() throws Exception {
+    // set up conf to have a cred provider
+    final Configuration conf = new Configuration();
+    final File file = tempDir.newFile("test.jks");
+    final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
+        file.toURI());
+    conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+        jks.toString());
+
+    provisionAccessKeys(conf);
+
+    conf.set(Constants.ACCESS_KEY, EXAMPLE_ID + "LJM");
+    S3xLoginHelper.Login creds =
+        S3AUtils.getAWSAccessKeys(new URI("s3a://foobar"), conf);
+    assertEquals("AccessKey incorrect.", EXAMPLE_ID, creds.getUser());
+    assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getPassword());
+  }
+
+  void provisionAccessKeys(final Configuration conf) throws Exception {
+    // add our creds to the provider
+    final CredentialProvider provider =
+        CredentialProviderFactory.getProviders(conf).get(0);
+    provider.createCredentialEntry(Constants.ACCESS_KEY,
+        EXAMPLE_ID.toCharArray());
+    provider.createCredentialEntry(Constants.SECRET_KEY,
+        EXAMPLE_KEY.toCharArray());
+    provider.flush();
+  }
+
+  @Test
+  public void testCredsFromUserInfo() throws Exception {
+    // set up conf to have a cred provider
+    final Configuration conf = new Configuration();
+    final File file = tempDir.newFile("test.jks");
+    final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
+        file.toURI());
+    conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+        jks.toString());
+
+    provisionAccessKeys(conf);
+
+    conf.set(Constants.ACCESS_KEY, EXAMPLE_ID + "LJM");
+    URI uriWithUserInfo = new URI("s3a://123:456@foobar");
+    S3xLoginHelper.Login creds =
+        S3AUtils.getAWSAccessKeys(uriWithUserInfo, conf);
+    assertEquals("AccessKey incorrect.", "123", creds.getUser());
+    assertEquals("SecretKey incorrect.", "456", creds.getPassword());
+  }
+
+  @Test
+  public void testIDFromUserInfoSecretFromCredentialProvider()
+      throws Exception {
+    // set up conf to have a cred provider
+    final Configuration conf = new Configuration();
+    final File file = tempDir.newFile("test.jks");
+    final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
+        file.toURI());
+    conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+        jks.toString());
+
+    provisionAccessKeys(conf);
+
+    conf.set(Constants.ACCESS_KEY, EXAMPLE_ID + "LJM");
+    URI uriWithUserInfo = new URI("s3a://123@foobar");
+    S3xLoginHelper.Login creds =
+        S3AUtils.getAWSAccessKeys(uriWithUserInfo, conf);
+    assertEquals("AccessKey incorrect.", "123", creds.getUser());
+    assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getPassword());
+  }
+
+  @Test
+  public void testSecretFromCredentialProviderIDFromConfig() throws Exception {
+    // set up conf to have a cred provider
+    final Configuration conf = new Configuration();
+    final File file = tempDir.newFile("test.jks");
+    final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
+        file.toURI());
+    conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+        jks.toString());
+
+    // add our creds to the provider
+    final CredentialProvider provider =
+        CredentialProviderFactory.getProviders(conf).get(0);
+    provider.createCredentialEntry(Constants.SECRET_KEY,
+        EXAMPLE_KEY.toCharArray());
+    provider.flush();
+
+    conf.set(Constants.ACCESS_KEY, EXAMPLE_ID);
+    S3xLoginHelper.Login creds =
+        S3AUtils.getAWSAccessKeys(new URI("s3a://foobar"), conf);
+    assertEquals("AccessKey incorrect.", EXAMPLE_ID, creds.getUser());
+    assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getPassword());
+  }
+
+  @Test
+  public void testIDFromCredentialProviderSecretFromConfig() throws Exception {
+    // set up conf to have a cred provider
+    final Configuration conf = new Configuration();
+    final File file = tempDir.newFile("test.jks");
+    final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
+        file.toURI());
+    conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+        jks.toString());
+
+    // add our creds to the provider
+    final CredentialProvider provider =
+        CredentialProviderFactory.getProviders(conf).get(0);
+    provider.createCredentialEntry(Constants.ACCESS_KEY,
+        EXAMPLE_ID.toCharArray());
+    provider.flush();
+
+    conf.set(Constants.SECRET_KEY, EXAMPLE_KEY);
+    S3xLoginHelper.Login creds =
+        S3AUtils.getAWSAccessKeys(new URI("s3a://foobar"), conf);
+    assertEquals("AccessKey incorrect.", EXAMPLE_ID, creds.getUser());
+    assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getPassword());
+  }
+
+  @Test
+  public void testExcludingS3ACredentialProvider() throws Exception {
+    // set up conf to have a cred provider
+    final Configuration conf = new Configuration();
+    final File file = tempDir.newFile("test.jks");
+    final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
+        file.toURI());
+    conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+        "jceks://s3a/foobar," + jks.toString());
+
+    // first make sure that the s3a based provider is removed
+    Configuration c = ProviderUtils.excludeIncompatibleCredentialProviders(
+        conf, S3AFileSystem.class);
+    String newPath = conf.get(
+        CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH);
+    assertFalse("Provider Path incorrect", newPath.contains("s3a://"));
+
+    // now let's make sure the new path is created by the S3AFileSystem
+    // and the integration still works. Let's provision the keys through
+    // the altered configuration instance and then try and access them
+    // using the original config with the s3a provider in the path.
+    provisionAccessKeys(c);
+
+    conf.set(Constants.ACCESS_KEY, EXAMPLE_ID + "LJM");
+    URI uriWithUserInfo = new URI("s3a://123:456@foobar");
+    S3xLoginHelper.Login creds =
+        S3AUtils.getAWSAccessKeys(uriWithUserInfo, conf);
+    assertEquals("AccessKey incorrect.", "123", creds.getUser());
+    assertEquals("SecretKey incorrect.", "456", creds.getPassword());
+
+  }
+
+  @Test
+  public void shouldBeAbleToSwitchOnS3PathStyleAccessViaConfigProperty()
+      throws Exception {
+
+    conf = new Configuration();
+    conf.set(Constants.PATH_STYLE_ACCESS, Boolean.toString(true));
+    assertTrue(conf.getBoolean(Constants.PATH_STYLE_ACCESS, false));
+
+    try {
+      fs = S3ATestUtils.createTestFileSystem(conf);
+      assertNotNull(fs);
+      AmazonS3Client s3 = fs.getAmazonS3Client();
+      assertNotNull(s3);
+      S3ClientOptions clientOptions = getField(s3, S3ClientOptions.class,
+          "clientOptions");
+      assertTrue("Expected to find path style access to be switched on!",
+          clientOptions.isPathStyleAccess());
+      byte[] file = ContractTestUtils.toAsciiByteArray("test file");
+      ContractTestUtils.writeAndRead(fs,
+          new Path("/path/style/access/testFile"), file, file.length,
+          conf.getInt(Constants.FS_S3A_BLOCK_SIZE, file.length), false, true);
+    } catch (final AWSS3IOException e) {
+      LOG.error("Caught exception: ", e);
+      // Catch/pass standard path style access behaviour when live bucket
+      // isn't in the same region as the s3 client default. See
+      // http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html
+      assertEquals(e.getStatusCode(), HttpStatus.SC_MOVED_PERMANENTLY);
+    }
+  }
+
+  @Test
+  public void testDefaultUserAgent() throws Exception {
+    conf = new Configuration();
+    fs = S3ATestUtils.createTestFileSystem(conf);
+    assertNotNull(fs);
+    AmazonS3Client s3 = fs.getAmazonS3Client();
+    assertNotNull(s3);
+    ClientConfiguration awsConf = getField(s3, ClientConfiguration.class,
+        "clientConfiguration");
+    assertEquals("Hadoop " + VersionInfo.getVersion(), awsConf.getUserAgent());
+  }
+
+  @Test
+  public void testCustomUserAgent() throws Exception {
+    conf = new Configuration();
+    conf.set(Constants.USER_AGENT_PREFIX, "MyApp");
+    fs = S3ATestUtils.createTestFileSystem(conf);
+    assertNotNull(fs);
+    AmazonS3Client s3 = fs.getAmazonS3Client();
+    assertNotNull(s3);
+    ClientConfiguration awsConf = getField(s3, ClientConfiguration.class,
+        "clientConfiguration");
+    assertEquals("MyApp, Hadoop " + VersionInfo.getVersion(),
+        awsConf.getUserAgent());
+  }
+
+  /**
+   * Reads and returns a field from an object using reflection.  If the field
+   * cannot be found, is null, or is not the expected type, then this method
+   * fails the test.
+   *
+   * @param target object to read
+   * @param fieldType type of field to read, which will also be the return type
+   * @param fieldName name of field to read
+   * @return field that was read
+   * @throws IllegalAccessException if access not allowed
+   */
+  private static <T> T getField(Object target, Class<T> fieldType,
+      String fieldName) throws IllegalAccessException {
+    Object obj = FieldUtils.readField(target, fieldName, true);
+    assertNotNull(String.format(
+        "Could not read field named %s in object with class %s.", fieldName,
+        target.getClass().getName()), obj);
+    assertTrue(String.format(
+        "Unexpected type found for field named %s, expected %s, actual %s.",
+        fieldName, fieldType.getName(), obj.getClass().getName()),
+        fieldType.isAssignableFrom(obj.getClass()));
+    return fieldType.cast(obj);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACredentialsInURL.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACredentialsInURL.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACredentialsInURL.java
new file mode 100644
index 0000000..b3d7abf
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACredentialsInURL.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.nio.file.AccessDeniedException;
+
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.TEST_FS_S3A_NAME;
+
+/**
+ * Tests that credentials can go into the URL. This includes a valid
+ * set, and a check that an invalid set do at least get stripped out
+ * of the final URI
+ */
+public class ITestS3ACredentialsInURL extends Assert {
+  private S3AFileSystem fs;
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3ACredentialsInURL.class);
+  @Rule
+  public Timeout testTimeout = new Timeout(30 * 60 * 1000);
+
+  @After
+  public void teardown() {
+    IOUtils.closeStream(fs);
+  }
+
+  /**
+   * Test instantiation.
+   * @throws Throwable
+   */
+  @Test
+  public void testInstantiateFromURL() throws Throwable {
+
+    Configuration conf = new Configuration();
+    String accessKey = conf.get(Constants.ACCESS_KEY);
+    String secretKey = conf.get(Constants.SECRET_KEY);
+    String fsname = conf.getTrimmed(TEST_FS_S3A_NAME, "");
+    Assume.assumeNotNull(fsname, accessKey, secretKey);
+    URI original = new URI(fsname);
+    URI secretsURI = createUriWithEmbeddedSecrets(original,
+        accessKey, secretKey);
+    if (secretKey.contains("/")) {
+      assertTrue("test URI encodes the / symbol", secretsURI.toString().
+          contains("%252F"));
+    }
+    if (secretKey.contains("+")) {
+      assertTrue("test URI encodes the + symbol", secretsURI.toString().
+          contains("%252B"));
+    }
+    assertFalse("Does not contain secrets", original.equals(secretsURI));
+
+    conf.set(TEST_FS_S3A_NAME, secretsURI.toString());
+    conf.unset(Constants.ACCESS_KEY);
+    conf.unset(Constants.SECRET_KEY);
+    fs = S3ATestUtils.createTestFileSystem(conf);
+    String fsURI = fs.getUri().toString();
+    assertFalse("FS URI contains a @ symbol", fsURI.contains("@"));
+    assertFalse("FS URI contains a % symbol", fsURI.contains("%"));
+    if (!original.toString().startsWith(fsURI)) {
+      fail("Filesystem URI does not match original");
+    }
+    validate("original path", new Path(original));
+    validate("bare path", new Path("/"));
+    validate("secrets path", new Path(secretsURI));
+  }
+
+  private void validate(String text, Path path) throws IOException {
+    try {
+      fs.canonicalizeUri(path.toUri());
+      fs.checkPath(path);
+      assertTrue(text + " Not a directory",
+          fs.getFileStatus(new Path("/")).isDirectory());
+      fs.globStatus(path);
+    } catch (AssertionError e) {
+      throw e;
+    } catch (Exception e) {
+      LOG.debug("{} failure: {}", text, e, e);
+      fail(text + " Test failed");
+    }
+  }
+
+  /**
+   * Set up some invalid credentials, verify login is rejected.
+   * @throws Throwable
+   */
+  @Test
+  public void testInvalidCredentialsFail() throws Throwable {
+    Configuration conf = new Configuration();
+    String fsname = conf.getTrimmed(TEST_FS_S3A_NAME, "");
+    Assume.assumeNotNull(fsname);
+    URI original = new URI(fsname);
+    URI testURI = createUriWithEmbeddedSecrets(original, "user", "//");
+
+    conf.set(TEST_FS_S3A_NAME, testURI.toString());
+    fs = S3ATestUtils.createTestFileSystem(conf);
+    try {
+      S3AFileStatus status = fs.getFileStatus(new Path("/"));
+      fail("Expected an AccessDeniedException, got " + status);
+    } catch (AccessDeniedException e) {
+      // expected
+    }
+
+  }
+
+  private URI createUriWithEmbeddedSecrets(URI original,
+      String accessKey,
+      String secretKey) throws UnsupportedEncodingException {
+    String encodedSecretKey = URLEncoder.encode(secretKey, "UTF-8");
+    String formattedString = String.format("%s://%s:%s@%s/%s/",
+        original.getScheme(),
+        accessKey,
+        encodedSecretKey,
+        original.getHost(),
+        original.getPath());
+    URI testURI;
+    try {
+      testURI = new Path(formattedString).toUri();
+    } catch (IllegalArgumentException e) {
+      // inner cause is stripped to keep any secrets out of stack traces
+      throw new IllegalArgumentException("Could not encode Path");
+    }
+    return testURI;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryption.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryption.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryption.java
new file mode 100644
index 0000000..4543278
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryption.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+
+/**
+ * Test whether or not encryption works by turning it on. Some checks
+ * are made for different file sizes as there have been reports that the
+ * file length may be rounded up to match word boundaries.
+ */
+public class ITestS3AEncryption extends AbstractS3ATestBase {
+  private static final String AES256 = Constants.SERVER_SIDE_ENCRYPTION_AES256;
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    S3ATestUtils.disableFilesystemCaching(conf);
+    conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
+        AES256);
+    return conf;
+  }
+
+  private static final int[] SIZES = {
+      0, 1, 2, 3, 4, 5, 254, 255, 256, 257, 2 ^ 10 - 3, 2 ^ 11 - 2, 2 ^ 12 - 1
+  };
+
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    IOUtils.closeStream(getFileSystem());
+  }
+
+  @Test
+  public void testEncryption() throws Throwable {
+    for (int size: SIZES) {
+      validateEncryptionForFilesize(size);
+    }
+  }
+
+  @Test
+  public void testEncryptionOverRename() throws Throwable {
+    skipIfEncryptionTestsDisabled(getConfiguration());
+    Path src = path(createFilename(1024));
+    byte[] data = dataset(1024, 'a', 'z');
+    S3AFileSystem fs = getFileSystem();
+    writeDataset(fs, src, data, data.length, 1024 * 1024, true);
+    ContractTestUtils.verifyFileContents(fs, src, data);
+    Path dest = path(src.getName() + "-copy");
+    fs.rename(src, dest);
+    ContractTestUtils.verifyFileContents(fs, dest, data);
+    assertEncrypted(dest);
+  }
+
+  protected void validateEncryptionForFilesize(int len) throws IOException {
+    skipIfEncryptionTestsDisabled(getConfiguration());
+    describe("Create an encrypted file of size " + len);
+    String src = createFilename(len);
+    Path path = writeThenReadFile(src, len);
+    assertEncrypted(path);
+    rm(getFileSystem(), path, false, false);
+  }
+
+  private String createFilename(int len) {
+    return String.format("%s-%04x", methodName.getMethodName(), len);
+  }
+
+  /**
+   * Assert that at path references an encrypted blob.
+   * @param path path
+   * @throws IOException on a failure
+   */
+  private void assertEncrypted(Path path) throws IOException {
+    ObjectMetadata md = getFileSystem().getObjectMetadata(path);
+    assertEquals(AES256, md.getSSEAlgorithm());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmPropagation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmPropagation.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmPropagation.java
new file mode 100644
index 0000000..81578c2
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionAlgorithmPropagation.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+
+/**
+ * Test whether or not encryption settings propagate by choosing an invalid
+ * one. We expect the write to fail with a 400 bad request error
+ */
+public class ITestS3AEncryptionAlgorithmPropagation
+    extends AbstractS3ATestBase {
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    S3ATestUtils.disableFilesystemCaching(conf);
+    conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
+        "DES");
+    return conf;
+  }
+
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    IOUtils.closeStream(getFileSystem());
+  }
+
+  @Test
+  public void testEncrypt0() throws Throwable {
+    writeThenReadFileToFailure(0);
+  }
+
+  @Test
+  public void testEncrypt256() throws Throwable {
+    writeThenReadFileToFailure(256);
+  }
+
+  /**
+   * Make this a no-op so test setup doesn't fail.
+   * @param path path path
+   * @throws IOException on any failure
+   */
+  @Override
+  protected void mkdirs(Path path) throws IOException {
+
+  }
+
+  protected void writeThenReadFileToFailure(int len) throws IOException {
+    skipIfEncryptionTestsDisabled(getConfiguration());
+    describe("Create an encrypted file of size " + len);
+    try {
+      writeThenReadFile(methodName.getMethodName() + '-' + len, len);
+      fail("Expected an exception about an illegal encryption algorithm");
+    } catch (AWSS3IOException e) {
+      assertStatusCode(e, 400);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionFastOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionFastOutputStream.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionFastOutputStream.java
new file mode 100644
index 0000000..c06fed1
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionFastOutputStream.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Run the encryption tests against the Fast output stream.
+ * This verifies that both file writing paths can encrypt their data.
+ */
+public class ITestS3AEncryptionFastOutputStream extends ITestS3AEncryption {
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    conf.setBoolean(Constants.FAST_UPLOAD, true);
+    return conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
new file mode 100644
index 0000000..d8e017e
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.services.s3.model.AmazonS3Exception;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
+import org.apache.hadoop.fs.contract.s3a.S3AContract;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.FileNotFoundException;
+import java.nio.file.AccessDeniedException;
+import java.util.concurrent.Callable;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+
+/**
+ * Test S3A Failure translation, including a functional test
+ * generating errors during stream IO.
+ */
+public class ITestS3AFailureHandling extends AbstractFSContractTestBase {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3AFailureHandling.class);
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+
+  @Test
+  public void testReadFileChanged() throws Throwable {
+    describe("overwrite a file with a shorter one during a read, seek");
+    final int fullLength = 8192;
+    final byte[] fullDataset = dataset(fullLength, 'a', 32);
+    final int shortLen = 4096;
+    final byte[] shortDataset = dataset(shortLen, 'A', 32);
+    final FileSystem fs = getFileSystem();
+    final Path testpath = path("readFileToChange.txt");
+    // initial write
+    writeDataset(fs, testpath, fullDataset, fullDataset.length, 1024, false);
+    try(FSDataInputStream instream = fs.open(testpath)) {
+      instream.seek(fullLength - 16);
+      assertTrue("no data to read", instream.read() >= 0);
+      // overwrite
+      writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, 
true);
+      // here the file length is less. Probe the file to see if this is true,
+      // with a spin and wait
+      eventually(30 *1000, new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          assertEquals(shortLen, fs.getFileStatus(testpath).getLen());
+          return null;
+        }
+      });
+      // here length is shorter. Assuming it has propagated to all replicas,
+      // the position of the input stream is now beyond the EOF.
+      // An attempt to seek backwards to a position greater than the
+      // short length will raise an exception from AWS S3, which must be
+      // translated into an EOF
+
+      instream.seek(shortLen + 1024);
+      int c = instream.read();
+      assertIsEOF("read()", c);
+
+      byte[] buf = new byte[256];
+
+      assertIsEOF("read(buffer)", instream.read(buf));
+      assertIsEOF("read(offset)",
+          instream.read(instream.getPos(), buf, 0, buf.length));
+
+      // now do a block read fully, again, backwards from the current pos
+      try {
+        instream.readFully(shortLen + 512, buf);
+        fail("Expected readFully to fail");
+      } catch (EOFException expected) {
+        LOG.debug("Expected EOF: ", expected);
+      }
+
+      assertIsEOF("read(offset)",
+          instream.read(shortLen + 510, buf, 0, buf.length));
+
+      // seek somewhere useful
+      instream.seek(shortLen - 256);
+
+      // delete the file. Reads must fail
+      fs.delete(testpath, false);
+
+      try {
+        int r = instream.read();
+        fail("Expected an exception, got " + r);
+      } catch (FileNotFoundException e) {
+        // expected
+      }
+
+      try {
+        instream.readFully(2048, buf);
+        fail("Expected readFully to fail");
+      } catch (FileNotFoundException e) {
+        // expected
+      }
+
+    }
+  }
+
+  /**
+   * Assert that a read operation returned an EOF value.
+   * @param operation specific operation
+   * @param readResult result
+   */
+  private void assertIsEOF(String operation, int readResult) {
+    assertEquals("Expected EOF from "+ operation
+        + "; got char " + (char) readResult, -1, readResult);
+  }
+
+  @Test
+  public void test404isNotFound() throws Throwable {
+    verifyTranslated(FileNotFoundException.class, createS3Exception(404));
+  }
+
+  protected Exception verifyTranslated(Class clazz,
+      AmazonClientException exception) throws Exception {
+    return verifyExceptionClass(clazz,
+        translateException("test", "/", exception));
+  }
+
+  @Test
+  public void test401isNotPermittedFound() throws Throwable {
+    verifyTranslated(AccessDeniedException.class,
+        createS3Exception(401));
+  }
+
+  protected AmazonS3Exception createS3Exception(int code) {
+    AmazonS3Exception source = new AmazonS3Exception("");
+    source.setStatusCode(code);
+    return source;
+  }
+
+  @Test
+  public void testGenericS3Exception() throws Throwable {
+    // S3 exception of no known type
+    AWSS3IOException ex = (AWSS3IOException)verifyTranslated(
+        AWSS3IOException.class,
+        createS3Exception(451));
+    assertEquals(451, ex.getStatusCode());
+  }
+
+  @Test
+  public void testGenericServiceS3Exception() throws Throwable {
+    // service exception of no known type
+    AmazonServiceException ase = new AmazonServiceException("unwind");
+    ase.setStatusCode(500);
+    AWSServiceIOException ex = (AWSServiceIOException)verifyTranslated(
+        AWSServiceIOException.class,
+        ase);
+    assertEquals(500, ex.getStatusCode());
+  }
+
+  @Test
+  public void testGenericClientException() throws Throwable {
+    // Generic Amazon exception
+    verifyTranslated(AWSClientIOException.class,
+        new AmazonClientException(""));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFastOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFastOutputStream.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFastOutputStream.java
new file mode 100644
index 0000000..b5fa1c3
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFastOutputStream.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * Tests regular and multi-part upload functionality for S3AFastOutputStream.
+ * File sizes are kept small to reduce test duration on slow connections
+ */
+public class ITestS3AFastOutputStream {
+  private FileSystem fs;
+
+
+  @Rule
+  public Timeout testTimeout = new Timeout(30 * 60 * 1000);
+
+  @Before
+  public void setUp() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setLong(Constants.MIN_MULTIPART_THRESHOLD, 5 * 1024 * 1024);
+    conf.setInt(Constants.MULTIPART_SIZE, 5 * 1024 * 1024);
+    conf.setBoolean(Constants.FAST_UPLOAD, true);
+    fs = S3ATestUtils.createTestFileSystem(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (fs != null) {
+      fs.delete(getTestPath(), true);
+    }
+  }
+
+  protected Path getTestPath() {
+    return new Path("/tests3a");
+  }
+
+  @Test
+  public void testRegularUpload() throws IOException {
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024);
+  }
+
+  @Test
+  public void testMultiPartUpload() throws IOException {
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 *
+        1024);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
new file mode 100644
index 0000000..2a6ba0c
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.contract.s3a.S3AContract;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.net.URI;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.fs.s3a.Statistic.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.MetricDiff;
+import static org.apache.hadoop.test.GenericTestUtils.getTestDir;
+
+/**
+ * Use metrics to assert about the cost of file status queries.
+ * {@link S3AFileSystem#getFileStatus(Path)}.
+ */
+public class ITestS3AFileOperationCost extends AbstractFSContractTestBase {
+
+  private MetricDiff metadataRequests;
+  private MetricDiff listRequests;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3AFileOperationCost.class);
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+
+  @Override
+  public S3AFileSystem getFileSystem() {
+    return (S3AFileSystem) super.getFileSystem();
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    S3AFileSystem fs = getFileSystem();
+    metadataRequests = new MetricDiff(fs, OBJECT_METADATA_REQUESTS);
+    listRequests = new MetricDiff(fs, OBJECT_LIST_REQUESTS);
+  }
+
+  @Test
+  public void testCostOfGetFileStatusOnFile() throws Throwable {
+    describe("performing getFileStatus on a file");
+    Path simpleFile = path("simple.txt");
+    S3AFileSystem fs = getFileSystem();
+    touch(fs, simpleFile);
+    resetMetricDiffs();
+    S3AFileStatus status = fs.getFileStatus(simpleFile);
+    assertTrue("not a file: " + status, status.isFile());
+    metadataRequests.assertDiffEquals(1);
+    listRequests.assertDiffEquals(0);
+  }
+
+  private void resetMetricDiffs() {
+    reset(metadataRequests, listRequests);
+  }
+
+  @Test
+  public void testCostOfGetFileStatusOnEmptyDir() throws Throwable {
+    describe("performing getFileStatus on an empty directory");
+    S3AFileSystem fs = getFileSystem();
+    Path dir = path("empty");
+    fs.mkdirs(dir);
+    resetMetricDiffs();
+    S3AFileStatus status = fs.getFileStatus(dir);
+    assertTrue("not empty: " + status, status.isEmptyDirectory());
+    metadataRequests.assertDiffEquals(2);
+    listRequests.assertDiffEquals(0);
+  }
+
+  @Test
+  public void testCostOfGetFileStatusOnMissingFile() throws Throwable {
+    describe("performing getFileStatus on a missing file");
+    S3AFileSystem fs = getFileSystem();
+    Path path = path("missing");
+    resetMetricDiffs();
+    try {
+      S3AFileStatus status = fs.getFileStatus(path);
+      fail("Got a status back from a missing file path " + status);
+    } catch (FileNotFoundException expected) {
+      // expected
+    }
+    metadataRequests.assertDiffEquals(2);
+    listRequests.assertDiffEquals(1);
+  }
+
+  @Test
+  public void testCostOfGetFileStatusOnMissingSubPath() throws Throwable {
+    describe("performing getFileStatus on a missing file");
+    S3AFileSystem fs = getFileSystem();
+    Path path = path("missingdir/missingpath");
+    resetMetricDiffs();
+    try {
+      S3AFileStatus status = fs.getFileStatus(path);
+      fail("Got a status back from a missing file path " + status);
+    } catch (FileNotFoundException expected) {
+      // expected
+    }
+    metadataRequests.assertDiffEquals(2);
+    listRequests.assertDiffEquals(1);
+  }
+
+  @Test
+  public void testCostOfGetFileStatusOnNonEmptyDir() throws Throwable {
+    describe("performing getFileStatus on a non-empty directory");
+    S3AFileSystem fs = getFileSystem();
+    Path dir = path("empty");
+    fs.mkdirs(dir);
+    Path simpleFile = new Path(dir, "simple.txt");
+    touch(fs, simpleFile);
+    resetMetricDiffs();
+    S3AFileStatus status = fs.getFileStatus(dir);
+    if (status.isEmptyDirectory()) {
+      // erroneous state
+      String fsState = fs.toString();
+      fail("FileStatus says directory isempty: " + status
+          + "\n" + ContractTestUtils.ls(fs, dir)
+          + "\n" + fsState);
+    }
+    metadataRequests.assertDiffEquals(2);
+    listRequests.assertDiffEquals(1);
+  }
+
+  @Test
+  public void testCostOfCopyFromLocalFile() throws Throwable {
+    describe("testCostOfCopyFromLocalFile");
+    File localTestDir = getTestDir("tmp");
+    localTestDir.mkdirs();
+    File tmpFile = File.createTempFile("tests3acost", ".txt",
+        localTestDir);
+    tmpFile.delete();
+    try {
+      URI localFileURI = tmpFile.toURI();
+      FileSystem localFS = FileSystem.get(localFileURI,
+          getFileSystem().getConf());
+      Path localPath = new Path(localFileURI);
+      int len = 10 * 1024;
+      byte[] data = dataset(len, 'A', 'Z');
+      writeDataset(localFS, localPath, data, len, 1024, true);
+      S3AFileSystem s3a = getFileSystem();
+      MetricDiff copyLocalOps = new MetricDiff(s3a,
+          INVOCATION_COPY_FROM_LOCAL_FILE);
+      MetricDiff putRequests = new MetricDiff(s3a,
+          OBJECT_PUT_REQUESTS);
+      MetricDiff putBytes = new MetricDiff(s3a,
+          OBJECT_PUT_BYTES);
+
+      Path remotePath = path("copied");
+      s3a.copyFromLocalFile(false, true, localPath, remotePath);
+      verifyFileContents(s3a, remotePath, data);
+      copyLocalOps.assertDiffEquals(1);
+      putRequests.assertDiffEquals(1);
+      putBytes.assertDiffEquals(len);
+      // print final stats
+      LOG.info("Filesystem {}", s3a);
+    } finally {
+      tmpFile.delete();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
new file mode 100644
index 0000000..858ac22
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
+import org.apache.hadoop.fs.Path;
+
+/**
+ *  Tests a live S3 system. If your keys and bucket aren't specified, all tests
+ *  are marked as passed.
+ *
+ *  This uses BlockJUnit4ClassRunner because FileSystemContractBaseTest from
+ *  TestCase which uses the old Junit3 runner that doesn't ignore assumptions
+ *  properly making it impossible to skip the tests if we don't have a valid
+ *  bucket.
+ **/
+public class ITestS3AFileSystemContract extends FileSystemContractBaseTest {
+
+  protected static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3AFileSystemContract.class);
+
+  @Override
+  public void setUp() throws Exception {
+    Configuration conf = new Configuration();
+
+    fs = S3ATestUtils.createTestFileSystem(conf);
+    super.setUp();
+  }
+
+  @Override
+  protected void tearDown() throws Exception {
+    if (fs != null) {
+      fs.delete(path("test"), true);
+    }
+    super.tearDown();
+  }
+
+  @Override
+  public void testMkdirsWithUmask() throws Exception {
+    // not supported
+  }
+
+  @Override
+  public void testRenameFileAsExistingFile() throws Exception {
+    if (!renameSupported()) {
+      return;
+    }
+
+    Path src = path("/test/hadoop/file");
+    createFile(src);
+    Path dst = path("/test/new/newfile");
+    createFile(dst);
+    // s3 doesn't support rename option
+    // rename-overwrites-dest is always allowed.
+    rename(src, dst, true, false, true);
+  }
+
+  @Override
+  public void testRenameDirectoryAsExistingDirectory() throws Exception {
+    if (!renameSupported()) {
+      return;
+    }
+
+    Path src = path("/test/hadoop/dir");
+    fs.mkdirs(src);
+    createFile(path("/test/hadoop/dir/file1"));
+    createFile(path("/test/hadoop/dir/subdir/file2"));
+
+    Path dst = path("/test/new/newdir");
+    fs.mkdirs(dst);
+    rename(src, dst, true, false, true);
+    assertFalse("Nested file1 exists",
+                fs.exists(path("/test/hadoop/dir/file1")));
+    assertFalse("Nested file2 exists",
+                fs.exists(path("/test/hadoop/dir/subdir/file2")));
+    assertTrue("Renamed nested file1 exists",
+               fs.exists(path("/test/new/newdir/file1")));
+    assertTrue("Renamed nested exists",
+               fs.exists(path("/test/new/newdir/subdir/file2")));
+  }
+
+//  @Override
+  public void testMoveDirUnderParent() throws Throwable {
+    // not support because
+    // Fails if dst is a directory that is not empty.
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java
new file mode 100644
index 0000000..360a151
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.net.URI;
+
+import com.amazonaws.auth.AWSCredentials;
+import com.amazonaws.auth.AWSCredentialsProvider;
+import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient;
+import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest;
+import com.amazonaws.services.securitytoken.model.GetSessionTokenResult;
+import com.amazonaws.services.securitytoken.model.Credentials;
+
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
+import org.apache.hadoop.fs.contract.s3a.S3AContract;
+import org.apache.hadoop.fs.s3native.S3xLoginHelper;
+import org.apache.hadoop.conf.Configuration;
+
+import org.junit.Test;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+/**
+ * Tests use of temporary credentials (for example, AWS STS & S3).
+ * This test extends a class that "does things to the root directory", and
+ * should only be used against transient filesystems where you don't care about
+ * the data.
+ */
+public class ITestS3ATemporaryCredentials extends AbstractFSContractTestBase {
+  public static final String TEST_STS_ENABLED = "test.fs.s3a.sts.enabled";
+  public static final String TEST_STS_ENDPOINT = "test.fs.s3a.sts.endpoint";
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3ATemporaryCredentials.class);
+
+  private static final String PROVIDER_CLASS
+      = TemporaryAWSCredentialsProvider.NAME;
+
+  private static final long TEST_FILE_SIZE = 1024;
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+
+  /**
+   * Test use of STS for requesting temporary credentials.
+   *
+   * The property test.sts.endpoint can be set to point this at different
+   * STS endpoints. This test will use the AWS credentials (if provided) for
+   * S3A tests to request temporary credentials, then attempt to use those
+   * credentials instead.
+   *
+   * @throws IOException
+   */
+  @Test
+  public void testSTS() throws IOException {
+    Configuration conf = getContract().getConf();
+    if (!conf.getBoolean(TEST_STS_ENABLED, true)) {
+      skip("STS functional tests disabled");
+    }
+
+    S3xLoginHelper.Login login = S3AUtils.getAWSAccessKeys(
+        URI.create("s3a://foobar"), conf);
+    if (!login.hasLogin()) {
+      skip("testSTS disabled because AWS credentials not configured");
+    }
+    AWSCredentialsProvider parentCredentials = new BasicAWSCredentialsProvider(
+        login.getUser(), login.getPassword());
+
+    String stsEndpoint = conf.getTrimmed(TEST_STS_ENDPOINT, "");
+    AWSSecurityTokenServiceClient stsClient;
+    stsClient = new AWSSecurityTokenServiceClient(parentCredentials);
+    if (!stsEndpoint.isEmpty()) {
+      LOG.debug("STS Endpoint ={}", stsEndpoint);
+      stsClient.setEndpoint(stsEndpoint);
+    }
+    GetSessionTokenRequest sessionTokenRequest = new GetSessionTokenRequest();
+    sessionTokenRequest.setDurationSeconds(900);
+    GetSessionTokenResult sessionTokenResult;
+    sessionTokenResult = stsClient.getSessionToken(sessionTokenRequest);
+    Credentials sessionCreds = sessionTokenResult.getCredentials();
+
+    String childAccessKey = sessionCreds.getAccessKeyId();
+    conf.set(ACCESS_KEY, childAccessKey);
+    String childSecretKey = sessionCreds.getSecretAccessKey();
+    conf.set(SECRET_KEY, childSecretKey);
+    String sessionToken = sessionCreds.getSessionToken();
+    conf.set(SESSION_TOKEN, sessionToken);
+
+    conf.set(AWS_CREDENTIALS_PROVIDER, PROVIDER_CLASS);
+
+    try(S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
+      createAndVerifyFile(fs, path("testSTS"), TEST_FILE_SIZE);
+    }
+
+    // now create an invalid set of credentials by changing the session
+    // token
+    conf.set(SESSION_TOKEN, "invalid-" + sessionToken);
+    try (S3AFileSystem fs = S3ATestUtils.createTestFileSystem(conf)) {
+      createAndVerifyFile(fs, path("testSTSInvalidToken"), TEST_FILE_SIZE);
+      fail("Expected an access exception, but file access to "
+          + fs.getUri() + " was allowed: " + fs);
+    } catch (AWSS3IOException ex) {
+      LOG.info("Expected Exception: {}", ex.toString());
+      LOG.debug("Expected Exception: {}", ex, ex);
+    }
+  }
+
+  @Test
+  public void testTemporaryCredentialValidation() throws Throwable {
+    Configuration conf = new Configuration();
+    conf.set(ACCESS_KEY, "accesskey");
+    conf.set(SECRET_KEY, "secretkey");
+    conf.set(SESSION_TOKEN, "");
+    TemporaryAWSCredentialsProvider provider
+        = new TemporaryAWSCredentialsProvider(getFileSystem().getUri(), conf);
+    try {
+      AWSCredentials credentials = provider.getCredentials();
+      fail("Expected a CredentialInitializationException,"
+          + " got " + credentials);
+    } catch (CredentialInitializationException expected) {
+      // expected
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6f9c346e/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java
deleted file mode 100644
index 25a8958..0000000
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.s3a;
-
-import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.hadoop.util.StopWatch;
-import org.junit.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-
-/**
- * Basic unit test for S3A's blocking executor service.
- */
-public class TestBlockingThreadPoolExecutorService {
-
-  private static final Logger LOG = LoggerFactory.getLogger(
-      BlockingThreadPoolExecutorService.class);
-
-  private static final int NUM_ACTIVE_TASKS = 4;
-  private static final int NUM_WAITING_TASKS = 2;
-  private static final int TASK_SLEEP_MSEC = 100;
-  private static final int SHUTDOWN_WAIT_MSEC = 200;
-  private static final int SHUTDOWN_WAIT_TRIES = 5;
-  private static final int BLOCKING_THRESHOLD_MSEC = 50;
-
-  private static final Integer SOME_VALUE = 1337;
-
-  private static BlockingThreadPoolExecutorService tpe = null;
-
-  @AfterClass
-  public static void afterClass() throws Exception {
-    ensureDestroyed();
-  }
-
-  /**
-   * Basic test of running one trivial task.
-   */
-  @Test
-  public void testSubmitCallable() throws Exception {
-    ensureCreated();
-    ListenableFuture<Integer> f = tpe.submit(callableSleeper);
-    Integer v = f.get();
-    assertEquals(SOME_VALUE, v);
-  }
-
-  /**
-   * More involved test, including detecting blocking when at capacity.
-   */
-  @Test
-  public void testSubmitRunnable() throws Exception {
-    ensureCreated();
-    int totalTasks = NUM_ACTIVE_TASKS + NUM_WAITING_TASKS;
-    StopWatch stopWatch = new StopWatch().start();
-    for (int i = 0; i < totalTasks; i++) {
-      tpe.submit(sleeper);
-      assertDidntBlock(stopWatch);
-    }
-    tpe.submit(sleeper);
-    assertDidBlock(stopWatch);
-  }
-
-  @Test
-  public void testShutdown() throws Exception {
-    // Cover create / destroy, regardless of when this test case runs
-    ensureCreated();
-    ensureDestroyed();
-
-    // Cover create, execute, destroy, regardless of when test case runs
-    ensureCreated();
-    testSubmitRunnable();
-    ensureDestroyed();
-  }
-
-  // Helper functions, etc.
-
-  private void assertDidntBlock(StopWatch sw) {
-    try {
-      assertFalse("Non-blocking call took too long.",
-          sw.now(TimeUnit.MILLISECONDS) > BLOCKING_THRESHOLD_MSEC);
-    } finally {
-      sw.reset().start();
-    }
-  }
-
-  private void assertDidBlock(StopWatch sw) {
-    try {
-      if (sw.now(TimeUnit.MILLISECONDS) < BLOCKING_THRESHOLD_MSEC) {
-        throw new RuntimeException("Blocking call returned too fast.");
-      }
-    } finally {
-      sw.reset().start();
-    }
-  }
-
-  private Runnable sleeper = new Runnable() {
-    @Override
-    public void run() {
-      String name = Thread.currentThread().getName();
-      try {
-        Thread.sleep(TASK_SLEEP_MSEC);
-      } catch (InterruptedException e) {
-        LOG.info("Thread {} interrupted.", name);
-        Thread.currentThread().interrupt();
-      }
-    }
-  };
-
-  private Callable<Integer> callableSleeper = new Callable<Integer>() {
-    @Override
-    public Integer call() throws Exception {
-      sleeper.run();
-      return SOME_VALUE;
-    }
-  };
-
-  /**
-   * Helper function to create thread pool under test.
-   */
-  private static void ensureCreated() throws Exception {
-    if (tpe == null) {
-      LOG.debug("Creating thread pool");
-      tpe = new BlockingThreadPoolExecutorService(NUM_ACTIVE_TASKS,
-          NUM_WAITING_TASKS, 1, TimeUnit.SECONDS, "btpetest");
-    }
-  }
-
-  /**
-   * Helper function to terminate thread pool under test, asserting that
-   * shutdown -> terminate works as expected.
-   */
-  private static void ensureDestroyed() throws Exception {
-    if (tpe == null) {
-      return;
-    }
-    int shutdownTries = SHUTDOWN_WAIT_TRIES;
-
-    tpe.shutdown();
-    if (!tpe.isShutdown()) {
-      throw new RuntimeException("Shutdown had no effect.");
-    }
-
-    while (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC,
-        TimeUnit.MILLISECONDS)) {
-      LOG.info("Waiting for thread pool shutdown.");
-      if (shutdownTries-- <= 0) {
-        LOG.error("Failed to terminate thread pool gracefully.");
-        break;
-      }
-    }
-    if (!tpe.isTerminated()) {
-      tpe.shutdownNow();
-      if (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC,
-          TimeUnit.MILLISECONDS)) {
-        throw new RuntimeException(
-            "Failed to terminate thread pool in timely manner.");
-      }
-    }
-    tpe = null;
-  }
-}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to