This is an automated email from the ASF dual-hosted git repository.

ahmar pushed a commit to branch feature-HADOOP-19363-analytics-accelerator-s3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to 
refs/heads/feature-HADOOP-19363-analytics-accelerator-s3 by this push:
     new 71ff4802507 Integrate analytics-accelerator with factory (#7332)
71ff4802507 is described below

commit 71ff4802507c3390e3f53742b21629238bcc5dbf
Author: rajdchak <rajdc...@amazon.co.uk>
AuthorDate: Tue Jan 28 13:27:17 2025 +0000

    Integrate analytics-accelerator with factory (#7332)
---
 hadoop-tools/hadoop-aws/pom.xml                    |  5 --
 .../java/org/apache/hadoop/fs/s3a/Constants.java   | 26 --------
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java    | 62 +++--------------
 .../apache/hadoop/fs/s3a/impl/S3AStoreImpl.java    |  5 +-
 .../streams/AbstractObjectInputStreamFactory.java  |  2 +-
 .../streams/AnalyticsStream.java}                  | 47 ++++++++-----
 .../s3a/impl/streams/AnalyticsStreamFactory.java   | 77 ++++++++++++++++++++++
 .../fs/s3a/impl/streams/InputStreamType.java       |  6 +-
 .../s3a/impl/streams/ObjectInputStreamFactory.java |  3 +-
 .../fs/s3a/impl/streams/StreamIntegration.java     |  9 ++-
 .../fs/s3a/ITestS3APrefetchingCacheFiles.java      |  2 -
 .../fs/s3a/ITestS3APrefetchingInputStream.java     |  5 +-
 .../fs/s3a/ITestS3APrefetchingLruEviction.java     |  2 -
 .../hadoop/fs/s3a/ITestS3AS3SeekableStream.java    | 55 +++++-----------
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java     |  5 +-
 .../fs/s3a/commit/ITestCommitOperationCost.java    |  7 +-
 .../fs/s3a/commit/ITestS3ACommitterFactory.java    |  2 +-
 .../s3a/commit/magic/ITestMagicCommitProtocol.java |  2 +-
 .../statistics/ITestS3AFileSystemStatistic.java    |  2 +-
 19 files changed, 160 insertions(+), 164 deletions(-)

diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index a6f56d419d1..f9551e7a7c9 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -478,11 +478,6 @@
       <version>0.0.2</version>
       <scope>compile</scope>
     </dependency>
-    <dependency>
-      <groupId>software.amazon.awssdk.crt</groupId>
-      <artifactId>aws-crt</artifactId>
-      <version>0.29.10</version>
-    </dependency>
     <dependency>
       <groupId>org.assertj</groupId>
       <artifactId>assertj-core</artifactId>
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 8cb9b0417b6..60154f4e753 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -1818,30 +1818,4 @@ private Constants() {
   public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX =
           "fs.s3a.analytics.accelerator";
 
-  /**
-   * Config to enable Analytics Accelerator Library for Amazon S3.
-   * https://github.com/awslabs/analytics-accelerator-s3
-   */
-  public static final String ANALYTICS_ACCELERATOR_ENABLED_KEY =
-          ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + ".enabled";
-
-  /**
-   * Config to enable usage of crt client with Analytics Accelerator Library.
-   * It is by default true.
-   */
-  public static final String ANALYTICS_ACCELERATOR_CRT_ENABLED =
-          "fs.s3a.analytics.accelerator.crt.client";
-
-  /**
-   * Default value for {@link #ANALYTICS_ACCELERATOR_ENABLED_KEY }
-   * Value {@value}.
-   */
-  public static final boolean ANALYTICS_ACCELERATOR_ENABLED_DEFAULT = false;
-
-  /**
-   * Default value for {@link #ANALYTICS_ACCELERATOR_CRT_ENABLED }
-   * Value {@value}.
-   */
-  public static final boolean ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT = true;
-
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 6d62dd0bf69..652d5f8aa56 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -51,10 +51,9 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
 import software.amazon.awssdk.core.exception.SdkException;
-import software.amazon.awssdk.services.s3.S3AsyncClient;
 import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
 import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
 import 
software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
 import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
@@ -85,11 +84,6 @@
 import software.amazon.awssdk.transfer.s3.model.Copy;
 import software.amazon.awssdk.transfer.s3.model.CopyRequest;
 
-import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
-import 
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
-import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
-import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
-
 import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -316,13 +310,6 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
    */
   private S3Client s3Client;
 
-  /**
-   * CRT-Based S3Client created of analytics accelerator library is enabled
-   * and managed by the S3AStoreImpl. Analytics accelerator library can be
-   * enabled with {@link Constants#ANALYTICS_ACCELERATOR_ENABLED_KEY}
-   */
-  private S3AsyncClient s3AsyncClient;
-
   // initial callback policy is fail-once; it's there just to assist
   // some mock tests and other codepaths trying to call the low level
   // APIs on an uninitialized filesystem.
@@ -352,8 +339,6 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
   // If true, S3SeekableInputStream from Analytics Accelerator for Amazon S3 
will be used.
   private boolean analyticsAcceleratorEnabled;
 
-  private boolean analyticsAcceleratorCRTEnabled;
-
   private int executorCapacity;
   private long multiPartThreshold;
   public static final Logger LOG = 
LoggerFactory.getLogger(S3AFileSystem.class);
@@ -522,11 +507,6 @@ public class S3AFileSystem extends FileSystem implements 
StreamCapabilities,
    */
   private boolean s3AccessGrantsEnabled;
 
-  /**
-   * Factory to create S3SeekableInputStream if {@link 
this#analyticsAcceleratorEnabled} is true.
-   */
-  private S3SeekableInputStreamFactory s3SeekableInputStreamFactory;
-
   /** Add any deprecated keys. */
   @SuppressWarnings("deprecation")
   private static void addDeprecatedKeys() {
@@ -673,16 +653,12 @@ public void initialize(URI name, Configuration 
originalConf)
       dirOperationsPurgeUploads = 
conf.getBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS,
           s3ExpressStore);
 
-      this.analyticsAcceleratorEnabled =
-          conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, 
ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
-      this.analyticsAcceleratorCRTEnabled =
-          conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED,
-              ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT);
+        this.analyticsAcceleratorEnabled = conf.getEnum(INPUT_STREAM_TYPE, 
InputStreamType.DEFAULT_STREAM_TYPE) == InputStreamType.Analytics;
 
       this.isMultipartUploadEnabled = 
conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
               DEFAULT_MULTIPART_UPLOAD_ENABLED);
 
-      if(this.analyticsAcceleratorEnabled && !analyticsAcceleratorCRTEnabled) {
+      if(this.analyticsAcceleratorEnabled) {
         // Temp change: Analytics Accelerator with S3AsyncClient do not 
support Multi-part upload.
         this.isMultipartUploadEnabled = false;
       }
@@ -803,27 +779,6 @@ public void initialize(URI name, Configuration 
originalConf)
       int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, 
DEFAULT_S3A_IO_RATE_LIMIT, 0);
       // now create and initialize the store
       store = createS3AStore(clientManager, rateLimitCapacity);
-
-      if (this.analyticsAcceleratorEnabled) {
-        LOG.info("Using S3SeekableInputStream");
-        if(this.analyticsAcceleratorCRTEnabled) {
-          LOG.info("Using S3 CRT client for analytics accelerator S3");
-          this.s3AsyncClient = 
S3CrtAsyncClient.builder().maxConcurrency(600).build();
-        } else {
-          LOG.info("Using S3 async client for analytics accelerator S3");
-          this.s3AsyncClient = store.getOrCreateAsyncClient();
-        }
-
-        ConnectorConfiguration configuration = new ConnectorConfiguration(conf,
-            ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
-        S3SeekableInputStreamConfiguration seekableInputStreamConfiguration =
-            
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
-        this.s3SeekableInputStreamFactory =
-            new S3SeekableInputStreamFactory(
-                new S3SdkObjectClient(this.s3AsyncClient),
-                seekableInputStreamConfiguration);
-      }
-
       // the s3 client is created through the store, rather than
       // directly through the client manager.
       // this is to aid mocking.
@@ -1909,7 +1864,7 @@ private FSDataInputStream executeOpen(
     final S3AFileStatus fileStatus =
         trackDuration(inputStreamStats,
             ACTION_FILE_OPENED.getSymbol(), () ->
-                extractOrFetchSimpleFileStatus(path, fileInformation));
+            extractOrFetchSimpleFileStatus(path, fileInformation));
     S3AReadOpContext readContext = createReadContext(
         fileStatus,
         auditSpan);
@@ -1933,7 +1888,7 @@ private FSDataInputStream executeOpen(
         true,
         inputStreamStats);
 
-    // do not validate() the parameters as the store
+      // do not validate() the parameters as the store
     // completes this.
     ObjectReadParameters parameters = new ObjectReadParameters()
         .withBoundedThreadPool(pool)
@@ -1941,7 +1896,7 @@ private FSDataInputStream executeOpen(
         .withContext(readContext.build())
         .withObjectAttributes(createObjectAttributes(path, fileStatus))
         .withStreamStatistics(inputStreamStats);
-    return new FSDataInputStream(getStore().readObject(parameters));
+      return new FSDataInputStream(getStore().readObject(parameters));
 
   }
 
@@ -1954,6 +1909,7 @@ private ObjectInputStreamCallbacks 
createInputStreamCallbacks(
     return new InputStreamCallbacksImpl(auditSpan, getStore(), fsHandler, 
unboundedThreadPool);
   }
 
+
   /**
    * Callbacks for WriteOperationHelper.
    */
@@ -4238,7 +4194,7 @@ PutObjectResponse executePut(
       throws IOException {
     String key = putObjectRequest.key();
     ProgressableProgressListener listener =
-        new ProgressableProgressListener(store, key, progress);
+        new ProgressableProgressListener(getStore(), key, progress);
     UploadInfo info = putObject(putObjectRequest, file, listener);
     PutObjectResponse result = getStore().waitForUploadCompletion(key, 
info).response();
     listener.uploadCompleted(info.getFileUpload());
@@ -4338,8 +4294,6 @@ protected synchronized void stopAllServices() {
         closeAutocloseables(LOG, getStore());
         store = null;
         s3Client = null;
-        s3AsyncClient = null;
-        s3SeekableInputStreamFactory = null;
 
         // At this point the S3A client is shut down,
         // now the executor pools are closed
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
index a432c92fdde..f5732c52279 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java
@@ -89,8 +89,7 @@
 import org.apache.hadoop.util.functional.Tuples;
 
 import static java.util.Objects.requireNonNull;
-import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
-import static org.apache.hadoop.fs.s3a.Constants.HADOOP_TMP_DIR;
+import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.extractException;
 import static org.apache.hadoop.fs.s3a.S3AUtils.getPutRequestLength;
 import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException;
@@ -947,7 +946,7 @@ public File createTemporaryFileForWriting(String pathStr,
    * All stream factory initialization required after {@code Service.init()},
    * after all other services have themselves been initialized.
    */
-  private void finishStreamFactoryInit() {
+  private void finishStreamFactoryInit() throws Exception {
     // must be on be invoked during service initialization
     Preconditions.checkState(isInState(STATE.INITED),
         "Store is in wrong state: %s", getServiceState());
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java
index 7c20f7d66f6..cd955ce7535 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java
@@ -48,7 +48,7 @@ protected AbstractObjectInputStreamFactory(final String name) 
{
    * @param factoryCallbacks callbacks needed by the factories.
    */
   @Override
-  public void bind(final StreamFactoryCallbacks factoryCallbacks) {
+  public void bind(final StreamFactoryCallbacks factoryCallbacks) throws 
Exception {
     // must be on be invoked during service initialization
     Preconditions.checkState(isInState(STATE.INITED),
         "Input Stream factory %s is in wrong state: %s",
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
similarity index 80%
rename from 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java
rename to 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
index ef6a2990815..50b9cde8d23 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
@@ -17,35 +17,34 @@
  * under the License.
  */
 
-package org.apache.hadoop.fs.s3a;
+package org.apache.hadoop.fs.s3a.impl.streams;
 
 import java.io.EOFException;
 import java.io.IOException;
 
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.StreamCapabilities;
+import org.apache.hadoop.fs.s3a.Retries;
+import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hadoop.fs.FSInputStream;
-
 import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
-import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
 import software.amazon.s3.analyticsaccelerator.util.S3URI;
 
-public class S3ASeekableStream extends FSInputStream implements 
StreamCapabilities {
+public class AnalyticsStream extends ObjectInputStream implements 
StreamCapabilities {
 
   private S3SeekableInputStream inputStream;
   private long lastReadCurrentPos = 0;
-  private final String key;
   private volatile boolean closed;
 
-  public static final Logger LOG = 
LoggerFactory.getLogger(S3ASeekableStream.class);
+  public static final Logger LOG = 
LoggerFactory.getLogger(AnalyticsStream.class);
 
-  public S3ASeekableStream(String bucket, String key,
-                           S3SeekableInputStreamFactory 
s3SeekableInputStreamFactory) {
-    this.inputStream = 
s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key));
-    this.key = key;
+  public AnalyticsStream(final ObjectReadParameters parameters, final 
S3SeekableInputStreamFactory s3SeekableInputStreamFactory) {
+    super(parameters);
+    S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
+    this.inputStream = 
s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), 
s3Attributes.getKey()));
   }
 
   /**
@@ -139,6 +138,24 @@ public int available() throws IOException {
     return super.available();
   }
 
+  @Override
+  protected boolean isStreamOpen() {
+    return !isClosed();
+  }
+
+  protected boolean isClosed() {
+    return inputStream == null;
+  }
+
+  @Override
+  protected void abortInFinalizer() {
+    try {
+      close();
+    } catch (IOException ignored) {
+
+    }
+  }
+
   @Override
   public synchronized void close() throws IOException {
     if(!closed) {
@@ -148,7 +165,7 @@ public synchronized void close() throws IOException {
         inputStream = null;
         super.close();
       } catch (IOException ioe) {
-        LOG.debug("Failure closing stream {}: ", key);
+        LOG.debug("Failure closing stream {}: ", getKey());
         throw ioe;
       }
     }
@@ -165,11 +182,11 @@ private void onReadFailure(IOException ioe) throws 
IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Got exception while trying to read from stream {}, " +
               "not trying to recover:",
-          key, ioe);
+              getKey(), ioe);
     } else {
       LOG.info("Got exception while trying to read from stream {}, " +
               "not trying to recover:",
-          key, ioe);
+              getKey(), ioe);
     }
     this.close();
   }
@@ -177,7 +194,7 @@ private void onReadFailure(IOException ioe) throws 
IOException {
 
   protected void throwIfClosed() throws IOException {
     if (closed) {
-      throw new IOException(key + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
+      throw new IOException(getKey() + ": " + 
FSExceptionMessages.STREAM_IS_CLOSED);
     }
   }
 }
\ No newline at end of file
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
new file mode 100644
index 00000000000..03e10449a92
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import org.apache.hadoop.conf.Configuration;
+import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
+import 
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+public class AnalyticsStreamFactory extends AbstractObjectInputStreamFactory {
+
+    private S3SeekableInputStreamConfiguration 
seekableInputStreamConfiguration;
+    private S3SeekableInputStreamFactory s3SeekableInputStreamFactory;
+    private boolean requireCrt;
+
+    public AnalyticsStreamFactory() {
+        super("AnalyticsStreamFactory");
+    }
+
+    @Override
+    protected void serviceInit(final Configuration conf) throws Exception {
+        super.serviceInit(conf);
+        ConnectorConfiguration configuration = new ConnectorConfiguration(conf,
+                ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+        this.seekableInputStreamConfiguration =
+                
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
+        this.requireCrt = false;
+    }
+
+    @Override
+    public void bind(final StreamFactoryCallbacks factoryCallbacks) throws 
Exception {
+        super.bind(factoryCallbacks);
+        this.s3SeekableInputStreamFactory = new S3SeekableInputStreamFactory(
+                new 
S3SdkObjectClient(callbacks().getOrCreateAsyncClient(requireCrt)),
+                seekableInputStreamConfiguration);
+    }
+
+    @Override
+    public ObjectInputStream readObject(final ObjectReadParameters parameters) 
throws IOException {
+        return new AnalyticsStream(
+                parameters,
+                s3SeekableInputStreamFactory);
+    }
+
+    /**
+     * Get the number of background threads required for this factory.
+     * @return the count of background threads.
+     */
+    @Override
+    public StreamThreadOptions threadRequirements() {
+        return new StreamThreadOptions(0, 0, false, false);
+    }
+
+
+}
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java
index 4ca9a6305a2..6aab7e531c6 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java
@@ -40,13 +40,11 @@ public enum InputStreamType {
    */
   Prefetch("prefetch", c ->
       new PrefetchingInputStreamFactory()),
-
   /**
    * The analytics input stream.
    */
-  Analytics("analytics", c -> {
-    throw new IllegalArgumentException("not yet supported");
-  });
+  Analytics("analytics", c ->
+      new AnalyticsStreamFactory());
 
   /**
    * Name.
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java
index d8fe87f9cf7..bc3b52ba31d 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java
@@ -45,8 +45,9 @@ public interface ObjectInputStreamFactory
    * This MUST ONLY be invoked between {@code init()}
    * and {@code start()}.
    * @param callbacks extra initialization parameters
+   * @throws Exception on encountering exception
    */
-  void bind(StreamFactoryCallbacks callbacks);
+  void bind(StreamFactoryCallbacks callbacks) throws Exception;
 
   /**
    * Create a new input stream.
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java
index dfe2efbb97c..18e2e46856a 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java
@@ -36,6 +36,8 @@ public final class StreamIntegration {
       LoggerFactory.getLogger(
           "org.apache.hadoop.conf.Configuration.deprecation");
 
+  public static final Logger LOG = 
LoggerFactory.getLogger(StreamIntegration.class);
+
   /**
    * Warn once on use of prefetch boolean flag rather than enum.
    */
@@ -53,7 +55,12 @@ public static ObjectInputStreamFactory 
createStreamFactory(final Configuration c
     // work out the default stream; this includes looking at the
     // deprecated prefetch enabled key to see if it is set.
     InputStreamType defaultStream = InputStreamType.DEFAULT_STREAM_TYPE;
-    if (conf.getBoolean(PREFETCH_ENABLED_KEY, false)) {
+
+    if(conf.getEnum(INPUT_STREAM_TYPE, InputStreamType.DEFAULT_STREAM_TYPE) == 
InputStreamType.Analytics) {
+      LOG.info("Using AnalyticsStream");
+      defaultStream = InputStreamType.Analytics;
+
+    } else if (conf.getBoolean(PREFETCH_ENABLED_KEY, false)) {
 
       // prefetch enabled, warn (once) then change it to be the default.
       WARN_PREFETCH_KEY.info("Using {} is deprecated: choose the appropriate 
stream in {}",
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java
index 0bf7752e438..8d886c752ab 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java
@@ -90,8 +90,6 @@ public Configuration createConfiguration() {
     final String bufferDirBase = configuration.get(BUFFER_DIR);
     bufferDir = bufferDirBase + "/" + UUID.randomUUID();
     configuration.set(BUFFER_DIR, bufferDir);
-    // When both Prefetching and Analytics Accelerator enabled Analytics 
Accelerator is used
-    configuration.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, false);
     return configuration;
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
index fbe7e7d0adb..d894adb66c7 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
@@ -34,7 +34,7 @@
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.test.LambdaTestUtils;
 
-import static org.apache.hadoop.fs.s3a.Constants.*;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetching;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
@@ -71,14 +71,11 @@ public class ITestS3APrefetchingInputStream extends 
AbstractS3ACostTest {
   private static final int INTERVAL_MILLIS = 500;
   private static final int BLOCK_SIZE = S_1K * 10;
 
-
   @Override
   public Configuration createConfiguration() {
     Configuration conf = enablePrefetching(super.createConfiguration());
     S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
     conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
-    // When both Prefetching and Analytics Accelerator enabled Analytics 
Accelerator is used
-    conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, false);
     return conf;
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java
index b71cc43d897..d43953dfe82 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java
@@ -90,8 +90,6 @@ public Configuration createConfiguration() {
         PREFETCH_BLOCK_SIZE_KEY);
     conf.setInt(PREFETCH_MAX_BLOCKS_COUNT, Integer.parseInt(maxBlocks));
     conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
-    // When both Prefetching and Analytics Accelerator enabled Analytics 
Accelerator is used
-    conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, false);
     return conf;
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java
index c6ecee95051..6ee6d6c6e11 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java
@@ -28,11 +28,11 @@
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
-import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_ENABLED_KEY;
-import static 
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CRT_ENABLED;
+import static org.apache.hadoop.fs.s3a.Constants.*;
 import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
 
+import org.assertj.core.api.Assertions;
+
 import 
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
 import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
 import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
@@ -42,13 +42,13 @@ public class ITestS3AS3SeekableStream extends 
AbstractS3ATestBase {
   private static final String PHYSICAL_IO_PREFIX = "physicalio";
   private static final String LOGICAL_IO_PREFIX = "logicalio";
 
-  public void testConnectorFrameWorkIntegration(boolean useCrtClient) throws 
IOException {
+  @Test
+  public void testConnectorFrameWorkIntegration() throws IOException {
     describe("Verify S3 connector framework integration");
 
     Configuration conf = getConfiguration();
-    removeBaseAndBucketOverrides(conf, ANALYTICS_ACCELERATOR_ENABLED_KEY);
-    conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, true);
-    conf.setBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, useCrtClient);
+    removeBaseAndBucketOverrides(conf, INPUT_STREAM_TYPE);
+    conf.set(INPUT_STREAM_TYPE, "Analytics");
 
     String testFile = "s3a://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz";
     S3AFileSystem s3AFileSystem =
@@ -63,16 +63,7 @@ public void testConnectorFrameWorkIntegration(boolean 
useCrtClient) throws IOExc
   }
 
   @Test
-  public void testConnectorFrameWorkIntegrationWithCrtClient() throws 
IOException {
-    testConnectorFrameWorkIntegration(true);
-  }
-
-  @Test
-  public void testConnectorFrameWorkIntegrationWithoutCrtClient() throws 
IOException {
-    testConnectorFrameWorkIntegration(false);
-  }
-
-  public void testConnectorFrameworkConfigurable(boolean useCrtClient) {
+  public void testConnectorFrameworkConfigurable() {
     describe("Verify S3 connector framework reads configuration");
 
     Configuration conf = getConfiguration();
@@ -86,33 +77,23 @@ public void testConnectorFrameworkConfigurable(boolean 
useCrtClient) {
     conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
         "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1);
 
-    conf.setBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, useCrtClient);
-
     ConnectorConfiguration connectorConfiguration =
         new ConnectorConfiguration(conf, 
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
 
     S3SeekableInputStreamConfiguration configuration =
         
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration);
 
-    assertSame("S3ASeekableStream configuration is not set to expected value",
-        PrefetchMode.ALL, 
configuration.getLogicalIOConfiguration().getPrefetchingMode());
-
-    assertEquals("S3ASeekableStream configuration is not set to expected 
value",
-        1, configuration.getPhysicalIOConfiguration().getBlobStoreCapacity());
-  }
+    
Assertions.assertThat(configuration.getLogicalIOConfiguration().getPrefetchingMode())
+            .as("AnalyticsStream configuration is not set to expected value")
+            .isSameAs(PrefetchMode.ALL);
 
-  @Test
-  public void testConnectorFrameworkConfigurableWithoutCrtClient() throws 
IOException {
-    testConnectorFrameworkConfigurable(false);
-  }
-
-  @Test
-  public void testConnectorFrameworkConfigurableWithCrtClient() throws 
IOException {
-    testConnectorFrameworkConfigurable(true);
+    
Assertions.assertThat(configuration.getPhysicalIOConfiguration().getBlobStoreCapacity())
+            .as("AnalyticsStream configuration is not set to expected value")
+            .isEqualTo(1);
   }
 
   @Test
-  public void testInvalidConfigurationThrows() {
+  public void testInvalidConfigurationThrows() throws Exception {
     describe("Verify S3 connector framework throws with invalid 
configuration");
 
     Configuration conf = getConfiguration();
@@ -123,8 +104,8 @@ public void testInvalidConfigurationThrows() {
 
     ConnectorConfiguration connectorConfiguration =
         new ConnectorConfiguration(conf, 
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
-    assertThrows("S3ASeekableStream illegal configuration does not throw",
-        IllegalArgumentException.class, () ->
-            
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
+    Assertions.assertThatExceptionOfType(IllegalArgumentException.class)
+            .isThrownBy(() ->
+                    
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
   }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 4159ff2f9b2..28fba0e5ddc 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -45,6 +45,7 @@
 import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
 import org.apache.hadoop.fs.s3a.impl.StoreContext;
 import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
+import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
 import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
 import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
 import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
@@ -587,8 +588,8 @@ public static void skipIfAnalyticsAcceleratorEnabled(
   }
 
   public static boolean isAnalyticsAcceleratorEnabled(final Configuration 
conf) {
-    return conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY,
-        ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
+    return conf.getEnum(INPUT_STREAM_TYPE,
+            InputStreamType.DEFAULT_STREAM_TYPE) == InputStreamType.Analytics;
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
index fac461371e6..515313b009d 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
@@ -168,11 +168,10 @@ private void abortActiveStream() throws IOException {
 
   @Test
   public void testCostOfCreatingMagicFile() throws Throwable {
-    describe("Files created under magic paths skip existence checks");
+    describe("Files created under magic paths skip existence checks and marker 
deletes");
 
     skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
-        "S3ASeekableInputStream does not support InputStreamStatistics");
-
+        "AnalyticsStream does not support InputStreamStatistics");
     S3AFileSystem fs = getFileSystem();
     Path destFile = methodSubPath("file.txt");
     fs.delete(destFile.getParent(), true);
@@ -251,7 +250,7 @@ public void testCostOfSavingLoadingPendingFile() throws 
Throwable {
     describe("Verify costs of saving .pending file under a magic path");
 
     skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
-        "S3ASeekableInputStream does not support InputStreamStatistics");
+        "AnalyticsStream does not support InputStreamStatistics");
     S3AFileSystem fs = getFileSystem();
     Path partDir = methodSubPath("file.pending");
     Path destFile = new Path(partDir, "file.pending");
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java
index a500bfb76a3..9057d1e366c 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java
@@ -184,7 +184,7 @@ public void setup() throws Exception {
     FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
     super.setup();
     skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
-        "S3ASeekableInputStream does not support InputStreamStatistics");
+        "AnalyticsStream does not support InputStreamStatistics");
     jobId = randomJobId();
     attempt0 = "attempt_" + jobId + "_m_000000_0";
     taskAttempt0 = TaskAttemptID.forName(attempt0);
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
index b89740ae312..542f6f2b7c7 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
@@ -79,7 +79,7 @@ protected String getCommitterName() {
   public void setup() throws Exception {
     super.setup();
     skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
-        "S3ASeekableInputStream does not support InputStreamStatistics");
+        "AnalyticsStream does not support InputStreamStatistics");
     CommitUtils.verifyIsMagicCommitFS(getFileSystem());
   }
 
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
index 5a3f7bb8fdb..fb1cfb781e7 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
@@ -44,7 +44,7 @@ public class ITestS3AFileSystemStatistic extends 
AbstractS3ATestBase {
    */
   @Test
   public void testBytesReadWithStream() throws IOException {
-    // Assertions will fail as {@link S3ASeekableInputStream}
+    // Assertions will fail as {@link AnalyticsStream}
     // do not support S3AFileSystemStatistics yet.
     skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
         "S3SeekableStream does not support File System Statistics");


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org


Reply via email to