This is an automated email from the ASF dual-hosted git repository. ahmar pushed a commit to branch feature-HADOOP-19363-analytics-accelerator-s3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 3828ad54469c869448718d1ea02de6eb36112d28 Author: Ahmar Suhail <ahma...@amazon.co.uk> AuthorDate: Wed Feb 12 11:41:08 2025 +0000 lazy eval of stream factory + test fixes --- .../s3a/impl/streams/AnalyticsStreamFactory.java | 27 +++++++++++++++------- .../hadoop/fs/s3a/ITestS3ARequesterPays.java | 7 +----- .../org/apache/hadoop/fs/s3a/S3ATestUtils.java | 4 ++-- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java index 0ec6713d9ca..611a791f9d7 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java @@ -21,7 +21,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.s3a.VectoredIOContext; +import org.apache.hadoop.util.functional.CallableRaisingIOE; +import org.apache.hadoop.util.functional.LazyAutoCloseableReference; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration; import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; @@ -39,7 +43,7 @@ public class AnalyticsStreamFactory extends AbstractObjectInputStreamFactory { private S3SeekableInputStreamConfiguration seekableInputStreamConfiguration; - private S3SeekableInputStreamFactory s3SeekableInputStreamFactory; + private LazyAutoCloseableReference<S3SeekableInputStreamFactory> s3SeekableInputStreamFactory; private boolean requireCrt; public AnalyticsStreamFactory() { @@ -59,20 +63,17 @@ protected void serviceInit(final Configuration conf) throws Exception { @Override public void bind(final FactoryBindingParameters factoryBindingParameters) throws IOException { super.bind(factoryBindingParameters); - this.s3SeekableInputStreamFactory = new S3SeekableInputStreamFactory( - new S3SdkObjectClient(callbacks().getOrCreateAsyncClient(requireCrt)), - seekableInputStreamConfiguration); + this.s3SeekableInputStreamFactory = new LazyAutoCloseableReference<>(createS3SeekableInputStreamFactory()); + } @Override public ObjectInputStream readObject(final ObjectReadParameters parameters) throws IOException { return new AnalyticsStream( parameters, - s3SeekableInputStreamFactory); + getOrCreateS3SeekableInputStreamFactory()); } - - - + @Override public InputStreamType streamType() { return InputStreamType.Analytics; @@ -95,5 +96,15 @@ public StreamFactoryRequirements factoryRequirements() { 0, vectorContext); } + private S3SeekableInputStreamFactory getOrCreateS3SeekableInputStreamFactory() + throws IOException { + return s3SeekableInputStreamFactory.eval(); + } + + private CallableRaisingIOE<S3SeekableInputStreamFactory> createS3SeekableInputStreamFactory() { + return () -> new S3SeekableInputStreamFactory( + new S3SdkObjectClient(callbacks().getOrCreateAsyncClient(requireCrt)), + seekableInputStreamConfiguration); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java index 725e54c7d85..eadc398e61a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java @@ -33,7 +33,6 @@ import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS; import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE; import static org.apache.hadoop.fs.s3a.S3ATestUtils.streamType; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** @@ -60,11 +59,7 @@ protected Configuration createConfiguration() { public void testRequesterPaysOptionSuccess() throws Throwable { describe("Test requester pays enabled case by reading last then first byte"); skipIfClientSideEncryption(); - // Analytics accelerator currently does not support IOStatistics which leads to the - // STREAM_READ_OPENED assertion to fail, this will be added as part of - // https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "Analytics Accelerator currently does not support IOStatistics"); + Configuration conf = this.createConfiguration(); conf.setBoolean(ALLOW_REQUESTER_PAYS, true); // Enable bucket exists check, the first failure point people may encounter diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index bf804415133..f1823330e7a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -590,8 +590,8 @@ public static void skipIfAnalyticsAcceleratorEnabled( } public static boolean isAnalyticsAcceleratorEnabled(final Configuration conf) { - return conf.getEnum(INPUT_STREAM_TYPE, - InputStreamType.Classic) == InputStreamType.Analytics; + return conf.get(INPUT_STREAM_TYPE, + INPUT_STREAM_TYPE_CLASSIC).equals(INPUT_STREAM_TYPE_ANALYTICS); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org