This is an automated email from the ASF dual-hosted git repository.

ahmar pushed a commit to branch feature-HADOOP-19363-analytics-accelerator-s3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit e18d0a4d26da1415304dd6d65202ee340903e03f
Author: fuatbasik <fuatba...@gmail.com>
AuthorDate: Wed Dec 18 10:06:56 2024 +0000

    HADOOP-19348. Add initial support for Analytics Accelerator Library for 
Amazon S3 (#7192)
---
 hadoop-tools/hadoop-aws/pom.xml                    |  11 ++
 .../java/org/apache/hadoop/fs/s3a/Constants.java   |  34 ++++
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  74 ++++++++-
 .../apache/hadoop/fs/s3a/S3ASeekableStream.java    | 183 +++++++++++++++++++++
 .../fs/contract/s3a/ITestS3AContractCreate.java    |  13 +-
 .../fs/contract/s3a/ITestS3AContractDistCp.java    |   9 +
 .../fs/contract/s3a/ITestS3AContractRename.java    |   8 +
 .../contract/s3a/ITestS3AContractVectoredRead.java |  12 ++
 .../apache/hadoop/fs/s3a/ITestS3ADelayedFNF.java   |   3 +
 .../hadoop/fs/s3a/ITestS3AEncryptionSSEC.java      |   6 +-
 .../hadoop/fs/s3a/ITestS3AFSMainOperations.java    |  18 +-
 .../hadoop/fs/s3a/ITestS3AFileSystemContract.java  |  11 +-
 .../hadoop/fs/s3a/ITestS3AIOStatisticsContext.java |   5 +
 .../hadoop/fs/s3a/ITestS3AInputStreamLeakage.java  |   7 +-
 .../org/apache/hadoop/fs/s3a/ITestS3AMetrics.java  |   5 +
 .../fs/s3a/ITestS3APrefetchingCacheFiles.java      |   8 +-
 .../fs/s3a/ITestS3APrefetchingInputStream.java     |   6 +-
 .../fs/s3a/ITestS3APrefetchingLruEviction.java     |   6 +-
 .../hadoop/fs/s3a/ITestS3AS3SeekableStream.java    | 130 +++++++++++++++
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java     |  15 ++
 .../fs/s3a/commit/ITestCommitOperationCost.java    |   7 +
 .../fs/s3a/commit/ITestS3ACommitterFactory.java    |   3 +
 .../s3a/commit/magic/ITestMagicCommitProtocol.java |   3 +
 .../integration/ITestDirectoryCommitProtocol.java  |   9 +
 .../ITestPartitionedCommitProtocol.java            |   9 +
 .../integration/ITestStagingCommitProtocol.java    |   4 +
 .../ITestStagingCommitProtocolFailure.java         |  12 +-
 .../fileContext/ITestS3AFileContextStatistics.java |   4 +
 .../fs/s3a/impl/ITestConnectionTimeouts.java       |  11 ++
 .../fs/s3a/performance/ITestS3AOpenCost.java       |   8 +-
 .../fs/s3a/performance/ITestUnbufferDraining.java  |   4 +
 .../ITestS3AContractStreamIOStatistics.java        |   8 +
 .../statistics/ITestS3AFileSystemStatistic.java    |   6 +
 33 files changed, 620 insertions(+), 32 deletions(-)

diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index e3c1b023e77..a6f56d419d1 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -472,6 +472,17 @@
       <artifactId>amazon-s3-encryption-client-java</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>software.amazon.s3.analyticsaccelerator</groupId>
+      <artifactId>analyticsaccelerator-s3</artifactId>
+      <version>0.0.2</version>
+      <scope>compile</scope>
+    </dependency>
+    <dependency>
+      <groupId>software.amazon.awssdk.crt</groupId>
+      <artifactId>aws-crt</artifactId>
+      <version>0.29.10</version>
+    </dependency>
     <dependency>
       <groupId>org.assertj</groupId>
       <artifactId>assertj-core</artifactId>
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index e695e918c95..0204b217808 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -1780,4 +1780,38 @@ private Constants() {
    * Value: {@value}.
    */
   public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit";
+
+
+  /**
+   * Prefix to configure Analytics Accelerator Library.
+   */
+  public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX =
+          "fs.s3a.analytics.accelerator";
+
+  /**
+   * Config to enable Analytics Accelerator Library for Amazon S3.
+   * https://github.com/awslabs/analytics-accelerator-s3
+   */
+  public static final String ANALYTICS_ACCELERATOR_ENABLED_KEY =
+          ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + ".enabled";
+
+  /**
+   * Config to enable usage of crt client with Analytics Accelerator Library.
+   * It is by default true.
+   */
+  public static final String ANALYTICS_ACCELERATOR_CRT_ENABLED =
+          "fs.s3a.analytics.accelerator.crt.client";
+
+  /**
+   * Default value for {@link #ANALYTICS_ACCELERATOR_ENABLED_KEY }
+   * Value {@value}.
+   */
+  public static final boolean ANALYTICS_ACCELERATOR_ENABLED_DEFAULT = false;
+
+  /**
+   * Default value for {@link #ANALYTICS_ACCELERATOR_CRT_ENABLED }
+   * Value {@value}.
+   */
+  public static final boolean ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT = true;
+
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 14031ed007e..24ad025b8e5 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -53,7 +53,9 @@
 
 import software.amazon.awssdk.core.ResponseInputStream;
 import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
 import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
 import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
 import 
software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
 import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
@@ -86,6 +88,11 @@
 import software.amazon.awssdk.transfer.s3.model.Copy;
 import software.amazon.awssdk.transfer.s3.model.CopyRequest;
 
+import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
+import 
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+
 import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -313,6 +320,13 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
    */
   private S3Client s3Client;
 
+  /**
+   * CRT-Based S3Client created of analytics accelerator library is enabled
+   * and managed by the S3AStoreImpl. Analytics accelerator library can be
+   * enabled with {@link Constants#ANALYTICS_ACCELERATOR_ENABLED_KEY}
+   */
+  private S3AsyncClient s3AsyncClient;
+
   // initial callback policy is fail-once; it's there just to assist
   // some mock tests and other codepaths trying to call the low level
   // APIs on an uninitialized filesystem.
@@ -340,6 +354,11 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
   // If true, the prefetching input stream is used for reads.
   private boolean prefetchEnabled;
 
+  // If true, S3SeekableInputStream from Analytics Accelerator for Amazon S3 
will be used.
+  private boolean analyticsAcceleratorEnabled;
+
+  private boolean analyticsAcceleratorCRTEnabled;
+
   // Size in bytes of a single prefetch block.
   private int prefetchBlockSize;
 
@@ -515,6 +534,11 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
    */
   private boolean s3AccessGrantsEnabled;
 
+  /**
+   * Factory to create S3SeekableInputStream if {@link 
this#analyticsAcceleratorEnabled} is true.
+   */
+  private S3SeekableInputStreamFactory s3SeekableInputStreamFactory;
+
   /** Add any deprecated keys. */
   @SuppressWarnings("deprecation")
   private static void addDeprecatedKeys() {
@@ -670,8 +694,21 @@ public void initialize(URI name, Configuration 
originalConf)
       this.prefetchBlockSize = (int) prefetchBlockSizeLong;
       this.prefetchBlockCount =
           intOption(conf, PREFETCH_BLOCK_COUNT_KEY, 
PREFETCH_BLOCK_DEFAULT_COUNT, 1);
+
+      this.analyticsAcceleratorEnabled =
+          conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, 
ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
+      this.analyticsAcceleratorCRTEnabled =
+          conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED,
+              ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT);
+
       this.isMultipartUploadEnabled = 
conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
-          DEFAULT_MULTIPART_UPLOAD_ENABLED);
+              DEFAULT_MULTIPART_UPLOAD_ENABLED);
+
+      if(this.analyticsAcceleratorEnabled && !analyticsAcceleratorCRTEnabled) {
+        // Temp change: Analytics Accelerator with S3AsyncClient do not 
support Multi-part upload.
+        this.isMultipartUploadEnabled = false;
+      }
+
       // multipart copy and upload are the same; this just makes it explicit
       this.isMultipartCopyEnabled = isMultipartUploadEnabled;
 
@@ -794,6 +831,27 @@ public void initialize(URI name, Configuration 
originalConf)
       // directly through the client manager.
       // this is to aid mocking.
       s3Client = store.getOrCreateS3Client();
+
+      if (this.analyticsAcceleratorEnabled) {
+        LOG.info("Using S3SeekableInputStream");
+        if(this.analyticsAcceleratorCRTEnabled) {
+          LOG.info("Using S3 CRT client for analytics accelerator S3");
+          this.s3AsyncClient = 
S3CrtAsyncClient.builder().maxConcurrency(600).build();
+        } else {
+          LOG.info("Using S3 async client for analytics accelerator S3");
+          this.s3AsyncClient = store.getOrCreateAsyncClient();
+        }
+
+        ConnectorConfiguration configuration = new ConnectorConfiguration(conf,
+            ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+        S3SeekableInputStreamConfiguration seekableInputStreamConfiguration =
+            
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
+        this.s3SeekableInputStreamFactory =
+            new S3SeekableInputStreamFactory(
+                new S3SdkObjectClient(this.s3AsyncClient),
+                seekableInputStreamConfiguration);
+      }
+
       // The filesystem is now ready to perform operations against
       // S3
       // This initiates a probe against S3 for the bucket existing.
@@ -1861,6 +1919,8 @@ private FSDataInputStream executeOpen(
       final Path path,
       final OpenFileSupport.OpenFileInformation fileInformation)
       throws IOException {
+
+
     // create the input stream statistics before opening
     // the file so that the time to prepare to open the file is included.
     S3AInputStreamStatistics inputStreamStats =
@@ -1877,6 +1937,14 @@ private FSDataInputStream executeOpen(
     fileInformation.applyOptions(readContext);
     LOG.debug("Opening '{}'", readContext);
 
+    if (this.analyticsAcceleratorEnabled) {
+      return new FSDataInputStream(
+              new S3ASeekableStream(
+                      this.bucket,
+                      pathToKey(path),
+                      s3SeekableInputStreamFactory));
+    }
+
     if (this.prefetchEnabled) {
       Configuration configuration = getConf();
       initLocalDirAllocatorIfNotInitialized(configuration);
@@ -4354,9 +4422,11 @@ public void close() throws IOException {
   protected synchronized void stopAllServices() {
     try {
       trackDuration(getDurationTrackerFactory(), FILESYSTEM_CLOSE.getSymbol(), 
() -> {
-        closeAutocloseables(LOG, store);
+        closeAutocloseables(LOG, store, s3SeekableInputStreamFactory);
         store = null;
         s3Client = null;
+        s3AsyncClient = null;
+        s3SeekableInputStreamFactory = null;
 
         // At this point the S3A client is shut down,
         // now the executor pools are closed
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java
new file mode 100644
index 00000000000..ef6a2990815
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSInputStream;
+
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
+import software.amazon.s3.analyticsaccelerator.util.S3URI;
+
+public class S3ASeekableStream extends FSInputStream implements 
StreamCapabilities {
+
+  private S3SeekableInputStream inputStream;
+  private long lastReadCurrentPos = 0;
+  private final String key;
+  private volatile boolean closed;
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(S3ASeekableStream.class);
+
+  public S3ASeekableStream(String bucket, String key,
+                           S3SeekableInputStreamFactory 
s3SeekableInputStreamFactory) {
+    this.inputStream = 
s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key));
+    this.key = key;
+  }
+
+  /**
+   * Indicates whether the given {@code capability} is supported by this 
stream.
+   *
+   * @param capability the capability to check.
+   * @return true if the given {@code capability} is supported by this stream, 
false otherwise.
+   */
+  @Override
+  public boolean hasCapability(String capability) {
+    return false;
+  }
+
+  @Override
+  public int read() throws IOException {
+    throwIfClosed();
+    int bytesRead;
+    try {
+      bytesRead = inputStream.read();
+    } catch (IOException ioe) {
+      onReadFailure(ioe);
+      throw ioe;
+    }
+    return bytesRead;
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    throwIfClosed();
+    if (pos < 0) {
+      throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+          + " " + pos);
+    }
+    inputStream.seek(pos);
+  }
+
+
+  @Override
+  public synchronized long getPos() {
+    if (!closed) {
+      lastReadCurrentPos = inputStream.getPos();
+    }
+    return lastReadCurrentPos;
+  }
+
+
+  /**
+   * Reads the last n bytes from the stream into a byte buffer. Blocks until 
end of stream is
+   * reached. Leaves the position of the stream unaltered.
+   *
+   * @param buf buffer to read data into
+   * @param off start position in buffer at which data is written
+   * @param len the number of bytes to read; the n-th byte should be the last 
byte of the stream.
+   * @return the total number of bytes read into the buffer
+   * @throws IOException if an I/O error occurs
+   */
+  public int readTail(byte[] buf, int off, int len) throws IOException {
+    throwIfClosed();
+    int bytesRead;
+    try {
+      bytesRead = inputStream.readTail(buf, off, len);
+    } catch (IOException ioe) {
+      onReadFailure(ioe);
+      throw ioe;
+    }
+    return bytesRead;
+  }
+
+  @Override
+  public int read(byte[] buf, int off, int len) throws IOException {
+    throwIfClosed();
+    int bytesRead;
+    try {
+      bytesRead = inputStream.read(buf, off, len);
+    } catch (IOException ioe) {
+      onReadFailure(ioe);
+      throw ioe;
+    }
+    return bytesRead;
+  }
+
+
+  @Override
+  public boolean seekToNewSource(long l) throws IOException {
+    return false;
+  }
+
+  @Override
+  public int available() throws IOException {
+    throwIfClosed();
+    return super.available();
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if(!closed) {
+      closed = true;
+      try {
+        inputStream.close();
+        inputStream = null;
+        super.close();
+      } catch (IOException ioe) {
+        LOG.debug("Failure closing stream {}: ", key);
+        throw ioe;
+      }
+    }
+  }
+
+  /**
+   * Close the stream on read failure.
+   * No attempt to recover from failure
+   *
+   * @param ioe exception caught.
+   */
+  @Retries.OnceTranslated
+  private void onReadFailure(IOException ioe) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Got exception while trying to read from stream {}, " +
+              "not trying to recover:",
+          key, ioe);
+    } else {
+      LOG.info("Got exception while trying to read from stream {}, " +
+              "not trying to recover:",
+          key, ioe);
+    }
+    this.close();
+  }
+
+
+  protected void throwIfClosed() throws IOException {
+    if (closed) {
+      throw new IOException(key + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
index 033c2d94c7b..3e5c67e5894 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
@@ -31,9 +31,8 @@
 
 import static 
org.apache.hadoop.fs.s3a.S3ATestConstants.KEY_PERFORMANCE_TESTS_ENABLED;
 import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
+
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
 
 /**
  * S3A contract tests creating files.
@@ -93,6 +92,14 @@ protected Configuration createConfiguration() {
     return conf;
   }
 
+  @Override
+  public void testOverwriteExistingFile() throws Throwable {
+    // Will remove this when Analytics Accelerator supports overwrites
+    skipIfAnalyticsAcceleratorEnabled(this.createConfiguration(),
+        "Analytics Accelerator does not support overwrites yet");
+    super.testOverwriteExistingFile();
+  }
+
   @Override
   public void testOverwriteNonEmptyDirectory() throws Throwable {
     try {
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
index e761e0d14bf..f6127700b5f 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
@@ -20,6 +20,7 @@
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static 
org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageStatistics;
@@ -78,6 +79,14 @@ public void testNonDirectWrite() throws Exception {
         getRenameOperationCount() - renames);
   }
 
+  @Override
+  public void testDistCpUpdateCheckFileSkip() throws Exception {
+    //Will remove this when Analytics Accelerator supports overwrites
+    skipIfAnalyticsAcceleratorEnabled(createConfiguration(),
+        "Analytics Accelerator Library does not support update to existing 
files");
+    super.testDistCpUpdateCheckFileSkip();
+  }
+
   private long getRenameOperationCount() {
     return getFileSystem().getStorageStatistics()
         .getLong(StorageStatistics.CommonStatisticNames.OP_RENAME);
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractRename.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractRename.java
index d3ba7373cc9..9f5246d9dde 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractRename.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractRename.java
@@ -36,6 +36,7 @@
 import static 
org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
 import static org.apache.hadoop.fs.s3a.S3ATestConstants.S3A_TEST_TIMEOUT;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 
 /**
  * S3A contract tests covering rename.
@@ -45,6 +46,13 @@ public class ITestS3AContractRename extends 
AbstractContractRenameTest {
   public static final Logger LOG = LoggerFactory.getLogger(
       ITestS3AContractRename.class);
 
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    skipIfAnalyticsAcceleratorEnabled(createConfiguration(),
+        "Analytics Accelerator does not support rename");
+
+  }
   @Override
   protected int getTestTimeoutMillis() {
     return S3A_TEST_TIMEOUT;
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
index fbb6d5a04d2..87e3b23cd71 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
@@ -63,6 +63,7 @@
 import static 
org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
 import static 
org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE;
 import static 
org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
 import static org.apache.hadoop.io.Sizes.S_1M;
@@ -88,6 +89,17 @@ protected AbstractFSContract createContract(Configuration 
conf) {
     return new S3AContract(conf);
   }
 
+  /**
+   * Analytics Accelerator Library for Amazon S3 does not support Vectored 
Reads.
+   * @throws Exception
+   */
+  @Override
+  public void setup() throws Exception {
+    skipIfAnalyticsAcceleratorEnabled(createConfiguration(),
+        "Analytics Accelerator does not support vectored reads");
+    super.setup();
+  }
+
   /**
    * Verify response to a vector read request which is beyond the
    * real length of the file.
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADelayedFNF.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADelayedFNF.java
index ca9d185c3e9..4793092b717 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADelayedFNF.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADelayedFNF.java
@@ -36,6 +36,7 @@
 import static org.apache.hadoop.fs.s3a.Constants.RETRY_INTERVAL;
 import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 
 /**
  * Tests behavior of a FileNotFound error that happens after open(), i.e. on
@@ -65,6 +66,8 @@ protected Configuration createConfiguration() {
    */
   @Test
   public void testNotFoundFirstRead() throws Exception {
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "Temporarily disabling to fix Exception handling on Analytics 
Accelerator");
     S3AFileSystem fs = getFileSystem();
     ChangeDetectionPolicy changeDetectionPolicy =
         fs.getChangeDetectionPolicy();
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
index d22de3b06d8..12e5ef3841a 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
@@ -38,9 +38,7 @@
 import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
 import static 
org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeSkipRootTests;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
@@ -93,6 +91,8 @@ protected Configuration createConfiguration() {
   @Override
   public void setup() throws Exception {
     super.setup();
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "Analytics Accelerator does not support SSEC");
     assumeEnabled();
     // although not a root dir test, this confuses paths enough it shouldn't 
be run in
     // parallel with other jobs
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
index 0281c57f5cb..1d3806e75e3 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
@@ -29,9 +29,7 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.s3a.S3AContract;
 
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.createTestPath;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
 
 /**
  * S3A Test suite for the FSMainOperationsBaseTest tests.
@@ -78,6 +76,20 @@ public void testCopyToLocalWithUseRawLocalFileSystemOption()
       throws Exception {
   }
 
+  @Override
+  public void testWriteReadAndDeleteOneAndAHalfBlocks() throws Exception {
+    // Skipping this test for AnalyticsAccelerator as it is acting as an 
overwrite test
+    skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(),
+        "Analytics Accelerator does not support overwrites");
+  }
+
+  @Override
+  public void testWriteReadAndDeleteTwoBlocks() throws Exception {
+    // Skipping this test for AnalyticsAccelerator as it is acting as an 
overwrite test
+    skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(),
+        "Analytics Accelerator does not support overwrites");
+  }
+
   @Override
   public void testOverwrite() throws IOException {
     boolean createPerformance = isCreatePerformanceEnabled(fSys);
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
index 48081457658..32d9a511a41 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
@@ -34,8 +34,7 @@
 import org.apache.hadoop.fs.Path;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.junit.Assume.*;
 import static org.junit.Assert.*;
@@ -160,4 +159,12 @@ public void testOverwrite() throws IOException {
       }
     }
   }
+
+  @Override
+  public void testOverWriteAndRead() throws Exception {
+    //Will remove this when Analytics Accelerator supports overwrites
+    skipIfAnalyticsAcceleratorEnabled(fs.getConf(),
+        "Analytics Accelerator does not support overwrites");
+    super.testOverWriteAndRead();
+  }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
index 70dc5ee476c..005b2fbf91d 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
@@ -43,6 +43,7 @@
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
@@ -77,6 +78,10 @@ protected Configuration createConfiguration() {
   public void setup() throws Exception {
     super.setup();
     executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
+    // TODO: Add IOStatistics Support to S3SeekableStream
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "S3SeekableStream does not support IOStatisticsContext");
+
   }
 
   @Override
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java
index 4b871c6a197..4d8956d38e7 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java
@@ -34,6 +34,7 @@
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
 import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_LEAKS;
 import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs;
@@ -58,7 +59,7 @@ public class ITestS3AInputStreamLeakage extends 
AbstractS3ATestBase {
   @Override
   public void setup() throws Exception {
     super.setup();
-    assume("Stream leak detection not avaialable",
+    assume("Stream leak detection not available",
         getFileSystem().hasCapability(STREAM_LEAKS));
   }
 
@@ -89,6 +90,10 @@ public void setup() throws Exception {
   @Test
   public void testFinalizer() throws Throwable {
     Path path = methodPath();
+    // TODO: Add Leak Detection to S3SeekableStream
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "S3SeekableStream does not support leak detection");
+
     final S3AFileSystem fs = getFileSystem();
 
     ContractTestUtils.createFile(fs, path, true, DATASET);
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
index 3bfe69c2bca..61d15999635 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
@@ -28,6 +28,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
 
 /**
@@ -51,6 +52,10 @@ public void testMetricsRegister()
 
   @Test
   public void testStreamStatistics() throws IOException {
+    // TODO: Add StreamStatistics support to S3SeekableStream
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "S3SeekableStream does not support stream statistics");
+
     S3AFileSystem fs = getFileSystem();
     Path file = path("testStreamStatistics");
     byte[] data = "abcdefghijklmnopqrstuvwxyz".getBytes();
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java
index e7c9921824c..b52a73fd6ab 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java
@@ -37,10 +37,7 @@
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
 
-import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
-import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
-import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
-import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.*;
 import static 
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
 import static 
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
 import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
@@ -73,7 +70,6 @@ public void setUp() throws Exception {
     super.setup();
     // Sets BUFFER_DIR by calling S3ATestUtils#prepareTestConfiguration
     conf = createConfiguration();
-
     testFile = getExternalData(conf);
     prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
     fs = FileSystem.get(testFile.toUri(), conf);
@@ -94,6 +90,8 @@ public Configuration createConfiguration() {
     final String bufferDirBase = configuration.get(BUFFER_DIR);
     bufferDir = bufferDirBase + "/" + UUID.randomUUID();
     configuration.set(BUFFER_DIR, bufferDir);
+    // When both Prefetching and Analytics Accelerator enabled Analytics 
Accelerator is used
+    configuration.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, false);
     return configuration;
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
index 28c85419465..544c71e114f 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
@@ -34,8 +34,7 @@
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.test.LambdaTestUtils;
 
-import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
-import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.*;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
@@ -71,6 +70,7 @@ public class ITestS3APrefetchingInputStream extends 
AbstractS3ACostTest {
   private static final int INTERVAL_MILLIS = 500;
   private static final int BLOCK_SIZE = S_1K * 10;
 
+
   @Override
   public Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
@@ -78,6 +78,8 @@ public Configuration createConfiguration() {
     S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
     conf.setBoolean(PREFETCH_ENABLED_KEY, true);
     conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    // When both Prefetching and Analytics Accelerator enabled Analytics 
Accelerator is used
+    conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, false);
     return conf;
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java
index 7375105909b..4e7bc24a432 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java
@@ -42,9 +42,7 @@
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.test.LambdaTestUtils;
 
-import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
-import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
-import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;
+import static org.apache.hadoop.fs.s3a.Constants.*;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValues;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
@@ -92,6 +90,8 @@ public Configuration createConfiguration() {
     conf.setBoolean(PREFETCH_ENABLED_KEY, true);
     conf.setInt(PREFETCH_MAX_BLOCKS_COUNT, Integer.parseInt(maxBlocks));
     conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    // When both Prefetching and Analytics Accelerator enabled Analytics 
Accelerator is used
+    conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, false);
     return conf;
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java
new file mode 100644
index 00000000000..c6ecee95051
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_ENABLED_KEY;
+import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CRT_ENABLED;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+
+import 
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AS3SeekableStream extends AbstractS3ATestBase {
+
+  private static final String PHYSICAL_IO_PREFIX = "physicalio";
+  private static final String LOGICAL_IO_PREFIX = "logicalio";
+
+  public void testConnectorFrameWorkIntegration(boolean useCrtClient) throws 
IOException {
+    describe("Verify S3 connector framework integration");
+
+    Configuration conf = getConfiguration();
+    removeBaseAndBucketOverrides(conf, ANALYTICS_ACCELERATOR_ENABLED_KEY);
+    conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, true);
+    conf.setBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, useCrtClient);
+
+    String testFile = "s3a://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz";
+    S3AFileSystem s3AFileSystem =
+        (S3AFileSystem) FileSystem.newInstance(new Path(testFile).toUri(), 
conf);
+    byte[] buffer = new byte[500];
+
+    try (FSDataInputStream inputStream = s3AFileSystem.open(new 
Path(testFile))) {
+      inputStream.seek(5);
+      inputStream.read(buffer, 0, 500);
+    }
+
+  }
+
+  @Test
+  public void testConnectorFrameWorkIntegrationWithCrtClient() throws 
IOException {
+    testConnectorFrameWorkIntegration(true);
+  }
+
+  @Test
+  public void testConnectorFrameWorkIntegrationWithoutCrtClient() throws 
IOException {
+    testConnectorFrameWorkIntegration(false);
+  }
+
+  public void testConnectorFrameworkConfigurable(boolean useCrtClient) {
+    describe("Verify S3 connector framework reads configuration");
+
+    Configuration conf = getConfiguration();
+    removeBaseAndBucketOverrides(conf);
+
+    //Disable Predictive Prefetching
+    conf.set(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+        "." + LOGICAL_IO_PREFIX + ".prefetching.mode", "all");
+
+    //Set Blobstore Capacity
+    conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+        "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1);
+
+    conf.setBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, useCrtClient);
+
+    ConnectorConfiguration connectorConfiguration =
+        new ConnectorConfiguration(conf, 
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+
+    S3SeekableInputStreamConfiguration configuration =
+        
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration);
+
+    assertSame("S3ASeekableStream configuration is not set to expected value",
+        PrefetchMode.ALL, 
configuration.getLogicalIOConfiguration().getPrefetchingMode());
+
+    assertEquals("S3ASeekableStream configuration is not set to expected 
value",
+        1, configuration.getPhysicalIOConfiguration().getBlobStoreCapacity());
+  }
+
+  @Test
+  public void testConnectorFrameworkConfigurableWithoutCrtClient() throws 
IOException {
+    testConnectorFrameworkConfigurable(false);
+  }
+
+  @Test
+  public void testConnectorFrameworkConfigurableWithCrtClient() throws 
IOException {
+    testConnectorFrameworkConfigurable(true);
+  }
+
+  @Test
+  public void testInvalidConfigurationThrows() {
+    describe("Verify S3 connector framework throws with invalid 
configuration");
+
+    Configuration conf = getConfiguration();
+    removeBaseAndBucketOverrides(conf);
+    //Disable Sequential Prefetching
+    conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+        "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", -1);
+
+    ConnectorConfiguration connectorConfiguration =
+        new ConnectorConfiguration(conf, 
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+    assertThrows("S3ASeekableStream illegal configuration does not throw",
+        IllegalArgumentException.class, () ->
+            
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
+  }
+}
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 1f779ab7ca3..b4c6462c01d 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -574,6 +574,21 @@ public static boolean isS3ExpressTestBucket(final 
Configuration conf) {
     return S3ExpressStorage.isS3ExpressStore(getTestBucketName(conf), "");
   }
 
+  /**
+   * Skip a test if the Analytics Accelerator Library for Amazon S3 is enabled.
+   * @param configuration configuration to probe
+   */
+  public static void skipIfAnalyticsAcceleratorEnabled(
+          Configuration configuration, String message) {
+    assume(message,
+            !isAnalyticsAcceleratorEnabled(configuration));
+  }
+
+  public static boolean isAnalyticsAcceleratorEnabled(final Configuration 
conf) {
+    return conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY,
+        ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
+  }
+
   /**
    * Skip a test if the filesystem lacks a required capability.
    * @param fs filesystem
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
index 8132b44cdb4..fac461371e6 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
@@ -40,6 +40,7 @@
 import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
 import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
 
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.s3a.Statistic.ACTION_HTTP_GET_REQUEST;
 import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_FILES_CREATED;
 import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT;
@@ -168,6 +169,10 @@ private void abortActiveStream() throws IOException {
   @Test
   public void testCostOfCreatingMagicFile() throws Throwable {
     describe("Files created under magic paths skip existence checks");
+
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "S3ASeekableInputStream does not support InputStreamStatistics");
+
     S3AFileSystem fs = getFileSystem();
     Path destFile = methodSubPath("file.txt");
     fs.delete(destFile.getParent(), true);
@@ -245,6 +250,8 @@ public void testCostOfCreatingMagicFile() throws Throwable {
   public void testCostOfSavingLoadingPendingFile() throws Throwable {
     describe("Verify costs of saving .pending file under a magic path");
 
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "S3ASeekableInputStream does not support InputStreamStatistics");
     S3AFileSystem fs = getFileSystem();
     Path partDir = methodSubPath("file.pending");
     Path destFile = new Path(partDir, "file.pending");
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java
index 2561a69f60b..a500bfb76a3 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java
@@ -47,6 +47,7 @@
 import org.apache.hadoop.security.UserGroupInformation;
 
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
 import static 
org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.COMMITTER_NAME_STAGING;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@@ -182,6 +183,8 @@ public void setup() throws Exception {
     // destroy all filesystems from previous runs.
     FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
     super.setup();
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "S3ASeekableInputStream does not support InputStreamStatistics");
     jobId = randomJobId();
     attempt0 = "attempt_" + jobId + "_m_000000_0";
     taskAttempt0 = TaskAttemptID.forName(attempt0);
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
index cbfc23a2a29..b89740ae312 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
@@ -46,6 +46,7 @@
 import org.junit.runners.Parameterized;
 
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
 import static 
org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.getMagicJobPath;
@@ -77,6 +78,8 @@ protected String getCommitterName() {
   @Override
   public void setup() throws Exception {
     super.setup();
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "S3ASeekableInputStream does not support InputStreamStatistics");
     CommitUtils.verifyIsMagicCommitFS(getFileSystem());
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java
index b19662c0117..cbd41fd1c93 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java
@@ -35,6 +35,7 @@
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND;
 import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE;
 import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS;
@@ -42,6 +43,14 @@
 /** ITest of the low level protocol methods. */
 public class ITestDirectoryCommitProtocol extends ITestStagingCommitProtocol {
 
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "Skipping because these tests will fail when using CRT client");
+  }
+
   @Override
   protected String suitename() {
     return "ITestDirectoryCommitProtocol";
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionedCommitProtocol.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionedCommitProtocol.java
index e3bc1500dab..b6438157e4e 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionedCommitProtocol.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionedCommitProtocol.java
@@ -32,10 +32,19 @@
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 
 /** ITest of the low level protocol methods. */
 public class ITestPartitionedCommitProtocol extends ITestStagingCommitProtocol 
{
 
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "Skipping because these tests will fail when using CRT client");
+  }
+
   @Override
   protected String suitename() {
     return "ITestPartitionedCommitProtocol";
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
index 81c3af812ab..d2b4e1c5295 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
@@ -41,6 +41,7 @@
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
 
 /** Test the staging committer's handling of the base protocol operations. */
@@ -65,6 +66,9 @@ protected Configuration createConfiguration() {
   @Override
   public void setup() throws Exception {
     super.setup();
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "Skipping because these tests will fail when using CRT client");
+
 
     // identify working dir for staging and delete
     Configuration conf = getConfiguration();
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java
index 08b6c21a863..5604aa76369 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java
@@ -32,9 +32,7 @@
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 
 import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
 import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME;
 import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@@ -45,6 +43,14 @@
  */
 public class ITestStagingCommitProtocolFailure extends AbstractS3ATestBase {
 
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "Skipping because these tests will fail when using CRT client");
+  }
+
   @Override
   protected Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
index 1724006a831..dc086f7c423 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
@@ -30,6 +30,8 @@
 import org.junit.Assert;
 import org.junit.Before;
 
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
+
 /**
  * S3a implementation of FCStatisticsBaseTest.
  */
@@ -44,6 +46,8 @@ public class ITestS3AFileContextStatistics extends 
FCStatisticsBaseTest {
   @Before
   public void setUp() throws Exception {
     conf = new Configuration();
+    skipIfAnalyticsAcceleratorEnabled(conf,
+        "S3SeekableStream does not support File Context Statistics");
     fc = S3ATestUtils.createTestFileContext(conf);
     testRootPath = fileContextTestHelper.getTestRootPath(fc, "test");
     fc.mkdir(testRootPath,
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java
index a4cc5cadc5d..2a6605b9342 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java
@@ -60,6 +60,7 @@
 import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
 import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static 
org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
 import static 
org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.setDurationAsMillis;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@@ -146,6 +147,11 @@ public void teardown() throws Exception {
   @Test
   public void testGeneratePoolTimeouts() throws Throwable {
     skipIfClientSideEncryption();
+
+    // Assertions will fail when using CRTClient with Analytics Accelerator.
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "Assertions will fail when using CRTClient with Analytics 
Accelerator");
+
     AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
     Configuration conf = timingOutConfiguration();
     Path path = methodPath();
@@ -188,6 +194,11 @@ public void testGeneratePoolTimeouts() throws Throwable {
   @Test
   public void testObjectUploadTimeouts() throws Throwable {
     skipIfClientSideEncryption();
+
+    // Assertions will fail when using CRTClient with Analytics Accelerator.
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "Assertions will fail when using CRTClient with Analytics 
Accelerator");
+
     AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
     final Path dir = methodPath();
     Path file = new Path(dir, "file");
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
index fdafce3c2eb..d2288957f1e 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
@@ -54,10 +54,7 @@
 import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION;
 import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_DEFAULT;
 import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.assertStreamIsNotChecksummed;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
 import static 
org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
@@ -110,6 +107,8 @@ public Configuration createConfiguration() {
   @Override
   public void setup() throws Exception {
     super.setup();
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "Assertions will fail as S3SeekableStream does not support Stream 
Statistics");
     S3AFileSystem fs = getFileSystem();
     testFile = methodPath();
 
@@ -388,7 +387,6 @@ public void testPositionedReadableReadPastEOF() throws 
Throwable {
 
     describe("PositionedReadable.read() past the end of the file");
     assumeNoPrefetching();
-
     verifyMetrics(() -> {
       try (FSDataInputStream in =
                openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
index be210003da0..548f78d1a28 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
@@ -54,6 +54,7 @@
 import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
 import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static 
org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.setDurationAsSeconds;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
@@ -118,6 +119,9 @@ public Configuration createConfiguration() {
   @Override
   public void setup() throws Exception {
     super.setup();
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "Skipping because getS3AInputStream will " +
+            "try to cast S3SeekableStream to S3AInputStream");
 
     // now create a new FS with minimal http capacity and recovery
     // a separate one is used to avoid test teardown suffering
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
index 0f6b69cd54d..2b332b2b3ee 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
@@ -27,6 +27,7 @@
 import org.apache.hadoop.fs.contract.s3a.S3AContract;
 import org.apache.hadoop.fs.statistics.StreamStatisticNames;
 
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.statistics.StreamStatisticNames.*;
 
 /**
@@ -78,4 +79,11 @@ public List<String> outputStreamStatisticKeys() {
         STREAM_WRITE_EXCEPTIONS);
   }
 
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    skipIfAnalyticsAcceleratorEnabled(createConfiguration(),
+        "S3SeekableStream does not support Stream Statistics");
+  }
+
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
index 0d5d2a789a0..5a3f7bb8fdb 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
@@ -31,6 +31,8 @@
 import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
 import org.apache.hadoop.fs.statistics.StreamStatisticNames;
 
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
+
 public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase {
 
   private static final int ONE_KB = 1024;
@@ -42,6 +44,10 @@ public class ITestS3AFileSystemStatistic extends 
AbstractS3ATestBase {
    */
   @Test
   public void testBytesReadWithStream() throws IOException {
+    // Assertions will fail as {@link S3ASeekableInputStream}
+    // do not support S3AFileSystemStatistics yet.
+    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+        "S3SeekableStream does not support File System Statistics");
     S3AFileSystem fs = getFileSystem();
     Path filePath = path(getMethodName());
     byte[] oneKbBuf = new byte[ONE_KB];


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to