This is an automated email from the ASF dual-hosted git repository.

ahmar pushed a commit to branch feature-HADOOP-19363-analytics-accelerator-s3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 3828ad54469c869448718d1ea02de6eb36112d28
Author: Ahmar Suhail <ahma...@amazon.co.uk>
AuthorDate: Wed Feb 12 11:41:08 2025 +0000

    lazy eval of stream factory + test fixes
---
 .../s3a/impl/streams/AnalyticsStreamFactory.java   | 27 +++++++++++++++-------
 .../hadoop/fs/s3a/ITestS3ARequesterPays.java       |  7 +-----
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java     |  4 ++--
 3 files changed, 22 insertions(+), 16 deletions(-)

diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
index 0ec6713d9ca..611a791f9d7 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
@@ -21,7 +21,11 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.s3a.VectoredIOContext;
+import org.apache.hadoop.util.functional.CallableRaisingIOE;
+import org.apache.hadoop.util.functional.LazyAutoCloseableReference;
 
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
 import 
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
 import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
@@ -39,7 +43,7 @@
 public class AnalyticsStreamFactory extends AbstractObjectInputStreamFactory {
 
     private S3SeekableInputStreamConfiguration 
seekableInputStreamConfiguration;
-    private S3SeekableInputStreamFactory s3SeekableInputStreamFactory;
+    private LazyAutoCloseableReference<S3SeekableInputStreamFactory>  
s3SeekableInputStreamFactory;
     private boolean requireCrt;
 
     public AnalyticsStreamFactory() {
@@ -59,20 +63,17 @@ protected void serviceInit(final Configuration conf) throws 
Exception {
     @Override
     public void bind(final FactoryBindingParameters factoryBindingParameters) 
throws IOException {
         super.bind(factoryBindingParameters);
-        this.s3SeekableInputStreamFactory = new S3SeekableInputStreamFactory(
-                new 
S3SdkObjectClient(callbacks().getOrCreateAsyncClient(requireCrt)),
-                seekableInputStreamConfiguration);
+        this.s3SeekableInputStreamFactory = new 
LazyAutoCloseableReference<>(createS3SeekableInputStreamFactory());
+
     }
 
     @Override
     public ObjectInputStream readObject(final ObjectReadParameters parameters) 
throws IOException {
         return new AnalyticsStream(
                 parameters,
-                s3SeekableInputStreamFactory);
+                getOrCreateS3SeekableInputStreamFactory());
     }
-
-
-
+    
     @Override
     public InputStreamType streamType() {
         return InputStreamType.Analytics;
@@ -95,5 +96,15 @@ public StreamFactoryRequirements factoryRequirements() {
             0, vectorContext);
     }
 
+    private S3SeekableInputStreamFactory 
getOrCreateS3SeekableInputStreamFactory()
+        throws IOException {
+       return s3SeekableInputStreamFactory.eval();
+    }
+
+    private CallableRaisingIOE<S3SeekableInputStreamFactory> 
createS3SeekableInputStreamFactory() {
+        return () -> new S3SeekableInputStreamFactory(
+            new 
S3SdkObjectClient(callbacks().getOrCreateAsyncClient(requireCrt)),
+            seekableInputStreamConfiguration);
+    }
 
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java
index 725e54c7d85..eadc398e61a 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java
@@ -33,7 +33,6 @@
 import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS;
 import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.streamType;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
@@ -60,11 +59,7 @@ protected Configuration createConfiguration() {
   public void testRequesterPaysOptionSuccess() throws Throwable {
     describe("Test requester pays enabled case by reading last then first 
byte");
     skipIfClientSideEncryption();
-    // Analytics accelerator currently does not support IOStatistics which 
leads to the
-    // STREAM_READ_OPENED assertion to fail, this will be added as part of
-    // https://issues.apache.org/jira/browse/HADOOP-19364
-    skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
-        "Analytics Accelerator currently does not support IOStatistics");
+
     Configuration conf = this.createConfiguration();
     conf.setBoolean(ALLOW_REQUESTER_PAYS, true);
     // Enable bucket exists check, the first failure point people may encounter
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index bf804415133..f1823330e7a 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -590,8 +590,8 @@ public static void skipIfAnalyticsAcceleratorEnabled(
   }
 
   public static boolean isAnalyticsAcceleratorEnabled(final Configuration 
conf) {
-    return conf.getEnum(INPUT_STREAM_TYPE,
-            InputStreamType.Classic) == InputStreamType.Analytics;
+    return conf.get(INPUT_STREAM_TYPE,
+        INPUT_STREAM_TYPE_CLASSIC).equals(INPUT_STREAM_TYPE_ANALYTICS);
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to