This is an automated email from the ASF dual-hosted git repository.
ahmar pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4 by this push:
new 38f9c1cceee HADOOP-19559. S3A: Prerequisite features for AAL default
ON. (#8067)
38f9c1cceee is described below
commit 38f9c1cceee80fd23d4e9e10bc4036d2f4656703
Author: ahmarsuhail <[email protected]>
AuthorDate: Fri Nov 7 15:45:47 2025 +0000
HADOOP-19559. S3A: Prerequisite features for AAL default ON. (#8067)
Backports:
* HADOOP-19394. S3A: Integrate with AAL's readVectored(). (#7720)
* HADOOP-19664. S3A: Analytics stream to use Java sync client. (#7909)
* HADOOP-19698. S3A: Add AAL dependency to LICENSE-binary.
* HADOOP-19587. S3A: Adds in support for SSE-C to AAL (#7906)
* HADOOP-19365. S3A: Adds in support for auditing for AAL. (#7723)
---
LICENSE-binary | 1 +
hadoop-project/pom.xml | 2 +-
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 5 +-
.../hadoop/fs/s3a/audit/AWSRequestAnalyzer.java | 15 +++++
.../hadoop/fs/s3a/audit/impl/LoggingAuditor.java | 52 +++++++++++++----
.../apache/hadoop/fs/s3a/impl/S3AStoreImpl.java | 7 +--
.../fs/s3a/impl/streams/AnalyticsStream.java | 65 ++++++++++++++++++++++
.../s3a/impl/streams/AnalyticsStreamFactory.java | 8 +--
.../s3a/impl/streams/ObjectInputStreamFactory.java | 7 +--
.../fs/s3a/impl/streams/ObjectReadParameters.java | 50 +++++++++++++++++
...TestS3AContractAnalyticsStreamVectoredRead.java | 50 ++++++++++++++++-
.../ITestS3AAnalyticsAcceleratorStreamReading.java | 9 +++
.../hadoop/fs/s3a/ITestS3AEncryptionSSEC.java | 28 +++++++++-
.../fs/s3a/impl/streams/TestStreamFactories.java | 5 +-
.../s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java | 3 -
15 files changed, 273 insertions(+), 34 deletions(-)
diff --git a/LICENSE-binary b/LICENSE-binary
index 120ff15f818..661c654f02f 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -361,6 +361,7 @@ org.xerial.snappy:snappy-java:1.1.10.4
org.yaml:snakeyaml:2.0
org.wildfly.openssl:wildfly-openssl:2.2.5.Final
software.amazon.awssdk:bundle:2.29.52
+software.amazon.s3.analyticsaccelerator:analyticsaccelerator-s3:1.3.0
--------------------------------------------------------------------------------
This product bundles various third-party components under other open source
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 08a78805c45..ca58ce5fecc 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -204,7 +204,7 @@
<aws-java-sdk.version>1.12.720</aws-java-sdk.version>
<aws-java-sdk-v2.version>2.29.52</aws-java-sdk-v2.version>
<amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version>
-
<amazon-s3-analyticsaccelerator-s3.version>1.2.1</amazon-s3-analyticsaccelerator-s3.version>
+
<amazon-s3-analyticsaccelerator-s3.version>1.3.0</amazon-s3-analyticsaccelerator-s3.version>
<aws.eventstream.version>1.0.1</aws.eventstream.version>
<hsqldb.version>2.7.1</hsqldb.version>
<frontend-maven-plugin.version>1.11.2</frontend-maven-plugin.version>
diff --git
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 7046ed9f110..b2774190018 100644
---
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -1928,7 +1928,10 @@ private FSDataInputStream executeOpen(
.withCallbacks(createInputStreamCallbacks(auditSpan))
.withContext(readContext.build())
.withObjectAttributes(createObjectAttributes(path, fileStatus))
- .withStreamStatistics(inputStreamStats);
+ .withStreamStatistics(inputStreamStats)
+ .withEncryptionSecrets(getEncryptionSecrets())
+ .withAuditSpan(auditSpan);
+
return new FSDataInputStream(getStore().readObject(parameters));
}
diff --git
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java
index e91710a0af3..d9a39d5d7db 100644
---
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java
+++
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AWSRequestAnalyzer.java
@@ -21,6 +21,7 @@
import java.util.List;
import software.amazon.awssdk.core.SdkRequest;
+import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
@@ -50,6 +51,8 @@
import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_LIST_REQUEST;
import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_PUT_REQUEST;
import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.STORE_EXISTS_PROBE;
+import static
software.amazon.s3.analyticsaccelerator.request.Constants.OPERATION_NAME;
+import static
software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID;
/**
* Extract information from a request.
@@ -193,6 +196,18 @@ private RequestInfo writing(final String verb,
|| request instanceof CreateSessionRequest;
}
+ /**
+ * If spanId and operation name are set by dependencies such as AAL, then
this returns true. Allows for auditing
+ * of requests which are made outside S3A's requestFactory.
+ *
+ * @param executionAttributes request execution attributes
+ * @return true if request is audited outside of current span
+ */
+ public static boolean
isRequestAuditedOutsideOfCurrentSpan(ExecutionAttributes executionAttributes) {
+ return executionAttributes.getAttribute(SPAN_ID) != null
+ && executionAttributes.getAttribute(OPERATION_NAME) != null;
+ }
+
/**
* Predicate which returns true if the request is part of the
* multipart upload API -and which therefore must be rejected
diff --git
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java
index 840ce5ffd30..19af82350b1 100644
---
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java
+++
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/impl/LoggingAuditor.java
@@ -61,6 +61,7 @@
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
import static
org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestMultipartIO;
import static
org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestNotAlwaysInSpan;
+import static
org.apache.hadoop.fs.s3a.audit.AWSRequestAnalyzer.isRequestAuditedOutsideOfCurrentSpan;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.OUTSIDE_SPAN;
import static
org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED;
import static
org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REFERRER_HEADER_ENABLED_DEFAULT;
@@ -69,6 +70,8 @@
import static org.apache.hadoop.fs.s3a.commit.CommitUtils.extractJobID;
import static org.apache.hadoop.fs.s3a.impl.HeaderProcessing.HEADER_REFERRER;
import static
org.apache.hadoop.fs.s3a.statistics.impl.StatisticsFromAwsSdkImpl.mapErrorStatusCodeToStatisticName;
+import static
software.amazon.s3.analyticsaccelerator.request.Constants.OPERATION_NAME;
+import static
software.amazon.s3.analyticsaccelerator.request.Constants.SPAN_ID;
/**
* The LoggingAuditor logs operations at DEBUG (in SDK Request) and
@@ -85,7 +88,6 @@ public class LoggingAuditor
private static final Logger LOG =
LoggerFactory.getLogger(LoggingAuditor.class);
-
/**
* Some basic analysis for the logs.
*/
@@ -267,7 +269,14 @@ HttpReferrerAuditHeader getReferrer(AuditSpanS3A span) {
*/
private class LoggingAuditSpan extends AbstractAuditSpanImpl {
- private final HttpReferrerAuditHeader referrer;
+ private HttpReferrerAuditHeader referrer;
+
+ /**
+ * Builder for the referrer header. Requests that execute outside S3A,
such as in AAL, will initially have SpanId
+ * of the outside-span operation. For such requests, the spanId and
operation name in this builder is overwritten
+ * in the modifyHttpRequest execution interceptor.
+ */
+ private final HttpReferrerAuditHeader.Builder headerBuilder;
/**
* Attach Range of data for GetObject Request.
@@ -300,7 +309,7 @@ private LoggingAuditSpan(
final String path2) {
super(spanId, operationName);
- this.referrer = HttpReferrerAuditHeader.builder()
+ this.headerBuilder = HttpReferrerAuditHeader.builder()
.withContextId(getAuditorId())
.withSpanId(spanId)
.withOperationName(operationName)
@@ -312,8 +321,9 @@ private LoggingAuditSpan(
currentThreadID())
.withAttribute(PARAM_TIMESTAMP, Long.toString(getTimestamp()))
.withEvaluated(context.getEvaluatedEntries())
- .withFilter(filters)
- .build();
+ .withFilter(filters);
+
+ this.referrer = this.headerBuilder.build();
this.description = referrer.buildHttpReferrer();
}
@@ -384,6 +394,26 @@ public SdkHttpRequest
modifyHttpRequest(Context.ModifyHttpRequest context,
SdkHttpRequest httpRequest = context.httpRequest();
SdkRequest sdkRequest = context.request();
+ // If spanId and operationName are set in execution attributes, then use
these values,
+ // instead of the ones in the current span. This is useful when requests
are happening in dependencies such as
+ // the analytics accelerator library (AAL), where they cannot be
attached to the correct span. In which case, AAL
+ // will attach the current spanId and operationName via execution
attributes during it's request creation. These
+ // can then used to update the values in the logger and referrer header.
Without this overwriting, the operation
+ // name and corresponding span will be whichever is active on the thread
the request is getting executed on.
+ boolean isRequestAuditedOutsideCurrentSpan =
isRequestAuditedOutsideOfCurrentSpan(executionAttributes);
+
+ String spanId = isRequestAuditedOutsideCurrentSpan ?
+ executionAttributes.getAttribute(SPAN_ID) : getSpanId();
+
+ String operationName = isRequestAuditedOutsideCurrentSpan ?
+ executionAttributes.getAttribute(OPERATION_NAME) :
getOperationName();
+
+ if (isRequestAuditedOutsideCurrentSpan) {
+ this.headerBuilder.withSpanId(spanId);
+ this.headerBuilder.withOperationName(operationName);
+ this.referrer = this.headerBuilder.build();
+ }
+
// attach range for GetObject requests
attachRangeFromRequest(httpRequest, executionAttributes);
@@ -400,11 +430,12 @@ public SdkHttpRequest
modifyHttpRequest(Context.ModifyHttpRequest context,
.appendHeader(HEADER_REFERRER, header)
.build();
}
+
if (LOG.isDebugEnabled()) {
LOG.debug("[{}] {} Executing {} with {}; {}",
currentThreadID(),
- getSpanId(),
- getOperationName(),
+ spanId,
+ operationName,
analyzer.analyze(context.request()),
header);
}
@@ -533,10 +564,12 @@ public void beforeExecution(Context.BeforeExecution
context,
+ analyzer.analyze(context.request());
final String unaudited = getSpanId() + " "
+ UNAUDITED_OPERATION + " " + error;
+ // If request is attached to a span in the modifyHttpRequest, as is the
case for requests made by AAL, treat it
+ // as an audited request.
if (isRequestNotAlwaysInSpan(context.request())) {
- // can get by auditing during a copy, so don't overreact
+ // can get by auditing during a copy, so don't overreact.
LOG.debug(unaudited);
- } else {
+ } else if (!isRequestAuditedOutsideOfCurrentSpan(executionAttributes)) {
final RuntimeException ex = new AuditFailureException(unaudited);
LOG.debug(unaudited, ex);
if (isRejectOutOfSpan()) {
@@ -547,5 +580,4 @@ public void beforeExecution(Context.BeforeExecution context,
super.beforeExecution(context, executionAttributes);
}
}
-
}
diff --git
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
index 96ab44a8597..240d1958653 100644
---
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
+++
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
@@ -993,10 +993,9 @@ public InputStreamType streamType() {
private class FactoryCallbacks implements StreamFactoryCallbacks {
@Override
- public S3AsyncClient getOrCreateAsyncClient(final boolean requireCRT)
throws IOException {
- // Needs support of the CRT before the requireCRT can be used
- LOG.debug("Stream factory requested async client");
- return clientManager().getOrCreateAsyncClient();
+ public S3Client getOrCreateSyncClient() throws IOException {
+ LOG.debug("Stream factory requested sync client");
+ return clientManager().getOrCreateS3Client();
}
@Override
diff --git
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
index 6b910c65380..8920b5b2dfc 100644
---
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
+++
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
@@ -21,10 +21,22 @@
import java.io.EOFException;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.IntFunction;
+import java.util.Optional;
+import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
+import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecretOperations;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
+import software.amazon.s3.analyticsaccelerator.common.ObjectRange;
+import software.amazon.s3.analyticsaccelerator.request.EncryptionSecrets;
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
+import software.amazon.s3.analyticsaccelerator.request.StreamAuditContext;
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
import software.amazon.s3.analyticsaccelerator.util.S3URI;
@@ -37,6 +49,11 @@
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.apache.hadoop.fs.FileRange;
+import org.apache.hadoop.fs.VectoredReadUtils;
+
+import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED;
+
/**
* Analytics stream creates a stream using aws-analytics-accelerator-s3. This
stream supports
@@ -128,6 +145,42 @@ public int read(byte[] buf, int off, int len) throws
IOException {
return bytesRead;
}
+ /**
+ * Pass to {@link #readVectored(List, IntFunction, Consumer)}
+ * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser.
+ * {@inheritDoc}
+ */
+ @Override
+ public void readVectored(List<? extends FileRange> ranges,
+ IntFunction<ByteBuffer> allocate)
throws IOException {
+ readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED);
+ }
+
+ /**
+ * Pass to {@link #readVectored(List, IntFunction, Consumer)}
+ * with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser.
+ * {@inheritDoc}
+ */
+ @Override
+ public void readVectored(final List<? extends FileRange> ranges,
+ final IntFunction<ByteBuffer> allocate,
+ final Consumer<ByteBuffer> release) throws
IOException {
+ LOG.debug("AAL: Starting vectored read on path {} for ranges {} ",
getPathStr(), ranges);
+ throwIfClosed();
+
+ List<ObjectRange> objectRanges = new ArrayList<>();
+
+ for (FileRange range : ranges) {
+ CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
+ ObjectRange objectRange = new ObjectRange(result, range.getOffset(),
range.getLength());
+ objectRanges.add(objectRange);
+ range.setData(result);
+ }
+
+ // AAL does not do any range coalescing, so input and combined ranges are
the same.
+ this.getS3AStreamStatistics().readVectoredOperationStarted(ranges.size(),
ranges.size());
+ inputStream.readVectored(objectRanges, allocate, release);
+ }
@Override
public boolean seekToNewSource(long l) throws IOException {
@@ -205,6 +258,18 @@ private OpenStreamInformation
buildOpenStreamInformation(ObjectReadParameters pa
.etag(parameters.getObjectAttributes().getETag()).build());
}
+
+ if (parameters.getEncryptionSecrets().getEncryptionMethod() ==
S3AEncryptionMethods.SSE_C) {
+
EncryptionSecretOperations.getSSECustomerKey(parameters.getEncryptionSecrets())
+ .ifPresent(base64customerKey ->
openStreamInformationBuilder.encryptionSecrets(
+
EncryptionSecrets.builder().sseCustomerKey(Optional.of(base64customerKey)).build()));
+ }
+
+
openStreamInformationBuilder.streamAuditContext(StreamAuditContext.builder()
+
.operationName(parameters.getAuditSpan().getOperationName())
+ .spanId(parameters.getAuditSpan().getSpanId())
+ .build());
+
return openStreamInformationBuilder.build();
}
diff --git
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
index c67c08be7b9..50333c68e0c 100644
---
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
+++
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
@@ -23,9 +23,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
+
import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
+import software.amazon.s3.analyticsaccelerator.S3SyncSdkObjectClient;
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
import org.apache.hadoop.conf.Configuration;
@@ -96,8 +97,7 @@ public StreamFactoryRequirements factoryRequirements() {
vectorContext.setMinSeekForVectoredReads(0);
return new StreamFactoryRequirements(0,
- 0, vectorContext,
- StreamFactoryRequirements.Requirements.ExpectUnauditedGetRequests);
+ 0, vectorContext);
}
@Override
@@ -118,7 +118,7 @@ private S3SeekableInputStreamFactory
getOrCreateS3SeekableInputStreamFactory()
private CallableRaisingIOE<S3SeekableInputStreamFactory>
createS3SeekableInputStreamFactory() {
return () -> new S3SeekableInputStreamFactory(
- new
S3SdkObjectClient(callbacks().getOrCreateAsyncClient(requireCrt)),
+ new S3SyncSdkObjectClient(callbacks().getOrCreateSyncClient()),
seekableInputStreamConfiguration);
}
diff --git
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java
index 9b2b54c48e5..caab0572ff2 100644
---
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java
+++
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java
@@ -20,7 +20,7 @@
import java.io.IOException;
-import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.StreamCapabilities;
@@ -80,12 +80,11 @@ ObjectInputStream readObject(ObjectReadParameters
parameters)
interface StreamFactoryCallbacks {
/**
- * Get the Async S3Client, raising a failure to create as an IOException.
- * @param requireCRT is the CRT required.
+ * Get the Sync S3Client, raising a failure to create as an IOException.
* @return the Async S3 client
* @throws IOException failure to create the client.
*/
- S3AsyncClient getOrCreateAsyncClient(boolean requireCRT) throws
IOException;
+ S3Client getOrCreateSyncClient() throws IOException;
void incrementFactoryStatistic(Statistic statistic);
}
diff --git
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java
index e784dadcb65..459e7cc93f5 100644
---
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java
+++
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectReadParameters.java
@@ -23,7 +23,9 @@
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.s3a.S3AReadOpContext;
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.fs.store.audit.AuditSpan;
import static java.util.Objects.requireNonNull;
@@ -69,6 +71,34 @@ public final class ObjectReadParameters {
*/
private LocalDirAllocator directoryAllocator;
+ /**
+ * Encryption secrets for this stream.
+ */
+ private EncryptionSecrets encryptionSecrets;
+
+ /**
+ * Span for which this stream is being created.
+ */
+ private AuditSpan auditSpan;
+
+ /**
+ * Getter.
+ * @return Encryption secrets.
+ */
+ public EncryptionSecrets getEncryptionSecrets() {
+ return encryptionSecrets;
+ }
+
+ /**
+ * Set encryption secrets.
+ * @param value new value
+ * @return the builder
+ */
+ public ObjectReadParameters withEncryptionSecrets(final EncryptionSecrets
value) {
+ encryptionSecrets = value;
+ return this;
+ }
+
/**
* @return Read operation context.
*/
@@ -172,6 +202,24 @@ public ObjectReadParameters withDirectoryAllocator(final
LocalDirAllocator value
return this;
}
+ /**
+ * Getter.
+ * @return Audit span.
+ */
+ public AuditSpan getAuditSpan() {
+ return auditSpan;
+ }
+
+ /**
+ * Set audit span.
+ * @param value new value
+ * @return the builder
+ */
+ public ObjectReadParameters withAuditSpan(final AuditSpan value) {
+ auditSpan = value;
+ return this;
+ }
+
/**
* Validate that all attributes are as expected.
* Mock tests can skip this if required.
@@ -185,6 +233,8 @@ public ObjectReadParameters validate() {
requireNonNull(directoryAllocator, "directoryAllocator");
requireNonNull(objectAttributes, "objectAttributes");
requireNonNull(streamStatistics, "streamStatistics");
+ requireNonNull(encryptionSecrets, "encryptionSecrets");
+ requireNonNull(auditSpan, "auditSpan");
return this;
}
}
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
index 8cf182680c3..f3a10b209c2 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
@@ -18,12 +18,24 @@
package org.apache.hadoop.fs.contract.s3a;
+import java.util.List;
+
+import org.junit.Test;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static
org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipForAnyEncryptionExceptSSES3;
+import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
/**
* S3A contract tests for vectored reads with the Analytics stream.
@@ -57,7 +69,6 @@ protected Configuration createConfiguration() {
// This issue is tracked in:
// https://github.com/awslabs/analytics-accelerator-s3/issues/218
skipForAnyEncryptionExceptSSES3(conf);
- conf.set("fs.contract.vector-io-early-eof-check", "false");
return conf;
}
@@ -65,4 +76,41 @@ protected Configuration createConfiguration() {
protected AbstractFSContract createContract(Configuration conf) {
return new S3AContract(conf);
}
+
+ /**
+ * When the offset is negative, AAL returns IllegalArgumentException,
whereas the base implementation will return
+ * an EoF.
+ */
+ @Override
+ public void testNegativeOffsetRange() throws Exception {
+ verifyExceptionalVectoredRead(ContractTestUtils.range(-1, 50),
IllegalArgumentException.class);
+ }
+
+ /**
+ * Currently there is no null check on the release operation, this will be
fixed in the next AAL version.
+ */
+ @Override
+ public void testNullReleaseOperation() {
+ skip("AAL current does not do a null check on the release operation");
+ }
+
+ @Test
+ public void testReadVectoredWithAALStatsCollection() throws Exception {
+
+ List<FileRange> fileRanges = createSampleNonOverlappingRanges();
+ try (FSDataInputStream in = openVectorFile()) {
+ in.readVectored(fileRanges, getAllocate());
+
+ validateVectoredReadResult(fileRanges, DATASET, 0);
+ IOStatistics st = in.getIOStatistics();
+
+ // Statistics such as GET requests will be added after IoStats support.
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED, 1);
+
+ verifyStatisticCounterValue(st,
+ StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
+ 1);
+ }
+ }
}
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
index 35d8f48ee2f..76afa6faaeb 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
@@ -42,6 +42,7 @@
import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+import static
org.apache.hadoop.fs.audit.AuditStatisticNames.AUDIT_REQUEST_EXECUTION;
import static
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
@@ -109,6 +110,12 @@ public void testConnectorFrameWorkIntegration() throws
Throwable {
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
fs.close();
verifyStatisticCounterValue(fs.getIOStatistics(),
ANALYTICS_STREAM_FACTORY_CLOSED, 1);
+
+ // Expect 4 audited requests. One HEAD, and 3 GETs. The 3 GETs are because
the read policy is WHOLE_FILE,
+ // in which case, AAL will start prefetching till EoF on file open in 8MB
chunks. The file read here
+ // s3://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz, has a size of
~21MB, resulting in 3 GETS:
+ // [0-8388607, 8388608-16777215, 16777216-21511173].
+ verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION,
4);
}
@Test
@@ -175,6 +182,8 @@ public void testMultiRowGroupParquet() throws Throwable {
}
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+
+ verifyStatisticCounterValue(getFileSystem().getIOStatistics(),
AUDIT_REQUEST_EXECUTION, 4);
}
@Test
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
index 0f79881466f..362fae50b2b 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
@@ -20,6 +20,8 @@
import java.io.IOException;
import java.nio.file.AccessDeniedException;
+import java.util.Arrays;
+import java.util.Collection;
import org.assertj.core.api.Assertions;
import org.junit.Test;
@@ -30,6 +32,8 @@
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.apache.hadoop.io.IOUtils;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
@@ -40,10 +44,10 @@
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeStoreAwsHosted;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeSkipRootTests;
import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
@@ -54,6 +58,7 @@
* Equally "vexing" has been the optimizations of getFileStatus(), wherein
* LIST comes before HEAD path + /
*/
+@RunWith(Parameterized.class)
public class ITestS3AEncryptionSSEC extends AbstractTestS3AEncryption {
private static final String SERVICE_AMAZON_S3_STATUS_CODE_403
@@ -73,6 +78,20 @@ public class ITestS3AEncryptionSSEC extends
AbstractTestS3AEncryption {
*/
private S3AFileSystem fsKeyB;
+ private final boolean analyticsAcceleratorEnabled;
+
+ @Parameterized.Parameters(name = "analyticsAcceleratorEnabled={0}")
+ public static Collection<Object[]> params() {
+ return Arrays.asList(new Object[][]{
+ {true},
+ {false}
+ });
+ }
+
+ public ITestS3AEncryptionSSEC (final boolean analyticsAcceleratorEnabled) {
+ this.analyticsAcceleratorEnabled = analyticsAcceleratorEnabled;
+ }
+
@SuppressWarnings("deprecation")
@Override
@@ -90,14 +109,17 @@ protected Configuration createConfiguration() {
getSSEAlgorithm().getMethod());
conf.set(S3_ENCRYPTION_KEY, KEY_1);
conf.setBoolean(ETAG_CHECKSUM_ENABLED, true);
+
+ if (analyticsAcceleratorEnabled) {
+ enableAnalyticsAccelerator(conf);
+ }
+
return conf;
}
@Override
public void setup() throws Exception {
super.setup();
- skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
- "Analytics Accelerator currently does not support SSE-C");
assumeEnabled();
// although not a root dir test, this confuses paths enough it shouldn't
be run in
// parallel with other jobs
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/streams/TestStreamFactories.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/streams/TestStreamFactories.java
index 05f15d46238..fd27d4e63d9 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/streams/TestStreamFactories.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/streams/TestStreamFactories.java
@@ -24,12 +24,12 @@
import org.apache.hadoop.fs.s3a.Statistic;
import org.assertj.core.api.Assertions;
import org.junit.Test;
-import software.amazon.awssdk.services.s3.S3AsyncClient;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.s3a.VectoredIOContext;
import org.apache.hadoop.fs.s3a.prefetch.PrefetchingInputStreamFactory;
import org.apache.hadoop.test.AbstractHadoopTestBase;
+import software.amazon.awssdk.services.s3.S3Client;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_CUSTOM_FACTORY;
import static org.apache.hadoop.fs.s3a.Constants.INPUT_STREAM_TYPE;
@@ -330,9 +330,8 @@ public FactoryFailsToInstantiate() {
* Callbacks from {@link ObjectInputStreamFactory} instances.
*/
private static final class Callbacks implements
ObjectInputStreamFactory.StreamFactoryCallbacks {
-
@Override
- public S3AsyncClient getOrCreateAsyncClient(final boolean requireCRT)
throws IOException {
+ public S3Client getOrCreateSyncClient() throws IOException {
throw new UnsupportedOperationException("not implemented");
}
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java
index 0203b00caab..6f19ba15c1c 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java
@@ -31,7 +31,6 @@
import static
org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled;
/**
@@ -55,8 +54,6 @@ public class ITestS3AHugeFilesSSECDiskBlocks
public void setup() throws Exception {
try {
super.setup();
- skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
- "Analytics Accelerator currently does not support SSE-C");
} catch (AccessDeniedException | AWSUnsupportedFeatureException e) {
skip("Bucket does not allow " + S3AEncryptionMethods.SSE_C + "
encryption method");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]