vaibhav5140 commented on code in PR #7763:
URL: https://github.com/apache/hadoop/pull/7763#discussion_r2180287554


##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -67,7 +70,7 @@ public class ITestS3AAnalyticsAcceleratorStreamReading 
extends AbstractS3ATestBa
 
   private Path externalTestFile;
 
-  @Before
+  @BeforeEach

Review Comment:
   Addressed



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -96,14 +99,20 @@ public void testConnectorFrameWorkIntegration() throws 
Throwable {
             .build().get()) {
       ioStats = inputStream.getIOStatistics();
       inputStream.seek(5);
-      inputStream.read(buffer, 0, 500);
+      int bytesRead = inputStream.read(buffer, 0, 500);
 
       final InputStream wrappedStream = inputStream.getWrappedStream();
       ObjectInputStream objectInputStream = (ObjectInputStream) wrappedStream;
 
       
Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
       Assertions.assertThat(objectInputStream.getInputPolicy())
           .isEqualTo(S3AInputPolicy.Sequential);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, bytesRead);

Review Comment:
   Addressed in new revision



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java:
##########
@@ -132,6 +132,12 @@ public final class StreamStatisticNames {
   public static final String STREAM_READ_OPERATIONS =
       "stream_read_operations";
 
+  /** Analytics GET requests made by stream. */
+  public static final String STREAM_READ_ANALYTICS_GET_REQUESTS = 
"stream_read_analytics_get_requests";

Review Comment:
   Actually, new analytics specific statistics provide isolated tracking. In 
case if both S3A and Analytics streams are used simultaneously, separate 
metrics provide precise tracking



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java:
##########
@@ -0,0 +1,30 @@
+package org.apache.hadoop.fs.s3a.impl.streams;

Review Comment:
   License error is fixed but javadoc error is because AnalyticsRequestCallback 
interface is currently not available in the released version of AAL
   
   



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -194,4 +215,127 @@ public void testInvalidConfigurationThrows() throws 
Exception {
         () -> 
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
   }
 
+  @Test
+  public void testLargeFileMultipleGets() throws Throwable {
+    describe("Large file should trigger multiple GET requests");
+
+    Path dest = writeThenReadFile("large-test-file.txt", 10 * 1024 * 1024); // 
10MB
+
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      IOStatistics ioStats = inputStream.getIOStatistics();
+      inputStream.readFully(new byte[(int) 
getFileSystem().getFileStatus(dest).getLen()]);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 
2);
+    }
+  }
+
+  @Test
+  public void testSmallFileSingleGet() throws Throwable {
+    describe("Small file should trigger only one GET request");
+
+    Path dest = writeThenReadFile("small-test-file.txt", 1 * 1024 * 1024); // 
1KB
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      IOStatistics ioStats = inputStream.getIOStatistics();
+      inputStream.readFully(new byte[(int) 
getFileSystem().getFileStatus(dest).getLen()]);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 
1);
+    }
+  }
+
+
+  @Test
+  public void testRandomSeekPatternGets() throws Throwable {
+    describe("Random seek pattern should optimize GET requests");
+
+    Path dest = writeThenReadFile("seek-test.txt", 100 * 1024);
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      IOStatistics ioStats = inputStream.getIOStatistics();
+
+      inputStream.seek(1000);
+      inputStream.read(new byte[100]);
+
+      inputStream.seek(50000);
+      inputStream.read(new byte[100]);
+
+      inputStream.seek(90000);
+      inputStream.read(new byte[100]);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 
1);
+    }
+}
+
+  @Test
+  public void testAALNeverMakesHeadRequests() throws Throwable {
+    describe("Prove AAL never makes HEAD requests - S3A provides all 
metadata");
+
+    Path dest = writeThenReadFile("no-head-test.txt", 1024 * 1024); // 1MB
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      IOStatistics ioStats = inputStream.getIOStatistics();
+      inputStream.read(new byte[1024]);
+
+      verifyStatisticCounterValue(ioStats, 
STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
+      verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+
+      ObjectInputStream objectInputStream = (ObjectInputStream) 
inputStream.getWrappedStream();
+      
Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
+
+    }
+  }
+
+
+  @Test
+  public void testParquetReadingNoHeadRequests() throws Throwable {

Review Comment:
   Addressed in new revision



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java:
##########
@@ -78,13 +77,4 @@ public List<String> outputStreamStatisticKeys() {
         STREAM_WRITE_BLOCK_UPLOADS,
         STREAM_WRITE_EXCEPTIONS);
   }
-
-  @Override

Review Comment:
   Yes, they are passing now
   



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java:
##########
@@ -48,13 +49,17 @@ public class AnalyticsStream extends ObjectInputStream 
implements StreamCapabili
   private S3SeekableInputStream inputStream;
   private long lastReadCurrentPos = 0;
   private volatile boolean closed;
+  private final long contentLength;
+  private final long lengthLimit;
 
   public static final Logger LOG = 
LoggerFactory.getLogger(AnalyticsStream.class);
 
   public AnalyticsStream(final ObjectReadParameters parameters,
       final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws 
IOException {
     super(InputStreamType.Analytics, parameters);
     S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
+    this.contentLength = s3Attributes.getLen();

Review Comment:
   getLen() is needed for length limiting, it ensures AnalyticsStream respects 
the declared file length from openFile() options rather than reading the entire 
S3 object



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java:
##########
@@ -177,6 +174,11 @@ public void testStreamIsNotChecksummed() throws Throwable {
 
     // if prefetching is enabled, skip this test
     assumeNoPrefetching();
+    // Skip for Analytics streams - checksum validation only exists in 
S3AInputStream.
+    // AnalyticsStream handles data integrity through AWS Analytics 
Accelerator internally.
+    if (isAnalyticsStream()) {
+      skip("Analytics stream doesn't use checksums");

Review Comment:
   Skip because S3AInputStream uses raw http with optional checksum validation 
that can be tested, but analytics stream uses AAL which has built-in data 
integrity and has no exposed checksum controls which can be verified



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -194,4 +215,127 @@ public void testInvalidConfigurationThrows() throws 
Exception {
         () -> 
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
   }
 
+  @Test
+  public void testLargeFileMultipleGets() throws Throwable {
+    describe("Large file should trigger multiple GET requests");
+
+    Path dest = writeThenReadFile("large-test-file.txt", 10 * 1024 * 1024); // 
10MB
+
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      IOStatistics ioStats = inputStream.getIOStatistics();
+      inputStream.readFully(new byte[(int) 
getFileSystem().getFileStatus(dest).getLen()]);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 
2);
+    }
+  }
+
+  @Test
+  public void testSmallFileSingleGet() throws Throwable {
+    describe("Small file should trigger only one GET request");
+
+    Path dest = writeThenReadFile("small-test-file.txt", 1 * 1024 * 1024); // 
1KB
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      IOStatistics ioStats = inputStream.getIOStatistics();
+      inputStream.readFully(new byte[(int) 
getFileSystem().getFileStatus(dest).getLen()]);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 
1);
+    }
+  }
+
+
+  @Test
+  public void testRandomSeekPatternGets() throws Throwable {
+    describe("Random seek pattern should optimize GET requests");
+
+    Path dest = writeThenReadFile("seek-test.txt", 100 * 1024);
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      IOStatistics ioStats = inputStream.getIOStatistics();
+
+      inputStream.seek(1000);
+      inputStream.read(new byte[100]);
+
+      inputStream.seek(50000);
+      inputStream.read(new byte[100]);
+
+      inputStream.seek(90000);
+      inputStream.read(new byte[100]);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 
1);
+    }
+}
+
+  @Test
+  public void testAALNeverMakesHeadRequests() throws Throwable {

Review Comment:
   Addressed in new revision



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -129,11 +140,17 @@ public void testMalformedParquetFooter() throws 
IOException {
 
     byte[] buffer = new byte[500];
     IOStatistics ioStats;
+    int bytesRead;
 
     try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
       ioStats = inputStream.getIOStatistics();
       inputStream.seek(5);
-      inputStream.read(buffer, 0, 500);
+      bytesRead = inputStream.read(buffer, 0, 500);

Review Comment:
   Addressed in new revision



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -194,4 +215,127 @@ public void testInvalidConfigurationThrows() throws 
Exception {
         () -> 
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
   }
 
+  @Test
+  public void testLargeFileMultipleGets() throws Throwable {
+    describe("Large file should trigger multiple GET requests");
+
+    Path dest = writeThenReadFile("large-test-file.txt", 10 * 1024 * 1024); // 
10MB
+
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      IOStatistics ioStats = inputStream.getIOStatistics();
+      inputStream.readFully(new byte[(int) 
getFileSystem().getFileStatus(dest).getLen()]);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 
2);

Review Comment:
   Addressed in new revision



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -194,4 +215,127 @@ public void testInvalidConfigurationThrows() throws 
Exception {
         () -> 
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
   }
 
+  @Test
+  public void testLargeFileMultipleGets() throws Throwable {
+    describe("Large file should trigger multiple GET requests");
+
+    Path dest = writeThenReadFile("large-test-file.txt", 10 * 1024 * 1024); // 
10MB
+
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      IOStatistics ioStats = inputStream.getIOStatistics();
+      inputStream.readFully(new byte[(int) 
getFileSystem().getFileStatus(dest).getLen()]);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 
2);
+    }
+  }
+
+  @Test
+  public void testSmallFileSingleGet() throws Throwable {
+    describe("Small file should trigger only one GET request");
+
+    Path dest = writeThenReadFile("small-test-file.txt", 1 * 1024 * 1024); // 
1KB
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      IOStatistics ioStats = inputStream.getIOStatistics();
+      inputStream.readFully(new byte[(int) 
getFileSystem().getFileStatus(dest).getLen()]);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 
1);
+    }
+  }
+
+
+  @Test
+  public void testRandomSeekPatternGets() throws Throwable {
+    describe("Random seek pattern should optimize GET requests");
+
+    Path dest = writeThenReadFile("seek-test.txt", 100 * 1024);
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      IOStatistics ioStats = inputStream.getIOStatistics();
+
+      inputStream.seek(1000);
+      inputStream.read(new byte[100]);

Review Comment:
   Addressed in new revision



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -194,4 +215,127 @@ public void testInvalidConfigurationThrows() throws 
Exception {
         () -> 
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
   }
 
+  @Test
+  public void testLargeFileMultipleGets() throws Throwable {
+    describe("Large file should trigger multiple GET requests");
+
+    Path dest = writeThenReadFile("large-test-file.txt", 10 * 1024 * 1024); // 
10MB

Review Comment:
   Addressed in new revision



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -166,15 +183,19 @@ public void testMultiRowGroupParquet() throws Throwable {
     }
 
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
-
+    verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 
1);
     try (FSDataInputStream inputStream = getFileSystem().openFile(dest)
         .must(FS_OPTION_OPENFILE_READ_POLICY, 
FS_OPTION_OPENFILE_READ_POLICY_PARQUET)
         .build().get()) {
       ioStats = inputStream.getIOStatistics();
       inputStream.readFully(buffer, 0, (int) fileStatus.getLen());
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, (int) 
fileStatus.getLen());
+      verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);
     }
 
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+    verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 
0);

Review Comment:
   Addressed in new revision



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -194,4 +215,127 @@ public void testInvalidConfigurationThrows() throws 
Exception {
         () -> 
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
   }
 
+  @Test
+  public void testLargeFileMultipleGets() throws Throwable {
+    describe("Large file should trigger multiple GET requests");
+
+    Path dest = writeThenReadFile("large-test-file.txt", 10 * 1024 * 1024); // 
10MB
+
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      IOStatistics ioStats = inputStream.getIOStatistics();
+      inputStream.readFully(new byte[(int) 
getFileSystem().getFileStatus(dest).getLen()]);

Review Comment:
   Addressed in new revision



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.fs.statistics.DurationTracker;
+import software.amazon.s3.analyticsaccelerator.util.RequestCallback;
+
+/**
+ * Implementation of AAL's RequestCallback interface that tracks analytics 
operations.
+ */
+public class AnalyticsRequestCallback implements RequestCallback {
+    private final S3AInputStreamStatistics statistics;
+
+    /**
+     * Create a new callback instance.
+     * @param statistics the statistics to update
+     */
+    public AnalyticsRequestCallback(S3AInputStreamStatistics statistics) {
+        this.statistics = statistics;
+    }
+
+    @Override
+    public void onGetRequest() {
+        statistics.incrementAnalyticsGetRequests();
+        // Update ACTION_HTTP_GET_REQUEST statistic
+        DurationTracker tracker = statistics.initiateGetRequest();

Review Comment:
   way to increment the ACTION_HTTP_GET_REQUEST statistic. The 
statistics.initiateGetRequest() call increments the counter, and 
tracker.close() completes the measurement



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -194,4 +215,127 @@ public void testInvalidConfigurationThrows() throws 
Exception {
         () -> 
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
   }
 
+  @Test
+  public void testLargeFileMultipleGets() throws Throwable {
+    describe("Large file should trigger multiple GET requests");
+
+    Path dest = writeThenReadFile("large-test-file.txt", 10 * 1024 * 1024); // 
10MB
+
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      IOStatistics ioStats = inputStream.getIOStatistics();
+      inputStream.readFully(new byte[(int) 
getFileSystem().getFileStatus(dest).getLen()]);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 
2);
+    }
+  }
+
+  @Test
+  public void testSmallFileSingleGet() throws Throwable {
+    describe("Small file should trigger only one GET request");
+
+    Path dest = writeThenReadFile("small-test-file.txt", 1 * 1024 * 1024); // 
1KB
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      IOStatistics ioStats = inputStream.getIOStatistics();
+      inputStream.readFully(new byte[(int) 
getFileSystem().getFileStatus(dest).getLen()]);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 
1);
+    }
+  }
+
+
+  @Test
+  public void testRandomSeekPatternGets() throws Throwable {
+    describe("Random seek pattern should optimize GET requests");
+
+    Path dest = writeThenReadFile("seek-test.txt", 100 * 1024);

Review Comment:
   Addressed in new revision



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java:
##########
@@ -194,4 +215,127 @@ public void testInvalidConfigurationThrows() throws 
Exception {
         () -> 
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
   }
 
+  @Test
+  public void testLargeFileMultipleGets() throws Throwable {
+    describe("Large file should trigger multiple GET requests");
+
+    Path dest = writeThenReadFile("large-test-file.txt", 10 * 1024 * 1024); // 
10MB
+
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      IOStatistics ioStats = inputStream.getIOStatistics();
+      inputStream.readFully(new byte[(int) 
getFileSystem().getFileStatus(dest).getLen()]);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 
2);
+    }
+  }
+
+  @Test
+  public void testSmallFileSingleGet() throws Throwable {
+    describe("Small file should trigger only one GET request");
+
+    Path dest = writeThenReadFile("small-test-file.txt", 1 * 1024 * 1024); // 
1KB
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      IOStatistics ioStats = inputStream.getIOStatistics();
+      inputStream.readFully(new byte[(int) 
getFileSystem().getFileStatus(dest).getLen()]);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 
1);
+    }
+  }
+
+
+  @Test
+  public void testRandomSeekPatternGets() throws Throwable {
+    describe("Random seek pattern should optimize GET requests");
+
+    Path dest = writeThenReadFile("seek-test.txt", 100 * 1024);
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      IOStatistics ioStats = inputStream.getIOStatistics();
+
+      inputStream.seek(1000);
+      inputStream.read(new byte[100]);
+
+      inputStream.seek(50000);
+      inputStream.read(new byte[100]);
+
+      inputStream.seek(90000);
+      inputStream.read(new byte[100]);
+
+      verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_GET_REQUESTS, 
1);
+    }
+}
+
+  @Test
+  public void testAALNeverMakesHeadRequests() throws Throwable {
+    describe("Prove AAL never makes HEAD requests - S3A provides all 
metadata");
+
+    Path dest = writeThenReadFile("no-head-test.txt", 1024 * 1024); // 1MB
+
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      IOStatistics ioStats = inputStream.getIOStatistics();
+      inputStream.read(new byte[1024]);
+
+      verifyStatisticCounterValue(ioStats, 
STREAM_READ_ANALYTICS_HEAD_REQUESTS, 0);
+      verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+
+      ObjectInputStream objectInputStream = (ObjectInputStream) 
inputStream.getWrappedStream();
+      
Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
+
+    }
+  }
+
+
+  @Test
+  public void testParquetReadingNoHeadRequests() throws Throwable {
+    describe("Parquet-optimized reading should not trigger AAL HEAD requests");
+
+    Path dest = path("parquet-head-test.parquet");
+    File file = new File("src/test/resources/multi_row_group.parquet");
+    Path sourcePath = new Path(file.toURI().getPath());
+    getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);
+
+    try (FSDataInputStream stream = getFileSystem().openFile(dest)
+            .must(FS_OPTION_OPENFILE_READ_POLICY, 
FS_OPTION_OPENFILE_READ_POLICY_PARQUET)
+            .build().get()) {
+
+      FileStatus fileStatus = getFileSystem().getFileStatus(dest);
+      stream.readFully(new byte[(int) fileStatus.getLen()]);
+
+      IOStatistics stats = stream.getIOStatistics();
+
+      verifyStatisticCounterValue(stats, STREAM_READ_ANALYTICS_HEAD_REQUESTS, 
0);
+      verifyStatisticCounterValue(stats, STREAM_READ_ANALYTICS_OPENED, 1);
+
+      verifyStatisticCounterValue(stats, STREAM_READ_ANALYTICS_GET_REQUESTS, 
1);
+    }
+  }
+
+
+  @Test
+  public void testConcurrentStreamsNoDuplicateGets() throws Throwable {
+    describe("Concurrent streams reading same object should not duplicate 
GETs");
+
+    Path dest = writeThenReadFile("concurrent-test.txt", 1 * 1024 * 1024);
+
+    try (FSDataInputStream stream1 = getFileSystem().open(dest);

Review Comment:
   Addressed in new revision



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to