This is an automated email from the ASF dual-hosted git repository. ahmar pushed a commit to branch integrate-analytics-accelerator in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 0d1f291db1b33f9b13f8fb0ee4d935bc97c61dbc Author: Ahmar Suhail <ahma...@amazon.co.uk> AuthorDate: Fri Feb 7 14:45:14 2025 +0000 Adds in integration for AWS analytics accelerator library --- hadoop-project/pom.xml | 6 + hadoop-tools/hadoop-aws/pom.xml | 5 + .../java/org/apache/hadoop/fs/s3a/Constants.java | 8 + .../apache/hadoop/fs/s3a/impl/S3AStoreImpl.java | 2 +- .../streams/AbstractObjectInputStreamFactory.java | 4 +- .../fs/s3a/impl/streams/AnalyticsStream.java | 205 +++++++++++++++++++++ .../s3a/impl/streams/AnalyticsStreamFactory.java | 100 ++++++++++ .../fs/s3a/impl/streams/InputStreamType.java | 6 +- .../s3a/impl/streams/ObjectInputStreamFactory.java | 2 +- .../fs/s3a/impl/streams/StreamIntegration.java | 1 - .../fs/contract/s3a/ITestS3AContractCreate.java | 16 +- .../fs/contract/s3a/ITestS3AContractDistCp.java | 9 + .../contract/s3a/ITestS3AContractVectoredRead.java | 12 ++ .../hadoop/fs/s3a/ITestS3AEncryptionSSEC.java | 6 +- .../hadoop/fs/s3a/ITestS3AFSMainOperations.java | 26 ++- .../hadoop/fs/s3a/ITestS3AFileSystemContract.java | 14 +- .../hadoop/fs/s3a/ITestS3AIOStatisticsContext.java | 6 + .../hadoop/fs/s3a/ITestS3AInputStreamLeakage.java | 6 + .../org/apache/hadoop/fs/s3a/ITestS3AMetrics.java | 6 + .../hadoop/fs/s3a/ITestS3AS3SeekableStream.java | 111 +++++++++++ .../org/apache/hadoop/fs/s3a/S3ATestUtils.java | 16 ++ .../fs/s3a/commit/ITestCommitOperationCost.java | 12 +- .../fileContext/ITestS3AFileContextStatistics.java | 6 + .../fs/s3a/performance/ITestS3AOpenCost.java | 13 +- .../fs/s3a/performance/ITestUnbufferDraining.java | 1 - .../s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java | 3 + .../ITestS3AContractStreamIOStatistics.java | 9 + .../statistics/ITestS3AFileSystemStatistic.java | 6 + 28 files changed, 590 insertions(+), 27 deletions(-) diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index b9469bc7523..cf0b4f591af 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -206,6 +206,7 @@ <aws-java-sdk.version>1.12.720</aws-java-sdk.version> <aws-java-sdk-v2.version>2.25.53</aws-java-sdk-v2.version> <amazon-s3-encryption-client-java.version>3.1.1</amazon-s3-encryption-client-java.version> + <amazon-s3-analyticsaccelerator-s3.version>0.0.3</amazon-s3-analyticsaccelerator-s3.version> <aws.eventstream.version>1.0.1</aws.eventstream.version> <hsqldb.version>2.7.1</hsqldb.version> <frontend-maven-plugin.version>1.11.2</frontend-maven-plugin.version> @@ -1192,6 +1193,11 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>software.amazon.s3.analyticsaccelerator</groupId> + <artifactId>analyticsaccelerator-s3</artifactId> + <version>${amazon-s3-analyticsaccelerator-s3.version}</version> + </dependency> <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-core</artifactId> diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml index 1d04107ff5b..05337c60abe 100644 --- a/hadoop-tools/hadoop-aws/pom.xml +++ b/hadoop-tools/hadoop-aws/pom.xml @@ -472,6 +472,11 @@ <artifactId>amazon-s3-encryption-client-java</artifactId> <scope>provided</scope> </dependency> + <dependency> + <groupId>software.amazon.s3.analyticsaccelerator</groupId> + <artifactId>analyticsaccelerator-s3</artifactId> + <scope>compile</scope> + </dependency> <dependency> <groupId>org.assertj</groupId> <artifactId>assertj-core</artifactId> diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 6796c29d348..b2843d24c8a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1827,4 +1827,12 @@ private Constants() { * Value: {@value}. */ public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit"; + + + /** + * Prefix to configure Analytics Accelerator Library. + */ + public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX = + "fs.s3a.analytics.accelerator"; + } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java index d284a098bcd..82dd6743dd4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java @@ -948,7 +948,7 @@ public File createTemporaryFileForWriting(String pathStr, * All stream factory initialization required after {@code Service.init()}, * after all other services have themselves been initialized. */ - private void finishStreamFactoryInit() { + private void finishStreamFactoryInit() throws IOException { // must be on be invoked during service initialization Preconditions.checkState(isInState(STATE.INITED), "Store is in wrong state: %s", getServiceState()); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java index c8f82d2abd6..f0b5c9614fe 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AbstractObjectInputStreamFactory.java @@ -18,6 +18,8 @@ package org.apache.hadoop.fs.s3a.impl.streams; +import java.io.IOException; + import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.statistics.StreamStatisticNames; import org.apache.hadoop.service.AbstractService; @@ -54,7 +56,7 @@ protected AbstractObjectInputStreamFactory(final String name) { * @param factoryBindingParameters parameters for the factory binding */ @Override - public void bind(final FactoryBindingParameters factoryBindingParameters) { + public void bind(final FactoryBindingParameters factoryBindingParameters) throws IOException { // must be on be invoked during service initialization Preconditions.checkState(isInState(STATE.INITED), "Input Stream factory %s is in wrong state: %s", diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java new file mode 100644 index 00000000000..82c68e1cb85 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.fs.s3a.impl.streams; + +import java.io.EOFException; +import java.io.IOException; + +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.s3a.Retries; +import org.apache.hadoop.fs.s3a.S3ObjectAttributes; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream; +import software.amazon.s3.analyticsaccelerator.util.S3URI; + +/** + * Analytics stream creates a stream using aws-analytics-accelerator-s3. This stream supports + * parquet specific optimisations such as parquet-aware prefetching. For more details, see + * https://github.com/awslabs/analytics-accelerator-s3. + */ +public class AnalyticsStream extends ObjectInputStream implements StreamCapabilities { + + private S3SeekableInputStream inputStream; + private long lastReadCurrentPos = 0; + private volatile boolean closed; + + public static final Logger LOG = LoggerFactory.getLogger(AnalyticsStream.class); + + public AnalyticsStream(final ObjectReadParameters parameters, final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException { + super(InputStreamType.Analytics, parameters); + S3ObjectAttributes s3Attributes = parameters.getObjectAttributes(); + this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), s3Attributes.getKey())); + } + + /** + * Indicates whether the given {@code capability} is supported by this stream. + * + * @param capability the capability to check. + * @return true if the given {@code capability} is supported by this stream, false otherwise. + */ + @Override + public boolean hasCapability(String capability) { + return false; + } + + @Override + public int read() throws IOException { + throwIfClosed(); + int bytesRead; + try { + bytesRead = inputStream.read(); + } catch (IOException ioe) { + onReadFailure(ioe); + throw ioe; + } + return bytesRead; + } + + @Override + public void seek(long pos) throws IOException { + throwIfClosed(); + if (pos < 0) { + throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + + " " + pos); + } + inputStream.seek(pos); + } + + + @Override + public synchronized long getPos() { + if (!closed) { + lastReadCurrentPos = inputStream.getPos(); + } + return lastReadCurrentPos; + } + + + /** + * Reads the last n bytes from the stream into a byte buffer. Blocks until end of stream is + * reached. Leaves the position of the stream unaltered. + * + * @param buf buffer to read data into + * @param off start position in buffer at which data is written + * @param len the number of bytes to read; the n-th byte should be the last byte of the stream. + * @return the total number of bytes read into the buffer + * @throws IOException if an I/O error occurs + */ + public int readTail(byte[] buf, int off, int len) throws IOException { + throwIfClosed(); + int bytesRead; + try { + bytesRead = inputStream.readTail(buf, off, len); + } catch (IOException ioe) { + onReadFailure(ioe); + throw ioe; + } + return bytesRead; + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + throwIfClosed(); + int bytesRead; + try { + bytesRead = inputStream.read(buf, off, len); + } catch (IOException ioe) { + onReadFailure(ioe); + throw ioe; + } + return bytesRead; + } + + + @Override + public boolean seekToNewSource(long l) throws IOException { + return false; + } + + @Override + public int available() throws IOException { + throwIfClosed(); + return super.available(); + } + + @Override + protected boolean isStreamOpen() { + return !isClosed(); + } + + protected boolean isClosed() { + return inputStream == null; + } + + @Override + protected void abortInFinalizer() { + try { + close(); + } catch (IOException ignored) { + + } + } + + @Override + public synchronized void close() throws IOException { + if(!closed) { + closed = true; + try { + inputStream.close(); + inputStream = null; + super.close(); + } catch (IOException ioe) { + LOG.debug("Failure closing stream {}: ", getKey()); + throw ioe; + } + } + } + + /** + * Close the stream on read failure. + * No attempt to recover from failure + * + * @param ioe exception caught. + */ + @Retries.OnceTranslated + private void onReadFailure(IOException ioe) throws IOException { + if (LOG.isDebugEnabled()) { + LOG.debug("Got exception while trying to read from stream {}, " + + "not trying to recover:", + getKey(), ioe); + } else { + LOG.info("Got exception while trying to read from stream {}, " + + "not trying to recover:", + getKey(), ioe); + } + this.close(); + } + + + protected void throwIfClosed() throws IOException { + if (closed) { + throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED); + } + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java new file mode 100644 index 00000000000..fd5bdb2e577 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.fs.s3a.impl.streams; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.s3a.VectoredIOContext; + +import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory; +import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration; + +import java.io.IOException; + +import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.impl.streams.StreamIntegration.populateVectoredIOContext; + +/** + * A factory for {@link AnalyticsStream}. This class is instantiated during initialization of + * {@code S3AStore}, if fs.s3a.input.stream.type is set to Analytics. + */ +public class AnalyticsStreamFactory extends AbstractObjectInputStreamFactory { + + private S3SeekableInputStreamConfiguration seekableInputStreamConfiguration; + private S3SeekableInputStreamFactory s3SeekableInputStreamFactory; + private boolean requireCrt; + + public AnalyticsStreamFactory() { + super("AnalyticsStreamFactory"); + } + + @Override + protected void serviceInit(final Configuration conf) throws Exception { + super.serviceInit(conf); + ConnectorConfiguration configuration = new ConnectorConfiguration(conf, + ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); + this.seekableInputStreamConfiguration = + S3SeekableInputStreamConfiguration.fromConfiguration(configuration); + this.requireCrt = false; + } + + @Override + public void bind(final FactoryBindingParameters factoryBindingParameters) throws IOException { + super.bind(factoryBindingParameters); + this.s3SeekableInputStreamFactory = new S3SeekableInputStreamFactory( + new S3SdkObjectClient(callbacks().getOrCreateAsyncClient(requireCrt)), + seekableInputStreamConfiguration); + } + + @Override + public ObjectInputStream readObject(final ObjectReadParameters parameters) throws IOException { + return new AnalyticsStream( + parameters, + s3SeekableInputStreamFactory); + } + + + + @Override + public InputStreamType streamType() { + return InputStreamType.Analytics; + } + + /** + * Calculate Return StreamFactoryRequirements + * @return a positive thread count. + */ + @Override + public StreamFactoryRequirements factoryRequirements() { + // fill in the vector context + final VectoredIOContext vectorContext = populateVectoredIOContext(getConfig()); + // and then disable range merging. + // this ensures that no reads are made for data which is then discarded... + // so the prefetch and block read code doesn't ever do wasteful fetches. + vectorContext.setMinSeekForVectoredReads(0); + + return new StreamFactoryRequirements(0, + 0, false, false, + vectorContext); + } + + +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java index 1775fa5f05c..a9c33a60fc6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/InputStreamType.java @@ -45,13 +45,11 @@ public enum InputStreamType { */ Prefetch(StreamIntegration.PREFETCH, 2, c -> new PrefetchingInputStreamFactory()), - /** * The analytics input stream. */ - Analytics(StreamIntegration.ANALYTICS, 3, c -> { - throw new IllegalArgumentException("not yet supported"); - }), + Analytics(StreamIntegration.ANALYTICS, 3, c -> + new AnalyticsStreamFactory()), /** * The a custom input stream. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java index d1c96c7411d..69637c89cd1 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/ObjectInputStreamFactory.java @@ -46,7 +46,7 @@ public interface ObjectInputStreamFactory * and {@code start()}. * @param factoryBindingParameters parameters for the factory binding */ - void bind(final FactoryBindingParameters factoryBindingParameters); + void bind(final FactoryBindingParameters factoryBindingParameters) throws IOException; /** * Create a new input stream. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java index 83c498fd0b7..b5f01172287 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/StreamIntegration.java @@ -114,7 +114,6 @@ private StreamIntegration() { * @throws RuntimeException any binding/loading/instantiation problem */ public static ObjectInputStreamFactory factoryFromConfig(final Configuration conf) { - // Construct the factory. return determineInputStreamType(conf) .factory() diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java index 033c2d94c7b..3e87a2f7e5f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java @@ -31,9 +31,8 @@ import static org.apache.hadoop.fs.s3a.S3ATestConstants.KEY_PERFORMANCE_TESTS_ENABLED; import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled; + +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; /** * S3A contract tests creating files. @@ -93,6 +92,17 @@ protected Configuration createConfiguration() { return conf; } + @Override + public void testOverwriteExistingFile() throws Throwable { + // Currently analytics accelerator does not support reading of files that have been overwritten. + // This is because the analytics accelerator library caches metadata, and when a file is + // overwritten, the old metadata continues to be used, until it is removed from the cache over + // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218. + skipIfAnalyticsAcceleratorEnabled(this.createConfiguration(), + "Analytics Accelerator currently does not support reading of over written files"); + super.testOverwriteExistingFile(); + } + @Override public void testOverwriteNonEmptyDirectory() throws Throwable { try { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java index e761e0d14bf..f6127700b5f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageStatistics; @@ -78,6 +79,14 @@ public void testNonDirectWrite() throws Exception { getRenameOperationCount() - renames); } + @Override + public void testDistCpUpdateCheckFileSkip() throws Exception { + //Will remove this when Analytics Accelerator supports overwrites + skipIfAnalyticsAcceleratorEnabled(createConfiguration(), + "Analytics Accelerator Library does not support update to existing files"); + super.testDistCpUpdateCheckFileSkip(); + } + private long getRenameOperationCount() { return getFileSystem().getStorageStatistics() .getLong(StorageStatistics.CommonStatisticNames.OP_RENAME); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index fbb6d5a04d2..87e3b23cd71 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -63,6 +63,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE; import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; import static org.apache.hadoop.io.Sizes.S_1M; @@ -88,6 +89,17 @@ protected AbstractFSContract createContract(Configuration conf) { return new S3AContract(conf); } + /** + * Analytics Accelerator Library for Amazon S3 does not support Vectored Reads. + * @throws Exception + */ + @Override + public void setup() throws Exception { + skipIfAnalyticsAcceleratorEnabled(createConfiguration(), + "Analytics Accelerator does not support vectored reads"); + super.setup(); + } + /** * Verify response to a vector read request which is beyond the * real length of the file. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java index d22de3b06d8..169898df829 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java @@ -38,9 +38,7 @@ import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY; import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM; import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeSkipRootTests; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** @@ -93,6 +91,8 @@ protected Configuration createConfiguration() { @Override public void setup() throws Exception { super.setup(); + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support SSE-C"); assumeEnabled(); // although not a root dir test, this confuses paths enough it shouldn't be run in // parallel with other jobs diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java index 0281c57f5cb..b056dac5a85 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java @@ -29,9 +29,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.s3a.S3AContract; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.createTestPath; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; /** * S3A Test suite for the FSMainOperationsBaseTest tests. @@ -78,6 +76,28 @@ public void testCopyToLocalWithUseRawLocalFileSystemOption() throws Exception { } + @Override + public void testWriteReadAndDeleteOneAndAHalfBlocks() throws Exception { + // Currently analytics accelerator does not support reading of files that have been overwritten. + // This is because the analytics accelerator library caches metadata, and when a file is + // overwritten, the old metadata continues to be used, until it is removed from the cache over + // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218. + skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(), + "Analytics Accelerator currently does not support reading of over written files"); + super.testWriteReadAndDeleteOneAndAHalfBlocks(); + } + + @Override + public void testWriteReadAndDeleteTwoBlocks() throws Exception { + // Currently analytics accelerator does not support reading of files that have been overwritten. + // This is because the analytics accelerator library caches metadata, and when a file is + // overwritten, the old metadata continues to be used, until it is removed from the cache over + // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218. + skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(), + "Analytics Accelerator currently does not support reading of over written files"); + super.testWriteReadAndDeleteTwoBlocks(); + } + @Override public void testOverwrite() throws IOException { boolean createPerformance = isCreatePerformanceEnabled(fSys); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java index 48081457658..2034ea274e7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java @@ -34,8 +34,7 @@ import org.apache.hadoop.fs.Path; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.junit.Assume.*; import static org.junit.Assert.*; @@ -160,4 +159,15 @@ public void testOverwrite() throws IOException { } } } + + @Override + public void testOverWriteAndRead() throws Exception { + // Currently analytics accelerator does not support reading of files that have been overwritten. + // This is because the analytics accelerator library caches metadata, and when a file is + // overwritten, the old metadata continues to be used, until it is removed from the cache over + // time. This will be fixed in https://github.com/awslabs/analytics-accelerator-s3/issues/218. + skipIfAnalyticsAcceleratorEnabled(fs.getConf(), + "Analytics Accelerator currently does not support reading of over written files"); + super.testOverWriteAndRead(); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java index 70dc5ee476c..daf5306dc39 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java @@ -43,6 +43,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset; import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES; @@ -77,6 +78,11 @@ protected Configuration createConfiguration() { public void setup() throws Exception { super.setup(); executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS); + // Analytics accelerator currently does not support IOStatisticsContext, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support IOStatisticsContext"); + } @Override diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java index 72c75162c9f..f26a585776a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java @@ -36,6 +36,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_LEAKS; import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs; @@ -87,6 +88,11 @@ public void setup() throws Exception { @Test public void testFinalizer() throws Throwable { Path path = methodPath(); + // Analytics accelerator currently does not support stream leak detection. This work is tracked + // in https://issues.apache.org/jira/browse/HADOOP-19451 + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support leak detection"); + final S3AFileSystem fs = getFileSystem(); ContractTestUtils.createFile(fs, path, true, DATASET); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java index 3bfe69c2bca..4ec579ce4f6 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.io.InputStream; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; /** @@ -51,6 +52,11 @@ public void testMetricsRegister() @Test public void testStreamStatistics() throws IOException { + // Analytics accelerator currently does not support IOStatistics, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support stream statistics"); + S3AFileSystem fs = getFileSystem(); Path file = path("testStreamStatistics"); byte[] data = "abcdefghijklmnopqrstuvwxyz".getBytes(); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java new file mode 100644 index 00000000000..6ee6d6c6e11 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; + +import org.junit.Test; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; + +import static org.apache.hadoop.fs.s3a.Constants.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; + +import org.assertj.core.api.Assertions; + +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration; +import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration; +import software.amazon.s3.analyticsaccelerator.util.PrefetchMode; + +public class ITestS3AS3SeekableStream extends AbstractS3ATestBase { + + private static final String PHYSICAL_IO_PREFIX = "physicalio"; + private static final String LOGICAL_IO_PREFIX = "logicalio"; + + @Test + public void testConnectorFrameWorkIntegration() throws IOException { + describe("Verify S3 connector framework integration"); + + Configuration conf = getConfiguration(); + removeBaseAndBucketOverrides(conf, INPUT_STREAM_TYPE); + conf.set(INPUT_STREAM_TYPE, "Analytics"); + + String testFile = "s3a://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz"; + S3AFileSystem s3AFileSystem = + (S3AFileSystem) FileSystem.newInstance(new Path(testFile).toUri(), conf); + byte[] buffer = new byte[500]; + + try (FSDataInputStream inputStream = s3AFileSystem.open(new Path(testFile))) { + inputStream.seek(5); + inputStream.read(buffer, 0, 500); + } + + } + + @Test + public void testConnectorFrameworkConfigurable() { + describe("Verify S3 connector framework reads configuration"); + + Configuration conf = getConfiguration(); + removeBaseAndBucketOverrides(conf); + + //Disable Predictive Prefetching + conf.set(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + + "." + LOGICAL_IO_PREFIX + ".prefetching.mode", "all"); + + //Set Blobstore Capacity + conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + + "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1); + + ConnectorConfiguration connectorConfiguration = + new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); + + S3SeekableInputStreamConfiguration configuration = + S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration); + + Assertions.assertThat(configuration.getLogicalIOConfiguration().getPrefetchingMode()) + .as("AnalyticsStream configuration is not set to expected value") + .isSameAs(PrefetchMode.ALL); + + Assertions.assertThat(configuration.getPhysicalIOConfiguration().getBlobStoreCapacity()) + .as("AnalyticsStream configuration is not set to expected value") + .isEqualTo(1); + } + + @Test + public void testInvalidConfigurationThrows() throws Exception { + describe("Verify S3 connector framework throws with invalid configuration"); + + Configuration conf = getConfiguration(); + removeBaseAndBucketOverrides(conf); + //Disable Sequential Prefetching + conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + + "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", -1); + + ConnectorConfiguration connectorConfiguration = + new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); + Assertions.assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> + S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration)); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index a1b3821f7fa..dd2dc3e91ee 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -45,6 +45,7 @@ import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum; import org.apache.hadoop.fs.s3a.impl.StoreContext; import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder; +import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType; import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream; import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; @@ -576,6 +577,21 @@ public static boolean isS3ExpressTestBucket(final Configuration conf) { return S3ExpressStorage.isS3ExpressStore(getTestBucketName(conf), ""); } + /** + * Skip a test if the Analytics Accelerator Library for Amazon S3 is enabled. + * @param configuration configuration to probe + */ + public static void skipIfAnalyticsAcceleratorEnabled( + Configuration configuration, String message) { + assume(message, + !isAnalyticsAcceleratorEnabled(configuration)); + } + + public static boolean isAnalyticsAcceleratorEnabled(final Configuration conf) { + return conf.getEnum(INPUT_STREAM_TYPE, + InputStreamType.Classic) == InputStreamType.Analytics; + } + /** * Skip a test if the filesystem lacks a required capability. * @param fs filesystem diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java index 8132b44cdb4..02f0251e8f0 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java @@ -40,6 +40,7 @@ import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest; import org.apache.hadoop.fs.statistics.IOStatisticsLogging; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.s3a.Statistic.ACTION_HTTP_GET_REQUEST; import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_FILES_CREATED; import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT; @@ -167,7 +168,12 @@ private void abortActiveStream() throws IOException { @Test public void testCostOfCreatingMagicFile() throws Throwable { - describe("Files created under magic paths skip existence checks"); + describe("Files created under magic paths skip existence checks and marker deletes"); + + // Analytics accelerator currently does not support IOStatistics, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path destFile = methodSubPath("file.txt"); fs.delete(destFile.getParent(), true); @@ -245,6 +251,10 @@ public void testCostOfCreatingMagicFile() throws Throwable { public void testCostOfSavingLoadingPendingFile() throws Throwable { describe("Verify costs of saving .pending file under a magic path"); + // Analytics accelerator currently does not support IOStatistics, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path partDir = methodSubPath("file.pending"); Path destFile = new Path(partDir, "file.pending"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java index 1724006a831..5b489c1c39c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java @@ -30,6 +30,8 @@ import org.junit.Assert; import org.junit.Before; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; + /** * S3a implementation of FCStatisticsBaseTest. */ @@ -44,6 +46,10 @@ public class ITestS3AFileContextStatistics extends FCStatisticsBaseTest { @Before public void setUp() throws Exception { conf = new Configuration(); + // Analytics accelerator currently does not support IOStatistics, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(conf, + "Analytics Accelerator currently does not support stream statistics"); fc = S3ATestUtils.createTestFileContext(conf); testRootPath = fileContextTestHelper.getTestRootPath(fc, "test"); fc.mkdir(testRootPath, diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index 37b1413ed0f..f790b3e539b 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -52,11 +52,9 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile; import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.assertStreamIsNotChecksummed; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.isPrefetchingEnabled; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; + +import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; + import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED; @@ -109,6 +107,10 @@ public Configuration createConfiguration() { @Override public void setup() throws Exception { super.setup(); + // Analytics accelerator currently does not support IOStatistics, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); testFile = methodPath(); @@ -387,7 +389,6 @@ public void testPositionedReadableReadPastEOF() throws Throwable { describe("PositionedReadable.read() past the end of the file"); assumeNoPrefetching(); - verifyMetrics(() -> { try (FSDataInputStream in = openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java index caf723b95de..e68ea9a0315 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java @@ -117,7 +117,6 @@ public Configuration createConfiguration() { @Override public void setup() throws Exception { super.setup(); - // now create a new FS with minimal http capacity and recovery // a separate one is used to avoid test teardown suffering // from the lack of http connections and short timeouts. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java index 6f19ba15c1c..a6ae2ad2972 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AHugeFilesSSECDiskBlocks.java @@ -31,6 +31,7 @@ import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM; import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfEncryptionTestsDisabled; /** @@ -53,6 +54,8 @@ public class ITestS3AHugeFilesSSECDiskBlocks @Override public void setup() throws Exception { try { + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), + "Analytics Accelerator currently does not support SSE-C"); super.setup(); } catch (AccessDeniedException | AWSUnsupportedFeatureException e) { skip("Bucket does not allow " + S3AEncryptionMethods.SSE_C + " encryption method"); diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java index 0f6b69cd54d..3c7644f2323 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.contract.s3a.S3AContract; import org.apache.hadoop.fs.statistics.StreamStatisticNames; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.*; /** @@ -78,4 +79,12 @@ public List<String> outputStreamStatisticKeys() { STREAM_WRITE_EXCEPTIONS); } + @Override + public void testInputStreamStatisticRead() throws Throwable { + // Analytics accelerator currently does not support IOStatistics, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(createConfiguration(), + "Analytics Accelerator currently does not support stream statistics"); + super.testInputStreamStatisticRead(); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java index 0d5d2a789a0..8cb0de1b83f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java @@ -31,6 +31,8 @@ import org.apache.hadoop.fs.statistics.IOStatisticAssertions; import org.apache.hadoop.fs.statistics.StreamStatisticNames; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; + public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase { private static final int ONE_KB = 1024; @@ -42,6 +44,10 @@ public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase { */ @Test public void testBytesReadWithStream() throws IOException { + // Analytics accelerator currently does not support IOStatistics, this will be added as + // part of https://issues.apache.org/jira/browse/HADOOP-19364 + skipIfAnalyticsAcceleratorEnabled(createConfiguration(), + "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path filePath = path(getMethodName()); byte[] oneKbBuf = new byte[ONE_KB]; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org