This is an automated email from the ASF dual-hosted git repository. stevel pushed a commit to branch branch-3.4.2 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.4.2 by this push: new dc727079f42 HADOOP-18296. Memory fragmentation in ChecksumFileSystem readVectored() (#7732) dc727079f42 is described below commit dc727079f4220f4233a2fa29c366c0f1e319f3d4 Author: Steve Loughran <ste...@cloudera.com> AuthorDate: Mon Jul 21 20:12:21 2025 +0100 HADOOP-18296. Memory fragmentation in ChecksumFileSystem readVectored() (#7732) Option "fs.file.checksum.verify" disables checksum verification in local FS, so sliced subsets of larger buffers are never returned. Stream capability "fs.capability.vectoredio.sliced" is true if a filesystem knows that it is returning slices of a larger buffer. This is false if a filesystem doesn't, or against the local FS in releases which lack this feature. Contributed by Steve Loughran --- .../org/apache/hadoop/fs/ChecksumFileSystem.java | 60 ++++- .../apache/hadoop/fs/CommonConfigurationKeys.java | 6 + .../java/org/apache/hadoop/fs/LocalFileSystem.java | 6 + .../org/apache/hadoop/fs/RawLocalFileSystem.java | 12 +- .../org/apache/hadoop/fs/StreamCapabilities.java | 10 + .../org/apache/hadoop/fs/VectoredReadUtils.java | 17 ++ .../hadoop/fs/impl/TrackingByteBufferPool.java | 290 +++++++++++++++++++++ .../src/main/java/org/apache/hadoop/io/Sizes.java | 3 + .../src/main/resources/core-default.xml | 18 ++ .../site/markdown/filesystem/fsdatainputstream.md | 54 +++- .../contract/AbstractContractVectoredReadTest.java | 69 ++++- .../localfs/TestLocalFSContractVectoredRead.java | 7 + .../fs/viewfs/TestViewFileSystemDelegation.java | 3 +- .../contract/s3a/ITestS3AContractVectoredRead.java | 15 +- .../org/apache/hadoop/fs/s3a/S3ATestUtils.java | 13 + 15 files changed, 559 insertions(+), 24 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java index 45b3f90feaa..084b68729be 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.EnumSet; import java.util.List; +import java.util.Locale; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -57,6 +58,7 @@ import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable; +import static org.apache.hadoop.io.Sizes.S_0; /**************************************************************** * Abstract Checksumed FileSystem. @@ -101,6 +103,14 @@ public void setVerifyChecksum(boolean verifyChecksum) { this.verifyChecksum = verifyChecksum; } + /** + * Is checksum verification enabled? + * @return true if files are to be verified through checksums. + */ + public boolean getVerifyChecksum() { + return verifyChecksum; + } + @Override public void setWriteChecksum(boolean writeChecksum) { this.writeChecksum = writeChecksum; @@ -165,7 +175,8 @@ private int getSumBufferSize(int bytesPerSum, int bufferSize) { /******************************************************* * For open()'s FSInputStream - * It verifies that data matches checksums. + * It verifies that data matches checksums iff the data + * file has matching checksums. *******************************************************/ private static class ChecksumFSInputChecker extends FSInputChecker implements IOStatisticsSource, StreamCapabilities { @@ -426,8 +437,18 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes, return data; } + /** + * Turn off range merging to make buffer recycling more likely (but not guaranteed). + * @return 0, always + */ + @Override + public int maxReadSizeForVectorReads() { + return S_0; + } + /** * Vectored read. + * <p> * If the file has no checksums: delegate to the underlying stream. * If the file is checksummed: calculate the checksum ranges as * well as the data ranges, read both, and validate the checksums @@ -448,10 +469,12 @@ public void readVectored(final List<? extends FileRange> ranges, final Consumer<ByteBuffer> release) throws IOException { // If the stream doesn't have checksums, just delegate. - if (sums == null) { + if (dataStreamToHandleVectorIO()) { + LOG.debug("No checksums for vectored read, delegating to inner stream"); datas.readVectored(ranges, allocate); return; } + LOG.debug("Checksum vectored read for {} ranges", ranges.size()); final long length = getFileLength(); final List<? extends FileRange> sorted = validateAndSortRanges(ranges, Optional.of(length)); @@ -489,9 +512,37 @@ public void readVectored(final List<? extends FileRange> ranges, } } + /** + * Predicate to determine whether vector reads should be directly + * handled by the data stream, rather than processing + * the ranges in this class, processing which includes checksum validation. + * <p> + * Vector reading is delegated whenever there are no checksums for + * the data file, or when validating checksums has been delegated. + * @return true if vector reads are to be directly handled by + * the data stream. + */ + private boolean dataStreamToHandleVectorIO() { + return sums == null; + } + + /** + * For this stream, declare that range merging may take place; + * otherwise delegate to the inner stream. + * @param capability string to query the stream support for. + * @return true for sliced vector IO if checksum validation + * is taking place. False if no checksums are present for the validation. + * For all other probes: pass to the wrapped stream + */ @Override public boolean hasCapability(String capability) { - return datas.hasCapability(capability); + switch (capability.toLowerCase(Locale.ENGLISH)) { + // slicing can take place during coalescing and checksumming + case StreamCapabilities.VECTOREDIO_BUFFERS_SLICED: + return !dataStreamToHandleVectorIO(); + default: + return datas.hasCapability(capability); + } } } @@ -1142,6 +1193,9 @@ public boolean hasPathCapability(final Path path, final String capability) case CommonPathCapabilities.FS_APPEND: case CommonPathCapabilities.FS_CONCAT: return false; + case StreamCapabilities.VECTOREDIO_BUFFERS_SLICED: + return getVerifyChecksum(); + default: return super.hasPathCapability(p, capability); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 31b6654afc5..e0832fc0576 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -505,4 +505,10 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { public static final String HADOOP_SECURITY_RESOLVER_IMPL = "hadoop.security.resolver.impl"; + /** + * Verify checksums on read -default is true. + * <p> + * {@value}. + */ + public static final String LOCAL_FS_VERIFY_CHECKSUM = "fs.file.checksum.verify"; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java index 590cbd9a49e..e912d2245be 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java @@ -27,6 +27,8 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import static org.apache.hadoop.fs.CommonConfigurationKeys.LOCAL_FS_VERIFY_CHECKSUM; + /**************************************************************** * Implement the FileSystem API for the checksumed local filesystem. * @@ -50,6 +52,10 @@ public void initialize(URI name, Configuration conf) throws IOException { if (!scheme.equals(fs.getUri().getScheme())) { swapScheme = scheme; } + final boolean checksum = conf.getBoolean(LOCAL_FS_VERIFY_CHECKSUM, true); + setVerifyChecksum(checksum); + LOG.debug("Checksum verification enabled={}", checksum); + } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java index d5f545b460d..3bd93a4f459 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java @@ -72,6 +72,7 @@ import org.apache.hadoop.util.StringUtils; import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED; +import static org.apache.hadoop.fs.VectoredReadUtils.hasVectorIOCapability; import static org.apache.hadoop.fs.VectoredReadUtils.sortRangeList; import static org.apache.hadoop.fs.VectoredReadUtils.validateRangeRequest; import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; @@ -293,15 +294,12 @@ public FileDescriptor getFileDescriptor() throws IOException { @Override public boolean hasCapability(String capability) { - // a bit inefficient, but intended to make it easier to add - // new capabilities. switch (capability.toLowerCase(Locale.ENGLISH)) { case StreamCapabilities.IOSTATISTICS: case StreamCapabilities.IOSTATISTICS_CONTEXT: - case StreamCapabilities.VECTOREDIO: return true; default: - return false; + return hasVectorIOCapability(capability); } } @@ -400,7 +398,9 @@ private void initiateRead() { for(int i = 0; i < ranges.size(); ++i) { FileRange range = ranges.get(i); buffers[i] = allocateRelease.getBuffer(false, range.getLength()); - channel.read(buffers[i], range.getOffset(), i, this); + final ByteBuffer buffer = buffers[i]; + LOG.debug("Reading file range {} into buffer {}", range, System.identityHashCode(buffer)); + channel.read(buffer, range.getOffset(), i, this); } } @@ -416,6 +416,8 @@ private void initiateRead() { public void completed(Integer result, Integer rangeIndex) { FileRange range = ranges.get(rangeIndex); ByteBuffer buffer = buffers[rangeIndex]; + LOG.debug("Completed read range {} into buffer {} outcome={} remaining={}", + range, System.identityHashCode(buffer), result, buffer.remaining()); if (result == -1) { // no data was read back. failed(new EOFException("Read past End of File"), rangeIndex); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index 93ed57ef830..955040d91a3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -86,6 +86,16 @@ public interface StreamCapabilities { */ String VECTOREDIO = "in:readvectored"; + /** + * Probe for vector IO implementation details: {@value}. + * When performing vectored IO operations, are the buffers returned by readVectored() + * potentially sliced subsets of buffers allocated by the allocate() function + * passed in the read requests? + * If true, this means that the returned buffers may be sliced subsets of the + * allocated buffers. + */ + String VECTOREDIO_BUFFERS_SLICED = "fs.capability.vectoredio.sliced"; + /** * Stream abort() capability implemented by {@link Abortable#abort()}. * This matches the Path Capability diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java index 6adcba39a3f..a4c2d69b63b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.List; +import java.util.Locale; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; @@ -476,6 +477,22 @@ public static ByteBuffer sliceTo(ByteBuffer readData, long readOffset, return readData; } + /** + * Default vector IO probes. + * These are capabilities which streams that leave vector IO + * to the default methods should return when queried for vector capabilities. + * @param capability capability to probe for. + * @return true if the given capability holds for vectored IO features. + */ + public static boolean hasVectorIOCapability(String capability) { + switch (capability.toLowerCase(Locale.ENGLISH)) { + case StreamCapabilities.VECTOREDIO: + return true; + default: + return false; + } + } + /** * private constructor. */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java new file mode 100644 index 00000000000..0abe678ab74 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hadoop.fs.impl; + +import java.nio.ByteBuffer; +import java.util.IdentityHashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.io.ByteBufferPool; + +import static java.lang.System.identityHashCode; +import static java.util.Objects.requireNonNull; + +/** + * A wrapper {@link ByteBufferPool} implementation that tracks whether all allocated buffers + * are released. + * <p> + * It throws the related exception at {@link #close()} if any buffer remains un-released. + * It also clears the buffers at release so if they continued being used it'll generate errors. + * <p> + * To be used for testing.. + * <p> + * The stacktraces of the allocation are not stored by default because + * it can significantly decrease the unit test performance. + * Configuring this class to log at DEBUG will trigger their collection. + * @see ByteBufferAllocationStacktraceException + * <p> + * Adapted from Parquet class {@code org.apache.parquet.bytes.TrackingByteBufferAllocator}. + */ +@VisibleForTesting +public final class TrackingByteBufferPool implements ByteBufferPool, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(TrackingByteBufferPool.class); + + /** + * Wrap an existing allocator with this tracking allocator. + * @param allocator allocator to wrap. + * @return a new allocator. + */ + public static TrackingByteBufferPool wrap(ByteBufferPool allocator) { + return new TrackingByteBufferPool(allocator); + } + + public static class LeakDetectorHeapByteBufferPoolException + extends RuntimeException { + + private LeakDetectorHeapByteBufferPoolException(String msg) { + super(msg); + } + + private LeakDetectorHeapByteBufferPoolException(String msg, Throwable cause) { + super(msg, cause); + } + + private LeakDetectorHeapByteBufferPoolException( + String message, + Throwable cause, + boolean enableSuppression, + boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + } + + /** + * Strack trace of allocation as saved in the tracking map. + */ + public static final class ByteBufferAllocationStacktraceException + extends LeakDetectorHeapByteBufferPoolException { + + /** + * Single stack trace instance to use when DEBUG is not enabled. + */ + private static final ByteBufferAllocationStacktraceException WITHOUT_STACKTRACE = + new ByteBufferAllocationStacktraceException(false); + + /** + * Create a stack trace for the map, either using the shared static one + * or a dynamically created one. + * @return a stack + */ + private static ByteBufferAllocationStacktraceException create() { + return LOG.isDebugEnabled() + ? new ByteBufferAllocationStacktraceException() + : WITHOUT_STACKTRACE; + } + + private ByteBufferAllocationStacktraceException() { + super("Allocation stacktrace of the first ByteBuffer:"); + } + + /** + * Private constructor to for the singleton {@link #WITHOUT_STACKTRACE}, + * telling develoers how to see a trace per buffer. + */ + private ByteBufferAllocationStacktraceException(boolean unused) { + super("Log org.apache.hadoop.fs.impl.TrackingByteBufferPool at DEBUG for stack traces", + null, + false, + false); + } + } + + /** + * Exception raised in {@link TrackingByteBufferPool#putBuffer(ByteBuffer)} if the + * buffer to release was not in the hash map. + */ + public static final class ReleasingUnallocatedByteBufferException + extends LeakDetectorHeapByteBufferPoolException { + + private ReleasingUnallocatedByteBufferException(final ByteBuffer b) { + super(String.format("Releasing a ByteBuffer instance that is not allocated" + + " by this buffer pool or already been released: %s size %d; hash code %s", + b, b.capacity(), identityHashCode(b))); + } + } + + /** + * Exception raised in {@link TrackingByteBufferPool#close()} if there + * was an unreleased buffer. + */ + public static final class LeakedByteBufferException + extends LeakDetectorHeapByteBufferPoolException { + + private final int count; + + private LeakedByteBufferException(int count, ByteBufferAllocationStacktraceException e) { + super(count + " ByteBuffer object(s) is/are remained unreleased" + + " after closing this buffer pool.", e); + this.count = count; + } + + /** + * Get the number of unreleased buffers. + * @return number of unreleased buffers + */ + public int getCount() { + return count; + } + } + + /** + * Tracker of allocations. + * <p> + * The key maps by the object id of the buffer, and refers to either a common stack trace + * or one dynamically created for each allocation. + */ + private final Map<ByteBuffer, ByteBufferAllocationStacktraceException> allocated = + new IdentityHashMap<>(); + + /** + * Wrapped buffer pool. + */ + private final ByteBufferPool allocator; + + /** + * Number of buffer allocations. + * <p> + * This is incremented in {@link #getBuffer(boolean, int)}. + */ + private final AtomicInteger bufferAllocations = new AtomicInteger(); + + /** + * Number of buffer releases. + * <p> + * This is incremented in {@link #putBuffer(ByteBuffer)}. + */ + private final AtomicInteger bufferReleases = new AtomicInteger(); + + /** + * private constructor. + * @param allocator pool allocator. + */ + private TrackingByteBufferPool(ByteBufferPool allocator) { + this.allocator = allocator; + } + + public int getBufferAllocations() { + return bufferAllocations.get(); + } + + public int getBufferReleases() { + return bufferReleases.get(); + } + + /** + * Get a buffer from the pool. + * <p> + * This increments the {@link #bufferAllocations} counter and stores the + * singleron or local allocation stack trace in the {@link #allocated} map. + * @param direct whether to allocate a direct buffer or not + * @param size size of the buffer to allocate + * @return a ByteBuffer instance + */ + @Override + public synchronized ByteBuffer getBuffer(final boolean direct, final int size) { + bufferAllocations.incrementAndGet(); + ByteBuffer buffer = allocator.getBuffer(direct, size); + final ByteBufferAllocationStacktraceException ex = + ByteBufferAllocationStacktraceException.create(); + allocated.put(buffer, ex); + LOG.debug("Creating ByteBuffer:{} size {} {}", + identityHashCode(buffer), size, buffer, ex); + return buffer; + } + + /** + * Release a buffer back to the pool. + * <p> + * This increments the {@link #bufferReleases} counter and removes the + * buffer from the {@link #allocated} map. + * <p> + * If the buffer was not allocated by this pool, it throws + * {@link ReleasingUnallocatedByteBufferException}. + * + * @param buffer buffer to release + * @throws ReleasingUnallocatedByteBufferException if the buffer was not allocated by this pool + */ + @Override + public synchronized void putBuffer(ByteBuffer buffer) + throws ReleasingUnallocatedByteBufferException { + + bufferReleases.incrementAndGet(); + requireNonNull(buffer); + LOG.debug("Releasing ByteBuffer: {}: {}", identityHashCode(buffer), buffer); + if (allocated.remove(buffer) == null) { + throw new ReleasingUnallocatedByteBufferException(buffer); + } + allocator.putBuffer(buffer); + // Clearing the buffer so subsequent access would probably generate errors + buffer.clear(); + } + + /** + * Check if the buffer is in the pool. + * @param buffer buffer + * @return true if the buffer is in the pool + */ + public boolean containsBuffer(ByteBuffer buffer) { + return allocated.containsKey(requireNonNull(buffer)); + } + + /** + * Get the number of allocated buffers. + * @return number of allocated buffers + */ + public int size() { + return allocated.size(); + } + + /** + * Expect all buffers to be released -if not, log unreleased ones + * and then raise an exception with the stack trace of the first + * unreleased buffer. + * @throws LeakedByteBufferException if at least one buffer was not released + */ + @Override + public void close() throws LeakedByteBufferException { + if (!allocated.isEmpty()) { + allocated.keySet().forEach(buffer -> + LOG.warn("Unreleased ByteBuffer {}; {}", identityHashCode(buffer), buffer)); + LeakedByteBufferException ex = new LeakedByteBufferException( + allocated.size(), + allocated.values().iterator().next()); + allocated.clear(); // Drop the references to the ByteBuffers, so they can be gc'd + throw ex; + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Sizes.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Sizes.java index bf2dc78741f..7bfce520910 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Sizes.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Sizes.java @@ -31,6 +31,9 @@ @InterfaceStability.Evolving public final class Sizes { + /** 0 bytes: {@value}. Here to make it easy to find use of zero in constants. */ + public static final int S_0 = 0; + /** 2^8 bytes: {@value}. */ public static final int S_256 = 256; diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 12ccac3ee65..d856a83e366 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -1364,6 +1364,24 @@ <description>File space usage statistics refresh interval in msec.</description> </property> +<property> + <name>fs.file.checksum.verify</name> + <value>true</value> + <description> + Should data read through the local filesystem (file://) URLs be verified aginst + the checksums stored in the associated checksum files? + Setting this to false skips loading the checksum files, reading data in checksum-aligned + blocks and verifying checksums. This may improve performance + when reading data, though it pushes the responsibility of detecting errors + into the file formats themselves, or the underlying storage system. + Even when verification is enabled, files without associated checksum files + .$FILENAME.crc are never verified. + When fs.file.checksum.verify is false, vector reads of data will always return + buffers that are the buffers allocated through the buffer allocator + passed in to the API call and not sliced subsets thereof. + </description> +</property> + <property> <name>fs.automatic.close</name> <value>true</value> diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md index 92ef696db7a..1a762d2d077 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md @@ -541,7 +541,7 @@ end of first and start of next range is more than this value. #### `maxReadSizeForVectorReads()` Maximum number of bytes which can be read in one go after merging the ranges. -Two ranges won't be merged if the combined data to be read It's okay we have a look at what we do right now for readOkayis more than this value. +Two ranges won't be merged if the combined data to be read. Essentially setting this to 0 will disable the merging of ranges. #### Concurrency @@ -647,7 +647,7 @@ For details see [HADOOP-19291](https://issues.apache.org/jira/browse/HADOOP-1929 For reliable use with older hadoop releases with the API: sort the list of ranges and check for overlaps before calling `readVectored()`. -*Direct Buffer Reads* +#### Direct Buffer Reads Releases without [HADOOP-19101](https://issues.apache.org/jira/browse/HADOOP-19101) _Vectored Read into off-heap buffer broken in fallback implementation_ can read data @@ -665,8 +665,54 @@ support through an explicit `hasCapability()` probe: Stream.hasCapability("in:readvectored") ``` -Given the HADOOP-18296 problem with `ChecksumFileSystem` and direct buffers, across all releases, -it is best to avoid using this API in production with direct buffers. +#### Buffer Slicing + +[HADOOP-18296](https://issues.apache.org/jira/browse/HADOOP-18296), +_Memory fragmentation in ChecksumFileSystem Vectored IO implementation_ +highlights that `ChecksumFileSystem` (which the default implementation of `file://` +subclasses), may return buffers which are sliced subsets of buffers allocated +through the `allocate()` function passed in. + +This will happen during reads with and without range coalescing. + +Checksum verification may be disabled by setting the option +`fs.file.checksum.verify` to false (Hadoop 3.4.2 and later). + +```xml +<property> + <name>fs.file.checksum.verify</name> + <value>false</value> +</property> +``` + +(As you would expect, disabling checksum verification means that errors +reading data may not be detected during the read operation. +Use with care in production.) + +Filesystem instances which split buffers during vector read operations +MUST declare this by returning `true` +to the path capabilities probe `fs.capability.vectoredio.sliced`, +and for the open stream in its `hasCapability()` method. + + +The local filesystem will not slice buffers if the checksum file +of `filename + ".crc"` is not found. This is not declared in the +filesystem `hasPathCapability(filename, "fs.capability.vectoredio.sliced")` +call, as no checks for the checksum file are made then. +This cannot be relied on in production, but it may be useful when +testing for buffer recycling with Hadoop releases 3.4.1 and earlier. + +*Implementors Notes* + +* Don't slice buffers. `ChecksumFileSystem` has to be considered an outlier which + needs to be addressed in future. +* Always free buffers in error handling code paths. +* When handling errors in coalesced ranges, don't release buffers for any sub-ranges + which have already completed. + +Handling failures in coalesced ranges is complicated. Recent implementations, such as +`org.apache.hadoop.fs.s3a.impl.streams.AnalyticsStream` omit range coalescing, +relying solely on parallel HTTP for performance. ## `void readVectored(List<? extends FileRange> ranges, IntFunction<ByteBuffer> allocate, Consumer<ByteBuffer> release)` diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java index e32107be656..a9c39ef0d68 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java @@ -44,6 +44,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.impl.TrackingByteBufferPool; import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool; import org.apache.hadoop.util.concurrent.HadoopExecutors; @@ -52,13 +53,16 @@ import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY; import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_VECTOR; +import static org.apache.hadoop.fs.StreamCapabilities.VECTOREDIO_BUFFERS_SLICED; import static org.apache.hadoop.fs.contract.ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS; import static org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals; import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; import static org.apache.hadoop.fs.contract.ContractTestUtils.range; import static org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead; import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; - import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.io.Sizes.S_128K; +import static org.apache.hadoop.io.Sizes.S_4K; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture; import static org.apache.hadoop.util.functional.FutureIO.awaitFuture; @@ -74,7 +78,7 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac private static final Logger LOG = LoggerFactory.getLogger(AbstractContractVectoredReadTest.class); - public static final int DATASET_LEN = 64 * 1024; + public static final int DATASET_LEN = S_128K; protected static final byte[] DATASET = ContractTestUtils.dataset(DATASET_LEN, 'a', 32); protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt"; @@ -91,6 +95,8 @@ public abstract class AbstractContractVectoredReadTest extends AbstractFSContrac private final String bufferType; + private final boolean isDirect; + /** * Path to the vector file. */ @@ -110,7 +116,7 @@ public static List<String> params() { protected AbstractContractVectoredReadTest(String bufferType) { this.bufferType = bufferType; - final boolean isDirect = !"array".equals(bufferType); + this.isDirect = !"array".equals(bufferType); this.allocate = size -> pool.getBuffer(isDirect, size); } @@ -619,4 +625,61 @@ protected <T extends Throwable> void verifyExceptionalVectoredRead( }); } } + + @Test + public void testBufferSlicing() throws Throwable { + describe("Test buffer slicing behavior in vectored IO"); + + final int numBuffers = 8; + final int bufferSize = S_4K; + long offset = 0; + final List<FileRange> fileRanges = new ArrayList<>(); + for (int i = 0; i < numBuffers; i++) { + fileRanges.add(FileRange.createFileRange(offset, bufferSize)); + // increment and add a non-binary-aligned gap, so as to force + // offsets to be misaligned with possible page sizes. + offset += bufferSize + 4000; + } + TrackingByteBufferPool trackerPool = TrackingByteBufferPool.wrap(getPool()); + int unknownBuffers = 0; + boolean slicing; + try (FSDataInputStream in = openVectorFile()) { + slicing = in.hasCapability(VECTOREDIO_BUFFERS_SLICED); + LOG.info("Slicing is {} for vectored IO with stream {}", slicing, in); + in.readVectored(fileRanges, s -> trackerPool.getBuffer(isDirect, s), trackerPool::putBuffer); + + // check that all buffers are from the the pool, unless they are sliced. + for (FileRange res : fileRanges) { + CompletableFuture<ByteBuffer> data = res.getData(); + ByteBuffer buffer = awaitFuture(data); + Assertions.assertThat(buffer) + .describedAs("Buffer must not be null") + .isNotNull(); + Assertions.assertThat(slicing || trackerPool.containsBuffer(buffer)) + .describedAs("Buffer must be from the pool") + .isTrue(); + try { + trackerPool.putBuffer(buffer); + } catch (TrackingByteBufferPool.ReleasingUnallocatedByteBufferException e) { + // this can happen if the buffer was sliced, as it is not in the pool. + if (!slicing) { + throw e; + } + LOG.info("Sliced buffer detected: {}", buffer); + unknownBuffers++; + } + } + } + try { + trackerPool.close(); + } catch (TrackingByteBufferPool.LeakedByteBufferException e) { + if (!slicing) { + throw e; + } + LOG.info("Slicing is enabled; we saw leaked buffers: {} after {}" + + " releases of unknown buffers", + e.getCount(), unknownBuffers); + } + + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java index 23cfcce75a2..9e0867597b3 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java @@ -74,6 +74,13 @@ public void testChecksumValidationDuringVectoredReadSmallFile() throws Exception validateCheckReadException(testPath, length, smallFileRanges); } + /** + * Verify that checksum validation works through vectored reads. + * @param testPath path to the file to be tested + * @param length length of the file to be created + * @param ranges ranges to be read from the file + * @throws Exception any exception other than ChecksumException + */ private void validateCheckReadException(Path testPath, int length, List<FileRange> ranges) throws Exception { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemDelegation.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemDelegation.java index 3a60d6ecdda..fc9b1281786 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemDelegation.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemDelegation.java @@ -157,7 +157,8 @@ public URI getUri() { public void setVerifyChecksum(boolean verifyChecksum) { this.verifyChecksum = verifyChecksum; } - + + @Override public boolean getVerifyChecksum(){ return verifyChecksum; } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java index d41974dab91..48b76b162aa 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java @@ -62,7 +62,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE; import static org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE; -import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableAnalyticsAccelerator; import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString; import static org.apache.hadoop.io.Sizes.S_1M; @@ -71,7 +71,8 @@ import static org.apache.hadoop.test.MoreAsserts.assertEqual; /** - * S3A contract tests for vectored reads. + * S3A contract tests for vectored reads through the classic input stream. + * <p> * This is a complex suite as it really is testing the store, so measurements of * what IO took place is also performed if the input stream is suitable for this. */ @@ -89,14 +90,12 @@ protected AbstractFSContract createContract(Configuration conf) { } /** - * Analytics Accelerator Library for Amazon S3 does not support Vectored Reads. - * @throws Exception + * Create a configuration. + * @return a configuration */ @Override - public void setup() throws Exception { - super.setup(); - skipIfAnalyticsAcceleratorEnabled(getContract().getConf(), - "Analytics Accelerator does not support vectored reads"); + protected Configuration createConfiguration() { + return disableAnalyticsAccelerator(super.createConfiguration()); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java index 269e3a14190..d74c8446325 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java @@ -108,6 +108,7 @@ import static org.apache.hadoop.fs.impl.FlagSet.createFlagSet; import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_S3; import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Analytics; +import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Classic; import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Prefetch; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion; @@ -1876,6 +1877,18 @@ public static Configuration enableAnalyticsAccelerator(Configuration conf) { return conf; } + /** + * Disable analytics stream for S3A S3AFileSystem in tests. + * @param conf Configuration to update + * @return patched config + */ + public static Configuration disableAnalyticsAccelerator(Configuration conf) { + removeBaseAndBucketOverrides(conf, + INPUT_STREAM_TYPE); + conf.setEnum(INPUT_STREAM_TYPE, Classic); + return conf; + } + /** * Probe for a filesystem having a specific stream type; * this is done through filesystem capabilities. --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org