This is an automated email from the ASF dual-hosted git repository. mthakur pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 147a466c6d200db171554058ed93656f03b40334 Author: Mukund Thakur <mtha...@cloudera.com> AuthorDate: Thu Jul 28 21:57:37 2022 +0530 HADOOP-18227. Add input stream IOStats for vectored IO api in S3A. (#4636) part of HADOOP-18103. Contributed By: Mukund Thakur --- .../hadoop/fs/statistics/StreamStatisticNames.java | 30 +++- .../contract/AbstractContractVectoredReadTest.java | 11 ++ .../org/apache/hadoop/fs/s3a/S3AInputStream.java | 6 +- .../apache/hadoop/fs/s3a/S3AInstrumentation.java | 29 ++++ .../s3a/statistics/S3AInputStreamStatistics.java | 14 ++ .../statistics/impl/EmptyS3AStatisticsContext.java | 11 ++ .../contract/s3a/ITestS3AContractVectoredRead.java | 171 +++++++++++++++++++++ .../org/apache/hadoop/fs/s3a/S3ATestUtils.java | 31 ++++ .../s3a/scale/ITestS3AInputStreamPerformance.java | 2 + .../hadoop/fs/s3a/scale/S3AScaleTestBase.java | 33 ---- 10 files changed, 303 insertions(+), 35 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java index ca755f08419..bb697ad8ccf 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java @@ -47,7 +47,7 @@ public final class StreamStatisticNames { public static final String STREAM_READ_ABORTED = "stream_aborted"; /** - * Bytes read from an input stream in read() calls. + * Bytes read from an input stream in read()/readVectored() calls. * Does not include bytes read and then discarded in seek/close etc. * These are the bytes returned to the caller. * Value: {@value}. @@ -110,6 +110,34 @@ public final class StreamStatisticNames { public static final String STREAM_READ_OPERATIONS = "stream_read_operations"; + /** + * Count of readVectored() operations in an input stream. + * Value: {@value}. + */ + public static final String STREAM_READ_VECTORED_OPERATIONS = + "stream_read_vectored_operations"; + + /** + * Count of bytes discarded during readVectored() operation + * in an input stream. + * Value: {@value}. + */ + public static final String STREAM_READ_VECTORED_READ_BYTES_DISCARDED = + "stream_read_vectored_read_bytes_discarded"; + + /** + * Count of incoming file ranges during readVectored() operation. + * Value: {@value} + */ + public static final String STREAM_READ_VECTORED_INCOMING_RANGES = + "stream_read_vectored_incoming_ranges"; + /** + * Count of combined file ranges during readVectored() operation. + * Value: {@value} + */ + public static final String STREAM_READ_VECTORED_COMBINED_RANGES = + "stream_read_vectored_combined_ranges"; + /** * Count of incomplete read() operations in an input stream, * that is, when the bytes returned were less than that requested. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java index 77bcc496ff4..379b992fba1 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -84,6 +84,10 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac return allocate; } + public WeakReferencedElasticByteBufferPool getPool() { + return pool; + } + @Override public void setup() throws Exception { super.setup(); @@ -382,6 +386,13 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac return fileRanges; } + protected List<FileRange> getConsecutiveRanges() { + List<FileRange> fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(100, 500)); + fileRanges.add(FileRange.createFileRange(600, 500)); + return fileRanges; + } + /** * Validate that exceptions must be thrown during a vectored * read operation with specific input ranges. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 178a807733a..c20c3a04863 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -963,7 +963,6 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, @Override public void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate) throws IOException { - LOG.debug("Starting vectored read on path {} for ranges {} ", pathStr, ranges); checkNotClosed(); if (stopVectoredIOOperations.getAndSet(false)) { @@ -978,6 +977,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, if (isOrderedDisjoint(sortedRanges, 1, minSeekForVectorReads())) { LOG.debug("Not merging the ranges as they are disjoint"); + streamStatistics.readVectoredOperationStarted(sortedRanges.size(), sortedRanges.size()); for (FileRange range: sortedRanges) { ByteBuffer buffer = allocate.apply(range.getLength()); unboundedThreadPool.submit(() -> readSingleRange(range, buffer)); @@ -987,6 +987,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, List<CombinedFileRange> combinedFileRanges = mergeSortedRanges(sortedRanges, 1, minSeekForVectorReads(), maxReadSizeForVectorReads()); + streamStatistics.readVectoredOperationStarted(sortedRanges.size(), combinedFileRanges.size()); LOG.debug("Number of original ranges size {} , Number of combined ranges {} ", ranges.size(), combinedFileRanges.size()); for (CombinedFileRange combinedFileRange: combinedFileRanges) { @@ -1088,6 +1089,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, } drainBytes += readCount; } + streamStatistics.readVectoredBytesDiscarded(drainBytes); LOG.debug("{} bytes drained from stream ", drainBytes); } @@ -1168,6 +1170,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, } else { readByteArray(objectContent, buffer.array(), 0, length); } + // update io stats. + incrementBytesRead(length); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 7c40d2d13c7..b57e0306798 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -803,6 +803,10 @@ public class S3AInstrumentation implements Closeable, MetricsSource, private final AtomicLong readOperations; private final AtomicLong readFullyOperations; private final AtomicLong seekOperations; + private final AtomicLong readVectoredOperations; + private final AtomicLong bytesDiscardedInVectoredIO; + private final AtomicLong readVectoredIncomingRanges; + private final AtomicLong readVectoredCombinedRanges; /** Bytes read by the application and any when draining streams . */ private final AtomicLong totalBytesRead; @@ -836,6 +840,10 @@ public class S3AInstrumentation implements Closeable, MetricsSource, StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED, StreamStatisticNames.STREAM_READ_TOTAL_BYTES, StreamStatisticNames.STREAM_READ_UNBUFFERED, + StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES, + StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES, + StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, + StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED, StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES) .withGauges(STREAM_READ_GAUGE_INPUT_POLICY) .withDurationTracking(ACTION_HTTP_GET_REQUEST, @@ -872,6 +880,14 @@ public class S3AInstrumentation implements Closeable, MetricsSource, StreamStatisticNames.STREAM_READ_OPERATIONS_INCOMPLETE); readOperations = st.getCounterReference( StreamStatisticNames.STREAM_READ_OPERATIONS); + readVectoredOperations = st.getCounterReference( + StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS); + bytesDiscardedInVectoredIO = st.getCounterReference( + StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED); + readVectoredIncomingRanges = st.getCounterReference( + StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES); + readVectoredCombinedRanges = st.getCounterReference( + StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES); readFullyOperations = st.getCounterReference( StreamStatisticNames.STREAM_READ_FULLY_OPERATIONS); seekOperations = st.getCounterReference( @@ -1017,6 +1033,19 @@ public class S3AInstrumentation implements Closeable, MetricsSource, } } + @Override + public void readVectoredOperationStarted(int numIncomingRanges, + int numCombinedRanges) { + readVectoredIncomingRanges.addAndGet(numIncomingRanges); + readVectoredCombinedRanges.addAndGet(numCombinedRanges); + readVectoredOperations.incrementAndGet(); + } + + @Override + public void readVectoredBytesDiscarded(int discarded) { + bytesDiscardedInVectoredIO.addAndGet(discarded); + } + /** * {@code close()} merges the stream statistics into the filesystem's * instrumentation instance. diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java index 539af2bde36..41a8f253159 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java @@ -96,6 +96,20 @@ public interface S3AInputStreamStatistics extends AutoCloseable, */ void readOperationCompleted(int requested, int actual); + /** + * A vectored read operation has started.. + * @param numIncomingRanges number of input ranges. + * @param numCombinedRanges number of combined ranges. + */ + void readVectoredOperationStarted(int numIncomingRanges, + int numCombinedRanges); + + /** + * Number of bytes discarded during vectored read. + * @param discarded discarded bytes during vectored read. + */ + void readVectoredBytesDiscarded(int discarded); + @Override void close(); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java index 5c0995e41b3..cea8be7f10e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java @@ -195,6 +195,17 @@ public final class EmptyS3AStatisticsContext implements S3AStatisticsContext { } + @Override + public void readVectoredOperationStarted(int numIncomingRanges, + int numCombinedRanges) { + + } + + @Override + public void readVectoredBytesDiscarded(int discarded) { + + } + @Override public void close() { diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index 18a727dcdce..84a90ba441a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -19,28 +19,41 @@ package org.apache.hadoop.fs.contract.s3a; import java.io.EOFException; +import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CompletableFuture; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.s3a.Constants; import org.apache.hadoop.fs.s3a.S3AFileSystem; import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.fs.statistics.IOStatistics; +import org.apache.hadoop.fs.statistics.StoreStatisticNames; +import org.apache.hadoop.fs.statistics.StreamStatisticNames; import org.apache.hadoop.test.LambdaTestUtils; +import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead; import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; +import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; +import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; import static org.apache.hadoop.test.MoreAsserts.assertEqual; public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTest { + private static final Logger LOG = LoggerFactory.getLogger(ITestS3AContractVectoredRead.class); + public ITestS3AContractVectoredRead(String bufferType) { super(bufferType); } @@ -156,4 +169,162 @@ public class ITestS3AContractVectoredRead extends AbstractContractVectoredReadTe List<FileRange> fileRanges = getSampleSameRanges(); verifyExceptionalVectoredRead(fs, fileRanges, UnsupportedOperationException.class); } + + /** + * As the minimum seek value is 4*1024, the first three ranges will be + * merged into and other two will remain as it is. + * */ + @Test + public void testNormalReadVsVectoredReadStatsCollection() throws Exception { + FileSystem fs = getTestFileSystemWithReadAheadDisabled(); + List<FileRange> fileRanges = new ArrayList<>(); + fileRanges.add(FileRange.createFileRange(10 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(8 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(14 * 1024, 100)); + fileRanges.add(FileRange.createFileRange(2 * 1024 - 101, 100)); + fileRanges.add(FileRange.createFileRange(40 * 1024, 1024)); + + FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME)); + CompletableFuture<FSDataInputStream> builder = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .withFileStatus(fileStatus) + .build(); + try (FSDataInputStream in = builder.get()) { + in.readVectored(fileRanges, getAllocate()); + validateVectoredReadResult(fileRanges, DATASET); + returnBuffersToPoolPostRead(fileRanges, getPool()); + + // audit the io statistics for this stream + IOStatistics st = in.getIOStatistics(); + LOG.info("IOStats after readVectored operation {}", ioStatisticsToPrettyString(st)); + + // the vectored io operation must be tracked + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, + 1); + + // the vectored io operation is being called with 5 input ranges. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES, + 5); + + // 5 input ranges got combined in 3 as some of them are close. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES, + 3); + + // number of bytes discarded will be based on the above input ranges. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED, + 5944); + + verifyStatisticCounterValue(st, + StoreStatisticNames.ACTION_HTTP_GET_REQUEST, + 3); + + // read bytes should match the sum of requested length for each input ranges. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_BYTES, + 1424); + + } + + CompletableFuture<FSDataInputStream> builder1 = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .withFileStatus(fileStatus) + .build(); + + try (FSDataInputStream in = builder1.get()) { + for (FileRange range : fileRanges) { + byte[] temp = new byte[range.getLength()]; + in.readFully((int) range.getOffset(), temp, 0, range.getLength()); + } + + // audit the statistics for this stream + IOStatistics st = in.getIOStatistics(); + LOG.info("IOStats after read fully operation {}", ioStatisticsToPrettyString(st)); + + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, + 0); + + // all other counter values consistent. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED, + 0); + verifyStatisticCounterValue(st, + StoreStatisticNames.ACTION_HTTP_GET_REQUEST, + 5); + + // read bytes should match the sum of requested length for each input ranges. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_BYTES, + 1424); + } + } + + @Test + public void testMultiVectoredReadStatsCollection() throws Exception { + FileSystem fs = getTestFileSystemWithReadAheadDisabled(); + List<FileRange> ranges1 = getConsecutiveRanges(); + List<FileRange> ranges2 = getConsecutiveRanges(); + FileStatus fileStatus = fs.getFileStatus(path(VECTORED_READ_FILE_NAME)); + CompletableFuture<FSDataInputStream> builder = + fs.openFile(path(VECTORED_READ_FILE_NAME)) + .withFileStatus(fileStatus) + .build(); + try (FSDataInputStream in = builder.get()) { + in.readVectored(ranges1, getAllocate()); + in.readVectored(ranges2, getAllocate()); + validateVectoredReadResult(ranges1, DATASET); + validateVectoredReadResult(ranges2, DATASET); + returnBuffersToPoolPostRead(ranges1, getPool()); + returnBuffersToPoolPostRead(ranges2, getPool()); + + // audit the io statistics for this stream + IOStatistics st = in.getIOStatistics(); + + // 2 vectored io calls are made above. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS, + 2); + + // 2 vectored io operation is being called with 2 input ranges. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES, + 4); + + // 2 ranges are getting merged in 1 during both vectored io operation. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES, + 2); + + // number of bytes discarded will be 0 as the ranges are consecutive. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED, + 0); + // only 2 http get request will be made because ranges in both range list will be merged + // to 1 because they are consecutive. + verifyStatisticCounterValue(st, + StoreStatisticNames.ACTION_HTTP_GET_REQUEST, + 2); + // read bytes should match the sum of requested length for each input ranges. + verifyStatisticCounterValue(st, + StreamStatisticNames.STREAM_READ_BYTES, + 2000); + } + } + + private FileSystem getTestFileSystemWithReadAheadDisabled() throws IOException { + Configuration conf = getFileSystem().getConf(); + // also resetting the min seek and max size values is important + // as this same test suite has test which overrides these params. + S3ATestUtils.removeBaseAndBucketOverrides(conf, + Constants.READAHEAD_RANGE, + Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, + Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE); + S3ATestUtils.disableFilesystemCaching(conf); + conf.setInt(Constants.READAHEAD_RANGE, 0); + return S3ATestUtils.createTestFileSystem(conf); + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 48cb52c5ac2..6162ed13123 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum; import org.apache.hadoop.fs.s3a.impl.StoreContext; import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder; import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; +import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext; import org.apache.hadoop.fs.s3native.S3xLoginHelper; import org.apache.hadoop.io.DataInputBuffer; @@ -69,6 +70,7 @@ import org.slf4j.LoggerFactory; import java.io.Closeable; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.net.URI; import java.net.URISyntaxException; import java.text.DateFormat; @@ -1457,4 +1459,33 @@ public final class S3ATestUtils { + " in " + secrets); } } + + + /** + * Get the input stream statistics of an input stream. + * Raises an exception if the inner stream is not an S3A input stream + * @param in wrapper + * @return the statistics for the inner stream + */ + public static S3AInputStreamStatistics getInputStreamStatistics( + FSDataInputStream in) { + return getS3AInputStream(in).getS3AStreamStatistics(); + } + + /** + * Get the inner stream of an input stream. + * Raises an exception if the inner stream is not an S3A input stream + * @param in wrapper + * @return the inner stream + * @throws AssertionError if the inner stream is of the wrong type + */ + public static S3AInputStream getS3AInputStream( + FSDataInputStream in) { + InputStream inner = in.getWrappedStream(); + if (inner instanceof S3AInputStream) { + return (S3AInputStream) inner; + } else { + throw new AssertionError("Not an S3AInputStream: " + inner); + } + } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java index d73a938bcce..b8195cb9964 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java @@ -56,6 +56,8 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_RE import static org.apache.hadoop.fs.contract.ContractTestUtils.*; import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getInputStreamStatistics; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticMinimum; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMaximumStatistic; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupMeanStatistic; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java index d95b46b10d7..514c6cf8869 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java @@ -19,19 +19,14 @@ package org.apache.hadoop.fs.s3a.scale; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; -import org.apache.hadoop.fs.s3a.S3AInputStream; import org.apache.hadoop.fs.s3a.S3ATestConstants; import org.apache.hadoop.fs.s3a.Statistic; -import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.InputStream; - import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.lookupGaugeStatistic; @@ -154,34 +149,6 @@ public class S3AScaleTestBase extends AbstractS3ATestBase { return getTestTimeoutSeconds() * 1000; } - /** - * Get the input stream statistics of an input stream. - * Raises an exception if the inner stream is not an S3A input stream - * @param in wrapper - * @return the statistics for the inner stream - */ - protected S3AInputStreamStatistics getInputStreamStatistics( - FSDataInputStream in) { - return getS3AInputStream(in).getS3AStreamStatistics(); - } - - /** - * Get the inner stream of an input stream. - * Raises an exception if the inner stream is not an S3A input stream - * @param in wrapper - * @return the inner stream - * @throws AssertionError if the inner stream is of the wrong type - */ - protected S3AInputStream getS3AInputStream( - FSDataInputStream in) { - InputStream inner = in.getWrappedStream(); - if (inner instanceof S3AInputStream) { - return (S3AInputStream) inner; - } else { - throw new AssertionError("Not an S3AInputStream: " + inner); - } - } - /** * Get the gauge value of a statistic from the * IOStatistics of the filesystem. Raises an assertion if --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org