This is an automated email from the ASF dual-hosted git repository. ahmar pushed a commit to branch feature-HADOOP-19363-analytics-accelerator-s3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/feature-HADOOP-19363-analytics-accelerator-s3 by this push: new 71ff4802507 Integrate analytics-accelerator with factory (#7332) 71ff4802507 is described below commit 71ff4802507c3390e3f53742b21629238bcc5dbf Author: rajdchak <rajdc...@amazon.co.uk> AuthorDate: Tue Jan 28 13:27:17 2025 +0000 Integrate analytics-accelerator with factory (#7332) --- hadoop-tools/hadoop-aws/pom.xml | 5 -- .../java/org/apache/hadoop/fs/s3a/Constants.java | 26 -------- .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 62 +++-------------- .../apache/hadoop/fs/s3a/impl/S3AStoreImpl.java | 5 +- .../streams/AbstractObjectInputStreamFactory.java | 2 +- .../streams/AnalyticsStream.java} | 47 ++++++++----- .../s3a/impl/streams/AnalyticsStreamFactory.java | 77 ++++++++++++++++++++++ .../fs/s3a/impl/streams/InputStreamType.java | 6 +- .../s3a/impl/streams/ObjectInputStreamFactory.java | 3 +- .../fs/s3a/impl/streams/StreamIntegration.java | 9 ++- .../fs/s3a/ITestS3APrefetchingCacheFiles.java | 2 - .../fs/s3a/ITestS3APrefetchingInputStream.java | 5 +- .../fs/s3a/ITestS3APrefetchingLruEviction.java | 2 - .../hadoop/fs/s3a/ITestS3AS3SeekableStream.java | 55 +++++----------- .../org/apache/hadoop/fs/s3a/S3ATestUtils.java | 5 +- .../fs/s3a/commit/ITestCommitOperationCost.java | 7 +- .../fs/s3a/commit/ITestS3ACommitterFactory.java | 2 +- .../s3a/commit/magic/ITestMagicCommitProtocol.java | 2 +- .../statistics/ITestS3AFileSystemStatistic.java | 2 +- 19 files changed, 160 insertions(+), 164 deletions(-) diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index a6f56d419d1..f9551e7a7c9 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -478,11 +478,6 @@ <version>0.0.2</version> <scope>compile</scope> </dependency> - <dependency> - <groupId>software.amazon.awssdk.crt</groupId> - <artifactId>aws-crt</artifactId> - <version>0.29.10</version> - </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 8cb9b0417b6..60154f4e753 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1818,30 +1818,4 @@ private Constants() { public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX = "fs.s3a.analytics.accelerator"; - /** - * Config to enable Analytics Accelerator Library for Amazon S3. - * https://github.com/awslabs/analytics-accelerator-s3 - */ - public static final String ANALYTICS_ACCELERATOR_ENABLED_KEY = - ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + ".enabled"; - - /** - * Config to enable usage of crt client with Analytics Accelerator Library. - * It is by default true. - */ - public static final String ANALYTICS_ACCELERATOR_CRT_ENABLED = - "fs.s3a.analytics.accelerator.crt.client"; - - /** - * Default value for {@link #ANALYTICS_ACCELERATOR_ENABLED_KEY } - * Value {@value}. - */ - public static final boolean ANALYTICS_ACCELERATOR_ENABLED_DEFAULT = false; - - /** - * Default value for {@link #ANALYTICS_ACCELERATOR_CRT_ENABLED } - * Value {@value}. - */ - public static final boolean ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT = true; - } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 6d62dd0bf69..652d5f8aa56 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -51,10 +51,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; +import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType; import software.amazon.awssdk.core.exception.SdkException; -import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest; @@ -85,11 +84,6 @@ import software.amazon.awssdk.transfer.s3.model.Copy; import software.amazon.awssdk.transfer.s3.model.CopyRequest; -import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient; -import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration; -import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; -import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration; - import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -316,13 +310,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private S3Client s3Client; - /** - * CRT-Based S3Client created of analytics accelerator library is enabled - * and managed by the S3AStoreImpl. Analytics accelerator library can be - * enabled with {@link Constants#ANALYTICS_ACCELERATOR_ENABLED_KEY} - */ - private S3AsyncClient s3AsyncClient; - // initial callback policy is fail-once; it's there just to assist // some mock tests and other codepaths trying to call the low level // APIs on an uninitialized filesystem. @@ -352,8 +339,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, // If true, S3SeekableInputStream from Analytics Accelerator for Amazon S3 will be used. private boolean analyticsAcceleratorEnabled; - private boolean analyticsAcceleratorCRTEnabled; - private int executorCapacity; private long multiPartThreshold; public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class); @@ -522,11 +507,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities, */ private boolean s3AccessGrantsEnabled; - /** - * Factory to create S3SeekableInputStream if {@link this#analyticsAcceleratorEnabled} is true. - */ - private S3SeekableInputStreamFactory s3SeekableInputStreamFactory; - /** Add any deprecated keys. */ @SuppressWarnings("deprecation") private static void addDeprecatedKeys() { @@ -673,16 +653,12 @@ public void initialize(URI name, Configuration originalConf) dirOperationsPurgeUploads = conf.getBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS, s3ExpressStore); - this.analyticsAcceleratorEnabled = - conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, ANALYTICS_ACCELERATOR_ENABLED_DEFAULT); - this.analyticsAcceleratorCRTEnabled = - conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, - ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT); + this.analyticsAcceleratorEnabled = conf.getEnum(INPUT_STREAM_TYPE, InputStreamType.DEFAULT_STREAM_TYPE) == InputStreamType.Analytics; this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED, DEFAULT_MULTIPART_UPLOAD_ENABLED); - if(this.analyticsAcceleratorEnabled && !analyticsAcceleratorCRTEnabled) { + if(this.analyticsAcceleratorEnabled) { // Temp change: Analytics Accelerator with S3AsyncClient do not support Multi-part upload. this.isMultipartUploadEnabled = false; } @@ -803,27 +779,6 @@ public void initialize(URI name, Configuration originalConf) int rateLimitCapacity = intOption(conf, S3A_IO_RATE_LIMIT, DEFAULT_S3A_IO_RATE_LIMIT, 0); // now create and initialize the store store = createS3AStore(clientManager, rateLimitCapacity); - - if (this.analyticsAcceleratorEnabled) { - LOG.info("Using S3SeekableInputStream"); - if(this.analyticsAcceleratorCRTEnabled) { - LOG.info("Using S3 CRT client for analytics accelerator S3"); - this.s3AsyncClient = S3CrtAsyncClient.builder().maxConcurrency(600).build(); - } else { - LOG.info("Using S3 async client for analytics accelerator S3"); - this.s3AsyncClient = store.getOrCreateAsyncClient(); - } - - ConnectorConfiguration configuration = new ConnectorConfiguration(conf, - ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); - S3SeekableInputStreamConfiguration seekableInputStreamConfiguration = - S3SeekableInputStreamConfiguration.fromConfiguration(configuration); - this.s3SeekableInputStreamFactory = - new S3SeekableInputStreamFactory( - new S3SdkObjectClient(this.s3AsyncClient), - seekableInputStreamConfiguration); - } - // the s3 client is created through the store, rather than // directly through the client manager. // this is to aid mocking. @@ -1909,7 +1864,7 @@ private FSDataInputStream executeOpen( final S3AFileStatus fileStatus = trackDuration(inputStreamStats, ACTION_FILE_OPENED.getSymbol(), () -> - extractOrFetchSimpleFileStatus(path, fileInformation)); + extractOrFetchSimpleFileStatus(path, fileInformation)); S3AReadOpContext readContext = createReadContext( fileStatus, auditSpan); @@ -1933,7 +1888,7 @@ private FSDataInputStream executeOpen( true, inputStreamStats); - // do not validate() the parameters as the store + // do not validate() the parameters as the store // completes this. ObjectReadParameters parameters = new ObjectReadParameters() .withBoundedThreadPool(pool) @@ -1941,7 +1896,7 @@ private FSDataInputStream executeOpen( .withContext(readContext.build()) .withObjectAttributes(createObjectAttributes(path, fileStatus)) .withStreamStatistics(inputStreamStats); - return new FSDataInputStream(getStore().readObject(parameters)); + return new FSDataInputStream(getStore().readObject(parameters)); } @@ -1954,6 +1909,7 @@ private ObjectInputStreamCallbacks createInputStreamCallbacks( return new InputStreamCallbacksImpl(auditSpan, getStore(), fsHandler, unboundedThreadPool); } + /** * Callbacks for WriteOperationHelper. */ @@ -4238,7 +4194,7 @@ PutObjectResponse executePut( throws IOException { String key = putObjectRequest.key(); ProgressableProgressListener listener = - new ProgressableProgressListener(store, key, progress); + new ProgressableProgressListener(getStore(), key, progress); UploadInfo info = putObject(putObjectRequest, file, listener); PutObjectResponse result = getStore().waitForUploadCompletion(key, info).response(); listener.uploadCompleted(info.getFileUpload()); @@ -4338,8 +4294,6 @@ protected synchronized void stopAllServices() { closeAutocloseables(LOG, getStore()); store = null; s3Client = null; - s3AsyncClient = null; - s3SeekableInputStreamFactory = null; // At this point the S3A client is shut down, // now the executor pools are closed diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java index a432c92fdde..f5732c52279 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java @@ -89,8 +89,7 @@ import org.apache.hadoop.util.functional.Tuples; import static java.util.Objects.requireNonNull; -import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR; -import static org.apache.hadoop.fs.s3a.Constants.HADOOP_TMP_DIR; +import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3AUtils.extractException; import static org.apache.hadoop.fs.s3a.S3AUtils.getPutRequestLength; import static org.apache.hadoop.fs.s3a.S3AUtils.isThrottleException; @@ -947,7 +946,7 @@ public File createTemporaryFileForWriting(String pathStr, * All stream factory initialization required after {@code Service.init()}, * after all other services have themselves been initialized. */ - private void finishStreamFactoryInit() { + private void finishStreamFactoryInit() throws Exception { // must be on be invoked during service initialization Preconditions.checkState(isInState(STATE.INITED), "Store is in wrong state: %s", getServiceState()); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java index 7c20f7d66f6..cd955ce7535 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java @@ -48,7 +48,7 @@ protected AbstractObjectInputStreamFactory(final String name) { * @param factoryCallbacks callbacks needed by the factories. */ @Override - public void bind(final StreamFactoryCallbacks factoryCallbacks) { + public void bind(final StreamFactoryCallbacks factoryCallbacks) throws Exception { // must be on be invoked during service initialization Preconditions.checkState(isInState(STATE.INITED), "Input Stream factory %s is in wrong state: %s", diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java similarity index 80% rename from hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java rename to hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java index ef6a2990815..50b9cde8d23 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java @@ -17,35 +17,34 @@ * under the License. */ -package org.apache.hadoop.fs.s3a; +package org.apache.hadoop.fs.s3a.impl.streams; import java.io.EOFException; import java.io.IOException; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.s3a.S3ObjectAttributes; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.FSInputStream; - import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; -import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; import software.amazon.s3.analyticsaccelerator.util.S3URI; -public class S3ASeekableStream extends FSInputStream implements StreamCapabilities { +public class AnalyticsStream extends ObjectInputStream implements StreamCapabilities { private S3SeekableInputStream inputStream; private long lastReadCurrentPos = 0; - private final String key; private volatile boolean closed; - public static final Logger LOG = LoggerFactory.getLogger(S3ASeekableStream.class); + public static final Logger LOG = LoggerFactory.getLogger(AnalyticsStream.class); - public S3ASeekableStream(String bucket, String key, - S3SeekableInputStreamFactory s3SeekableInputStreamFactory) { - this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key)); - this.key = key; + public AnalyticsStream(final ObjectReadParameters parameters, final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) { + super(parameters); + S3ObjectAttributes s3Attributes = parameters.getObjectAttributes(); + this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), s3Attributes.getKey())); } /** @@ -139,6 +138,24 @@ public int available() throws IOException { return super.available(); } + @Override + protected boolean isStreamOpen() { + return !isClosed(); + } + + protected boolean isClosed() { + return inputStream == null; + } + + @Override + protected void abortInFinalizer() { + try { + close(); + } catch (IOException ignored) { + + } + } + @Override public synchronized void close() throws IOException { if(!closed) { @@ -148,7 +165,7 @@ public synchronized void close() throws IOException { inputStream = null; super.close(); } catch (IOException ioe) { - LOG.debug("Failure closing stream {}: ", key); + LOG.debug("Failure closing stream {}: ", getKey()); throw ioe; } } @@ -165,11 +182,11 @@ private void onReadFailure(IOException ioe) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("Got exception while trying to read from stream {}, " + "not trying to recover:", - key, ioe); + getKey(), ioe); } else { LOG.info("Got exception while trying to read from stream {}, " + "not trying to recover:", - key, ioe); + getKey(), ioe); } this.close(); } @@ -177,7 +194,7 @@ private void onReadFailure(IOException ioe) throws IOException { protected void throwIfClosed() throws IOException { if (closed) { - throw new IOException(key + ": " + FSExceptionMessages.STREAM_IS_CLOSED); + throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED); } } } \ No newline at end of file diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java new file mode 100644 index 00000000000..03e10449a92 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.fs.s3a.impl.streams; + +import org.apache.hadoop.conf.Configuration; +import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; +import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration; + +import java.io.IOException; + +import static org.apache.hadoop.fs.s3a.Constants.*; + +public class AnalyticsStreamFactory extends AbstractObjectInputStreamFactory { + + private S3SeekableInputStreamConfiguration seekableInputStreamConfiguration; + private S3SeekableInputStreamFactory s3SeekableInputStreamFactory; + private boolean requireCrt; + + public AnalyticsStreamFactory() { + super("AnalyticsStreamFactory"); + } + + @Override + protected void serviceInit(final Configuration conf) throws Exception { + super.serviceInit(conf); + ConnectorConfiguration configuration = new ConnectorConfiguration(conf, + ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); + this.seekableInputStreamConfiguration = + S3SeekableInputStreamConfiguration.fromConfiguration(configuration); + this.requireCrt = false; + } + + @Override + public void bind(final StreamFactoryCallbacks factoryCallbacks) throws Exception { + super.bind(factoryCallbacks); + this.s3SeekableInputStreamFactory = new S3SeekableInputStreamFactory( + new S3SdkObjectClient(callbacks().getOrCreateAsyncClient(requireCrt)), + seekableInputStreamConfiguration); + } + + @Override + public ObjectInputStream readObject(final ObjectReadParameters parameters) throws IOException { + return new AnalyticsStream( + parameters, + s3SeekableInputStreamFactory); + } + + /** + * Get the number of background threads required for this factory. + * @return the count of background threads. + */ + @Override + public StreamThreadOptions threadRequirements() { + return new StreamThreadOptions(0, 0, false, false); + } + + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java index 4ca9a6305a2..6aab7e531c6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java @@ -40,13 +40,11 @@ public enum InputStreamType { */ Prefetch("prefetch", c -> new PrefetchingInputStreamFactory()), - /** * The analytics input stream. */ - Analytics("analytics", c -> { - throw new IllegalArgumentException("not yet supported"); - }); + Analytics("analytics", c -> + new AnalyticsStreamFactory()); /** * Name. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java index d8fe87f9cf7..bc3b52ba31d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java @@ -45,8 +45,9 @@ public interface ObjectInputStreamFactory * This MUST ONLY be invoked between {@code init()} * and {@code start()}. * @param callbacks extra initialization parameters + * @throws Exception on encountering exception */ - void bind(StreamFactoryCallbacks callbacks); + void bind(StreamFactoryCallbacks callbacks) throws Exception; /** * Create a new input stream. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java index dfe2efbb97c..18e2e46856a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java @@ -36,6 +36,8 @@ public final class StreamIntegration { LoggerFactory.getLogger( "org.apache.hadoop.conf.Configuration.deprecation"); + public static final Logger LOG = LoggerFactory.getLogger(StreamIntegration.class); + /** * Warn once on use of prefetch boolean flag rather than enum. */ @@ -53,7 +55,12 @@ public static ObjectInputStreamFactory createStreamFactory(final Configuration c // work out the default stream; this includes looking at the // deprecated prefetch enabled key to see if it is set. InputStreamType defaultStream = InputStreamType.DEFAULT_STREAM_TYPE; - if (conf.getBoolean(PREFETCH_ENABLED_KEY, false)) { + + if(conf.getEnum(INPUT_STREAM_TYPE, InputStreamType.DEFAULT_STREAM_TYPE) == InputStreamType.Analytics) { + LOG.info("Using AnalyticsStream"); + defaultStream = InputStreamType.Analytics; + + } else if (conf.getBoolean(PREFETCH_ENABLED_KEY, false)) { // prefetch enabled, warn (once) then change it to be the default. WARN_PREFETCH_KEY.info("Using {} is deprecated: choose the appropriate stream in {}", diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java index 0bf7752e438..8d886c752ab 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java @@ -90,8 +90,6 @@ public Configuration createConfiguration() { final String bufferDirBase = configuration.get(BUFFER_DIR); bufferDir = bufferDirBase + "/" + UUID.randomUUID(); configuration.set(BUFFER_DIR, bufferDir); - // When both Prefetching and Analytics Accelerator enabled Analytics Accelerator is used - configuration.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, false); return configuration; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java index fbe7e7d0adb..d894adb66c7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java @@ -34,7 +34,7 @@ import org.apache.hadoop.fs.statistics.IOStatistics; import org.apache.hadoop.test.LambdaTestUtils; -import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY; import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetching; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; @@ -71,14 +71,11 @@ public class ITestS3APrefetchingInputStream extends AbstractS3ACostTest { private static final int INTERVAL_MILLIS = 500; private static final int BLOCK_SIZE = S_1K * 10; - @Override public Configuration createConfiguration() { Configuration conf = enablePrefetching(super.createConfiguration()); S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY); conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE); - // When both Prefetching and Analytics Accelerator enabled Analytics Accelerator is used - conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, false); return conf; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java index b71cc43d897..d43953dfe82 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java @@ -90,8 +90,6 @@ public Configuration createConfiguration() { PREFETCH_BLOCK_SIZE_KEY); conf.setInt(PREFETCH_MAX_BLOCKS_COUNT, Integer.parseInt(maxBlocks)); conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE); - // When both Prefetching and Analytics Accelerator enabled Analytics Accelerator is used - conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, false); return conf; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java index c6ecee95051..6ee6d6c6e11 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java @@ -28,11 +28,11 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX; -import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_ENABLED_KEY; -import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CRT_ENABLED; +import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import org.assertj.core.api.Assertions; + import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration; import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration; import software.amazon.s3.analyticsaccelerator.util.PrefetchMode; @@ -42,13 +42,13 @@ public class ITestS3AS3SeekableStream extends AbstractS3ATestBase { private static final String PHYSICAL_IO_PREFIX = "physicalio"; private static final String LOGICAL_IO_PREFIX = "logicalio"; - public void testConnectorFrameWorkIntegration(boolean useCrtClient) throws IOException { + @Test + public void testConnectorFrameWorkIntegration() throws IOException { describe("Verify S3 connector framework integration"); Configuration conf = getConfiguration(); - removeBaseAndBucketOverrides(conf, ANALYTICS_ACCELERATOR_ENABLED_KEY); - conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, true); - conf.setBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, useCrtClient); + removeBaseAndBucketOverrides(conf, INPUT_STREAM_TYPE); + conf.set(INPUT_STREAM_TYPE, "Analytics"); String testFile = "s3a://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz"; S3AFileSystem s3AFileSystem = @@ -63,16 +63,7 @@ public void testConnectorFrameWorkIntegration(boolean useCrtClient) throws IOExc } @Test - public void testConnectorFrameWorkIntegrationWithCrtClient() throws IOException { - testConnectorFrameWorkIntegration(true); - } - - @Test - public void testConnectorFrameWorkIntegrationWithoutCrtClient() throws IOException { - testConnectorFrameWorkIntegration(false); - } - - public void testConnectorFrameworkConfigurable(boolean useCrtClient) { + public void testConnectorFrameworkConfigurable() { describe("Verify S3 connector framework reads configuration"); Configuration conf = getConfiguration(); @@ -86,33 +77,23 @@ public void testConnectorFrameworkConfigurable(boolean useCrtClient) { conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1); - conf.setBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, useCrtClient); - ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); S3SeekableInputStreamConfiguration configuration = S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration); - assertSame("S3ASeekableStream configuration is not set to expected value", - PrefetchMode.ALL, configuration.getLogicalIOConfiguration().getPrefetchingMode()); - - assertEquals("S3ASeekableStream configuration is not set to expected value", - 1, configuration.getPhysicalIOConfiguration().getBlobStoreCapacity()); - } + Assertions.assertThat(configuration.getLogicalIOConfiguration().getPrefetchingMode()) + .as("AnalyticsStream configuration is not set to expected value") + .isSameAs(PrefetchMode.ALL); - @Test - public void testConnectorFrameworkConfigurableWithoutCrtClient() throws IOException { - testConnectorFrameworkConfigurable(false); - } - - @Test - public void testConnectorFrameworkConfigurableWithCrtClient() throws IOException { - testConnectorFrameworkConfigurable(true); + Assertions.assertThat(configuration.getPhysicalIOConfiguration().getBlobStoreCapacity()) + .as("AnalyticsStream configuration is not set to expected value") + .isEqualTo(1); } @Test - public void testInvalidConfigurationThrows() { + public void testInvalidConfigurationThrows() throws Exception { describe("Verify S3 connector framework throws with invalid configuration"); Configuration conf = getConfiguration(); @@ -123,8 +104,8 @@ public void testInvalidConfigurationThrows() { ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); - assertThrows("S3ASeekableStream illegal configuration does not throw", - IllegalArgumentException.class, () -> - S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration)); + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> + S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration)); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 4159ff2f9b2..28fba0e5ddc 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -45,6 +45,7 @@ import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum; import org.apache.hadoop.fs.s3a.impl.StoreContext; import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder; +import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType; import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream; import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; @@ -587,8 +588,8 @@ public static void skipIfAnalyticsAcceleratorEnabled( } public static boolean isAnalyticsAcceleratorEnabled(final Configuration conf) { - return conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, - ANALYTICS_ACCELERATOR_ENABLED_DEFAULT); + return conf.getEnum(INPUT_STREAM_TYPE, + InputStreamType.DEFAULT_STREAM_TYPE) == InputStreamType.Analytics; } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java index fac461371e6..515313b009d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java @@ -168,11 +168,10 @@ private void abortActiveStream() throws IOException { @Test public void testCostOfCreatingMagicFile() throws Throwable { - describe("Files created under magic paths skip existence checks"); + describe("Files created under magic paths skip existence checks and marker deletes"); skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "S3ASeekableInputStream does not support InputStreamStatistics"); - + "AnalyticsStream does not support InputStreamStatistics"); S3AFileSystem fs = getFileSystem(); Path destFile = methodSubPath("file.txt"); fs.delete(destFile.getParent(), true); @@ -251,7 +250,7 @@ public void testCostOfSavingLoadingPendingFile() throws Throwable { describe("Verify costs of saving .pending file under a magic path"); skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "S3ASeekableInputStream does not support InputStreamStatistics"); + "AnalyticsStream does not support InputStreamStatistics"); S3AFileSystem fs = getFileSystem(); Path partDir = methodSubPath("file.pending"); Path destFile = new Path(partDir, "file.pending"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java index a500bfb76a3..9057d1e366c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java @@ -184,7 +184,7 @@ public void setup() throws Exception { FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser()); super.setup(); skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "S3ASeekableInputStream does not support InputStreamStatistics"); + "AnalyticsStream does not support InputStreamStatistics"); jobId = randomJobId(); attempt0 = "attempt_" + jobId + "_m_000000_0"; taskAttempt0 = TaskAttemptID.forName(attempt0); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java index b89740ae312..542f6f2b7c7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java @@ -79,7 +79,7 @@ protected String getCommitterName() { public void setup() throws Exception { super.setup(); skipIfAnalyticsAcceleratorEnabled(getConfiguration(), - "S3ASeekableInputStream does not support InputStreamStatistics"); + "AnalyticsStream does not support InputStreamStatistics"); CommitUtils.verifyIsMagicCommitFS(getFileSystem()); } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java index 5a3f7bb8fdb..fb1cfb781e7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java @@ -44,7 +44,7 @@ public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase { */ @Test public void testBytesReadWithStream() throws IOException { - // Assertions will fail as {@link S3ASeekableInputStream} + // Assertions will fail as {@link AnalyticsStream} // do not support S3AFileSystemStatistics yet. skipIfAnalyticsAcceleratorEnabled(getConfiguration(), "S3SeekableStream does not support File System Statistics"); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org