This is an automated email from the ASF dual-hosted git repository.
ahmar pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1f099fe2db7 HADOOP-19364. S3A: IoStats support for AAL. (#8007)
1f099fe2db7 is described below
commit 1f099fe2db7dbfd1f1bd6e1ef11edcf2d7b59ca8
Author: ahmarsuhail <[email protected]>
AuthorDate: Thu Nov 20 11:22:30 2025 +0000
HADOOP-19364. S3A: IoStats support for AAL. (#8007)
---
.../hadoop/fs/statistics/StreamStatisticNames.java | 17 ++
.../apache/hadoop/fs/s3a/S3AInstrumentation.java | 36 +++-
.../java/org/apache/hadoop/fs/s3a/Statistic.java | 14 +-
.../s3a/impl/streams/AnalyticsRequestCallback.java | 69 +++++++
.../fs/s3a/impl/streams/AnalyticsStream.java | 45 ++++-
.../s3a/impl/streams/AnalyticsStreamFactory.java | 2 -
.../s3a/statistics/S3AInputStreamStatistics.java | 26 +++
.../statistics/impl/EmptyS3AStatisticsContext.java | 25 +++
...TestS3AContractAnalyticsStreamVectoredRead.java | 71 ++++++-
.../fs/contract/s3a/ITestS3AContractCreate.java | 8 -
.../fs/contract/s3a/ITestS3AContractDistCp.java | 10 -
.../s3a/ITestS3AContractMultipartUploader.java | 7 -
.../ITestS3AAnalyticsAcceleratorStreamReading.java | 214 ++++++++++++++++++++-
.../hadoop/fs/s3a/ITestS3AFSMainOperations.java | 13 --
.../hadoop/fs/s3a/ITestS3AFileSystemContract.java | 7 -
.../hadoop/fs/s3a/ITestS3AIOStatisticsContext.java | 6 +-
.../org/apache/hadoop/fs/s3a/ITestS3AMetrics.java | 5 -
.../org/apache/hadoop/fs/s3a/S3ATestConstants.java | 20 ++
.../fs/s3a/commit/ITestCommitOperationCost.java | 11 +-
.../fileContext/ITestS3AFileContextStatistics.java | 5 -
.../fs/s3a/performance/ITestS3AOpenCost.java | 92 ++++++---
.../ITestS3AContractStreamIOStatistics.java | 5 -
.../statistics/ITestS3AFileSystemStatistic.java | 5 -
23 files changed, 599 insertions(+), 114 deletions(-)
diff --git
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
index 09c19ad071a..e8cb9f6469a 100644
---
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
+++
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
@@ -489,6 +489,23 @@ public final class StreamStatisticNames {
public static final String STREAM_FILE_CACHE_EVICTION
= "stream_file_cache_eviction";
+ /**
+ * Bytes that were prefetched by the stream.
+ */
+ public static final String STREAM_READ_PREFETCHED_BYTES =
"stream_read_prefetched_bytes";
+
+ /**
+ * Tracks failures in footer parsing.
+ */
+ public static final String STREAM_READ_PARQUET_FOOTER_PARSING_FAILED
+ = "stream_read_parquet_footer_parsing_failed";
+
+ /**
+ * A cache hit occurs when the request range can be satisfied by the data in
the cache.
+ */
+ public static final String STREAM_READ_CACHE_HIT = "stream_read_cache_hit";
+
+
private StreamStatisticNames() {
}
diff --git
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index b3c907428ac..bc2c83a6872 100644
---
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -81,6 +81,9 @@
import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_FAILURES;
import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_UNBUFFERED;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_CACHE_HIT;
import static
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
import static org.apache.hadoop.fs.s3a.Statistic.*;
@@ -891,7 +894,12 @@ private InputStreamStatistics(
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES,
StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE,
- StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED)
+ StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST,
+ StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED,
+ StreamStatisticNames.STREAM_READ_CACHE_HIT,
+ StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES,
+ StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED
+ )
.withGauges(STREAM_READ_GAUGE_INPUT_POLICY,
STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(),
STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(),
@@ -1128,6 +1136,32 @@ public void readVectoredBytesDiscarded(int discarded) {
bytesDiscardedInVectoredIO.addAndGet(discarded);
}
+ @Override
+ public void getRequestInitiated() {
+ increment(ACTION_HTTP_GET_REQUEST);
+ }
+
+ @Override
+ public void headRequestInitiated() {
+ increment(StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST);
+ }
+
+ @Override
+ public void bytesPrefetched(long size) {
+ increment(STREAM_READ_PREFETCHED_BYTES, size);
+ }
+
+ @Override
+ public void footerParsingFailed() {
+ increment(STREAM_READ_PARQUET_FOOTER_PARSING_FAILED);
+ }
+
+ @Override
+ public void streamReadCacheHit() {
+ increment(STREAM_READ_CACHE_HIT);
+ }
+
+
@Override
public void executorAcquired(Duration timeInQueue) {
// update the duration fields in the IOStatistics.
diff --git
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index 6389742167d..4b09530ba9b 100644
---
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -458,9 +458,21 @@ public enum Statistic {
StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE,
"Gauge of active memory in use",
TYPE_GAUGE),
+ STREAM_READ_PREFETCH_BYTES(
+ StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES,
+ "Bytes prefetched by AAL stream",
+ TYPE_COUNTER),
+ STREAM_READ_PARQUET_FOOTER_PARSING_FAILED(
+ StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED,
+ "Count of Parquet footer parsing failures encountered by AAL",
+ TYPE_COUNTER),
+ STREAM_READ_CACHE_HIT(
+ StreamStatisticNames.STREAM_READ_CACHE_HIT,
+ "Count of cache hits in AAL stream",
+ TYPE_COUNTER),
- /* Stream Write statistics */
+ /* Stream Write statistics */
STREAM_WRITE_EXCEPTIONS(
StreamStatisticNames.STREAM_WRITE_EXCEPTIONS,
"Count of stream write failures reported",
diff --git
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java
new file mode 100644
index 00000000000..d3940bd86e8
--- /dev/null
+++
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl.streams;
+
+import software.amazon.s3.analyticsaccelerator.util.RequestCallback;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+
+/**
+ * Implementation of AAL's RequestCallback interface that tracks analytics
operations.
+ */
+public class AnalyticsRequestCallback implements RequestCallback {
+ private final S3AInputStreamStatistics statistics;
+
+ /**
+ * Create a new callback instance.
+ * @param statistics the statistics to update
+ */
+ public AnalyticsRequestCallback(S3AInputStreamStatistics statistics) {
+ this.statistics = statistics;
+ }
+
+ @Override
+ public void onGetRequest() {
+ statistics.getRequestInitiated();
+ }
+
+ @Override
+ public void onHeadRequest() {
+ statistics.headRequestInitiated();
+ }
+
+ @Override
+ public void onBlockPrefetch(long start, long end) {
+ statistics.bytesPrefetched(end - start + 1);
+ }
+
+ @Override
+ public void footerParsingFailed() {
+ statistics.footerParsingFailed();
+ }
+
+ @Override
+ public void onReadVectored(int numIncomingRanges, int numCombinedRanges) {
+ statistics.readVectoredOperationStarted(numIncomingRanges,
numCombinedRanges);
+ }
+
+ @Override
+ public void onCacheHit() {
+ statistics.streamReadCacheHit();
+ }
+
+}
+
diff --git
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
index 8920b5b2dfc..954ee3a0e48 100644
---
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
+++
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java
@@ -40,6 +40,7 @@
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
import software.amazon.s3.analyticsaccelerator.util.S3URI;
+import software.amazon.s3.analyticsaccelerator.util.RequestCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,6 +73,7 @@ public AnalyticsStream(final ObjectReadParameters parameters,
final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws
IOException {
super(InputStreamType.Analytics, parameters);
S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
+
this.inputStream =
s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(),
s3Attributes.getKey()), buildOpenStreamInformation(parameters));
getS3AStreamStatistics().streamOpened(InputStreamType.Analytics);
@@ -80,6 +82,9 @@ public AnalyticsStream(final ObjectReadParameters parameters,
@Override
public int read() throws IOException {
throwIfClosed();
+
+ getS3AStreamStatistics().readOperationStarted(getPos(), 1);
+
int bytesRead;
try {
bytesRead = inputStream.read();
@@ -87,6 +92,11 @@ public int read() throws IOException {
onReadFailure(ioe);
throw ioe;
}
+
+ if (bytesRead != -1) {
+ incrementBytesRead(1);
+ }
+
return bytesRead;
}
@@ -122,6 +132,8 @@ public synchronized long getPos() {
*/
public int readTail(byte[] buf, int off, int len) throws IOException {
throwIfClosed();
+ getS3AStreamStatistics().readOperationStarted(getPos(), len);
+
int bytesRead;
try {
bytesRead = inputStream.readTail(buf, off, len);
@@ -129,12 +141,20 @@ public int readTail(byte[] buf, int off, int len) throws
IOException {
onReadFailure(ioe);
throw ioe;
}
+
+ if (bytesRead > 0) {
+ incrementBytesRead(bytesRead);
+ }
+
return bytesRead;
}
@Override
public int read(byte[] buf, int off, int len) throws IOException {
throwIfClosed();
+
+ getS3AStreamStatistics().readOperationStarted(getPos(), len);
+
int bytesRead;
try {
bytesRead = inputStream.read(buf, off, len);
@@ -142,6 +162,11 @@ public int read(byte[] buf, int off, int len) throws
IOException {
onReadFailure(ioe);
throw ioe;
}
+
+ if (bytesRead > 0) {
+ incrementBytesRead(bytesRead);
+ }
+
return bytesRead;
}
@@ -177,8 +202,6 @@ public void readVectored(final List<? extends FileRange>
ranges,
range.setData(result);
}
- // AAL does not do any range coalescing, so input and combined ranges are
the same.
- this.getS3AStreamStatistics().readVectoredOperationStarted(ranges.size(),
ranges.size());
inputStream.readVectored(objectRanges, allocate, release);
}
@@ -247,10 +270,14 @@ private void onReadFailure(IOException ioe) throws
IOException {
}
private OpenStreamInformation
buildOpenStreamInformation(ObjectReadParameters parameters) {
+
+ final RequestCallback requestCallback = new
AnalyticsRequestCallback(getS3AStreamStatistics());
+
OpenStreamInformation.OpenStreamInformationBuilder
openStreamInformationBuilder =
OpenStreamInformation.builder()
.inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext()
- .getInputPolicy()));
+ .getInputPolicy()))
+ .requestCallback(requestCallback);
if (parameters.getObjectAttributes().getETag() != null) {
openStreamInformationBuilder.objectMetadata(ObjectMetadata.builder()
@@ -300,4 +327,16 @@ protected void throwIfClosed() throws IOException {
throw new IOException(getKey() + ": " +
FSExceptionMessages.STREAM_IS_CLOSED);
}
}
+
+ /**
+ * Increment the bytes read counter if there is a stats instance
+ * and the number of bytes read is more than zero.
+ * @param bytesRead number of bytes read
+ */
+ private void incrementBytesRead(long bytesRead) {
+ getS3AStreamStatistics().bytesRead(bytesRead);
+ if (getContext().getStats() != null && bytesRead > 0) {
+ getContext().getStats().incrementBytesRead(bytesRead);
+ }
+ }
}
diff --git
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
index 50333c68e0c..cff743242f1 100644
---
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
+++
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java
@@ -48,7 +48,6 @@ public class AnalyticsStreamFactory extends
AbstractObjectInputStreamFactory {
private S3SeekableInputStreamConfiguration seekableInputStreamConfiguration;
private LazyAutoCloseableReference<S3SeekableInputStreamFactory>
s3SeekableInputStreamFactory;
- private boolean requireCrt;
public AnalyticsStreamFactory() {
super("AnalyticsStreamFactory");
@@ -61,7 +60,6 @@ protected void serviceInit(final Configuration conf) throws
Exception {
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
this.seekableInputStreamConfiguration =
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
- this.requireCrt = false;
}
@Override
diff --git
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
index 7ad7cf75367..1e48b3ef850 100644
---
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
+++
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java
@@ -119,6 +119,32 @@ void readVectoredOperationStarted(int numIncomingRanges,
*/
void readVectoredBytesDiscarded(int discarded);
+ /**
+ * Number of S3 GET requests initiated by the stream.
+ */
+ void getRequestInitiated();
+
+ /**
+ * Number of S3 HEAD requests initiated by the stream.
+ */
+ void headRequestInitiated();
+
+ /**
+ * Number of bytes prefetched.
+ * @param size number of bytes prefetched.
+ */
+ void bytesPrefetched(long size);
+
+ /**
+ * Number of failures in footer parsing.
+ */
+ void footerParsingFailed();
+
+ /**
+ * If the request data is already in the data cache.
+ */
+ void streamReadCacheHit();
+
@Override
void close();
diff --git
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
index 26b9f2b1568..cf339269549 100644
---
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
+++
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
@@ -212,6 +212,31 @@ public void readVectoredBytesDiscarded(int discarded) {
}
+ @Override
+ public void getRequestInitiated() {
+
+ }
+
+ @Override
+ public void headRequestInitiated() {
+
+ }
+
+ @Override
+ public void bytesPrefetched(long size) {
+
+ }
+
+ @Override
+ public void footerParsingFailed() {
+
+ }
+
+ @Override
+ public void streamReadCacheHit() {
+
+ }
+
@Override
public void close() {
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
index 0875851d4d6..3effd5a1ab1 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.fs.contract.s3a;
+import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -26,6 +27,7 @@
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.test.tags.IntegrationTest;
@@ -36,9 +38,20 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static
org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
+import static
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.AAL_CACHE_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.AAL_READ_BUFFER_SIZE;
+import static
org.apache.hadoop.fs.s3a.S3ATestConstants.AAL_REQUEST_COALESCE_TOLERANCE;
+import static
org.apache.hadoop.fs.s3a.S3ATestConstants.AAL_SMALL_OBJECT_PREFETCH_ENABLED;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
+import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipForAnyEncryptionExceptSSES3;
import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
+import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
+import static org.apache.hadoop.io.Sizes.S_16K;
+import static org.apache.hadoop.io.Sizes.S_1K;
+import static org.apache.hadoop.io.Sizes.S_32K;
+
/**
* S3A contract tests for vectored reads with the Analytics stream.
@@ -57,6 +70,18 @@ public ITestS3AContractAnalyticsStreamVectoredRead(String
bufferType) {
super(bufferType);
}
+ private static final String REQUEST_COALESCE_TOLERANCE_KEY =
+ ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." +
AAL_REQUEST_COALESCE_TOLERANCE;
+
+ private static final String READ_BUFFER_SIZE_KEY =
+ ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." +
AAL_READ_BUFFER_SIZE;
+
+ private static final String SMALL_OBJECT_PREFETCH_ENABLED_KEY =
+ ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." +
AAL_SMALL_OBJECT_PREFETCH_ENABLED;
+
+ private static final String CACHE_TIMEOUT_KEY =
+ ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + "." + AAL_CACHE_TIMEOUT;
+
/**
* Create a configuration.
* @return a configuration
@@ -64,6 +89,28 @@ public ITestS3AContractAnalyticsStreamVectoredRead(String
bufferType) {
@Override
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
+
+ S3ATestUtils.disableFilesystemCaching(conf);
+
+ removeBaseAndBucketOverrides(conf,
+ REQUEST_COALESCE_TOLERANCE_KEY,
+ READ_BUFFER_SIZE_KEY,
+ SMALL_OBJECT_PREFETCH_ENABLED_KEY,
+ CACHE_TIMEOUT_KEY);
+
+ // Set the coalesce tolerance to 1KB, default is 1MB.
+ conf.setInt(REQUEST_COALESCE_TOLERANCE_KEY, S_16K);
+
+ // Set the minimum block size to 32KB. AAL uses a default block size of
128KB, which means the minimum size a S3
+ // request will be is 128KB. Since the file being read is 128KB, we need
to use this here to demonstrate that
+ // separate GET requests are made for ranges that are not coalesced.
+ conf.setInt(READ_BUFFER_SIZE_KEY, S_32K);
+
+ // Disable small object prefetched, otherwise anything less than 8MB is
fetched in a single GET.
+ conf.set(SMALL_OBJECT_PREFETCH_ENABLED_KEY, "false");
+
+ conf.setInt(CACHE_TIMEOUT_KEY, 5000);
+
enableAnalyticsAccelerator(conf);
// If encryption is set, some AAL tests will fail.
// This is because AAL caches the head request response, and uses
@@ -102,21 +149,41 @@ public void testNullReleaseOperation() {
@Test
public void testReadVectoredWithAALStatsCollection() throws Exception {
+ List<FileRange> fileRanges = new ArrayList<>();
+ fileRanges.add(FileRange.createFileRange(0, 100));
+ fileRanges.add(FileRange.createFileRange(800, 200));
+ fileRanges.add(FileRange.createFileRange(4 * S_1K, 4 * S_1K));
+ fileRanges.add(FileRange.createFileRange(80 * S_1K, 4 * S_1K));
- List<FileRange> fileRanges = createSampleNonOverlappingRanges();
try (FSDataInputStream in = openVectorFile()) {
in.readVectored(fileRanges, getAllocate());
validateVectoredReadResult(fileRanges, DATASET, 0);
IOStatistics st = in.getIOStatistics();
- // Statistics such as GET requests will be added after IoStats support.
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED, 1);
verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
1);
+
+ // Verify ranges are coalesced, we are using a coalescing tolerance of
16KB, so [0-100, 800-200, 4KB-8KB] will
+ // get coalesced into a single range.
+ verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES, 4);
+ verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES, 2);
+
+ verifyStatisticCounterValue(st, ACTION_HTTP_GET_REQUEST, 2);
+
+ // read the same ranges again to demonstrate that the data is cached,
and no new GETs are made.
+ in.readVectored(fileRanges, getAllocate());
+ verifyStatisticCounterValue(st, ACTION_HTTP_GET_REQUEST, 2);
+
+ // Because of how AAL is currently written, it is not possible to track
cache hits that originate from a
+ // readVectored() accurately. For this reason, cache hits from
readVectored are currently not tracked, for more
+ // details see:
https://github.com/awslabs/analytics-accelerator-s3/issues/359
+ verifyStatisticCounterValue(st,
StreamStatisticNames.STREAM_READ_CACHE_HIT, 0);
}
+
}
}
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
index 65894cc6518..19c483a5de3 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java
@@ -35,7 +35,6 @@
import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE;
import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled;
/**
@@ -100,13 +99,6 @@ protected Configuration createConfiguration() {
@Test
public void testOverwriteNonEmptyDirectory() throws Throwable {
try {
- // Currently analytics accelerator does not support reading of files
that have been overwritten.
- // This is because the analytics accelerator library caches metadata,
and when a file is
- // overwritten, the old metadata continues to be used, until it is
removed from the cache over
- // time. This will be fixed in
https://github.com/awslabs/analytics-accelerator-s3/issues/218.
- skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
- "Analytics Accelerator currently does not support reading of over
written files");
-
super.testOverwriteNonEmptyDirectory();
failWithCreatePerformance();
} catch (AssertionError e) {
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
index 4e3013e9d43..2ee7ccd2975 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
@@ -22,7 +22,6 @@
import static org.apache.hadoop.fs.s3a.Constants.*;
import static
org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageStatistics;
@@ -87,15 +86,6 @@ public void testNonDirectWrite() throws Exception {
@Test
@Override
public void testDistCpUpdateCheckFileSkip() throws Exception {
- // Currently analytics accelerator does not support reading of files that
have been overwritten.
- // This is because the analytics accelerator library caches metadata and
data, and when a
- // file is overwritten, the old data continues to be used, until it is
removed from the
- // cache over time. This will be fixed in
- // https://github.com/awslabs/analytics-accelerator-s3/issues/218.
- // In this test case, the remote file is created, read, then deleted, and
then created again
- // with different contents, and read again, which leads to assertions
failing.
- skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
- "Analytics Accelerator Library does not support update to existing
files");
super.testDistCpUpdateCheckFileSkip();
}
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
index 6d98388503e..c8679f7e349 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
@@ -47,7 +47,6 @@
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBool;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestPropertyBytes;
import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static
org.apache.hadoop.fs.s3a.impl.ChecksumSupport.getChecksumAlgorithm;
import static
org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_HUGE_PARTITION_SIZE;
@@ -173,12 +172,6 @@ public void
testMultipartUploadReverseOrderNonContiguousPartNumbers() throws Exc
@Override
public void testConcurrentUploads() throws Throwable {
assumeNotS3ExpressFileSystem(getFileSystem());
- // Currently analytics accelerator does not support reading of files that
have been overwritten.
- // This is because the analytics accelerator library caches metadata and
data, and when a file
- // is overwritten, the old data continues to be used, until it is removed
from the cache over
- // time. This will be fixed in
https://github.com/awslabs/analytics-accelerator-s3/issues/218.
- skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
- "Analytics Accelerator currently does not support reading of over
written files");
super.testConcurrentUploads();
}
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
index be0d2cc5b20..4d3bb5487f6 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java
@@ -43,14 +43,27 @@
import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_PARQUET;
import static
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static
org.apache.hadoop.fs.audit.AuditStatisticNames.AUDIT_REQUEST_EXECUTION;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static
org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static
org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
-import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
+import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST;
import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.ANALYTICS_STREAM_FACTORY_CLOSED;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_CACHE_HIT;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPERATIONS;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED;
+import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES;
+import static org.apache.hadoop.io.Sizes.S_1K;
+import static org.apache.hadoop.io.Sizes.S_1M;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static software.amazon.s3.analyticsaccelerator.util.Constants.ONE_KB;
+import static software.amazon.s3.analyticsaccelerator.util.Constants.ONE_MB;
/**
* Tests integration of the
@@ -88,6 +101,16 @@ public void testConnectorFrameWorkIntegration() throws
Throwable {
S3AFileSystem fs =
(S3AFileSystem) FileSystem.get(externalTestFile.toUri(),
getConfiguration());
+
+ final long initialAuditCount = fs.getIOStatistics().counters()
+ .getOrDefault(AUDIT_REQUEST_EXECUTION, 0L);
+
+ long fileLength = fs.getFileStatus(externalTestFile).getLen();
+
+ // Head request for the file length.
+ verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION,
+ initialAuditCount + 1);
+
byte[] buffer = new byte[500];
IOStatistics ioStats;
@@ -105,9 +128,21 @@ public void testConnectorFrameWorkIntegration() throws
Throwable {
Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics);
Assertions.assertThat(objectInputStream.getInputPolicy())
.isEqualTo(S3AInputPolicy.Sequential);
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, 500);
+ verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);
+
+ long streamBytesRead =
objectInputStream.getS3AStreamStatistics().getBytesRead();
+ Assertions.assertThat(streamBytesRead).as("Stream statistics should
track bytes read")
+ .isEqualTo(500);
}
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+
+ // Since policy is WHOLE_FILE, the whole file starts getting prefetched as
soon as the stream to it is opened.
+ // So prefetched bytes is fileLen - 5
+ verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES,
fileLength - 5);
+
fs.close();
verifyStatisticCounterValue(fs.getIOStatistics(),
ANALYTICS_STREAM_FACTORY_CLOSED, 1);
@@ -115,7 +150,70 @@ public void testConnectorFrameWorkIntegration() throws
Throwable {
// in which case, AAL will start prefetching till EoF on file open in 8MB
chunks. The file read here
// s3://noaa-cors-pds/raw/2023/017/ohfh/OHFH017d.23_.gz, has a size of
~21MB, resulting in 3 GETS:
// [0-8388607, 8388608-16777215, 16777216-21511173].
- verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION,
4);
+ verifyStatisticCounterValue(fs.getIOStatistics(), AUDIT_REQUEST_EXECUTION,
+ initialAuditCount + 1 + 4);
+ }
+
+ @Test
+ public void testSequentialPrefetching() throws IOException {
+
+ Configuration conf = getConfiguration();
+
+ // AAL uses a caffeine cache, and expires any prefetched data for a key 1s
after it was last accessed by default.
+ // While this works well when running on EC2, for local testing, it can
take more than 1s to download large chunks
+ // of data. Set this value to higher for testing to prevent early cache
evictions.
+ conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
+ "." + AAL_CACHE_TIMEOUT, 10000);
+
+ S3AFileSystem fs =
+ (S3AFileSystem) FileSystem.get(externalTestFile.toUri(),
getConfiguration());
+ byte[] buffer = new byte[10 * ONE_MB];
+ IOStatistics ioStats;
+
+ long fileLength = fs.getFileStatus(externalTestFile).getLen();
+
+ // Here we read through the 21MB external test file, but do not pass in
the WHOLE_FILE policy. Instead, we rely
+ // on AAL detecting a sequential pattern being read, and then prefetching
bytes in a geometrical progression.
+ // AAL's sequential prefetching starts prefetching in increments 4MB, 8MB,
16MB etc. depending on how many
+ // sequential reads happen.
+ try (FSDataInputStream inputStream = fs.open(externalTestFile)) {
+ ioStats = inputStream.getIOStatistics();
+
+ inputStream.readFully(buffer, 0, ONE_MB);
+ // The first sequential read, so prefetch the next 4MB.
+ inputStream.readFully(buffer, 0, ONE_MB);
+
+ // Since ONE_MB was requested by the reader, the prefetched bytes are
3MB.
+ verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 3 *
ONE_MB);
+
+ // These next two reads are within the last prefetched bytes, so no
further bytes are prefetched.
+ inputStream.readFully(buffer, 0, 2 * ONE_MB);
+ inputStream.readFully(buffer, 0, ONE_MB);
+ verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 3 *
ONE_MB);
+ // Two cache hits, as the previous two reads were already prefetched.
+ verifyStatisticCounterValue(ioStats, STREAM_READ_CACHE_HIT, 2);
+
+ // Another sequential read, GP will now prefetch the next 8MB of data.
+ inputStream.readFully(buffer, 0, ONE_MB);
+ // Cache hit is still 2, as the previous read required a new GET request
as it was outside the previously fetched
+ // 4MB.
+ verifyStatisticCounterValue(ioStats, STREAM_READ_CACHE_HIT, 2);
+ // A total of 10MB is prefetched - 3MB and then 7MB.
+ verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 10 *
ONE_MB);
+ long bytesRemainingForPrefetch = fileLength - (inputStream.getPos() + 10
* ONE_MB);
+ inputStream.readFully(buffer, 0, 10 * ONE_MB);
+
+
+ // Though the next GP should prefetch 16MB, since the file is ~23MB,
only the bytes till EoF are prefetched.
+ verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES,
+ 10 * ONE_MB + bytesRemainingForPrefetch);
+ inputStream.readFully(buffer, 0, 3 * ONE_MB);
+ verifyStatisticCounterValue(ioStats, STREAM_READ_CACHE_HIT, 3);
+ }
+
+ // verify all AAL stats are passed to the FS.
+ verifyStatisticCounterValue(fs.getIOStatistics(), STREAM_READ_CACHE_HIT,
3);
+ verifyStatisticCounterValue(fs.getIOStatistics(),
STREAM_READ_PARQUET_FOOTER_PARSING_FAILED, 0);
}
@Test
@@ -134,9 +232,33 @@ public void testMalformedParquetFooter() throws
IOException {
Path sourcePath = new Path(file.toURI().getPath());
getFileSystem().copyFromLocalFile(false, true, sourcePath, dest);
+ long fileLength = getFileSystem().getFileStatus(dest).getLen();
+
byte[] buffer = new byte[500];
IOStatistics ioStats;
+ int bytesRead;
+
+ try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+ ioStats = inputStream.getIOStatistics();
+ inputStream.seek(5);
+ bytesRead = inputStream.read(buffer, 0, 500);
+
+ ObjectInputStream objectInputStream = (ObjectInputStream)
inputStream.getWrappedStream();
+ long streamBytesRead =
objectInputStream.getS3AStreamStatistics().getBytesRead();
+ Assertions.assertThat(streamBytesRead).as("Stream statistics should
track bytes read")
+ .isEqualTo(bytesRead);
+ }
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
+ verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0);
+ // This file has a content length of 451. Since it's a parquet file, AAL
will prefetch the footer bytes (last 32KB),
+ // as soon as the file is opened, but because the file is < 32KB, the
whole file is prefetched.
+ verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES,
fileLength);
+
+ // Open a stream to the object twice, verifying that data is cached, and
streams to the same object, do not
+ // prefetch the same data twice.
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
ioStats = inputStream.getIOStatistics();
inputStream.seek(5);
@@ -144,6 +266,10 @@ public void testMalformedParquetFooter() throws
IOException {
}
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 0);
+ verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0);
+ // No data is prefetched, as it already exists in the cache from the
previous factory.
+ verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 0);
}
/**
@@ -167,6 +293,7 @@ public void testMultiRowGroupParquet() throws Throwable {
final int size = 3000;
byte[] buffer = new byte[size];
int readLimit = Math.min(size, (int) fileStatus.getLen());
+
IOStatistics ioStats;
final IOStatistics fsIostats = getFileSystem().getIOStatistics();
@@ -179,6 +306,13 @@ public void testMultiRowGroupParquet() throws Throwable {
}
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
+ verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
+
+ // S3A makes a HEAD request on the stream open(), and then AAL makes a GET
request to get the object, total audit
+ // operations = 10.
+ long currentAuditCount = initialAuditCount + 2;
+ verifyStatisticCounterValue(getFileSystem().getIOStatistics(),
+ AUDIT_REQUEST_EXECUTION, currentAuditCount);
try (FSDataInputStream inputStream = getFileSystem().openFile(dest)
.withFileStatus(fileStatus)
@@ -186,11 +320,17 @@ public void testMultiRowGroupParquet() throws Throwable {
.build().get()) {
ioStats = inputStream.getIOStatistics();
inputStream.readFully(buffer, 0, readLimit);
+
+ verifyStatisticCounterValue(ioStats, STREAM_READ_BYTES, (int)
fileStatus.getLen());
+ verifyStatisticCounterValue(ioStats, STREAM_READ_OPERATIONS, 1);
}
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
- verifyStatisticCounterValue(fsIostats, AUDIT_REQUEST_EXECUTION,
initialAuditCount + 2);
+ // S3A passes in the meta-data(content length) on file open,
+ // we expect AAL to make no HEAD requests
+ verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0);
+ verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 0);
}
@Test
@@ -210,4 +350,72 @@ public void testInvalidConfigurationThrows() throws
Exception {
() ->
S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration));
}
+
+ @Test
+ public void testRandomSeekPatternGets() throws Throwable {
+ describe("Random seek pattern should optimize GET requests");
+
+ Path dest = path("seek-test.txt");
+ byte[] data = dataset(5 * S_1M, 256, 255);
+ writeDataset(getFileSystem(), dest, data, 5 * S_1M, 1024, true);
+
+ byte[] buffer = new byte[S_1M];
+ try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+ IOStatistics ioStats = inputStream.getIOStatistics();
+
+ inputStream.read(buffer);
+ inputStream.seek(2 * S_1M);
+ inputStream.read(new byte[512 * S_1K]);
+ inputStream.seek(3 * S_1M);
+ inputStream.read(new byte[512 * S_1K]);
+
+ verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
+ verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0);
+ }
+
+ // We did 3 reads, and all of them were served from the cache
+ verifyStatisticCounterValue(getFileSystem().getIOStatistics(),
STREAM_READ_CACHE_HIT, 3);
+ }
+
+
+ @Test
+ public void testSequentialStreamsNoDuplicateGets() throws Throwable {
+ describe("Sequential streams reading same object should not duplicate
GETs");
+
+ Path dest = path("sequential-test.txt");
+ int fileLen = S_1M;
+
+ byte[] data = dataset(fileLen, 256, 255);
+ writeDataset(getFileSystem(), dest, data, fileLen, 1024, true);
+
+ byte[] buffer = new byte[ONE_MB];
+ try (FSDataInputStream stream1 = getFileSystem().open(dest);
+ FSDataInputStream stream2 = getFileSystem().open(dest)) {
+
+ stream1.read(buffer, 0, 2 * ONE_KB);
+ stream2.read(buffer);
+ stream1.read(buffer, 0, 10 * ONE_KB);
+
+ IOStatistics stats1 = stream1.getIOStatistics();
+ IOStatistics stats2 = stream2.getIOStatistics();
+
+ verifyStatisticCounterValue(stats1, ACTION_HTTP_GET_REQUEST, 1);
+ verifyStatisticCounterValue(stats2, ACTION_HTTP_HEAD_REQUEST, 0);
+
+ // Since it's a small file (ALL will prefetch the whole file for size <
8MB), the whole file is prefetched
+ // on the first read.
+ verifyStatisticCounterValue(stats1, STREAM_READ_PREFETCHED_BYTES,
fileLen);
+
+ // The second stream will not prefetch any bytes, as they have already
been prefetched by stream 1.
+ verifyStatisticCounterValue(stats2, STREAM_READ_PREFETCHED_BYTES, 0);
+ }
+
+ // verify value is passed up to the FS
+ verifyStatisticCounterValue(getFileSystem().getIOStatistics(),
+ STREAM_READ_PREFETCHED_BYTES, fileLen);
+
+ // We did 3 reads, all of them were served from the small object cache. In
this case, the whole object was
+ // downloaded as soon as the stream to it was opened.
+ verifyStatisticCounterValue(getFileSystem().getIOStatistics(),
STREAM_READ_CACHE_HIT, 3);
+ }
}
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
index f78f97a097a..6456f900a02 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java
@@ -32,7 +32,6 @@
import static org.apache.hadoop.fs.s3a.S3ATestUtils.createTestPath;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.assertj.core.api.Assertions.assertThat;
/**
@@ -76,23 +75,11 @@ public void testCopyToLocalWithUseRawLocalFileSystemOption()
@Override
public void testWriteReadAndDeleteOneAndAHalfBlocks() throws Exception {
- // Currently analytics accelerator does not support reading of files that
have been overwritten.
- // This is because the analytics accelerator library caches metadata, and
when a file is
- // overwritten, the old metadata continues to be used, until it is removed
from the cache over
- // time. This will be fixed in
https://github.com/awslabs/analytics-accelerator-s3/issues/218.
- skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(),
- "Analytics Accelerator currently does not support reading of over
written files");
super.testWriteReadAndDeleteOneAndAHalfBlocks();
}
@Override
public void testWriteReadAndDeleteTwoBlocks() throws Exception {
- // Currently analytics accelerator does not support reading of files that
have been overwritten.
- // This is because the analytics accelerator library caches metadata, and
when a file is
- // overwritten, the old metadata continues to be used, until it is removed
from the cache over
- // time. This will be fixed in
https://github.com/awslabs/analytics-accelerator-s3/issues/218.
- skipIfAnalyticsAcceleratorEnabled(this.contract.getConf(),
- "Analytics Accelerator currently does not support reading of over
written files");
super.testWriteReadAndDeleteTwoBlocks();
}
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
index 8667d2b646f..3b1b8af9eed 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
@@ -37,7 +37,6 @@
import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -168,12 +167,6 @@ public void testOverwrite() throws IOException {
@Override
public void testOverWriteAndRead() throws Exception {
- // Currently analytics accelerator does not support reading of files that
have been overwritten.
- // This is because the analytics accelerator library caches metadata, and
when a file is
- // overwritten, the old metadata continues to be used, until it is removed
from the cache over
- // time. This will be fixed in
https://github.com/awslabs/analytics-accelerator-s3/issues/218.
- skipIfAnalyticsAcceleratorEnabled(fs.getConf(),
- "Analytics Accelerator currently does not support reading of over
written files");
super.testOverWriteAndRead();
}
}
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
index 12a1cd7d8f6..2bc342717a1 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AIOStatisticsContext.java
@@ -45,7 +45,6 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disablePrefetching;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
import static
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
import static
org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
@@ -81,10 +80,7 @@ protected Configuration createConfiguration() {
public void setup() throws Exception {
super.setup();
executor = HadoopExecutors.newFixedThreadPool(SMALL_THREADS);
- // Analytics accelerator currently does not support IOStatisticsContext,
this will be added as
- // part of https://issues.apache.org/jira/browse/HADOOP-19364
- skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
- "Analytics Accelerator currently does not support
IOStatisticsContext");
+
}
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
index 2ae28c74fe5..548b30a3b2d 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AMetrics.java
@@ -28,7 +28,6 @@
import java.io.IOException;
import java.io.InputStream;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
/**
@@ -52,10 +51,6 @@ public void testMetricsRegister()
@Test
public void testStreamStatistics() throws IOException {
- // Analytics accelerator currently does not support IOStatistics, this
will be added as
- // part of https://issues.apache.org/jira/browse/HADOOP-19364
- skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
- "Analytics Accelerator currently does not support stream statistics");
S3AFileSystem fs = getFileSystem();
Path file = path("testStreamStatistics");
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
index 259a8a54e0e..01939732d8a 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
@@ -302,4 +302,24 @@ public interface S3ATestConstants {
* Default value of {@link #MULTIPART_COMMIT_CONSUMES_UPLOAD_ID}: {@value}.
*/
boolean DEFAULT_MULTIPART_COMMIT_CONSUMES_UPLOAD_ID = false;
+
+ /**
+ * Ranges within this distance of each other will be coalesced.
+ */
+ String AAL_REQUEST_COALESCE_TOLERANCE =
"physicalio.request.coalesce.tolerance";
+
+ /**
+ * The minimum size of a block in AAL.
+ */
+ String AAL_READ_BUFFER_SIZE = "physicalio.readbuffersize";
+
+ /**
+ * Objects smaller than this will be downloaded completely.
+ */
+ String AAL_SMALL_OBJECT_PREFETCH_ENABLED =
"physicalio.small.objects.prefetching.enabled";
+
+ /**
+ * Objects in AAL's cache will expire after this duration.
+ */
+ String AAL_CACHE_TIMEOUT = "physicalio.cache.timeout";
}
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
index fcc24102cc4..640a88de164 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperationCost.java
@@ -43,7 +43,6 @@
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.assumeMultipartUploads;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.s3a.Statistic.ACTION_HTTP_GET_REQUEST;
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_FILES_CREATED;
import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MAGIC_MARKER_PUT;
@@ -175,11 +174,7 @@ private void abortActiveStream() throws IOException {
@Test
public void testCostOfCreatingMagicFile() throws Throwable {
describe("Files created under magic paths skip existence checks and marker
deletes");
-
- // Analytics accelerator currently does not support IOStatistics, this
will be added as
- // part of https://issues.apache.org/jira/browse/HADOOP-19364
- skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
- "Analytics Accelerator currently does not support stream statistics");
+
S3AFileSystem fs = getFileSystem();
Path destFile = methodSubPath("file.txt");
fs.delete(destFile.getParent(), true);
@@ -257,10 +252,6 @@ public void testCostOfCreatingMagicFile() throws Throwable
{
public void testCostOfSavingLoadingPendingFile() throws Throwable {
describe("Verify costs of saving .pending file under a magic path");
- // Analytics accelerator currently does not support IOStatistics, this
will be added as
- // part of https://issues.apache.org/jira/browse/HADOOP-19364
- skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
- "Analytics Accelerator currently does not support stream statistics");
S3AFileSystem fs = getFileSystem();
Path partDir = methodSubPath("file.pending");
Path destFile = new Path(partDir, "file.pending");
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
index 484d1cc2c31..e958c989420 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/fileContext/ITestS3AFileContextStatistics.java
@@ -30,7 +30,6 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
@@ -48,10 +47,6 @@ public class ITestS3AFileContextStatistics extends
FCStatisticsBaseTest {
@BeforeEach
public void setUp() throws Exception {
conf = new Configuration();
- // Analytics accelerator currently does not support IOStatistics, this
will be added as
- // part of https://issues.apache.org/jira/browse/HADOOP-19364
- skipIfAnalyticsAcceleratorEnabled(conf,
- "Analytics Accelerator currently does not support stream statistics");
fc = S3ATestUtils.createTestFileContext(conf);
testRootPath = fileContextTestHelper.getTestRootPath(fc, "test");
fc.mkdir(testRootPath,
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
index c1c03ca6e72..46541c6bb26 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
@@ -58,7 +58,6 @@
import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream;
import static
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.streamType;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
@@ -71,6 +70,7 @@
import static
org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
import static
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
+import static
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
@@ -95,6 +95,16 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
*/
private boolean prefetching;
+ /**
+ * Is the analytics stream enabled?
+ */
+ private boolean analyticsStream;
+
+ /**
+ * Is the classic input stream enabled?
+ */
+ private boolean classicInputStream;
+
@Override
public Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
@@ -113,17 +123,14 @@ public Configuration createConfiguration() {
@Override
public void setup() throws Exception {
super.setup();
- // Analytics accelerator currently does not support IOStatistics, this
will be added as
- // part of https://issues.apache.org/jira/browse/HADOOP-19364
- skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
- "Analytics Accelerator currently does not support stream statistics");
S3AFileSystem fs = getFileSystem();
testFile = methodPath();
-
writeTextFile(fs, testFile, TEXT, true);
testFileStatus = fs.getFileStatus(testFile);
fileLength = (int)testFileStatus.getLen();
prefetching = prefetching();
+ analyticsStream = isAnalyticsStream();
+ classicInputStream = isClassicInputStream();
}
/**
@@ -177,6 +184,10 @@ public void testStreamIsNotChecksummed() throws Throwable {
// if prefetching is enabled, skip this test
assumeNoPrefetching();
+ // If AAL is enabled, skip this test. AAL uses S3A's default S3 client,
and if checksumming is disabled on the
+ // client, then AAL will also not enforce it.
+ assumeNotAnalytics();
+
S3AFileSystem fs = getFileSystem();
// open the file
@@ -193,6 +204,7 @@ public void testStreamIsNotChecksummed() throws Throwable {
// open the stream.
in.read();
+
// now examine the innermost stream and make sure it doesn't have a
checksum
assertStreamIsNotChecksummed(getS3AInputStream(in));
}
@@ -200,6 +212,11 @@ public void testStreamIsNotChecksummed() throws Throwable {
@Test
public void testOpenFileShorterLength() throws Throwable {
+
+ // For AAL, since it makes the HEAD to get the file length if the eTag is
not supplied,
+ // it is not able to use the file length supplied in the open() call, and
the test fails.
+ assumeNotAnalytics();
+
// do a second read with the length declared as short.
// we now expect the bytes read to be shorter.
S3AFileSystem fs = getFileSystem();
@@ -244,7 +261,6 @@ public void testOpenFileLongerLengthReadFully() throws
Throwable {
final int extra = 10;
long longLen = fileLength + extra;
-
// assert behaviors of seeking/reading past the file length.
// there is no attempt at recovery.
verifyMetrics(() -> {
@@ -264,7 +280,9 @@ public void testOpenFileLongerLengthReadFully() throws
Throwable {
// two GET calls were made, one for readFully,
// the second on the read() past the EOF
// the operation has got as far as S3
- probe(!prefetching(), STREAM_READ_OPENED, 1 + 1));
+ probe(classicInputStream, STREAM_READ_OPENED, 1 + 1),
+ // For AAL, the seek past content length fails, before the GET is made.
+ probe(analyticsStream, STREAM_READ_OPENED, 1));
// now on a new stream, try a full read from after the EOF
verifyMetrics(() -> {
@@ -275,10 +293,6 @@ public void testOpenFileLongerLengthReadFully() throws
Throwable {
return in.toString();
}
},
- // two GET calls were made, one for readFully,
- // the second on the read() past the EOF
- // the operation has got as far as S3
-
with(STREAM_READ_OPENED, 1));
}
@@ -348,7 +362,9 @@ public void testReadPastEOF() throws Throwable {
}
},
always(),
- probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, extra));
+ probe(classicInputStream, Statistic.ACTION_HTTP_GET_REQUEST, extra),
+ // AAL won't make the GET call if trying to read beyond EOF
+ probe(analyticsStream, Statistic.ACTION_HTTP_GET_REQUEST, 0));
}
/**
@@ -439,18 +455,28 @@ public void testVectorReadPastEOF() throws Throwable {
byte[] buf = new byte[longLen];
ByteBuffer bb = ByteBuffer.wrap(buf);
final FileRange range = FileRange.createFileRange(0, longLen);
- in.readVectored(Arrays.asList(range), (i) -> bb);
- interceptFuture(EOFException.class,
- "",
- ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
- TimeUnit.SECONDS,
- range.getData());
- assertS3StreamClosed(in);
- return "vector read past EOF with " + in;
+
+ // For AAL, if there is no eTag, the provided length will not be
passed in, and a HEAD request will be made.
+ // AAL requires the etag to detect changes in the object and then do
cache eviction if required.
+ if (isAnalyticsStream()) {
+ intercept(EOFException.class, () ->
+ in.readVectored(Arrays.asList(range), (i) -> bb));
+ verifyStatisticCounterValue(in.getIOStatistics(),
ACTION_HTTP_HEAD_REQUEST, 1);
+ return "vector read past EOF with " + in;
+ } else {
+ in.readVectored(Arrays.asList(range), (i) -> bb);
+ interceptFuture(EOFException.class,
+ "",
+
ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS,
+ TimeUnit.SECONDS,
+ range.getData());
+ assertS3StreamClosed(in);
+ return "vector read past EOF with " + in;
+ }
}
},
always(),
- probe(!prefetching, Statistic.ACTION_HTTP_GET_REQUEST, 1));
+ probe(classicInputStream, Statistic.ACTION_HTTP_GET_REQUEST, 1));
}
/**
@@ -461,6 +487,22 @@ private boolean prefetching() {
return InputStreamType.Prefetch == streamType(getFileSystem());
}
+ /**
+ * Is the current stream type Analytics?
+ * @return true if Analytics stream is enabled.
+ */
+ private boolean isAnalyticsStream() {
+ return InputStreamType.Analytics == streamType(getFileSystem());
+ }
+
+ /**
+ * Is the current input stream type S3AInputStream?
+ * @return true if the S3AInputStream is being used.
+ */
+ private boolean isClassicInputStream() {
+ return InputStreamType.Classic == streamType(getFileSystem());
+ }
+
/**
* Skip the test if prefetching is enabled.
*/
@@ -470,6 +512,12 @@ private void assumeNoPrefetching(){
}
}
+ private void assumeNotAnalytics() {
+ if (analyticsStream) {
+ skip("Analytics stream is enabled");
+ }
+ }
+
/**
* Assert that the inner S3 Stream is closed.
* @param in input stream
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
index 4165f7a6c9c..7f363d190c5 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AContractStreamIOStatistics.java
@@ -30,7 +30,6 @@
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.test.tags.IntegrationTest;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.*;
/**
@@ -86,10 +85,6 @@ public List<String> outputStreamStatisticKeys() {
@Test
@Override
public void testInputStreamStatisticRead() throws Throwable {
- // Analytics accelerator currently does not support IOStatistics, this
will be added as
- // part of https://issues.apache.org/jira/browse/HADOOP-19364
- skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
- "Analytics Accelerator currently does not support stream statistics");
super.testInputStreamStatisticRead();
}
}
diff --git
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
index 2f54cab00b1..81a719fbea2 100644
---
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
+++
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java
@@ -31,7 +31,6 @@
import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
-import static
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase {
@@ -44,10 +43,6 @@ public class ITestS3AFileSystemStatistic extends
AbstractS3ATestBase {
*/
@Test
public void testBytesReadWithStream() throws IOException {
- // Analytics accelerator currently does not support IOStatistics, this
will be added as
- // part of https://issues.apache.org/jira/browse/HADOOP-19364
- skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
- "Analytics Accelerator currently does not support stream statistics");
S3AFileSystem fs = getFileSystem();
Path filePath = path(getMethodName());
byte[] oneKbBuf = new byte[ONE_KB];
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]