This is an automated email from the ASF dual-hosted git repository.

ahmar pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 38f9c1cceee HADOOP-19559. S3A: Prerequisite features for AAL default 
ON. (#8067)
38f9c1cceee is described below

commit 38f9c1cceee80fd23d4e9e10bc4036d2f4656703
Author: ahmarsuhail <[email protected]>
AuthorDate: Fri Nov 7 15:45:47 2025 +0000

    HADOOP-19559. S3A: Prerequisite features for AAL default ON. (#8067)
    
    Backports:
    
    * HADOOP-19394. S3A: Integrate with AAL's readVectored().  (#7720)
    
    * HADOOP-19664. S3A: Analytics stream to use Java sync client. (#7909)
    
    * HADOOP-19698. S3A: Add AAL dependency to LICENSE-binary.
    
    * HADOOP-19587. S3A: Adds in support for SSE-C to AAL (#7906)
    
    * HADOOP-19365. S3A: Adds in support for auditing for AAL. (#7723)
---
 LICENSE-binary                                     |  1 +
 hadoop-project/pom.xml                             |  2 +-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    |  5 +-
 .../hadoop/fs/s3a/audit/AWSRequestAnalyzer.java    | 15 +++++
 .../hadoop/fs/s3a/audit/impl/LoggingAuditor.java   | 52 +++++++++++++----
 .../apache/hadoop/fs/s3a/impl/S3AStoreImpl.java    |  7 +--
 .../fs/s3a/impl/streams/AnalyticsStream.java       | 65 ++++++++++++++++++++++
 .../s3a/impl/streams/AnalyticsStreamFactory.java   |  8 +--
 .../s3a/impl/streams/ObjectInputStreamFactory.java |  7 +--
 .../fs/s3a/impl/streams/ObjectReadParameters.java  | 50 +++++++++++++++++
 ...TestS3AContractAnalyticsStreamVectoredRead.java | 50 ++++++++++++++++-
 .../ITestS3AAnalyticsAcceleratorStreamReading.java |  9 +++
 .../hadoop/fs/s3a/ITestS3AEncryptionSSEC.java      | 28 +++++++++-
 .../fs/s3a/impl/streams/TestStreamFactories.java   |  5 +-
 .../s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java |  3 -
 15 files changed, 273 insertions(+), 34 deletions(-)

diff --git a/LICENSE-binary b/LICENSE-binary
index 120ff15f818..661c654f02f 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -361,6 +361,7 @@ org.xerial.snappy:snappy-java:1.1.10.4
 org.yaml:snakeyaml:2.0
 org.wildfly.openssl:wildfly-openssl:2.2.5.Final
 software.amazon.awssdk:bundle:2.29.52
+software.amazon.s3.analyticsaccelerator:analyticsaccelerator-s3:1.3.0
 
 
--------------------------------------------------------------------------------
 This product bundles various third-party components under other open source
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 08a78805c45..ca58ce5fecc 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -204,7 +204,7 @@
     <aws-java-sdk.version>1.12.720</aws-java-sdk.version>
     <aws-java-sdk-v2.version>2.29.52</aws-java-sdk-v2.version>
     
<amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
-    
<amazon-s3-analyticsaccelerator-s3.version>1.2.1</amazon-s3-analyticsaccelerator-s3.version>
+    
<amazon-s3-analyticsaccelerator-s3.version>1.3.0</amazon-s3-analyticsaccelerator-s3.version>
     <aws.eventstream.version>1.0.1</aws.eventstream.version>
     <hsqldb.version>2.7.1</hsqldb.version>
     <frontend-maven-plugin.version>1.11.2</frontend-maven-plugin.version>
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 7046ed9f110..b2774190018 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -1928,7 +1928,10 @@ private FSDataInputStream executeOpen(
         .withCallbacks(createInputStreamCallbacks(auditSpan))
         .withContext(readContext.build())
         .withObjectAttributes(createObjectAttributes(path, fileStatus))
-        .withStreamStatistics(inputStreamStats);
+        .withStreamStatistics(inputStreamStats)
+        .withEncryptionSecrets(getEncryptionSecrets())
+        .withAuditSpan(auditSpan);
+
     return new FSDataInputStream(getStore().readObject(parameters));
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java
index e91710a0af3..d9a39d5d7db 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java
@@ -21,6 +21,7 @@
 import java.util.List;
 
 import software.amazon.awssdk.core.SdkRequest;
+import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
 import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
 import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
 import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
@@ -50,6 +51,8 @@
 import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_LIST_REQUEST;
 import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_PUT_REQUEST;
 import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_EXISTS_PROBE;
+import static 
software.amazon.s3.analyticsaccelerator.request.Constants.OPERATION_NAME;
+import static 
software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID;
 
 /**
  * Extract information from a request.
@@ -193,6 +196,18 @@ private RequestInfo writing(final String verb,
         || request instanceof CreateSessionRequest;
   }
 
+  /**
+   * If spanId and operation name are set by dependencies such as AAL, then 
this returns true. Allows for auditing
+   * of requests which are made outside S3A's requestFactory.
+   *
+   * @param executionAttributes request execution attributes
+   * @return true if request is audited outside of current span
+   */
+  public static boolean 
isRequestAuditedOutsideOfCurrentSpan(ExecutionAttributes executionAttributes) {
+   return executionAttributes.getAttribute(SPAN_ID) != null
+            && executionAttributes.getAttribute(OPERATION_NAME) != null;
+  }
+
   /**
    * Predicate which returns true if the request is part of the
    * multipart upload API -and which therefore must be rejected
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java
index 840ce5ffd30..19af82350b1 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java
@@ -61,6 +61,7 @@
 import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
 import static 
org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestMultipartIO;
 import static 
org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestNotAlwaysInSpan;
+import static 
org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestAuditedOutsideOfCurrentSpan;
 import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.OUTSIDE_SPAN;
 import static 
org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED;
 import static 
org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED_DEFAULT;
@@ -69,6 +70,8 @@
 import static org.apache.hadoop.fs.s3a.commit.CommitUtils.extractJobID;
 import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.HEADER_REFERRER;
 import static 
org.apache.hadoop.fs.s3a.statistics.impl.StatisticsFromAwsSdkImpl.mapErrorStatusCodeToStatisticName;
+import static 
software.amazon.s3.analyticsaccelerator.request.Constants.OPERATION_NAME;
+import static 
software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID;
 
 /**
  * The LoggingAuditor logs operations at DEBUG (in SDK Request) and
@@ -85,7 +88,6 @@ public class LoggingAuditor
   private static final Logger LOG =
       LoggerFactory.getLogger(LoggingAuditor.class);
 
-
   /**
    * Some basic analysis for the logs.
    */
@@ -267,7 +269,14 @@ HttpReferrerAuditHeader getReferrer(AuditSpanS3A span) {
    */
   private class LoggingAuditSpan extends AbstractAuditSpanImpl {
 
-    private final HttpReferrerAuditHeader referrer;
+    private HttpReferrerAuditHeader referrer;
+
+    /**
+     * Builder for the referrer header. Requests that execute outside S3A, 
such as in AAL, will initially have SpanId
+     * of  the outside-span operation. For such requests, the spanId and 
operation name in this builder is overwritten
+     * in the modifyHttpRequest execution interceptor.
+     */
+    private final HttpReferrerAuditHeader.Builder headerBuilder;
 
     /**
      * Attach Range of data for GetObject Request.
@@ -300,7 +309,7 @@ private LoggingAuditSpan(
         final String path2) {
       super(spanId, operationName);
 
-      this.referrer = HttpReferrerAuditHeader.builder()
+      this.headerBuilder = HttpReferrerAuditHeader.builder()
           .withContextId(getAuditorId())
           .withSpanId(spanId)
           .withOperationName(operationName)
@@ -312,8 +321,9 @@ private LoggingAuditSpan(
               currentThreadID())
           .withAttribute(PARAM_TIMESTAMP, Long.toString(getTimestamp()))
           .withEvaluated(context.getEvaluatedEntries())
-          .withFilter(filters)
-          .build();
+          .withFilter(filters);
+
+      this.referrer = this.headerBuilder.build();
 
       this.description = referrer.buildHttpReferrer();
     }
@@ -384,6 +394,26 @@ public SdkHttpRequest 
modifyHttpRequest(Context.ModifyHttpRequest context,
       SdkHttpRequest httpRequest = context.httpRequest();
       SdkRequest sdkRequest = context.request();
 
+      // If spanId and operationName are set in execution attributes, then use 
these values,
+      // instead of the ones in the current span. This is useful when requests 
are happening in dependencies such as
+      // the analytics accelerator library (AAL), where they cannot be 
attached to the correct span. In which case, AAL
+      // will attach the current spanId and operationName via execution 
attributes during it's request creation. These
+      // can then used to update the values in the logger and referrer header. 
Without this overwriting, the operation
+      // name and corresponding span will be whichever is active on the thread 
the request is getting executed on.
+      boolean isRequestAuditedOutsideCurrentSpan = 
isRequestAuditedOutsideOfCurrentSpan(executionAttributes);
+
+      String spanId = isRequestAuditedOutsideCurrentSpan ?
+              executionAttributes.getAttribute(SPAN_ID) : getSpanId();
+
+      String operationName = isRequestAuditedOutsideCurrentSpan ?
+              executionAttributes.getAttribute(OPERATION_NAME) : 
getOperationName();
+
+      if (isRequestAuditedOutsideCurrentSpan) {
+        this.headerBuilder.withSpanId(spanId);
+        this.headerBuilder.withOperationName(operationName);
+        this.referrer = this.headerBuilder.build();
+      }
+
       // attach range for GetObject requests
       attachRangeFromRequest(httpRequest, executionAttributes);
 
@@ -400,11 +430,12 @@ public SdkHttpRequest 
modifyHttpRequest(Context.ModifyHttpRequest context,
             .appendHeader(HEADER_REFERRER, header)
             .build();
       }
+
       if (LOG.isDebugEnabled()) {
         LOG.debug("[{}] {} Executing {} with {}; {}",
             currentThreadID(),
-            getSpanId(),
-            getOperationName(),
+            spanId,
+            operationName,
             analyzer.analyze(context.request()),
             header);
       }
@@ -533,10 +564,12 @@ public void beforeExecution(Context.BeforeExecution 
context,
           + analyzer.analyze(context.request());
       final String unaudited = getSpanId() + " "
           + UNAUDITED_OPERATION + " " + error;
+      // If request is attached to a span in the modifyHttpRequest, as is the 
case for requests made by AAL, treat it
+      // as an audited request.
       if (isRequestNotAlwaysInSpan(context.request())) {
-        // can get by auditing during a copy, so don't overreact
+        // can get by auditing during a copy, so don't overreact.
         LOG.debug(unaudited);
-      } else {
+      } else if (!isRequestAuditedOutsideOfCurrentSpan(executionAttributes)) {
         final RuntimeException ex = new AuditFailureException(unaudited);
         LOG.debug(unaudited, ex);
         if (isRejectOutOfSpan()) {
@@ -547,5 +580,4 @@ public void beforeExecution(Context.BeforeExecution context,
       super.beforeExecution(context, executionAttributes);
     }
   }
-
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
index 96ab44a8597..240d1958653 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
@@ -993,10 +993,9 @@ public InputStreamType streamType() {
   private class FactoryCallbacks implements StreamFactoryCallbacks {
 
     @Override
-    public S3AsyncClient getOrCreateAsyncClient(final boolean requireCRT) 
throws IOException {
-      // Needs support of the CRT before the requireCRT can be used
-      LOG.debug("Stream factory requested async client");
-      return clientManager().getOrCreateAsyncClient();
+    public S3Client getOrCreateSyncClient() throws IOException {
+      LOG.debug("Stream factory requested sync client");
+      return clientManager().getOrCreateS3Client();
     }
 
     @Override
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
index 6b910c65380..8920b5b2dfc 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
@@ -21,10 +21,22 @@
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.IntFunction;
+import java.util.Optional;
 
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecretOperations;
 import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
 import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
+import software.amazon.s3.analyticsaccelerator.common.ObjectRange;
+import software.amazon.s3.analyticsaccelerator.request.EncryptionSecrets;
 import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
+import software.amazon.s3.analyticsaccelerator.request.StreamAuditContext;
 import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
 import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
 import software.amazon.s3.analyticsaccelerator.util.S3URI;
@@ -37,6 +49,11 @@
 import org.apache.hadoop.fs.s3a.Retries;
 import org.apache.hadoop.fs.s3a.S3AInputPolicy;
 import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.VectoredReadUtils;
+
+import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED;
+
 
 /**
  * Analytics stream creates a stream using aws-analytics-accelerator-s3. This 
stream supports
@@ -128,6 +145,42 @@ public int read(byte[] buf, int off, int len) throws 
IOException {
     return bytesRead;
   }
 
+  /**
+   * Pass to {@link #readVectored(List, IntFunction, Consumer)}
+   * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser.
+   * {@inheritDoc}
+   */
+  @Override
+  public void readVectored(List<? extends FileRange> ranges,
+                                        IntFunction<ByteBuffer> allocate) 
throws IOException {
+    readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED);
+  }
+
+  /**
+   * Pass to {@link #readVectored(List, IntFunction, Consumer)}
+   * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser.
+   * {@inheritDoc}
+   */
+  @Override
+  public void readVectored(final List<? extends FileRange> ranges,
+                           final IntFunction<ByteBuffer> allocate,
+                           final Consumer<ByteBuffer> release) throws 
IOException {
+    LOG.debug("AAL: Starting vectored read on path {} for ranges {} ", 
getPathStr(), ranges);
+    throwIfClosed();
+
+    List<ObjectRange> objectRanges = new ArrayList<>();
+
+    for (FileRange range : ranges) {
+      CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
+      ObjectRange objectRange = new ObjectRange(result, range.getOffset(), 
range.getLength());
+      objectRanges.add(objectRange);
+      range.setData(result);
+    }
+
+    // AAL does not do any range coalescing, so input and combined ranges are 
the same.
+    this.getS3AStreamStatistics().readVectoredOperationStarted(ranges.size(), 
ranges.size());
+    inputStream.readVectored(objectRanges, allocate, release);
+  }
 
   @Override
   public boolean seekToNewSource(long l) throws IOException {
@@ -205,6 +258,18 @@ private OpenStreamInformation 
buildOpenStreamInformation(ObjectReadParameters pa
           .etag(parameters.getObjectAttributes().getETag()).build());
     }
 
+
+    if (parameters.getEncryptionSecrets().getEncryptionMethod() == 
S3AEncryptionMethods.SSE_C) {
+      
EncryptionSecretOperations.getSSECustomerKey(parameters.getEncryptionSecrets())
+              .ifPresent(base64customerKey -> 
openStreamInformationBuilder.encryptionSecrets(
+              
EncryptionSecrets.builder().sseCustomerKey(Optional.of(base64customerKey)).build()));
+    }
+
+    
openStreamInformationBuilder.streamAuditContext(StreamAuditContext.builder()
+                    
.operationName(parameters.getAuditSpan().getOperationName())
+                    .spanId(parameters.getAuditSpan().getSpanId())
+                    .build());
+
     return openStreamInformationBuilder.build();
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
index c67c08be7b9..50333c68e0c 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
@@ -23,9 +23,10 @@
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
+
 import 
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
 import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
+import software.amazon.s3.analyticsaccelerator.S3SyncSdkObjectClient;
 import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
 
 import org.apache.hadoop.conf.Configuration;
@@ -96,8 +97,7 @@ public StreamFactoryRequirements factoryRequirements() {
     vectorContext.setMinSeekForVectoredReads(0);
 
     return new StreamFactoryRequirements(0,
-            0, vectorContext,
-            StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests);
+            0, vectorContext);
   }
 
   @Override
@@ -118,7 +118,7 @@ private S3SeekableInputStreamFactory 
getOrCreateS3SeekableInputStreamFactory()
 
   private CallableRaisingIOE<S3SeekableInputStreamFactory> 
createS3SeekableInputStreamFactory() {
     return () -> new S3SeekableInputStreamFactory(
-            new 
S3SdkObjectClient(callbacks().getOrCreateAsyncClient(requireCrt)),
+            new S3SyncSdkObjectClient(callbacks().getOrCreateSyncClient()),
             seekableInputStreamConfiguration);
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java
index 9b2b54c48e5..caab0572ff2 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java
@@ -20,7 +20,7 @@
 
 import java.io.IOException;
 
-import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
 
 import org.apache.hadoop.fs.s3a.Statistic;
 import org.apache.hadoop.fs.StreamCapabilities;
@@ -80,12 +80,11 @@ ObjectInputStream readObject(ObjectReadParameters 
parameters)
   interface StreamFactoryCallbacks {
 
     /**
-     * Get the Async S3Client, raising a failure to create as an IOException.
-     * @param requireCRT is the CRT required.
+     * Get the Sync S3Client, raising a failure to create as an IOException.
      * @return the Async S3 client
      * @throws IOException failure to create the client.
      */
-    S3AsyncClient getOrCreateAsyncClient(boolean requireCRT) throws 
IOException;
+    S3Client getOrCreateSyncClient() throws IOException;
 
     void incrementFactoryStatistic(Statistic statistic);
   }
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java
index e784dadcb65..459e7cc93f5 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java
@@ -23,7 +23,9 @@
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.s3a.S3AReadOpContext;
 import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
 import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.fs.store.audit.AuditSpan;
 
 import static java.util.Objects.requireNonNull;
 
@@ -69,6 +71,34 @@ public final class ObjectReadParameters {
    */
   private LocalDirAllocator directoryAllocator;
 
+  /**
+   * Encryption secrets for this stream.
+   */
+  private EncryptionSecrets encryptionSecrets;
+
+  /**
+   * Span for which this stream is being created.
+   */
+  private AuditSpan auditSpan;
+
+  /**
+   * Getter.
+   * @return Encryption secrets.
+   */
+  public EncryptionSecrets getEncryptionSecrets() {
+    return encryptionSecrets;
+  }
+
+  /**
+   * Set encryption secrets.
+   * @param value new value
+   * @return the builder
+   */
+  public ObjectReadParameters withEncryptionSecrets(final EncryptionSecrets 
value) {
+    encryptionSecrets = value;
+    return this;
+  }
+
   /**
    * @return Read operation context.
    */
@@ -172,6 +202,24 @@ public ObjectReadParameters withDirectoryAllocator(final 
LocalDirAllocator value
     return this;
   }
 
+  /**
+   * Getter.
+   * @return Audit span.
+   */
+  public AuditSpan getAuditSpan() {
+    return auditSpan;
+  }
+
+  /**
+   * Set audit span.
+   * @param value new value
+   * @return the builder
+   */
+  public ObjectReadParameters withAuditSpan(final AuditSpan value) {
+    auditSpan = value;
+    return this;
+  }
+
   /**
    * Validate that all attributes are as expected.
    * Mock tests can skip this if required.
@@ -185,6 +233,8 @@ public ObjectReadParameters validate() {
     requireNonNull(directoryAllocator, "directoryAllocator");
     requireNonNull(objectAttributes, "objectAttributes");
     requireNonNull(streamStatistics, "streamStatistics");
+    requireNonNull(encryptionSecrets, "encryptionSecrets");
+    requireNonNull(auditSpan, "auditSpan");
     return this;
   }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
index 8cf182680c3..f3a10b209c2 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
@@ -18,12 +18,24 @@
 
 package org.apache.hadoop.fs.contract.s3a;
 
+import java.util.List;
+
+import org.junit.Test;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileRange;
 import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
 
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static 
org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipForAnyEncryptionExceptSSES3;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 
 /**
  * S3A contract tests for vectored reads with the Analytics stream.
@@ -57,7 +69,6 @@ protected Configuration createConfiguration() {
     // This issue is tracked in:
     // https://github.com/awslabs/analytics-accelerator-s3/issues/218
     skipForAnyEncryptionExceptSSES3(conf);
-    conf.set("fs.contract.vector-io-early-eof-check", "false");
     return conf;
   }
 
@@ -65,4 +76,41 @@ protected Configuration createConfiguration() {
   protected AbstractFSContract createContract(Configuration conf) {
     return new S3AContract(conf);
   }
+
+  /**
+   * When the offset is negative, AAL returns IllegalArgumentException, 
whereas the base implementation will return
+   * an EoF.
+   */
+  @Override
+  public void testNegativeOffsetRange()  throws Exception {
+    verifyExceptionalVectoredRead(ContractTestUtils.range(-1, 50), 
IllegalArgumentException.class);
+  }
+
+  /**
+   * Currently there is no null check on the release operation, this will be 
fixed in the next AAL version.
+   */
+  @Override
+  public void testNullReleaseOperation()  {
+    skip("AAL current does not do a null check on the release operation");
+  }
+
+  @Test
+  public void testReadVectoredWithAALStatsCollection() throws Exception {
+
+    List<FileRange> fileRanges = createSampleNonOverlappingRanges();
+    try (FSDataInputStream in = openVectorFile()) {
+      in.readVectored(fileRanges, getAllocate());
+
+      validateVectoredReadResult(fileRanges, DATASET, 0);
+      IOStatistics st = in.getIOStatistics();
+
+      // Statistics such as GET requests will be added after IoStats support.
+      verifyStatisticCounterValue(st,
+              StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED, 1);
+
+      verifyStatisticCounterValue(st,
+              StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+              1);
+    }
+  }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
index 35d8f48ee2f..76afa6faaeb 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
@@ -42,6 +42,7 @@
 import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
 import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
 import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+import static 
org.apache.hadoop.fs.audit.AuditStatisticNames.AUDIT_REQUEST_EXECUTION;
 import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
@@ -109,6 +110,12 @@ public void testConnectorFrameWorkIntegration() throws 
Throwable {
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
     fs.close();
     verifyStatisticCounterValue(fs.getIOStatistics(), 
ANALYTICS_STREAM_FACTORY_CLOSED, 1);
+
+    // Expect 4 audited requests. One HEAD, and 3 GETs. The 3 GETs are because 
the read policy is WHOLE_FILE,
+    // in which case, AAL will start prefetching till EoF on file open in 8MB 
chunks. The file read here
+    // s3://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz, has a size of 
~21MB, resulting in 3 GETS:
+    // [0-8388607, 8388608-16777215, 16777216-21511173].
+    verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION, 
4);
   }
 
   @Test
@@ -175,6 +182,8 @@ public void testMultiRowGroupParquet() throws Throwable {
     }
 
     verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+
+    verifyStatisticCounterValue(getFileSystem().getIOStatistics(), 
AUDIT_REQUEST_EXECUTION, 4);
   }
 
   @Test
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
index 0f79881466f..362fae50b2b 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
@@ -20,6 +20,8 @@
 
 import java.io.IOException;
 import java.nio.file.AccessDeniedException;
+import java.util.Arrays;
+import java.util.Collection;
 
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
@@ -30,6 +32,8 @@
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.contract.s3a.S3AContract;
 import org.apache.hadoop.io.IOUtils;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
@@ -40,10 +44,10 @@
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
 
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeStoreAwsHosted;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeSkipRootTests;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
@@ -54,6 +58,7 @@
  * Equally "vexing" has been the optimizations of getFileStatus(), wherein
  * LIST comes before HEAD path + /
  */
+@RunWith(Parameterized.class)
 public class ITestS3AEncryptionSSEC extends AbstractTestS3AEncryption {
 
   private static final String SERVICE_AMAZON_S3_STATUS_CODE_403
@@ -73,6 +78,20 @@ public class ITestS3AEncryptionSSEC extends 
AbstractTestS3AEncryption {
    */
   private S3AFileSystem fsKeyB;
 
+  private final boolean analyticsAcceleratorEnabled;
+
+  @Parameterized.Parameters(name = "analyticsAcceleratorEnabled={0}")
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{
+            {true},
+            {false}
+    });
+  }
+
+  public ITestS3AEncryptionSSEC (final boolean analyticsAcceleratorEnabled) {
+    this.analyticsAcceleratorEnabled = analyticsAcceleratorEnabled;
+  }
+
 
   @SuppressWarnings("deprecation")
   @Override
@@ -90,14 +109,17 @@ protected Configuration createConfiguration() {
         getSSEAlgorithm().getMethod());
     conf.set(S3_ENCRYPTION_KEY, KEY_1);
     conf.setBoolean(ETAG_CHECKSUM_ENABLED, true);
+
+    if (analyticsAcceleratorEnabled) {
+      enableAnalyticsAccelerator(conf);
+    }
+
     return conf;
   }
 
   @Override
   public void setup() throws Exception {
     super.setup();
-    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
-        "Analytics Accelerator currently does not support SSE-C");
     assumeEnabled();
     // although not a root dir test, this confuses paths enough it shouldn't 
be run in
     // parallel with other jobs
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/streams/TestStreamFactories.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/streams/TestStreamFactories.java
index 05f15d46238..fd27d4e63d9 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/streams/TestStreamFactories.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/streams/TestStreamFactories.java
@@ -24,12 +24,12 @@
 import org.apache.hadoop.fs.s3a.Statistic;
 import org.assertj.core.api.Assertions;
 import org.junit.Test;
-import software.amazon.awssdk.services.s3.S3AsyncClient;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.s3a.VectoredIOContext;
 import org.apache.hadoop.fs.s3a.prefetch.PrefetchingInputStreamFactory;
 import org.apache.hadoop.test.AbstractHadoopTestBase;
+import software.amazon.awssdk.services.s3.S3Client;
 
 import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_CUSTOM_FACTORY;
 import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_TYPE;
@@ -330,9 +330,8 @@ public FactoryFailsToInstantiate() {
    * Callbacks from {@link ObjectInputStreamFactory} instances.
    */
   private static final class Callbacks implements 
ObjectInputStreamFactory.StreamFactoryCallbacks {
-
     @Override
-    public S3AsyncClient getOrCreateAsyncClient(final boolean requireCRT) 
throws IOException {
+    public S3Client getOrCreateSyncClient() throws IOException {
       throw new UnsupportedOperationException("not implemented");
     }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java
index 0203b00caab..6f19ba15c1c 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java
@@ -31,7 +31,6 @@
 import static 
org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
 
 /**
@@ -55,8 +54,6 @@ public class ITestS3AHugeFilesSSECDiskBlocks
   public void setup() throws Exception {
     try {
       super.setup();
-      skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
-          "Analytics Accelerator currently does not support SSE-C");
     } catch (AccessDeniedException | AWSUnsupportedFeatureException e) {
       skip("Bucket does not allow " + S3AEncryptionMethods.SSE_C + " 
encryption method");
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to