This is an automated email from the ASF dual-hosted git repository.
ahmar pushed a commit to branch feature-HADOOP-19363-analytics-accelerator-s3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to
refs/heads/feature-HADOOP-19363-analytics-accelerator-s3 by this push:
new 3d8f4a4eea0 HADOOP-19348. Add initial support for Analytics
Accelerator Library for Amazon S3 (#7192)
3d8f4a4eea0 is described below
commit 3d8f4a4eea0f37a573f7d734d71a5a0c2def6e22
Author: fuatbasik <[email protected]>
AuthorDate: Wed Dec 18 10:06:56 2024 +0000
HADOOP-19348. Add initial support for Analytics Accelerator Library for
Amazon S3 (#7192)
---
hadoop-tools/hadoop-aws/pom.xml | 11 ++
.../java/org/apache/hadoop/fs/s3a/Constants.java | 34 ++++
.../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 74 ++++++++-
.../apache/hadoop/fs/s3a/S3ASeekableStream.java | 183 +++++++++++++++++++++
.../fs/contract/s3a/ITestS3AContractCreate.java | 11 +-
.../fs/contract/s3a/ITestS3AContractDistCp.java | 9 +
.../fs/contract/s3a/ITestS3AContractRename.java | 8 +
.../contract/s3a/ITestS3AContractVectoredRead.java | 12 ++
.../apache/hadoop/fs/s3a/ITestS3ADelayedFNF.java | 3 +
.../hadoop/fs/s3a/ITestS3AEncryptionSSEC.java | 6 +-
.../hadoop/fs/s3a/ITestS3AFSMainOperations.java | 18 +-
.../hadoop/fs/s3a/ITestS3AFileSystemContract.java | 11 +-
.../hadoop/fs/s3a/ITestS3AIOStatisticsContext.java | 5 +
.../hadoop/fs/s3a/ITestS3AInputStreamLeakage.java | 7 +-
.../org/apache/hadoop/fs/s3a/ITestS3AMetrics.java | 5 +
.../fs/s3a/ITestS3APrefetchingCacheFiles.java | 8 +-
.../fs/s3a/ITestS3APrefetchingInputStream.java | 6 +-
.../fs/s3a/ITestS3APrefetchingLruEviction.java | 6 +-
.../hadoop/fs/s3a/ITestS3AS3SeekableStream.java | 130 +++++++++++++++
.../org/apache/hadoop/fs/s3a/S3ATestUtils.java | 15 ++
.../fs/s3a/commit/ITestCommitOperationCost.java | 6 +
.../fs/s3a/commit/ITestS3ACommitterFactory.java | 3 +
.../s3a/commit/magic/ITestMagicCommitProtocol.java | 3 +
.../integration/ITestDirectoryCommitProtocol.java | 9 +
.../ITestPartitionedCommitProtocol.java | 9 +
.../integration/ITestStagingCommitProtocol.java | 4 +
.../ITestStagingCommitProtocolFailure.java | 12 +-
.../fileContext/ITestS3AFileContextStatistics.java | 4 +
.../fs/s3a/impl/ITestConnectionTimeouts.java | 11 ++
.../fs/s3a/performance/ITestS3AOpenCost.java | 8 +-
.../fs/s3a/performance/ITestUnbufferDraining.java | 4 +
.../ITestS3AContractStreamIOStatistics.java | 8 +
.../statistics/ITestS3AFileSystemStatistic.java | 6 +
33 files changed, 618 insertions(+), 31 deletions(-)
diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index d28704b7c33..f047cde9cfb 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -525,6 +525,17 @@
<artifactId>amazon-s3-encryption-client-java</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>software.amazon.s3.analyticsaccelerator</groupId>
+ <artifactId>analyticsaccelerator-s3</artifactId>
+ <version>0.0.2</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk.crt</groupId>
+ <artifactId>aws-crt</artifactId>
+ <version>0.29.10</version>
+ </dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
diff --git
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index b03c41c7bb1..d563181bb61 100644
---
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -1760,4 +1760,38 @@ public final class Constants {
* Value: {@value}.
*/
public static final String S3A_IO_RATE_LIMIT = "fs.s3a.io.rate.limit";
+
+
+ /**
+ * Prefix to configure Analytics Accelerator Library.
+ */
+ public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX =
+ "fs.s3a.analytics.accelerator";
+
+ /**
+ * Config to enable Analytics Accelerator Library for Amazon S3.
+ * https://github.com/awslabs/analytics-accelerator-s3
+ */
+ public static final String ANALYTICS_ACCELERATOR_ENABLED_KEY =
+ ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + ".enabled";
+
+ /**
+ * Config to enable usage of crt client with Analytics Accelerator Library.
+ * It is by default true.
+ */
+ public static final String ANALYTICS_ACCELERATOR_CRT_ENABLED =
+ "fs.s3a.analytics.accelerator.crt.client";
+
+ /**
+ * Default value for {@link #ANALYTICS_ACCELERATOR_ENABLED_KEY }
+ * Value {@value}.
+ */
+ public static final boolean ANALYTICS_ACCELERATOR_ENABLED_DEFAULT = false;
+
+ /**
+ * Default value for {@link #ANALYTICS_ACCELERATOR_CRT_ENABLED }
+ * Value {@value}.
+ */
+ public static final boolean ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT = true;
+
}
diff --git
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index c0e530cb5ce..e85cae3942b 100644
---
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -54,7 +54,9 @@ import javax.annotation.Nullable;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkException;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient;
import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
import
software.amazon.awssdk.services.s3.model.CompleteMultipartUploadResponse;
import software.amazon.awssdk.services.s3.model.GetBucketLocationRequest;
@@ -87,6 +89,11 @@ import
software.amazon.awssdk.transfer.s3.model.CompletedCopy;
import software.amazon.awssdk.transfer.s3.model.Copy;
import software.amazon.awssdk.transfer.s3.model.CopyRequest;
+import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -317,6 +324,13 @@ public class S3AFileSystem extends FileSystem implements
StreamCapabilities,
*/
private S3Client s3Client;
+ /**
+ * CRT-Based S3Client created of analytics accelerator library is enabled
+ * and managed by the S3AStoreImpl. Analytics accelerator library can be
+ * enabled with {@link Constants#ANALYTICS_ACCELERATOR_ENABLED_KEY}
+ */
+ private S3AsyncClient s3AsyncClient;
+
// initial callback policy is fail-once; it's there just to assist
// some mock tests and other codepaths trying to call the low level
// APIs on an uninitialized filesystem.
@@ -344,6 +358,11 @@ public class S3AFileSystem extends FileSystem implements
StreamCapabilities,
// If true, the prefetching input stream is used for reads.
private boolean prefetchEnabled;
+ // If true, S3SeekableInputStream from Analytics Accelerator for Amazon S3
will be used.
+ private boolean analyticsAcceleratorEnabled;
+
+ private boolean analyticsAcceleratorCRTEnabled;
+
// Size in bytes of a single prefetch block.
private int prefetchBlockSize;
@@ -525,6 +544,11 @@ public class S3AFileSystem extends FileSystem implements
StreamCapabilities,
*/
private boolean s3AccessGrantsEnabled;
+ /**
+ * Factory to create S3SeekableInputStream if {@link
this#analyticsAcceleratorEnabled} is true.
+ */
+ private S3SeekableInputStreamFactory s3SeekableInputStreamFactory;
+
/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
@@ -680,8 +704,21 @@ public class S3AFileSystem extends FileSystem implements
StreamCapabilities,
this.prefetchBlockSize = (int) prefetchBlockSizeLong;
this.prefetchBlockCount =
intOption(conf, PREFETCH_BLOCK_COUNT_KEY,
PREFETCH_BLOCK_DEFAULT_COUNT, 1);
+
+ this.analyticsAcceleratorEnabled =
+ conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY,
ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
+ this.analyticsAcceleratorCRTEnabled =
+ conf.getBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED,
+ ANALYTICS_ACCELERATOR_CRT_ENABLED_DEFAULT);
+
this.isMultipartUploadEnabled =
conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
- DEFAULT_MULTIPART_UPLOAD_ENABLED);
+ DEFAULT_MULTIPART_UPLOAD_ENABLED);
+
+ if(this.analyticsAcceleratorEnabled && !analyticsAcceleratorCRTEnabled) {
+ // Temp change: Analytics Accelerator with S3AsyncClient do not
support Multi-part upload.
+ this.isMultipartUploadEnabled = false;
+ }
+
// multipart copy and upload are the same; this just makes it explicit
this.isMultipartCopyEnabled = isMultipartUploadEnabled;
@@ -809,6 +846,27 @@ public class S3AFileSystem extends FileSystem implements
StreamCapabilities,
// directly through the client manager.
// this is to aid mocking.
s3Client = store.getOrCreateS3Client();
+
+ if (this.analyticsAcceleratorEnabled) {
+ LOG.info("Using S3SeekableInputStream");
+ if(this.analyticsAcceleratorCRTEnabled) {
+ LOG.info("Using S3 CRT client for analytics accelerator S3");
+ this.s3AsyncClient =
S3CrtAsyncClient.builder().maxConcurrency(600).build();
+ } else {
+ LOG.info("Using S3 async client for analytics accelerator S3");
+ this.s3AsyncClient = store.getOrCreateAsyncClient();
+ }
+
+ ConnectorConfiguration configuration = new ConnectorConfiguration(conf,
+ ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+ S3SeekableInputStreamConfiguration seekableInputStreamConfiguration =
+
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
+ this.s3SeekableInputStreamFactory =
+ new S3SeekableInputStreamFactory(
+ new S3SdkObjectClient(this.s3AsyncClient),
+ seekableInputStreamConfiguration);
+ }
+
// The filesystem is now ready to perform operations against
// S3
// This initiates a probe against S3 for the bucket existing.
@@ -1876,6 +1934,8 @@ public class S3AFileSystem extends FileSystem implements
StreamCapabilities,
final Path path,
final OpenFileSupport.OpenFileInformation fileInformation)
throws IOException {
+
+
// create the input stream statistics before opening
// the file so that the time to prepare to open the file is included.
S3AInputStreamStatistics inputStreamStats =
@@ -1892,6 +1952,14 @@ public class S3AFileSystem extends FileSystem implements
StreamCapabilities,
fileInformation.applyOptions(readContext);
LOG.debug("Opening '{}'", readContext);
+ if (this.analyticsAcceleratorEnabled) {
+ return new FSDataInputStream(
+ new S3ASeekableStream(
+ this.bucket,
+ pathToKey(path),
+ s3SeekableInputStreamFactory));
+ }
+
if (this.prefetchEnabled) {
Configuration configuration = getConf();
initLocalDirAllocatorIfNotInitialized(configuration);
@@ -4421,9 +4489,11 @@ public class S3AFileSystem extends FileSystem implements
StreamCapabilities,
protected synchronized void stopAllServices() {
try {
trackDuration(getDurationTrackerFactory(), FILESYSTEM_CLOSE.getSymbol(),
() -> {
- closeAutocloseables(LOG, store);
+ closeAutocloseables(LOG, store, s3SeekableInputStreamFactory);
store = null;
s3Client = null;
+ s3AsyncClient = null;
+ s3SeekableInputStreamFactory = null;
// At this point the S3A client is shut down,
// now the executor pools are closed
diff --git
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java
new file mode 100644
index 00000000000..ef6a2990815
--- /dev/null
+++
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ASeekableStream.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.StreamCapabilities;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSInputStream;
+
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
+import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
+import software.amazon.s3.analyticsaccelerator.util.S3URI;
+
+public class S3ASeekableStream extends FSInputStream implements
StreamCapabilities {
+
+ private S3SeekableInputStream inputStream;
+ private long lastReadCurrentPos = 0;
+ private final String key;
+ private volatile boolean closed;
+
+ public static final Logger LOG =
LoggerFactory.getLogger(S3ASeekableStream.class);
+
+ public S3ASeekableStream(String bucket, String key,
+ S3SeekableInputStreamFactory
s3SeekableInputStreamFactory) {
+ this.inputStream =
s3SeekableInputStreamFactory.createStream(S3URI.of(bucket, key));
+ this.key = key;
+ }
+
+ /**
+ * Indicates whether the given {@code capability} is supported by this
stream.
+ *
+ * @param capability the capability to check.
+ * @return true if the given {@code capability} is supported by this stream,
false otherwise.
+ */
+ @Override
+ public boolean hasCapability(String capability) {
+ return false;
+ }
+
+ @Override
+ public int read() throws IOException {
+ throwIfClosed();
+ int bytesRead;
+ try {
+ bytesRead = inputStream.read();
+ } catch (IOException ioe) {
+ onReadFailure(ioe);
+ throw ioe;
+ }
+ return bytesRead;
+ }
+
+ @Override
+ public void seek(long pos) throws IOException {
+ throwIfClosed();
+ if (pos < 0) {
+ throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK
+ + " " + pos);
+ }
+ inputStream.seek(pos);
+ }
+
+
+ @Override
+ public synchronized long getPos() {
+ if (!closed) {
+ lastReadCurrentPos = inputStream.getPos();
+ }
+ return lastReadCurrentPos;
+ }
+
+
+ /**
+ * Reads the last n bytes from the stream into a byte buffer. Blocks until
end of stream is
+ * reached. Leaves the position of the stream unaltered.
+ *
+ * @param buf buffer to read data into
+ * @param off start position in buffer at which data is written
+ * @param len the number of bytes to read; the n-th byte should be the last
byte of the stream.
+ * @return the total number of bytes read into the buffer
+ * @throws IOException if an I/O error occurs
+ */
+ public int readTail(byte[] buf, int off, int len) throws IOException {
+ throwIfClosed();
+ int bytesRead;
+ try {
+ bytesRead = inputStream.readTail(buf, off, len);
+ } catch (IOException ioe) {
+ onReadFailure(ioe);
+ throw ioe;
+ }
+ return bytesRead;
+ }
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws IOException {
+ throwIfClosed();
+ int bytesRead;
+ try {
+ bytesRead = inputStream.read(buf, off, len);
+ } catch (IOException ioe) {
+ onReadFailure(ioe);
+ throw ioe;
+ }
+ return bytesRead;
+ }
+
+
+ @Override
+ public boolean seekToNewSource(long l) throws IOException {
+ return false;
+ }
+
+ @Override
+ public int available() throws IOException {
+ throwIfClosed();
+ return super.available();
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ if(!closed) {
+ closed = true;
+ try {
+ inputStream.close();
+ inputStream = null;
+ super.close();
+ } catch (IOException ioe) {
+ LOG.debug("Failure closing stream {}: ", key);
+ throw ioe;
+ }
+ }
+ }
+
+ /**
+ * Close the stream on read failure.
+ * No attempt to recover from failure
+ *
+ * @param ioe exception caught.
+ */
+ @Retries.OnceTranslated
+ private void onReadFailure(IOException ioe) throws IOException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Got exception while trying to read from stream {}, " +
+ "not trying to recover:",
+ key, ioe);
+ } else {
+ LOG.info("Got exception while trying to read from stream {}, " +
+ "not trying to recover:",
+ key, ioe);
+ }
+ this.close();
+ }
+
+
+ protected void throwIfClosed() throws IOException {
+ if (closed) {
+ throw new IOException(key + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
index 000caf32883..fd569422b42 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
@@ -30,8 +30,7 @@ import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
/**
* S3A contract tests creating files.
@@ -88,6 +87,14 @@ public class ITestS3AContractCreate extends
AbstractContractCreateTest {
return conf;
}
+ @Override
+ public void testOverwriteExistingFile() throws Throwable {
+ // Will remove this when Analytics Accelerator supports overwrites
+ skipIfAnalyticsAcceleratorEnabled(this.createConfiguration(),
+ "Analytics Accelerator does not support overwrites yet");
+ super.testOverwriteExistingFile();
+ }
+
@Override
public void testOverwriteNonEmptyDirectory() throws Throwable {
try {
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
index e761e0d14bf..f6127700b5f 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.contract.s3a;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static
org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageStatistics;
@@ -78,6 +79,14 @@ public class ITestS3AContractDistCp extends
AbstractContractDistCpTest {
getRenameOperationCount() - renames);
}
+ @Override
+ public void testDistCpUpdateCheckFileSkip() throws Exception {
+ //Will remove this when Analytics Accelerator supports overwrites
+ skipIfAnalyticsAcceleratorEnabled(createConfiguration(),
+ "Analytics Accelerator Library does not support update to existing
files");
+ super.testDistCpUpdateCheckFileSkip();
+ }
+
private long getRenameOperationCount() {
return getFileSystem().getStorageStatistics()
.getLong(StorageStatistics.CommonStatisticNames.OP_RENAME);
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractRename.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractRename.java
index d3ba7373cc9..9f5246d9dde 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractRename.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractRename.java
@@ -36,6 +36,7 @@ import static
org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static
org.apache.hadoop.fs.contract.ContractTestUtils.verifyFileContents;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.S3A_TEST_TIMEOUT;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
/**
* S3A contract tests covering rename.
@@ -45,6 +46,13 @@ public class ITestS3AContractRename extends
AbstractContractRenameTest {
public static final Logger LOG = LoggerFactory.getLogger(
ITestS3AContractRename.class);
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ skipIfAnalyticsAcceleratorEnabled(createConfiguration(),
+ "Analytics Accelerator does not support rename");
+
+ }
@Override
protected int getTestTimeoutMillis() {
return S3A_TEST_TIMEOUT;
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
index 8096f55bcd5..41aa90434cd 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
@@ -61,6 +61,7 @@ import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_RE
import static org.apache.hadoop.fs.contract.ContractTestUtils.range;
import static
org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
import static
org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
@@ -84,6 +85,17 @@ public class ITestS3AContractVectoredRead extends
AbstractContractVectoredReadTe
return new S3AContract(conf);
}
+ /**
+ * Analytics Accelerator Library for Amazon S3 does not support Vectored
Reads.
+ * @throws Exception
+ */
+ @Override
+ public void setup() throws Exception {
+ skipIfAnalyticsAcceleratorEnabled(createConfiguration(),
+ "Analytics Accelerator does not support vectored reads");
+ super.setup();
+ }
+
/**
* Verify response to a vector read request which is beyond the
* real length of the file.
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADelayedFNF.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADelayedFNF.java
index ca9d185c3e9..4793092b717 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADelayedFNF.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ADelayedFNF.java
@@ -36,6 +36,7 @@ import static
org.apache.hadoop.fs.s3a.Constants.CHANGE_DETECT_SOURCE;
import static org.apache.hadoop.fs.s3a.Constants.RETRY_INTERVAL;
import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
/**
* Tests behavior of a FileNotFound error that happens after open(), i.e. on
@@ -65,6 +66,8 @@ public class ITestS3ADelayedFNF extends AbstractS3ATestBase {
*/
@Test
public void testNotFoundFirstRead() throws Exception {
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "Temporarily disabling to fix Exception handling on Analytics
Accelerator");
S3AFileSystem fs = getFileSystem();
ChangeDetectionPolicy changeDetectionPolicy =
fs.getChangeDetectionPolicy();
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
index d22de3b06d8..12e5ef3841a 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
@@ -38,9 +38,7 @@ import static
org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY;
import static
org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM;
import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeSkipRootTests;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
@@ -93,6 +91,8 @@ public class ITestS3AEncryptionSSEC extends
AbstractTestS3AEncryption {
@Override
public void setup() throws Exception {
super.setup();
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "Analytics Accelerator does not support SSEC");
assumeEnabled();
// although not a root dir test, this confuses paths enough it shouldn't
be run in
// parallel with other jobs
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
index 0281c57f5cb..1d3806e75e3 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
@@ -29,9 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.createTestPath;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
/**
* S3A Test suite for the FSMainOperationsBaseTest tests.
@@ -78,6 +76,20 @@ public class ITestS3AFSMainOperations extends
FSMainOperationsBaseTest {
throws Exception {
}
+ @Override
+ public void testWriteReadAndDeleteOneAndAHalfBlocks() throws Exception {
+ // Skipping this test for AnalyticsAccelerator as it is acting as an
overwrite test
+ skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(),
+ "Analytics Accelerator does not support overwrites");
+ }
+
+ @Override
+ public void testWriteReadAndDeleteTwoBlocks() throws Exception {
+ // Skipping this test for AnalyticsAccelerator as it is acting as an
overwrite test
+ skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(),
+ "Analytics Accelerator does not support overwrites");
+ }
+
@Override
public void testOverwrite() throws IOException {
boolean createPerformance = isCreatePerformanceEnabled(fSys);
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
index 48081457658..32d9a511a41 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
@@ -34,8 +34,7 @@ import org.apache.hadoop.fs.FileSystemContractBaseTest;
import org.apache.hadoop.fs.Path;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.junit.Assume.*;
import static org.junit.Assert.*;
@@ -160,4 +159,12 @@ public class ITestS3AFileSystemContract extends
FileSystemContractBaseTest {
}
}
}
+
+ @Override
+ public void testOverWriteAndRead() throws Exception {
+ //Will remove this when Analytics Accelerator supports overwrites
+ skipIfAnalyticsAcceleratorEnabled(fs.getConf(),
+ "Analytics Accelerator does not support overwrites");
+ super.testOverWriteAndRead();
+ }
}
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
index 70dc5ee476c..005b2fbf91d 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
@@ -43,6 +43,7 @@ import static
org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
@@ -77,6 +78,10 @@ public class ITestS3AIOStatisticsContext extends
AbstractS3ATestBase {
public void setup() throws Exception {
super.setup();
executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
+ // TODO: Add IOStatistics Support to S3SeekableStream
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "S3SeekableStream does not support IOStatisticsContext");
+
}
@Override
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java
index 4b871c6a197..4d8956d38e7 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AInputStreamLeakage.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_LEAKS;
import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs;
@@ -58,7 +59,7 @@ public class ITestS3AInputStreamLeakage extends
AbstractS3ATestBase {
@Override
public void setup() throws Exception {
super.setup();
- assume("Stream leak detection not avaialable",
+ assume("Stream leak detection not available",
getFileSystem().hasCapability(STREAM_LEAKS));
}
@@ -89,6 +90,10 @@ public class ITestS3AInputStreamLeakage extends
AbstractS3ATestBase {
@Test
public void testFinalizer() throws Throwable {
Path path = methodPath();
+ // TODO: Add Leak Detection to S3SeekableStream
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "S3SeekableStream does not support leak detection");
+
final S3AFileSystem fs = getFileSystem();
ContractTestUtils.createFile(fs, path, true, DATASET);
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
index 3bfe69c2bca..61d15999635 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
/**
@@ -51,6 +52,10 @@ public class ITestS3AMetrics extends AbstractS3ATestBase {
@Test
public void testStreamStatistics() throws IOException {
+ // TODO: Add StreamStatistics support to S3SeekableStream
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "S3SeekableStream does not support stream statistics");
+
S3AFileSystem fs = getFileSystem();
Path file = path("testStreamStatistics");
byte[] data = "abcdefghijklmnopqrstuvwxyz".getBytes();
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java
index 5e6731ed520..5e37e68d8be 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java
@@ -37,10 +37,7 @@ import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
-import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
-import static org.apache.hadoop.fs.s3a.Constants.ENDPOINT;
-import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
-import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.*;
import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
@@ -77,7 +74,6 @@ public class ITestS3APrefetchingCacheFiles extends
AbstractS3ACostTest {
super.setup();
// Sets BUFFER_DIR by calling S3ATestUtils#prepareTestConfiguration
conf = createConfiguration();
-
testFile = getExternalData(conf);
prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
fs = FileSystem.get(testFile.toUri(), conf);
@@ -98,6 +94,8 @@ public class ITestS3APrefetchingCacheFiles extends
AbstractS3ACostTest {
final String bufferDirBase = configuration.get(BUFFER_DIR);
bufferDir = bufferDirBase + "/" + UUID.randomUUID();
configuration.set(BUFFER_DIR, bufferDir);
+ // When both Prefetching and Analytics Accelerator enabled Analytics
Accelerator is used
+ configuration.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, false);
return configuration;
}
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
index 4998cbc946e..b97be9d18e9 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java
@@ -34,8 +34,7 @@ import
org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.test.LambdaTestUtils;
-import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
-import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.*;
import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMaximum;
import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
@@ -75,6 +74,7 @@ public class ITestS3APrefetchingInputStream extends
AbstractS3ACostTest {
private static final int INTERVAL_MILLIS = 500;
private static final int BLOCK_SIZE = S_1K * 10;
+
@Override
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
@@ -82,6 +82,8 @@ public class ITestS3APrefetchingInputStream extends
AbstractS3ACostTest {
S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_BLOCK_SIZE_KEY);
conf.setBoolean(PREFETCH_ENABLED_KEY, true);
conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ // When both Prefetching and Analytics Accelerator enabled Analytics
Accelerator is used
+ conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, false);
return conf;
}
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java
index 0fa08f37cf9..8aefcc8535a 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java
@@ -42,9 +42,7 @@ import
org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.test.LambdaTestUtils;
-import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
-import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
-import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;
+import static org.apache.hadoop.fs.s3a.Constants.*;
import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValues;
import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
@@ -93,6 +91,8 @@ public class ITestS3APrefetchingLruEviction extends
AbstractS3ACostTest {
conf.setBoolean(PREFETCH_ENABLED_KEY, true);
conf.setInt(PREFETCH_MAX_BLOCKS_COUNT, Integer.parseInt(maxBlocks));
conf.setInt(PREFETCH_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ // When both Prefetching and Analytics Accelerator enabled Analytics
Accelerator is used
+ conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, false);
return conf;
}
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java
new file mode 100644
index 00000000000..c6ecee95051
--- /dev/null
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AS3SeekableStream.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+import static
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_ENABLED_KEY;
+import static
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CRT_ENABLED;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+
+import
software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
+import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
+import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
+
+public class ITestS3AS3SeekableStream extends AbstractS3ATestBase {
+
+ private static final String PHYSICAL_IO_PREFIX = "physicalio";
+ private static final String LOGICAL_IO_PREFIX = "logicalio";
+
+ public void testConnectorFrameWorkIntegration(boolean useCrtClient) throws
IOException {
+ describe("Verify S3 connector framework integration");
+
+ Configuration conf = getConfiguration();
+ removeBaseAndBucketOverrides(conf, ANALYTICS_ACCELERATOR_ENABLED_KEY);
+ conf.setBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY, true);
+ conf.setBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, useCrtClient);
+
+ String testFile = "s3a://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz";
+ S3AFileSystem s3AFileSystem =
+ (S3AFileSystem) FileSystem.newInstance(new Path(testFile).toUri(),
conf);
+ byte[] buffer = new byte[500];
+
+ try (FSDataInputStream inputStream = s3AFileSystem.open(new
Path(testFile))) {
+ inputStream.seek(5);
+ inputStream.read(buffer, 0, 500);
+ }
+
+ }
+
+ @Test
+ public void testConnectorFrameWorkIntegrationWithCrtClient() throws
IOException {
+ testConnectorFrameWorkIntegration(true);
+ }
+
+ @Test
+ public void testConnectorFrameWorkIntegrationWithoutCrtClient() throws
IOException {
+ testConnectorFrameWorkIntegration(false);
+ }
+
+ public void testConnectorFrameworkConfigurable(boolean useCrtClient) {
+ describe("Verify S3 connector framework reads configuration");
+
+ Configuration conf = getConfiguration();
+ removeBaseAndBucketOverrides(conf);
+
+ //Disable Predictive Prefetching
+ conf.set(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+ "." + LOGICAL_IO_PREFIX + ".prefetching.mode", "all");
+
+ //Set Blobstore Capacity
+ conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+ "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", 1);
+
+ conf.setBoolean(ANALYTICS_ACCELERATOR_CRT_ENABLED, useCrtClient);
+
+ ConnectorConfiguration connectorConfiguration =
+ new ConnectorConfiguration(conf,
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+
+ S3SeekableInputStreamConfiguration configuration =
+
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration);
+
+ assertSame("S3ASeekableStream configuration is not set to expected value",
+ PrefetchMode.ALL,
configuration.getLogicalIOConfiguration().getPrefetchingMode());
+
+ assertEquals("S3ASeekableStream configuration is not set to expected
value",
+ 1, configuration.getPhysicalIOConfiguration().getBlobStoreCapacity());
+ }
+
+ @Test
+ public void testConnectorFrameworkConfigurableWithoutCrtClient() throws
IOException {
+ testConnectorFrameworkConfigurable(false);
+ }
+
+ @Test
+ public void testConnectorFrameworkConfigurableWithCrtClient() throws
IOException {
+ testConnectorFrameworkConfigurable(true);
+ }
+
+ @Test
+ public void testInvalidConfigurationThrows() {
+ describe("Verify S3 connector framework throws with invalid
configuration");
+
+ Configuration conf = getConfiguration();
+ removeBaseAndBucketOverrides(conf);
+ //Disable Sequential Prefetching
+ conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+ "." + PHYSICAL_IO_PREFIX + ".blobstore.capacity", -1);
+
+ ConnectorConfiguration connectorConfiguration =
+ new ConnectorConfiguration(conf,
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
+ assertThrows("S3ASeekableStream illegal configuration does not throw",
+ IllegalArgumentException.class, () ->
+
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
+ }
+}
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 308838c2927..e9ebd98169f 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -574,6 +574,21 @@ public final class S3ATestUtils {
return S3ExpressStorage.isS3ExpressStore(getTestBucketName(conf), "");
}
+ /**
+ * Skip a test if the Analytics Accelerator Library for Amazon S3 is enabled.
+ * @param configuration configuration to probe
+ */
+ public static void skipIfAnalyticsAcceleratorEnabled(
+ Configuration configuration, String message) {
+ assume(message,
+ !isAnalyticsAcceleratorEnabled(configuration));
+ }
+
+ public static boolean isAnalyticsAcceleratorEnabled(final Configuration
conf) {
+ return conf.getBoolean(ANALYTICS_ACCELERATOR_ENABLED_KEY,
+ ANALYTICS_ACCELERATOR_ENABLED_DEFAULT);
+ }
+
/**
* Skip a test if the filesystem lacks a required capability.
* @param fs filesystem
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
index fbe1a0a3120..9041a20aeeb 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.s3a.commit.impl.CommitOperations;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.s3a.Statistic.ACTION_HTTP_GET_REQUEST;
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_FILES_CREATED;
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT;
@@ -203,6 +204,9 @@ public class ITestCommitOperationCost extends
AbstractS3ACostTest {
@Test
public void testCostOfCreatingMagicFile() throws Throwable {
describe("Files created under magic paths skip existence checks and marker
deletes");
+
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "S3ASeekableInputStream does not support InputStreamStatistics");
S3AFileSystem fs = getFileSystem();
Path destFile = methodSubPath("file.txt");
fs.delete(destFile.getParent(), true);
@@ -282,6 +286,8 @@ public class ITestCommitOperationCost extends
AbstractS3ACostTest {
public void testCostOfSavingLoadingPendingFile() throws Throwable {
describe("Verify costs of saving .pending file under a magic path");
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "S3ASeekableInputStream does not support InputStreamStatistics");
S3AFileSystem fs = getFileSystem();
Path partDir = methodSubPath("file.pending");
Path destFile = new Path(partDir, "file.pending");
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java
index 2561a69f60b..a500bfb76a3 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java
@@ -47,6 +47,7 @@ import
org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.hadoop.security.UserGroupInformation;
import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static
org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.COMMITTER_NAME_STAGING;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@@ -182,6 +183,8 @@ public final class ITestS3ACommitterFactory extends
AbstractCommitITest {
// destroy all filesystems from previous runs.
FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
super.setup();
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "S3ASeekableInputStream does not support InputStreamStatistics");
jobId = randomJobId();
attempt0 = "attempt_" + jobId + "_m_000000_0";
taskAttempt0 = TaskAttemptID.forName(attempt0);
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
index cbfc23a2a29..b89740ae312 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/magic/ITestMagicCommitProtocol.java
@@ -46,6 +46,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
import static
org.apache.hadoop.fs.s3a.commit.impl.CommitUtilsWithMR.getMagicJobPath;
@@ -77,6 +78,8 @@ public class ITestMagicCommitProtocol extends
AbstractITCommitProtocol {
@Override
public void setup() throws Exception {
super.setup();
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "S3ASeekableInputStream does not support InputStreamStatistics");
CommitUtils.verifyIsMagicCommitFS(getFileSystem());
}
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java
index b19662c0117..cbd41fd1c93 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestDirectoryCommitProtocol.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static
org.apache.hadoop.fs.s3a.commit.CommitConstants.CONFLICT_MODE_APPEND;
import static
org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_CONFLICT_MODE;
import static
org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_EXPERIMENTAL_COLLECT_IOSTATISTICS;
@@ -42,6 +43,14 @@ import static
org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_EXPE
/** ITest of the low level protocol methods. */
public class ITestDirectoryCommitProtocol extends ITestStagingCommitProtocol {
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "Skipping because these tests will fail when using CRT client");
+ }
+
@Override
protected String suitename() {
return "ITestDirectoryCommitProtocol";
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionedCommitProtocol.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionedCommitProtocol.java
index e3bc1500dab..b6438157e4e 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionedCommitProtocol.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestPartitionedCommitProtocol.java
@@ -32,10 +32,19 @@ import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
/** ITest of the low level protocol methods. */
public class ITestPartitionedCommitProtocol extends ITestStagingCommitProtocol
{
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "Skipping because these tests will fail when using CRT client");
+ }
+
@Override
protected String suitename() {
return "ITestPartitionedCommitProtocol";
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
index 81c3af812ab..d2b4e1c5295 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocol.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
/** Test the staging committer's handling of the base protocol operations. */
@@ -65,6 +66,9 @@ public class ITestStagingCommitProtocol extends
AbstractITCommitProtocol {
@Override
public void setup() throws Exception {
super.setup();
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "Skipping because these tests will fail when using CRT client");
+
// identify working dir for staging and delete
Configuration conf = getConfiguration();
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java
index 08b6c21a863..5604aa76369 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/integration/ITestStagingCommitProtocolFailure.java
@@ -32,9 +32,7 @@ import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static
org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_NAME;
import static
org.apache.hadoop.fs.s3a.commit.CommitConstants.S3A_COMMITTER_FACTORY_KEY;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@@ -45,6 +43,14 @@ import static
org.apache.hadoop.test.LambdaTestUtils.intercept;
*/
public class ITestStagingCommitProtocolFailure extends AbstractS3ATestBase {
+
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "Skipping because these tests will fail when using CRT client");
+ }
+
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
index 1724006a831..dc086f7c423 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
@@ -30,6 +30,8 @@ import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
+
/**
* S3a implementation of FCStatisticsBaseTest.
*/
@@ -44,6 +46,8 @@ public class ITestS3AFileContextStatistics extends
FCStatisticsBaseTest {
@Before
public void setUp() throws Exception {
conf = new Configuration();
+ skipIfAnalyticsAcceleratorEnabled(conf,
+ "S3SeekableStream does not support File Context Statistics");
fc = S3ATestUtils.createTestFileContext(conf);
testRootPath = fileContextTestHelper.getTestRootPath(fc, "test");
fc.mkdir(testRootPath,
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java
index a4cc5cadc5d..2a6605b9342 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestConnectionTimeouts.java
@@ -60,6 +60,7 @@ import static
org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static
org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_PATH_PREFIX;
import static
org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.setDurationAsMillis;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
@@ -146,6 +147,11 @@ public class ITestConnectionTimeouts extends
AbstractS3ATestBase {
@Test
public void testGeneratePoolTimeouts() throws Throwable {
skipIfClientSideEncryption();
+
+ // Assertions will fail when using CRTClient with Analytics Accelerator.
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "Assertions will fail when using CRTClient with Analytics
Accelerator");
+
AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
Configuration conf = timingOutConfiguration();
Path path = methodPath();
@@ -188,6 +194,11 @@ public class ITestConnectionTimeouts extends
AbstractS3ATestBase {
@Test
public void testObjectUploadTimeouts() throws Throwable {
skipIfClientSideEncryption();
+
+ // Assertions will fail when using CRTClient with Analytics Accelerator.
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "Assertions will fail when using CRTClient with Analytics
Accelerator");
+
AWSClientConfig.setMinimumOperationDuration(Duration.ZERO);
final Path dir = methodPath();
Path file = new Path(dir, "file");
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
index 8e61225e17e..c07c5dd5086 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
@@ -54,10 +54,7 @@ import static
org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_DEFAULT;
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.assertStreamIsNotChecksummed;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
-import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
import static
org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
@@ -114,6 +111,8 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
@Override
public void setup() throws Exception {
super.setup();
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "Assertions will fail as S3SeekableStream does not support Stream
Statistics");
S3AFileSystem fs = getFileSystem();
testFile = methodPath();
@@ -392,7 +391,6 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
describe("PositionedReadable.read() past the end of the file");
assumeNoPrefetching();
-
verifyMetrics(() -> {
try (FSDataInputStream in =
openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
index 00bae1519f5..a0dec8d476b 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
@@ -54,6 +54,7 @@ import static
org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static
org.apache.hadoop.fs.s3a.impl.ConfigurationHelper.setDurationAsSeconds;
import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static
org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
@@ -125,6 +126,9 @@ public class ITestUnbufferDraining extends
AbstractS3ACostTest {
@Override
public void setup() throws Exception {
super.setup();
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "Skipping because getS3AInputStream will " +
+ "try to cast S3SeekableStream to S3AInputStream");
// now create a new FS with minimal http capacity and recovery
// a separate one is used to avoid test teardown suffering
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
index 0f6b69cd54d..2b332b2b3ee 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.*;
/**
@@ -78,4 +79,11 @@ public class ITestS3AContractStreamIOStatistics extends
STREAM_WRITE_EXCEPTIONS);
}
+ @Override
+ public void setup() throws Exception {
+ super.setup();
+ skipIfAnalyticsAcceleratorEnabled(createConfiguration(),
+ "S3SeekableStream does not support Stream Statistics");
+ }
+
}
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
index 0d5d2a789a0..5a3f7bb8fdb 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
+
public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase {
private static final int ONE_KB = 1024;
@@ -42,6 +44,10 @@ public class ITestS3AFileSystemStatistic extends
AbstractS3ATestBase {
*/
@Test
public void testBytesReadWithStream() throws IOException {
+ // Assertions will fail as {@link S3ASeekableInputStream}
+ // do not support S3AFileSystemStatistics yet.
+ skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
+ "S3SeekableStream does not support File System Statistics");
S3AFileSystem fs = getFileSystem();
Path filePath = path(getMethodName());
byte[] oneKbBuf = new byte[ONE_KB];
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]