This is an automated email from the ASF dual-hosted git repository.

ahmar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1f099fe2db7 HADOOP-19364. S3A: IoStats support for AAL. (#8007)
1f099fe2db7 is described below

commit 1f099fe2db7dbfd1f1bd6e1ef11edcf2d7b59ca8
Author: ahmarsuhail <[email protected]>
AuthorDate: Thu Nov 20 11:22:30 2025 +0000

    HADOOP-19364. S3A: IoStats support for AAL. (#8007)
---
 .../hadoop/fs/statistics/StreamStatisticNames.java |  17 ++
 .../apache/hadoop/fs/s3a/S3AInstrumentation.java   |  36 +++-
 .../java/org/apache/hadoop/fs/s3a/Statistic.java   |  14 +-
 .../s3a/impl/streams/AnalyticsRequestCallback.java |  69 +++++++
 .../fs/s3a/impl/streams/AnalyticsStream.java       |  45 ++++-
 .../s3a/impl/streams/AnalyticsStreamFactory.java   |   2 -
 .../s3a/statistics/S3AInputStreamStatistics.java   |  26 +++
 .../statistics/impl/EmptyS3AStatisticsContext.java |  25 +++
 ...TestS3AContractAnalyticsStreamVectoredRead.java |  71 ++++++-
 .../fs/contract/s3a/ITestS3AContractCreate.java    |   8 -
 .../fs/contract/s3a/ITestS3AContractDistCp.java    |  10 -
 .../s3a/ITestS3AContractMultipartUploader.java     |   7 -
 .../ITestS3AAnalyticsAcceleratorStreamReading.java | 214 ++++++++++++++++++++-
 .../hadoop/fs/s3a/ITestS3AFSMainOperations.java    |  13 --
 .../hadoop/fs/s3a/ITestS3AFileSystemContract.java  |   7 -
 .../hadoop/fs/s3a/ITestS3AIOStatisticsContext.java |   6 +-
 .../org/apache/hadoop/fs/s3a/ITestS3AMetrics.java  |   5 -
 .../org/apache/hadoop/fs/s3a/S3ATestConstants.java |  20 ++
 .../fs/s3a/commit/ITestCommitOperationCost.java    |  11 +-
 .../fileContext/ITestS3AFileContextStatistics.java |   5 -
 .../fs/s3a/performance/ITestS3AOpenCost.java       |  92 ++++++---
 .../ITestS3AContractStreamIOStatistics.java        |   5 -
 .../statistics/ITestS3AFileSystemStatistic.java    |   5 -
 23 files changed, 599 insertions(+), 114 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
index 09c19ad071a..e8cb9f6469a 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
@@ -489,6 +489,23 @@ public final class StreamStatisticNames {
   public static final String STREAM_FILE_CACHE_EVICTION
       = "stream_file_cache_eviction";
 
+  /**
+   * Bytes that were prefetched by the stream.
+   */
+  public static final String STREAM_READ_PREFETCHED_BYTES = 
"stream_read_prefetched_bytes";
+
+  /**
+   * Tracks failures in footer parsing.
+   */
+  public static final String STREAM_READ_PARQUET_FOOTER_PARSING_FAILED
+          = "stream_read_parquet_footer_parsing_failed";
+
+  /**
+   * A cache hit occurs when the request range can be satisfied by the data in 
the cache.
+   */
+  public static final String STREAM_READ_CACHE_HIT = "stream_read_cache_hit";
+
+
   private StreamStatisticNames() {
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index b3c907428ac..bc2c83a6872 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -81,6 +81,9 @@
 import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
 import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_FAILURES;
 import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_UNBUFFERED;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_CACHE_HIT;
 import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
 import static org.apache.hadoop.fs.s3a.Statistic.*;
 
@@ -891,7 +894,12 @@ private InputStreamStatistics(
               StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
               StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES,
               StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE,
-              StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED)
+              StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST,
+              StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED,
+              StreamStatisticNames.STREAM_READ_CACHE_HIT,
+              StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES,
+              StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED
+                  )
           .withGauges(STREAM_READ_GAUGE_INPUT_POLICY,
               STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(),
               STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(),
@@ -1128,6 +1136,32 @@ public void readVectoredBytesDiscarded(int discarded) {
       bytesDiscardedInVectoredIO.addAndGet(discarded);
     }
 
+    @Override
+    public void getRequestInitiated() {
+      increment(ACTION_HTTP_GET_REQUEST);
+    }
+
+    @Override
+    public void headRequestInitiated() {
+      increment(StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST);
+    }
+
+    @Override
+    public void bytesPrefetched(long size) {
+      increment(STREAM_READ_PREFETCHED_BYTES, size);
+    }
+
+    @Override
+    public void footerParsingFailed() {
+      increment(STREAM_READ_PARQUET_FOOTER_PARSING_FAILED);
+    }
+
+    @Override
+    public void streamReadCacheHit() {
+      increment(STREAM_READ_CACHE_HIT);
+    }
+
+
     @Override
     public void executorAcquired(Duration timeInQueue) {
       // update the duration fields in the IOStatistics.
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 6389742167d..4b09530ba9b 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -458,9 +458,21 @@ public enum Statistic {
       StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE,
       "Gauge of active memory in use",
       TYPE_GAUGE),
+  STREAM_READ_PREFETCH_BYTES(
+          StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES,
+          "Bytes prefetched by AAL stream",
+          TYPE_COUNTER),
+  STREAM_READ_PARQUET_FOOTER_PARSING_FAILED(
+          StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED,
+          "Count of Parquet footer parsing failures encountered by AAL",
+          TYPE_COUNTER),
+  STREAM_READ_CACHE_HIT(
+          StreamStatisticNames.STREAM_READ_CACHE_HIT,
+          "Count of cache hits in AAL stream",
+          TYPE_COUNTER),
 
-  /* Stream Write statistics */
 
+  /* Stream Write statistics */
   STREAM_WRITE_EXCEPTIONS(
       StreamStatisticNames.STREAM_WRITE_EXCEPTIONS,
       "Count of stream write failures reported",
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java
new file mode 100644
index 00000000000..d3940bd86e8
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import software.amazon.s3.analyticsaccelerator.util.RequestCallback;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+
+/**
+ * Implementation of AAL's RequestCallback interface that tracks analytics 
operations.
+ */
+public class AnalyticsRequestCallback implements RequestCallback {
+  private final S3AInputStreamStatistics statistics;
+
+    /**
+     * Create a new callback instance.
+     * @param statistics the statistics to update
+     */
+  public AnalyticsRequestCallback(S3AInputStreamStatistics statistics) {
+    this.statistics = statistics;
+  }
+
+  @Override
+    public void onGetRequest() {
+    statistics.getRequestInitiated();
+  }
+
+  @Override
+    public void onHeadRequest() {
+    statistics.headRequestInitiated();
+  }
+
+  @Override
+  public void onBlockPrefetch(long start, long end) {
+    statistics.bytesPrefetched(end - start + 1);
+  }
+
+  @Override
+  public void footerParsingFailed() {
+    statistics.footerParsingFailed();
+  }
+
+  @Override
+  public void onReadVectored(int numIncomingRanges, int numCombinedRanges) {
+    statistics.readVectoredOperationStarted(numIncomingRanges, 
numCombinedRanges);
+  }
+
+  @Override
+  public void onCacheHit() {
+    statistics.streamReadCacheHit();
+  }
+
+}
+
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
index 8920b5b2dfc..954ee3a0e48 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
@@ -40,6 +40,7 @@
 import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
 import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
 import software.amazon.s3.analyticsaccelerator.util.S3URI;
+import software.amazon.s3.analyticsaccelerator.util.RequestCallback;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,6 +73,7 @@ public AnalyticsStream(final ObjectReadParameters parameters,
       final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws 
IOException {
     super(InputStreamType.Analytics, parameters);
     S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
+
     this.inputStream = 
s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(),
         s3Attributes.getKey()), buildOpenStreamInformation(parameters));
     getS3AStreamStatistics().streamOpened(InputStreamType.Analytics);
@@ -80,6 +82,9 @@ public AnalyticsStream(final ObjectReadParameters parameters,
   @Override
   public int read() throws IOException {
     throwIfClosed();
+
+    getS3AStreamStatistics().readOperationStarted(getPos(), 1);
+
     int bytesRead;
     try {
       bytesRead = inputStream.read();
@@ -87,6 +92,11 @@ public int read() throws IOException {
       onReadFailure(ioe);
       throw ioe;
     }
+
+    if (bytesRead != -1) {
+      incrementBytesRead(1);
+    }
+
     return bytesRead;
   }
 
@@ -122,6 +132,8 @@ public synchronized long getPos() {
    */
   public int readTail(byte[] buf, int off, int len) throws IOException {
     throwIfClosed();
+    getS3AStreamStatistics().readOperationStarted(getPos(), len);
+
     int bytesRead;
     try {
       bytesRead = inputStream.readTail(buf, off, len);
@@ -129,12 +141,20 @@ public int readTail(byte[] buf, int off, int len) throws 
IOException {
       onReadFailure(ioe);
       throw ioe;
     }
+
+    if (bytesRead > 0) {
+      incrementBytesRead(bytesRead);
+    }
+
     return bytesRead;
   }
 
   @Override
   public int read(byte[] buf, int off, int len) throws IOException {
     throwIfClosed();
+
+    getS3AStreamStatistics().readOperationStarted(getPos(), len);
+
     int bytesRead;
     try {
       bytesRead = inputStream.read(buf, off, len);
@@ -142,6 +162,11 @@ public int read(byte[] buf, int off, int len) throws 
IOException {
       onReadFailure(ioe);
       throw ioe;
     }
+
+    if (bytesRead > 0) {
+      incrementBytesRead(bytesRead);
+    }
+
     return bytesRead;
   }
 
@@ -177,8 +202,6 @@ public void readVectored(final List<? extends FileRange> 
ranges,
       range.setData(result);
     }
 
-    // AAL does not do any range coalescing, so input and combined ranges are 
the same.
-    this.getS3AStreamStatistics().readVectoredOperationStarted(ranges.size(), 
ranges.size());
     inputStream.readVectored(objectRanges, allocate, release);
   }
 
@@ -247,10 +270,14 @@ private void onReadFailure(IOException ioe) throws 
IOException {
   }
 
   private OpenStreamInformation 
buildOpenStreamInformation(ObjectReadParameters parameters) {
+
+    final RequestCallback requestCallback = new 
AnalyticsRequestCallback(getS3AStreamStatistics());
+
     OpenStreamInformation.OpenStreamInformationBuilder 
openStreamInformationBuilder =
         OpenStreamInformation.builder()
             .inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext()
-            .getInputPolicy()));
+            .getInputPolicy()))
+            .requestCallback(requestCallback);
 
     if (parameters.getObjectAttributes().getETag() != null) {
       openStreamInformationBuilder.objectMetadata(ObjectMetadata.builder()
@@ -300,4 +327,16 @@ protected void throwIfClosed() throws IOException {
       throw new IOException(getKey() + ": " + 
FSExceptionMessages.STREAM_IS_CLOSED);
     }
   }
+
+  /**
+   * Increment the bytes read counter if there is a stats instance
+   * and the number of bytes read is more than zero.
+   * @param bytesRead number of bytes read
+   */
+  private void incrementBytesRead(long bytesRead) {
+    getS3AStreamStatistics().bytesRead(bytesRead);
+    if (getContext().getStats() != null && bytesRead > 0) {
+      getContext().getStats().incrementBytesRead(bytesRead);
+    }
+  }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
index 50333c68e0c..cff743242f1 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
@@ -48,7 +48,6 @@ public class AnalyticsStreamFactory extends 
AbstractObjectInputStreamFactory {
 
   private S3SeekableInputStreamConfiguration seekableInputStreamConfiguration;
   private LazyAutoCloseableReference<S3SeekableInputStreamFactory>  
s3SeekableInputStreamFactory;
-  private boolean requireCrt;
 
   public AnalyticsStreamFactory() {
     super("AnalyticsStreamFactory");
@@ -61,7 +60,6 @@ protected void serviceInit(final Configuration conf) throws 
Exception {
                 ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
     this.seekableInputStreamConfiguration =
                 
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
-    this.requireCrt = false;
   }
 
   @Override
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
index 7ad7cf75367..1e48b3ef850 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
@@ -119,6 +119,32 @@ void readVectoredOperationStarted(int numIncomingRanges,
    */
   void readVectoredBytesDiscarded(int discarded);
 
+  /**
+   * Number of S3 GET requests initiated by the stream.
+   */
+  void getRequestInitiated();
+
+  /**
+   * Number of S3 HEAD requests initiated by the stream.
+   */
+  void headRequestInitiated();
+
+  /**
+   * Number of bytes prefetched.
+   * @param size number of bytes prefetched.
+   */
+  void bytesPrefetched(long size);
+
+  /**
+   * Number of failures in footer parsing.
+   */
+  void footerParsingFailed();
+
+  /**
+   * If the request data is already in the data cache.
+   */
+  void streamReadCacheHit();
+
   @Override
   void close();
 
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
index 26b9f2b1568..cf339269549 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
@@ -212,6 +212,31 @@ public void readVectoredBytesDiscarded(int discarded) {
 
     }
 
+    @Override
+    public void getRequestInitiated() {
+
+    }
+
+    @Override
+    public void headRequestInitiated() {
+
+    }
+
+    @Override
+    public void bytesPrefetched(long size) {
+
+    }
+
+    @Override
+    public void footerParsingFailed() {
+
+    }
+
+    @Override
+    public void streamReadCacheHit() {
+
+    }
+
     @Override
     public void close() {
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
index 0875851d4d6..3effd5a1ab1 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.contract.s3a;
 
+import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
@@ -26,6 +27,7 @@
 import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.fs.statistics.StreamStatisticNames;
 import org.apache.hadoop.test.tags.IntegrationTest;
@@ -36,9 +38,20 @@
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static 
org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
+import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.AAL_CACHE_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.AAL_READ_BUFFER_SIZE;
+import static 
org.apache.hadoop.fs.s3a.S3ATestConstants.AAL_REQUEST_COALESCE_TOLERANCE;
+import static 
org.apache.hadoop.fs.s3a.S3ATestConstants.AAL_SMALL_OBJECT_PREFETCH_ENABLED;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipForAnyEncryptionExceptSSES3;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
+import static org.apache.hadoop.io.Sizes.S_16K;
+import static org.apache.hadoop.io.Sizes.S_1K;
+import static org.apache.hadoop.io.Sizes.S_32K;
+
 
 /**
  * S3A contract tests for vectored reads with the Analytics stream.
@@ -57,6 +70,18 @@ public ITestS3AContractAnalyticsStreamVectoredRead(String 
bufferType) {
     super(bufferType);
   }
 
+  private static final String REQUEST_COALESCE_TOLERANCE_KEY =
+          ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + 
AAL_REQUEST_COALESCE_TOLERANCE;
+
+  private static final String READ_BUFFER_SIZE_KEY =
+          ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + 
AAL_READ_BUFFER_SIZE;
+
+  private static final String SMALL_OBJECT_PREFETCH_ENABLED_KEY =
+          ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + 
AAL_SMALL_OBJECT_PREFETCH_ENABLED;
+
+  private static final String CACHE_TIMEOUT_KEY =
+          ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + AAL_CACHE_TIMEOUT;
+
   /**
    * Create a configuration.
    * @return a configuration
@@ -64,6 +89,28 @@ public ITestS3AContractAnalyticsStreamVectoredRead(String 
bufferType) {
   @Override
   protected Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
+
+    S3ATestUtils.disableFilesystemCaching(conf);
+
+    removeBaseAndBucketOverrides(conf,
+            REQUEST_COALESCE_TOLERANCE_KEY,
+            READ_BUFFER_SIZE_KEY,
+            SMALL_OBJECT_PREFETCH_ENABLED_KEY,
+            CACHE_TIMEOUT_KEY);
+
+    // Set the coalesce tolerance to 1KB, default is 1MB.
+    conf.setInt(REQUEST_COALESCE_TOLERANCE_KEY, S_16K);
+
+    // Set the minimum block size to 32KB. AAL uses a default block size of 
128KB, which means the minimum size a S3
+    // request will be is 128KB. Since the file being read is 128KB, we need 
to  use this here to demonstrate that
+    // separate GET requests are made for ranges that are not coalesced.
+    conf.setInt(READ_BUFFER_SIZE_KEY, S_32K);
+
+    // Disable small object prefetched, otherwise anything less than 8MB is 
fetched in a single GET.
+    conf.set(SMALL_OBJECT_PREFETCH_ENABLED_KEY, "false");
+
+    conf.setInt(CACHE_TIMEOUT_KEY, 5000);
+
     enableAnalyticsAccelerator(conf);
     // If encryption is set, some AAL tests will fail.
     // This is because AAL caches the head request response, and uses
@@ -102,21 +149,41 @@ public void testNullReleaseOperation()  {
 
   @Test
   public void testReadVectoredWithAALStatsCollection() throws Exception {
+    List<FileRange> fileRanges = new ArrayList<>();
+    fileRanges.add(FileRange.createFileRange(0, 100));
+    fileRanges.add(FileRange.createFileRange(800, 200));
+    fileRanges.add(FileRange.createFileRange(4 * S_1K, 4 * S_1K));
+    fileRanges.add(FileRange.createFileRange(80 * S_1K, 4 * S_1K));
 
-    List<FileRange> fileRanges = createSampleNonOverlappingRanges();
     try (FSDataInputStream in = openVectorFile()) {
       in.readVectored(fileRanges, getAllocate());
 
       validateVectoredReadResult(fileRanges, DATASET, 0);
       IOStatistics st = in.getIOStatistics();
 
-      // Statistics such as GET requests will be added after IoStats support.
       verifyStatisticCounterValue(st,
               StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED, 1);
 
       verifyStatisticCounterValue(st,
               StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
               1);
+
+      // Verify ranges are coalesced, we are using a coalescing tolerance of 
16KB, so [0-100, 800-200, 4KB-8KB] will
+      // get coalesced into a single range.
+      verifyStatisticCounterValue(st, 
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES, 4);
+      verifyStatisticCounterValue(st, 
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES, 2);
+
+      verifyStatisticCounterValue(st, ACTION_HTTP_GET_REQUEST, 2);
+
+      // read the same ranges again to demonstrate that the data is cached, 
and no new GETs are made.
+      in.readVectored(fileRanges, getAllocate());
+      verifyStatisticCounterValue(st, ACTION_HTTP_GET_REQUEST, 2);
+
+      // Because of how AAL is currently written, it is not possible to track 
cache hits that originate from a
+      // readVectored() accurately. For this reason, cache hits from 
readVectored are currently not tracked, for more
+      // details see: 
https://github.com/awslabs/analytics-accelerator-s3/issues/359
+      verifyStatisticCounterValue(st, 
StreamStatisticNames.STREAM_READ_CACHE_HIT, 0);
     }
+
   }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
index 65894cc6518..19c483a5de3 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
@@ -35,7 +35,6 @@
 import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
 
 /**
@@ -100,13 +99,6 @@ protected Configuration createConfiguration() {
   @Test
   public void testOverwriteNonEmptyDirectory() throws Throwable {
     try {
-       // Currently analytics accelerator does not support reading of files 
that have been overwritten.
-       // This is because the analytics accelerator library caches metadata, 
and when a file is
-       // overwritten, the old metadata continues to be used, until it is 
removed from the cache over
-       // time. This will be fixed in 
https://github.com/awslabs/analytics-accelerator-s3/issues/218.
-       skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
-           "Analytics Accelerator currently does not support reading of over 
written files");
-
        super.testOverwriteNonEmptyDirectory();
        failWithCreatePerformance();
     } catch (AssertionError e) {
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
index 4e3013e9d43..2ee7ccd2975 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
@@ -22,7 +22,6 @@
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static 
org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageStatistics;
@@ -87,15 +86,6 @@ public void testNonDirectWrite() throws Exception {
   @Test
   @Override
   public void testDistCpUpdateCheckFileSkip() throws Exception {
-    // Currently analytics accelerator does not support reading of files that 
have been overwritten.
-    // This is because the analytics accelerator library caches metadata and 
data, and when a
-    // file is overwritten, the old data continues to be used, until it is 
removed from the
-    // cache over time. This will be fixed in
-    // https://github.com/awslabs/analytics-accelerator-s3/issues/218.
-    // In this test case, the remote file is created, read, then deleted, and 
then created again
-    // with different contents, and read again, which leads to assertions 
failing.
-    skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
-        "Analytics Accelerator Library does not support update to existing 
files");
     super.testDistCpUpdateCheckFileSkip();
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
index 6d98388503e..c8679f7e349 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
@@ -47,7 +47,6 @@
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static 
org.apache.hadoop.fs.s3a.impl.ChecksumSupport.getChecksumAlgorithm;
 import static 
org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_HUGE_PARTITION_SIZE;
 
@@ -173,12 +172,6 @@ public void 
testMultipartUploadReverseOrderNonContiguousPartNumbers() throws Exc
   @Override
   public void testConcurrentUploads() throws Throwable {
     assumeNotS3ExpressFileSystem(getFileSystem());
-    // Currently analytics accelerator does not support reading of files that 
have been overwritten.
-    // This is because the analytics accelerator library caches metadata and 
data, and when a file
-    // is overwritten, the old data continues to be used, until it is removed 
from the cache over
-    // time. This will be fixed in 
https://github.com/awslabs/analytics-accelerator-s3/issues/218.
-    skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
-        "Analytics Accelerator currently does not support reading of over 
written files");
     super.testConcurrentUploads();
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
index be0d2cc5b20..4d3bb5487f6 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
@@ -43,14 +43,27 @@
 import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
 import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
 import static 
org.apache.hadoop.fs.audit.AuditStatisticNames.AUDIT_REQUEST_EXECUTION;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 import static 
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
-import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST;
 import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.ANALYTICS_STREAM_FACTORY_CLOSED;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_CACHE_HIT;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPERATIONS;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED;
+import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES;
+import static org.apache.hadoop.io.Sizes.S_1K;
+import static org.apache.hadoop.io.Sizes.S_1M;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static software.amazon.s3.analyticsaccelerator.util.Constants.ONE_KB;
+import static software.amazon.s3.analyticsaccelerator.util.Constants.ONE_MB;
 
 /**
  * Tests integration of the
@@ -88,6 +101,16 @@ public void testConnectorFrameWorkIntegration() throws 
Throwable {
 
     S3AFileSystem fs =
         (S3AFileSystem) FileSystem.get(externalTestFile.toUri(), 
getConfiguration());
+
+    final long initialAuditCount = fs.getIOStatistics().counters()
+            .getOrDefault(AUDIT_REQUEST_EXECUTION, 0L);
+
+   long fileLength = fs.getFileStatus(externalTestFile).getLen();
+
+   // Head request for the file length.
+    verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION,
+            initialAuditCount + 1);
+
     byte[] buffer = new byte[500];
     IOStatistics ioStats;
 
@@ -105,9 +128,21 @@ public void testConnectorFrameWorkIntegration() throws 
Throwable {
       
Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
       Assertions.assertThat(objectInputStream.getInputPolicy())
           .isEqualTo(S3AInputPolicy.Sequential);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, 500);
+      verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);
+
+      long streamBytesRead = 
objectInputStream.getS3AStreamStatistics().getBytesRead();
+      Assertions.assertThat(streamBytesRead).as("Stream statistics should 
track bytes read")
+              .isEqualTo(500);
     }
 
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+
+    // Since policy is WHOLE_FILE, the whole file starts getting prefetched as 
soon as the stream to it is opened.
+    // So prefetched bytes is fileLen - 5
+    verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 
fileLength - 5);
+
     fs.close();
     verifyStatisticCounterValue(fs.getIOStatistics(), 
ANALYTICS_STREAM_FACTORY_CLOSED, 1);
 
@@ -115,7 +150,70 @@ public void testConnectorFrameWorkIntegration() throws 
Throwable {
     // in which case, AAL will start prefetching till EoF on file open in 8MB 
chunks. The file read here
     // s3://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz, has a size of 
~21MB, resulting in 3 GETS:
     // [0-8388607, 8388608-16777215, 16777216-21511173].
-    verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 
4);
+    verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION,
+            initialAuditCount + 1 + 4);
+  }
+
+  @Test
+  public void testSequentialPrefetching() throws IOException {
+
+    Configuration conf = getConfiguration();
+
+    // AAL uses a caffeine cache, and expires any prefetched data for a key 1s 
after it was last accessed by default.
+    // While this works well when running on EC2, for local testing, it can 
take more than 1s to download large chunks
+    // of data. Set this value to higher for testing to prevent early cache 
evictions.
+    conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+            "."  + AAL_CACHE_TIMEOUT, 10000);
+
+    S3AFileSystem fs =
+            (S3AFileSystem) FileSystem.get(externalTestFile.toUri(), 
getConfiguration());
+    byte[] buffer = new byte[10 * ONE_MB];
+    IOStatistics ioStats;
+
+    long fileLength = fs.getFileStatus(externalTestFile).getLen();
+
+    // Here we read through the 21MB external test file, but do not pass in 
the WHOLE_FILE policy. Instead, we rely
+    // on AAL detecting a sequential pattern being read, and then prefetching 
bytes in a geometrical progression.
+    // AAL's sequential prefetching starts prefetching in increments 4MB, 8MB, 
16MB etc. depending on how many
+    // sequential reads happen.
+    try (FSDataInputStream inputStream = fs.open(externalTestFile)) {
+      ioStats = inputStream.getIOStatistics();
+
+      inputStream.readFully(buffer, 0, ONE_MB);
+      // The first sequential read, so prefetch the next 4MB.
+      inputStream.readFully(buffer,   0, ONE_MB);
+
+      // Since ONE_MB was requested by the reader, the prefetched bytes are 
3MB.
+      verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 3 * 
ONE_MB);
+
+      // These next two reads are within the last prefetched bytes, so no 
further bytes are prefetched.
+      inputStream.readFully(buffer, 0, 2 *  ONE_MB);
+      inputStream.readFully(buffer, 0, ONE_MB);
+      verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 3 * 
ONE_MB);
+      // Two cache hits, as the previous two reads were already prefetched.
+      verifyStatisticCounterValue(ioStats, STREAM_READ_CACHE_HIT, 2);
+
+      // Another sequential read, GP will now prefetch the next 8MB of data.
+      inputStream.readFully(buffer, 0, ONE_MB);
+      // Cache hit is still 2, as the previous read required a new GET request 
as it was outside the previously fetched
+      // 4MB.
+      verifyStatisticCounterValue(ioStats, STREAM_READ_CACHE_HIT, 2);
+      // A total of 10MB is prefetched - 3MB and then 7MB.
+      verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 10 * 
ONE_MB);
+      long bytesRemainingForPrefetch = fileLength - (inputStream.getPos() + 10 
* ONE_MB);
+      inputStream.readFully(buffer, 0, 10 * ONE_MB);
+
+
+      // Though the next GP should prefetch 16MB, since the file is ~23MB, 
only the bytes till EoF are prefetched.
+      verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES,
+              10 * ONE_MB + bytesRemainingForPrefetch);
+      inputStream.readFully(buffer, 0, 3 * ONE_MB);
+      verifyStatisticCounterValue(ioStats, STREAM_READ_CACHE_HIT, 3);
+    }
+
+    // verify all AAL stats are passed to the FS.
+    verifyStatisticCounterValue(fs.getIOStatistics(), STREAM_READ_CACHE_HIT, 
3);
+    verifyStatisticCounterValue(fs.getIOStatistics(), 
STREAM_READ_PARQUET_FOOTER_PARSING_FAILED, 0);
   }
 
   @Test
@@ -134,9 +232,33 @@ public void testMalformedParquetFooter() throws 
IOException {
     Path sourcePath = new Path(file.toURI().getPath());
     getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);
 
+    long fileLength = getFileSystem().getFileStatus(dest).getLen();
+
     byte[] buffer = new byte[500];
     IOStatistics ioStats;
+    int bytesRead;
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      ioStats = inputStream.getIOStatistics();
+      inputStream.seek(5);
+      bytesRead = inputStream.read(buffer, 0, 500);
+
+      ObjectInputStream objectInputStream = (ObjectInputStream) 
inputStream.getWrappedStream();
+      long streamBytesRead = 
objectInputStream.getS3AStreamStatistics().getBytesRead();
+      Assertions.assertThat(streamBytesRead).as("Stream statistics should 
track bytes read")
+              .isEqualTo(bytesRead);
 
+    }
+
+    verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+    verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
+    verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0);
+    // This file has a content length of 451. Since it's a parquet file, AAL 
will prefetch the footer bytes (last 32KB),
+    // as soon as the file is opened, but because the file is < 32KB, the 
whole file is prefetched.
+    verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 
fileLength);
+    
+    // Open a stream to the object twice, verifying that data is cached, and 
streams to the same object, do not
+    // prefetch the same data twice.
     try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
       ioStats = inputStream.getIOStatistics();
       inputStream.seek(5);
@@ -144,6 +266,10 @@ public void testMalformedParquetFooter() throws 
IOException {
     }
 
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+    verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 0);
+    verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0);
+    // No data is prefetched, as it already exists in the cache from the 
previous factory.
+    verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 0);
   }
 
   /**
@@ -167,6 +293,7 @@ public void testMultiRowGroupParquet() throws Throwable {
     final int size = 3000;
     byte[] buffer = new byte[size];
     int readLimit = Math.min(size, (int) fileStatus.getLen());
+
     IOStatistics ioStats;
 
     final IOStatistics fsIostats = getFileSystem().getIOStatistics();
@@ -179,6 +306,13 @@ public void testMultiRowGroupParquet() throws Throwable {
     }
 
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+    verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
+
+    // S3A makes a HEAD request on the stream open(), and then AAL makes a GET 
request to get the object, total audit
+    // operations = 10.
+    long currentAuditCount = initialAuditCount + 2;
+    verifyStatisticCounterValue(getFileSystem().getIOStatistics(),
+            AUDIT_REQUEST_EXECUTION, currentAuditCount);
 
     try (FSDataInputStream inputStream = getFileSystem().openFile(dest)
         .withFileStatus(fileStatus)
@@ -186,11 +320,17 @@ public void testMultiRowGroupParquet() throws Throwable {
         .build().get()) {
       ioStats = inputStream.getIOStatistics();
       inputStream.readFully(buffer, 0, readLimit);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, (int) 
fileStatus.getLen());
+      verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);
     }
 
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
 
-    verifyStatisticCounterValue(fsIostats, AUDIT_REQUEST_EXECUTION, 
initialAuditCount + 2);
+    // S3A passes in the meta-data(content length) on file open,
+    // we expect AAL to make no HEAD requests
+    verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0);
+    verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 0);
   }
 
   @Test
@@ -210,4 +350,72 @@ public void testInvalidConfigurationThrows() throws 
Exception {
         () -> 
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
   }
 
+
+  @Test
+  public void testRandomSeekPatternGets() throws Throwable {
+    describe("Random seek pattern should optimize GET requests");
+
+    Path dest = path("seek-test.txt");
+    byte[] data = dataset(5 * S_1M, 256, 255);
+    writeDataset(getFileSystem(), dest, data, 5 * S_1M, 1024, true);
+
+    byte[] buffer = new byte[S_1M];
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      IOStatistics ioStats = inputStream.getIOStatistics();
+
+      inputStream.read(buffer);
+      inputStream.seek(2 * S_1M);
+      inputStream.read(new byte[512 * S_1K]);
+      inputStream.seek(3 * S_1M);
+      inputStream.read(new byte[512 * S_1K]);
+
+      verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
+      verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0);
+    }
+
+    // We did 3 reads, and all of them were served from the cache
+    verifyStatisticCounterValue(getFileSystem().getIOStatistics(), 
STREAM_READ_CACHE_HIT, 3);
+  }
+
+
+  @Test
+  public void testSequentialStreamsNoDuplicateGets() throws Throwable {
+    describe("Sequential streams reading same object should not duplicate 
GETs");
+
+    Path dest = path("sequential-test.txt");
+    int fileLen = S_1M;
+
+    byte[] data = dataset(fileLen, 256, 255);
+    writeDataset(getFileSystem(), dest, data, fileLen, 1024, true);
+
+    byte[] buffer = new byte[ONE_MB];
+    try (FSDataInputStream stream1 = getFileSystem().open(dest);
+         FSDataInputStream stream2 = getFileSystem().open(dest)) {
+
+      stream1.read(buffer, 0, 2 * ONE_KB);
+      stream2.read(buffer);
+      stream1.read(buffer, 0, 10 * ONE_KB);
+
+      IOStatistics stats1 = stream1.getIOStatistics();
+      IOStatistics stats2 = stream2.getIOStatistics();
+
+      verifyStatisticCounterValue(stats1, ACTION_HTTP_GET_REQUEST, 1);
+      verifyStatisticCounterValue(stats2, ACTION_HTTP_HEAD_REQUEST, 0);
+
+      // Since it's a small file (ALL will prefetch the whole file for size < 
8MB), the whole file is prefetched
+      // on the first read.
+      verifyStatisticCounterValue(stats1, STREAM_READ_PREFETCHED_BYTES, 
fileLen);
+
+      // The second stream will not prefetch any bytes, as they have already 
been prefetched by stream 1.
+      verifyStatisticCounterValue(stats2, STREAM_READ_PREFETCHED_BYTES, 0);
+    }
+
+    // verify value is passed up to the FS
+    verifyStatisticCounterValue(getFileSystem().getIOStatistics(),
+            STREAM_READ_PREFETCHED_BYTES, fileLen);
+
+    // We did 3 reads, all of them were served from the small object cache. In 
this case, the whole object was
+    // downloaded as soon as the stream to it was opened.
+    verifyStatisticCounterValue(getFileSystem().getIOStatistics(), 
STREAM_READ_CACHE_HIT, 3);
+  }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
index f78f97a097a..6456f900a02 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
@@ -32,7 +32,6 @@
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.createTestPath;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
@@ -76,23 +75,11 @@ public void testCopyToLocalWithUseRawLocalFileSystemOption()
 
   @Override
   public void testWriteReadAndDeleteOneAndAHalfBlocks() throws Exception {
-    // Currently analytics accelerator does not support reading of files that 
have been overwritten.
-    // This is because the analytics accelerator library caches metadata, and 
when a file is
-    // overwritten, the old metadata continues to be used, until it is removed 
from the cache over
-    // time. This will be fixed in 
https://github.com/awslabs/analytics-accelerator-s3/issues/218.
-    skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(),
-        "Analytics Accelerator currently does not support reading of over 
written files");
     super.testWriteReadAndDeleteOneAndAHalfBlocks();
   }
 
   @Override
   public void testWriteReadAndDeleteTwoBlocks() throws Exception {
-    // Currently analytics accelerator does not support reading of files that 
have been overwritten.
-    // This is because the analytics accelerator library caches metadata, and 
when a file is
-    // overwritten, the old metadata continues to be used, until it is removed 
from the cache over
-    // time. This will be fixed in 
https://github.com/awslabs/analytics-accelerator-s3/issues/218.
-    skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(),
-        "Analytics Accelerator currently does not support reading of over 
written files");
     super.testWriteReadAndDeleteTwoBlocks();
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
index 8667d2b646f..3b1b8af9eed 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
@@ -37,7 +37,6 @@
 
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -168,12 +167,6 @@ public void testOverwrite() throws IOException {
 
   @Override
   public void testOverWriteAndRead() throws Exception {
-    // Currently analytics accelerator does not support reading of files that 
have been overwritten.
-    // This is because the analytics accelerator library caches metadata, and 
when a file is
-    // overwritten, the old metadata continues to be used, until it is removed 
from the cache over
-    // time. This will be fixed in 
https://github.com/awslabs/analytics-accelerator-s3/issues/218.
-    skipIfAnalyticsAcceleratorEnabled(fs.getConf(),
-        "Analytics Accelerator currently does not support reading of over 
written files");
     super.testOverWriteAndRead();
   }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
index 12a1cd7d8f6..2bc342717a1 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
@@ -45,7 +45,6 @@
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static 
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
@@ -81,10 +80,7 @@ protected Configuration createConfiguration() {
   public void setup() throws Exception {
     super.setup();
     executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
-    // Analytics accelerator currently does not support IOStatisticsContext, 
this will be added as
-    // part of https://issues.apache.org/jira/browse/HADOOP-19364
-    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
-        "Analytics Accelerator currently does not support 
IOStatisticsContext");
+
 
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
index 2ae28c74fe5..548b30a3b2d 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
@@ -28,7 +28,6 @@
 import java.io.IOException;
 import java.io.InputStream;
 
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
 
 /**
@@ -52,10 +51,6 @@ public void testMetricsRegister()
 
   @Test
   public void testStreamStatistics() throws IOException {
-     // Analytics accelerator currently does not support IOStatistics, this 
will be added as
-    // part of https://issues.apache.org/jira/browse/HADOOP-19364
-    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
-        "Analytics Accelerator currently does not support stream statistics");
 
     S3AFileSystem fs = getFileSystem();
     Path file = path("testStreamStatistics");
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
index 259a8a54e0e..01939732d8a 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
@@ -302,4 +302,24 @@ public interface S3ATestConstants {
    * Default value of {@link #MULTIPART_COMMIT_CONSUMES_UPLOAD_ID}: {@value}.
    */
   boolean DEFAULT_MULTIPART_COMMIT_CONSUMES_UPLOAD_ID = false;
+
+  /**
+   * Ranges within this distance of each other will be coalesced.
+   */
+  String AAL_REQUEST_COALESCE_TOLERANCE = 
"physicalio.request.coalesce.tolerance";
+
+  /**
+   * The minimum size of a block in AAL.
+   */
+  String AAL_READ_BUFFER_SIZE = "physicalio.readbuffersize";
+
+  /**
+   * Objects smaller than this will be downloaded completely.
+   */
+  String AAL_SMALL_OBJECT_PREFETCH_ENABLED = 
"physicalio.small.objects.prefetching.enabled";
+
+  /**
+   * Objects in AAL's cache will expire after this duration.
+   */
+  String AAL_CACHE_TIMEOUT = "physicalio.cache.timeout";
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
index fcc24102cc4..640a88de164 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
@@ -43,7 +43,6 @@
 import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
 
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.s3a.Statistic.ACTION_HTTP_GET_REQUEST;
 import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_FILES_CREATED;
 import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT;
@@ -175,11 +174,7 @@ private void abortActiveStream() throws IOException {
   @Test
   public void testCostOfCreatingMagicFile() throws Throwable {
     describe("Files created under magic paths skip existence checks and marker 
deletes");
-
-    // Analytics accelerator currently does not support IOStatistics, this 
will be added as
-    // part of https://issues.apache.org/jira/browse/HADOOP-19364
-    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
-        "Analytics Accelerator currently does not support stream statistics");
+    
     S3AFileSystem fs = getFileSystem();
     Path destFile = methodSubPath("file.txt");
     fs.delete(destFile.getParent(), true);
@@ -257,10 +252,6 @@ public void testCostOfCreatingMagicFile() throws Throwable 
{
   public void testCostOfSavingLoadingPendingFile() throws Throwable {
     describe("Verify costs of saving .pending file under a magic path");
 
-    // Analytics accelerator currently does not support IOStatistics, this 
will be added as
-    // part of https://issues.apache.org/jira/browse/HADOOP-19364
-    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
-        "Analytics Accelerator currently does not support stream statistics");
     S3AFileSystem fs = getFileSystem();
     Path partDir = methodSubPath("file.pending");
     Path destFile = new Path(partDir, "file.pending");
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
index 484d1cc2c31..e958c989420 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
@@ -30,7 +30,6 @@
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /**
@@ -48,10 +47,6 @@ public class ITestS3AFileContextStatistics extends 
FCStatisticsBaseTest {
   @BeforeEach
   public void setUp() throws Exception {
     conf = new Configuration();
-    // Analytics accelerator currently does not support IOStatistics, this 
will be added as
-    // part of https://issues.apache.org/jira/browse/HADOOP-19364
-    skipIfAnalyticsAcceleratorEnabled(conf,
-        "Analytics Accelerator currently does not support stream statistics");
     fc = S3ATestUtils.createTestFileContext(conf);
     testRootPath = fileContextTestHelper.getTestRootPath(fc, "test");
     fc.mkdir(testRootPath,
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
index c1c03ca6e72..46541c6bb26 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
@@ -58,7 +58,6 @@
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.streamType;
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
@@ -71,6 +70,7 @@
 import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
 import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
+import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
 
@@ -95,6 +95,16 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
    */
   private boolean prefetching;
 
+  /**
+   * Is the analytics stream enabled?
+   */
+  private boolean analyticsStream;
+
+  /**
+   * Is the classic input stream enabled?
+   */
+  private boolean classicInputStream;
+
   @Override
   public Configuration createConfiguration() {
     Configuration conf = super.createConfiguration();
@@ -113,17 +123,14 @@ public Configuration createConfiguration() {
   @Override
   public void setup() throws Exception {
     super.setup();
-    // Analytics accelerator currently does not support IOStatistics, this 
will be added as
-    // part of https://issues.apache.org/jira/browse/HADOOP-19364
-    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
-        "Analytics Accelerator currently does not support stream statistics");
     S3AFileSystem fs = getFileSystem();
     testFile = methodPath();
-
     writeTextFile(fs, testFile, TEXT, true);
     testFileStatus = fs.getFileStatus(testFile);
     fileLength = (int)testFileStatus.getLen();
     prefetching = prefetching();
+    analyticsStream = isAnalyticsStream();
+    classicInputStream = isClassicInputStream();
   }
 
   /**
@@ -177,6 +184,10 @@ public void testStreamIsNotChecksummed() throws Throwable {
 
     // if prefetching is enabled, skip this test
     assumeNoPrefetching();
+    // If AAL is enabled, skip this test. AAL uses S3A's default S3 client, 
and if checksumming is disabled on the
+    // client, then AAL will also not enforce it.
+    assumeNotAnalytics();
+
     S3AFileSystem fs = getFileSystem();
 
     // open the file
@@ -193,6 +204,7 @@ public void testStreamIsNotChecksummed() throws Throwable {
 
       // open the stream.
       in.read();
+
       // now examine the innermost stream and make sure it doesn't have a 
checksum
       assertStreamIsNotChecksummed(getS3AInputStream(in));
     }
@@ -200,6 +212,11 @@ public void testStreamIsNotChecksummed() throws Throwable {
 
   @Test
   public void testOpenFileShorterLength() throws Throwable {
+
+    // For AAL, since it makes the HEAD to get the file length if the eTag is 
not supplied,
+    // it is not able to use the file length supplied in the open() call, and 
the test fails.
+    assumeNotAnalytics();
+
     // do a second read with the length declared as short.
     // we now expect the bytes read to be shorter.
     S3AFileSystem fs = getFileSystem();
@@ -244,7 +261,6 @@ public void testOpenFileLongerLengthReadFully() throws 
Throwable {
     final int extra = 10;
     long longLen = fileLength + extra;
 
-
     // assert behaviors of seeking/reading past the file length.
     // there is no attempt at recovery.
     verifyMetrics(() -> {
@@ -264,7 +280,9 @@ public void testOpenFileLongerLengthReadFully() throws 
Throwable {
         // two GET calls were made, one for readFully,
         // the second on the read() past the EOF
         // the operation has got as far as S3
-        probe(!prefetching(), STREAM_READ_OPENED, 1 + 1));
+        probe(classicInputStream, STREAM_READ_OPENED, 1 + 1),
+        // For AAL, the seek past content length fails, before the GET is made.
+        probe(analyticsStream, STREAM_READ_OPENED, 1));
 
     // now on a new stream, try a full read from after the EOF
     verifyMetrics(() -> {
@@ -275,10 +293,6 @@ public void testOpenFileLongerLengthReadFully() throws 
Throwable {
         return in.toString();
       }
     },
-        // two GET calls were made, one for readFully,
-        // the second on the read() past the EOF
-        // the operation has got as far as S3
-
         with(STREAM_READ_OPENED, 1));
   }
 
@@ -348,7 +362,9 @@ public void testReadPastEOF() throws Throwable {
       }
     },
         always(),
-        probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, extra));
+        probe(classicInputStream, Statistic.ACTION_HTTP_GET_REQUEST, extra),
+        // AAL won't make the GET call if trying to read beyond EOF
+        probe(analyticsStream, Statistic.ACTION_HTTP_GET_REQUEST, 0));
   }
 
   /**
@@ -439,18 +455,28 @@ public void testVectorReadPastEOF() throws Throwable {
         byte[] buf = new byte[longLen];
         ByteBuffer bb = ByteBuffer.wrap(buf);
         final FileRange range = FileRange.createFileRange(0, longLen);
-        in.readVectored(Arrays.asList(range), (i) -> bb);
-        interceptFuture(EOFException.class,
-            "",
-            ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
-            TimeUnit.SECONDS,
-            range.getData());
-        assertS3StreamClosed(in);
-        return "vector read past EOF with " + in;
+
+        // For AAL, if there is no eTag, the provided length will not be 
passed in, and a HEAD request will be made.
+        // AAL requires the etag to detect changes in the object and then do 
cache eviction if required.
+        if (isAnalyticsStream()) {
+          intercept(EOFException.class, () ->
+                  in.readVectored(Arrays.asList(range), (i) -> bb));
+          verifyStatisticCounterValue(in.getIOStatistics(), 
ACTION_HTTP_HEAD_REQUEST, 1);
+          return "vector read past EOF with " + in;
+        } else {
+          in.readVectored(Arrays.asList(range), (i) -> bb);
+          interceptFuture(EOFException.class,
+                  "",
+                  
ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
+                  TimeUnit.SECONDS,
+                  range.getData());
+          assertS3StreamClosed(in);
+          return "vector read past EOF with " + in;
+        }
       }
     },
         always(),
-        probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1));
+        probe(classicInputStream, Statistic.ACTION_HTTP_GET_REQUEST, 1));
   }
 
   /**
@@ -461,6 +487,22 @@ private boolean prefetching()  {
     return InputStreamType.Prefetch == streamType(getFileSystem());
   }
 
+  /**
+   * Is the current stream type Analytics?
+   * @return true if Analytics stream is enabled.
+   */
+  private boolean isAnalyticsStream() {
+    return InputStreamType.Analytics == streamType(getFileSystem());
+  }
+
+  /**
+   * Is the current input stream type S3AInputStream?
+   * @return true if the S3AInputStream is being used.
+   */
+  private boolean isClassicInputStream() {
+    return InputStreamType.Classic == streamType(getFileSystem());
+  }
+
   /**
    * Skip the test if prefetching is enabled.
    */
@@ -470,6 +512,12 @@ private void assumeNoPrefetching(){
     }
   }
 
+  private void assumeNotAnalytics() {
+    if (analyticsStream) {
+      skip("Analytics stream is enabled");
+    }
+  }
+
   /**
    * Assert that the inner S3 Stream is closed.
    * @param in input stream
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
index 4165f7a6c9c..7f363d190c5 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
@@ -30,7 +30,6 @@
 import org.apache.hadoop.fs.statistics.StreamStatisticNames;
 import org.apache.hadoop.test.tags.IntegrationTest;
 
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.fs.statistics.StreamStatisticNames.*;
 
 /**
@@ -86,10 +85,6 @@ public List<String> outputStreamStatisticKeys() {
   @Test
   @Override
   public void testInputStreamStatisticRead() throws Throwable {
-    // Analytics accelerator currently does not support IOStatistics, this 
will be added as
-    // part of https://issues.apache.org/jira/browse/HADOOP-19364
-    skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
-        "Analytics Accelerator currently does not support stream statistics");
     super.testInputStreamStatisticRead();
   }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
index 2f54cab00b1..81a719fbea2 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
@@ -31,7 +31,6 @@
 import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
 import org.apache.hadoop.fs.statistics.StreamStatisticNames;
 
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 
 public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase {
 
@@ -44,10 +43,6 @@ public class ITestS3AFileSystemStatistic extends 
AbstractS3ATestBase {
    */
   @Test
   public void testBytesReadWithStream() throws IOException {
-    // Analytics accelerator currently does not support IOStatistics, this 
will be added as
-    // part of https://issues.apache.org/jira/browse/HADOOP-19364
-    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
-        "Analytics Accelerator currently does not support stream statistics");
     S3AFileSystem fs = getFileSystem();
     Path filePath = path(getMethodName());
     byte[] oneKbBuf = new byte[ONE_KB];


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to