This is an automated email from the ASF dual-hosted git repository. ahmar pushed a commit to branch feature-HADOOP-19363-analytics-accelerator-s3 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/feature-HADOOP-19363-analytics-accelerator-s3 by this push: new 977a1b05c6d initial review comments 977a1b05c6d is described below commit 977a1b05c6de8f0965f97714ed754608a48c6a1a Author: Ahmar Suhail <ahma...@amazon.co.uk> AuthorDate: Tue Feb 25 14:46:36 2025 +0000 initial review comments --- .../java/org/apache/hadoop/fs/s3a/Constants.java | 1 + .../apache/hadoop/fs/s3a/S3AInstrumentation.java | 11 ++-- .../fs/s3a/impl/streams/AnalyticsStream.java | 13 +---- ...TestS3AContractAnalyticsStreamVectoredRead.java | 37 ++++++++++++ .../fs/contract/s3a/ITestS3AContractCreate.java | 6 +- .../contract/s3a/ITestS3AContractVectoredRead.java | 2 +- ...ITestS3AAnalyticsAcceleratorStreamReading.java} | 65 +++++++++++++--------- .../hadoop/fs/s3a/ITestS3AEncryptionSSEC.java | 6 +- .../hadoop/fs/s3a/ITestS3AFSMainOperations.java | 5 +- .../hadoop/fs/s3a/ITestS3AFileSystemContract.java | 5 +- .../fs/s3a/performance/ITestS3AOpenCost.java | 7 ++- .../statistics/ITestS3AFileSystemStatistic.java | 2 +- 12 files changed, 108 insertions(+), 52 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index b2843d24c8a..2b019e1fe4c 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -1831,6 +1831,7 @@ private Constants() { /** * Prefix to configure Analytics Accelerator Library. + * Value: {@value}. */ public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX = "fs.s3a.analytics.accelerator"; diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 1eef0b404bc..1d26eb62750 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -1038,12 +1038,13 @@ public long streamOpened() { @Override public long streamOpened(InputStreamType type) { - switch (type) { - case Analytics: - return analyticsStreamOpenOperations.getAndIncrement(); - default: - return openOperations.getAndIncrement(); + long count = openOperations.getAndIncrement(); + + if (type == InputStreamType.Analytics) { + count = analyticsStreamOpenOperations.getAndIncrement(); } + + return count; } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java index 77904ad8bf4..6c18b7477ed 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java @@ -58,17 +58,6 @@ public AnalyticsStream(final ObjectReadParameters parameters, final S3SeekableIn getS3AStreamStatistics().streamOpened(InputStreamType.Analytics); } - /** - * Indicates whether the given {@code capability} is supported by this stream. - * - * @param capability the capability to check. - * @return true if the given {@code capability} is supported by this stream, false otherwise. - */ - @Override - public boolean hasCapability(String capability) { - return false; - } - @Override public int read() throws IOException { throwIfClosed(); @@ -244,4 +233,4 @@ protected void throwIfClosed() throws IOException { throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED); } } -} \ No newline at end of file +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java new file mode 100644 index 00000000000..f532cc77ea2 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java @@ -0,0 +1,37 @@ +package org.apache.hadoop.fs.contract.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; + +import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator; + +/** + * S3A contract tests for vectored reads with the Analytics stream. The analytics stream does + * not explicitly implement the vectoredRead() method, or currently do and vectored-read specific + * optimisations (such as range coalescing). However, this test ensures that the base implementation + * of readVectored {@link org.apache.hadoop.fs.PositionedReadable} still works. + */ +public class ITestS3AContractAnalyticsStreamVectoredRead extends AbstractContractVectoredReadTest { + + public ITestS3AContractAnalyticsStreamVectoredRead(String bufferType) { + super(bufferType); + } + + /** + * Create a configuration. + * @return a configuration + */ + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + enableAnalyticsAccelerator(conf); + conf.set("fs.contract.vector-io-early-eof-check", "false"); + return conf; + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java index 3e87a2f7e5f..9bbc30d14bf 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractCreate.java @@ -31,8 +31,10 @@ import static org.apache.hadoop.fs.s3a.S3ATestConstants.KEY_PERFORMANCE_TESTS_ENABLED; import static org.apache.hadoop.fs.s3a.Constants.CONNECTION_EXPECT_CONTINUE; - -import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfNotEnabled; /** * S3A contract tests creating files. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index 87e3b23cd71..4826811890a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -95,9 +95,9 @@ protected AbstractFSContract createContract(Configuration conf) { */ @Override public void setup() throws Exception { + super.setup(); skipIfAnalyticsAcceleratorEnabled(createConfiguration(), "Analytics Accelerator does not support vectored reads"); - super.setup(); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java similarity index 80% rename from hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java rename to hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java index 02c766b9941..1aa8cb1f23a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java @@ -25,6 +25,7 @@ import org.junit.Before; import org.junit.Test; +import org.assertj.core.api.Assertions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; @@ -35,45 +36,45 @@ import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream; import org.apache.hadoop.fs.statistics.IOStatistics; +import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration; +import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration; +import software.amazon.s3.analyticsaccelerator.util.PrefetchMode; + import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator; import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData; -import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; - -import org.assertj.core.api.Assertions; - - -import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration; -import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration; -import software.amazon.s3.analyticsaccelerator.util.PrefetchMode; - -public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase { +/** + * Tests integration of the + * <a href="https://github.com/awslabs/analytics-accelerator-s3">analytics accelerator library</a> + * + * Certain tests in this class rely on reading local parquet files stored in resources. + * These files are copied from local to S3 and then read via the analytics stream. + * This is done to ensure AAL can read the parquet format, and handles exceptions from malformed + * parquet files. + * + */ +public class ITestS3AAnalyticsAcceleratorStreamReading extends AbstractS3ATestBase { private static final String PHYSICAL_IO_PREFIX = "physicalio"; private static final String LOGICAL_IO_PREFIX = "logicalio"; - private Configuration conf; - private Path testFile; + private Path externalTestFile; @Before public void setUp() throws Exception { super.setup(); - conf = createConfiguration(); - testFile = getExternalData(conf); + externalTestFile = getExternalData(getConfiguration()); } @Override public Configuration createConfiguration() { Configuration configuration = super.createConfiguration(); - if (isUsingDefaultExternalDataFile(configuration)) { - S3ATestUtils.removeBaseAndBucketOverrides(configuration, - ENDPOINT); - } enableAnalyticsAccelerator(configuration); return configuration; } @@ -82,23 +83,25 @@ public Configuration createConfiguration() { public void testConnectorFrameWorkIntegration() throws IOException { describe("Verify S3 connector framework integration"); + Configuration conf = new Configuration(getConfiguration()); removeBaseAndBucketOverrides(conf, INPUT_FADVISE); conf.set(INPUT_FADVISE, "whole-file"); S3AFileSystem fs = - (S3AFileSystem) FileSystem.get(testFile.toUri(), conf); + (S3AFileSystem) FileSystem.get(externalTestFile.toUri(), conf); byte[] buffer = new byte[500]; IOStatistics ioStats; - try (FSDataInputStream inputStream = fs.open(testFile)) { + try (FSDataInputStream inputStream = fs.open(externalTestFile)) { ioStats = inputStream.getIOStatistics(); inputStream.seek(5); inputStream.read(buffer, 0, 500); final InputStream wrappedStream = inputStream.getWrappedStream(); ObjectInputStream objectInputStream = (ObjectInputStream) wrappedStream; - assertEquals(objectInputStream.streamType(), InputStreamType.Analytics); - assertEquals(objectInputStream.getInputPolicy(), S3AInputPolicy.Sequential); + + Assertions.assertThat(objectInputStream.streamType()).isEqualTo(InputStreamType.Analytics); + Assertions.assertThat(objectInputStream.getInputPolicy()).isEqualTo(S3AInputPolicy.Sequential); } verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); @@ -114,6 +117,7 @@ public void testMalformedParquetFooter() throws IOException { Path dest = path("malformed_footer.parquet"); File file = new File("src/test/resources/malformed_footer.parquet"); + Path sourcePath = new Path(file.toURI().getPath()); getFileSystem().copyFromLocalFile(false, true, sourcePath, dest); @@ -129,6 +133,14 @@ public void testMalformedParquetFooter() throws IOException { verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1); } + /** + * This test reads a multi-row group parquet file. Each parquet consists of at least one + * row group, which contains the column data for a subset of rows. A single parquet file + * can contain multiple row groups, this allows for further parallelisation, as each row group + * can be processed independently. + * + * @throws IOException any IO problem + */ @Test public void testMultiRowGroupParquet() throws IOException { describe("A parquet file is read successfully"); @@ -156,8 +168,7 @@ public void testMultiRowGroupParquet() throws IOException { public void testConnectorFrameworkConfigurable() { describe("Verify S3 connector framework reads configuration"); - Configuration conf = getConfiguration(); - removeBaseAndBucketOverrides(conf); + Configuration conf = new Configuration(getConfiguration()); //Disable Predictive Prefetching conf.set(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX + @@ -194,9 +205,9 @@ public void testInvalidConfigurationThrows() throws Exception { ConnectorConfiguration connectorConfiguration = new ConnectorConfiguration(conf, ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX); - Assertions.assertThatExceptionOfType(IllegalArgumentException.class) - .isThrownBy(() -> - S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration)); + + intercept(IllegalArgumentException.class, + () -> S3SeekableInputStreamConfiguration.fromConfiguration(connectorConfiguration)); } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java index 169898df829..8671d962175 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java @@ -38,7 +38,11 @@ import static org.apache.hadoop.fs.s3a.Constants.S3_ENCRYPTION_KEY; import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM; import static org.apache.hadoop.fs.s3a.Constants.SERVER_SIDE_ENCRYPTION_KEY; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; + +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeSkipRootTests; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.test.LambdaTestUtils.intercept; /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java index b056dac5a85..3c405cb7c51 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java @@ -29,7 +29,10 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.contract.s3a.S3AContract; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.createTestPath; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; /** * S3A Test suite for the FSMainOperationsBaseTest tests. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java index 2034ea274e7..02d56795890 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java @@ -34,7 +34,10 @@ import org.apache.hadoop.fs.Path; import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; + +import static org.apache.hadoop.fs.s3a.S3ATestUtils.isCreatePerformanceEnabled; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.setPerformanceFlags; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.junit.Assume.*; import static org.junit.Assert.*; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java index 75689211ba8..febc6bb82c4 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java @@ -53,7 +53,12 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.skip; import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile; import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_VALIDATION; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.*; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assertStreamIsNotChecksummed; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getS3AInputStream; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.streamType; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED; import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED; diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java index 8cb0de1b83f..376dcdf727f 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/statistics/ITestS3AFileSystemStatistic.java @@ -46,7 +46,7 @@ public class ITestS3AFileSystemStatistic extends AbstractS3ATestBase { public void testBytesReadWithStream() throws IOException { // Analytics accelerator currently does not support IOStatistics, this will be added as // part of https://issues.apache.org/jira/browse/HADOOP-19364 - skipIfAnalyticsAcceleratorEnabled(createConfiguration(), + skipIfAnalyticsAcceleratorEnabled(getConfiguration(), "Analytics Accelerator currently does not support stream statistics"); S3AFileSystem fs = getFileSystem(); Path filePath = path(getMethodName()); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org