This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.4.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.4.2 by this push:
     new dc727079f42 HADOOP-18296. Memory fragmentation in ChecksumFileSystem 
readVectored() (#7732)
dc727079f42 is described below

commit dc727079f4220f4233a2fa29c366c0f1e319f3d4
Author: Steve Loughran <ste...@cloudera.com>
AuthorDate: Mon Jul 21 20:12:21 2025 +0100

    HADOOP-18296. Memory fragmentation in ChecksumFileSystem readVectored() 
(#7732)
    
    Option "fs.file.checksum.verify" disables checksum
    verification in local FS, so sliced subsets of larger buffers are
    never returned.
    
    Stream capability  "fs.capability.vectoredio.sliced" is true
    if a filesystem knows that it is returning slices of a larger buffer.
    This is false if a filesystem doesn't, or against the local
    FS in releases which lack this feature.
    
    Contributed by Steve Loughran
---
 .../org/apache/hadoop/fs/ChecksumFileSystem.java   |  60 ++++-
 .../apache/hadoop/fs/CommonConfigurationKeys.java  |   6 +
 .../java/org/apache/hadoop/fs/LocalFileSystem.java |   6 +
 .../org/apache/hadoop/fs/RawLocalFileSystem.java   |  12 +-
 .../org/apache/hadoop/fs/StreamCapabilities.java   |  10 +
 .../org/apache/hadoop/fs/VectoredReadUtils.java    |  17 ++
 .../hadoop/fs/impl/TrackingByteBufferPool.java     | 290 +++++++++++++++++++++
 .../src/main/java/org/apache/hadoop/io/Sizes.java  |   3 +
 .../src/main/resources/core-default.xml            |  18 ++
 .../site/markdown/filesystem/fsdatainputstream.md  |  54 +++-
 .../contract/AbstractContractVectoredReadTest.java |  69 ++++-
 .../localfs/TestLocalFSContractVectoredRead.java   |   7 +
 .../fs/viewfs/TestViewFileSystemDelegation.java    |   3 +-
 .../contract/s3a/ITestS3AContractVectoredRead.java |  15 +-
 .../org/apache/hadoop/fs/s3a/S3ATestUtils.java     |  13 +
 15 files changed, 559 insertions(+), 24 deletions(-)

diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
index 45b3f90feaa..084b68729be 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
@@ -29,6 +29,7 @@
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
@@ -57,6 +58,7 @@
 import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges;
 import static 
org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
 import static 
org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
+import static org.apache.hadoop.io.Sizes.S_0;
 
 /****************************************************************
  * Abstract Checksumed FileSystem.
@@ -101,6 +103,14 @@ public void setVerifyChecksum(boolean verifyChecksum) {
     this.verifyChecksum = verifyChecksum;
   }
 
+  /**
+   * Is checksum verification enabled?
+   * @return true if files are to be verified through checksums.
+   */
+  public boolean getVerifyChecksum() {
+    return verifyChecksum;
+  }
+
   @Override
   public void setWriteChecksum(boolean writeChecksum) {
     this.writeChecksum = writeChecksum;
@@ -165,7 +175,8 @@ private int getSumBufferSize(int bytesPerSum, int 
bufferSize) {
 
   /*******************************************************
    * For open()'s FSInputStream
-   * It verifies that data matches checksums.
+   * It verifies that data matches checksums iff the data
+   * file has matching checksums.
    *******************************************************/
   private static class ChecksumFSInputChecker extends FSInputChecker implements
       IOStatisticsSource, StreamCapabilities {
@@ -426,8 +437,18 @@ static ByteBuffer checkBytes(ByteBuffer sumsBytes,
       return data;
     }
 
+    /**
+     * Turn off range merging to make buffer recycling more likely (but not 
guaranteed).
+     * @return 0, always
+     */
+    @Override
+    public int maxReadSizeForVectorReads() {
+      return S_0;
+    }
+
     /**
      * Vectored read.
+     * <p>
      * If the file has no checksums: delegate to the underlying stream.
      * If the file is checksummed: calculate the checksum ranges as
      * well as the data ranges, read both, and validate the checksums
@@ -448,10 +469,12 @@ public void readVectored(final List<? extends FileRange> 
ranges,
         final Consumer<ByteBuffer> release) throws IOException {
 
       // If the stream doesn't have checksums, just delegate.
-      if (sums == null) {
+      if (dataStreamToHandleVectorIO()) {
+        LOG.debug("No checksums for vectored read, delegating to inner 
stream");
         datas.readVectored(ranges, allocate);
         return;
       }
+      LOG.debug("Checksum vectored read for {} ranges", ranges.size());
       final long length = getFileLength();
       final List<? extends FileRange> sorted = validateAndSortRanges(ranges,
           Optional.of(length));
@@ -489,9 +512,37 @@ public void readVectored(final List<? extends FileRange> 
ranges,
       }
     }
 
+    /**
+     * Predicate to determine whether vector reads should be directly
+     * handled by the data stream, rather than processing
+     * the ranges in this class, processing which includes checksum validation.
+     * <p>
+     * Vector reading is delegated whenever there are no checksums for
+     * the data file, or when validating checksums has been delegated.
+     * @return true if vector reads are to be directly handled by
+     * the data stream.
+     */
+    private boolean dataStreamToHandleVectorIO() {
+      return sums == null;
+    }
+
+    /**
+     * For this stream, declare that range merging may take place;
+     * otherwise delegate to the inner stream.
+     * @param capability string to query the stream support for.
+     * @return true for sliced vector IO if checksum validation
+     * is taking place. False if no checksums are present for the validation.
+     * For all other probes: pass to the wrapped stream
+     */
     @Override
     public boolean hasCapability(String capability) {
-      return datas.hasCapability(capability);
+      switch (capability.toLowerCase(Locale.ENGLISH)) {
+        // slicing can take place during coalescing and checksumming
+      case StreamCapabilities.VECTOREDIO_BUFFERS_SLICED:
+        return !dataStreamToHandleVectorIO();
+      default:
+        return datas.hasCapability(capability);
+      }
     }
   }
 
@@ -1142,6 +1193,9 @@ public boolean hasPathCapability(final Path path, final 
String capability)
     case CommonPathCapabilities.FS_APPEND:
     case CommonPathCapabilities.FS_CONCAT:
       return false;
+    case StreamCapabilities.VECTOREDIO_BUFFERS_SLICED:
+      return getVerifyChecksum();
+
     default:
       return super.hasPathCapability(p, capability);
     }
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 31b6654afc5..e0832fc0576 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -505,4 +505,10 @@ public class CommonConfigurationKeys extends 
CommonConfigurationKeysPublic {
   public static final String HADOOP_SECURITY_RESOLVER_IMPL =
       "hadoop.security.resolver.impl";
 
+  /**
+   * Verify checksums on read -default is true.
+   * <p>
+   * {@value}.
+   */
+  public static final String LOCAL_FS_VERIFY_CHECKSUM = 
"fs.file.checksum.verify";
 }
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java
index 590cbd9a49e..e912d2245be 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/LocalFileSystem.java
@@ -27,6 +27,8 @@
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 
+import static 
org.apache.hadoop.fs.CommonConfigurationKeys.LOCAL_FS_VERIFY_CHECKSUM;
+
 /****************************************************************
  * Implement the FileSystem API for the checksumed local filesystem.
  *
@@ -50,6 +52,10 @@ public void initialize(URI name, Configuration conf) throws 
IOException {
     if (!scheme.equals(fs.getUri().getScheme())) {
       swapScheme = scheme;
     }
+    final boolean checksum = conf.getBoolean(LOCAL_FS_VERIFY_CHECKSUM, true);
+    setVerifyChecksum(checksum);
+    LOG.debug("Checksum verification enabled={}", checksum);
+
   }
 
   /**
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
index d5f545b460d..3bd93a4f459 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/RawLocalFileSystem.java
@@ -72,6 +72,7 @@
 import org.apache.hadoop.util.StringUtils;
 
 import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED;
+import static org.apache.hadoop.fs.VectoredReadUtils.hasVectorIOCapability;
 import static org.apache.hadoop.fs.VectoredReadUtils.sortRangeList;
 import static org.apache.hadoop.fs.VectoredReadUtils.validateRangeRequest;
 import static 
org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
@@ -293,15 +294,12 @@ public FileDescriptor getFileDescriptor() throws 
IOException {
 
     @Override
     public boolean hasCapability(String capability) {
-      // a bit inefficient, but intended to make it easier to add
-      // new capabilities.
       switch (capability.toLowerCase(Locale.ENGLISH)) {
       case StreamCapabilities.IOSTATISTICS:
       case StreamCapabilities.IOSTATISTICS_CONTEXT:
-      case StreamCapabilities.VECTOREDIO:
         return true;
       default:
-        return false;
+        return hasVectorIOCapability(capability);
       }
     }
 
@@ -400,7 +398,9 @@ private void initiateRead() {
       for(int i = 0; i < ranges.size(); ++i) {
         FileRange range = ranges.get(i);
         buffers[i] = allocateRelease.getBuffer(false, range.getLength());
-        channel.read(buffers[i], range.getOffset(), i, this);
+        final ByteBuffer buffer = buffers[i];
+        LOG.debug("Reading file range {} into buffer {}", range, 
System.identityHashCode(buffer));
+        channel.read(buffer, range.getOffset(), i, this);
       }
     }
 
@@ -416,6 +416,8 @@ private void initiateRead() {
     public void completed(Integer result, Integer rangeIndex) {
       FileRange range = ranges.get(rangeIndex);
       ByteBuffer buffer = buffers[rangeIndex];
+      LOG.debug("Completed read range {} into buffer {} outcome={} 
remaining={}",
+          range, System.identityHashCode(buffer), result, buffer.remaining());
       if (result == -1) {
         // no data was read back.
         failed(new EOFException("Read past End of File"), rangeIndex);
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
index 93ed57ef830..955040d91a3 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
@@ -86,6 +86,16 @@ public interface StreamCapabilities {
    */
   String VECTOREDIO = "in:readvectored";
 
+  /**
+   * Probe for vector IO implementation details: {@value}.
+   * When performing vectored IO operations, are the buffers returned by 
readVectored()
+   * potentially sliced subsets of buffers allocated by the allocate() function
+   * passed in the read requests?
+   * If true, this means that the returned buffers may be sliced subsets of the
+   * allocated buffers.
+   */
+  String VECTOREDIO_BUFFERS_SLICED = "fs.capability.vectoredio.sliced";
+
   /**
    * Stream abort() capability implemented by {@link Abortable#abort()}.
    * This matches the Path Capability
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java
index 6adcba39a3f..a4c2d69b63b 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/VectoredReadUtils.java
@@ -24,6 +24,7 @@
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
+import java.util.Locale;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Consumer;
@@ -476,6 +477,22 @@ public static ByteBuffer sliceTo(ByteBuffer readData, long 
readOffset,
     return readData;
   }
 
+  /**
+   * Default vector IO probes.
+   * These are capabilities which streams that leave vector IO
+   * to the default methods should return when queried for vector capabilities.
+   * @param capability capability to probe for.
+   * @return true if the given capability holds for vectored IO features.
+   */
+  public static boolean hasVectorIOCapability(String capability) {
+    switch (capability.toLowerCase(Locale.ENGLISH)) {
+    case StreamCapabilities.VECTOREDIO:
+      return true;
+    default:
+      return false;
+    }
+  }
+
   /**
    * private constructor.
    */
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java
new file mode 100644
index 00000000000..0abe678ab74
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/TrackingByteBufferPool.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import java.nio.ByteBuffer;
+import java.util.IdentityHashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.io.ByteBufferPool;
+
+import static java.lang.System.identityHashCode;
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A wrapper {@link ByteBufferPool} implementation that tracks whether all 
allocated buffers
+ * are released.
+ * <p>
+ * It throws the related exception at {@link #close()} if any buffer remains 
un-released.
+ * It also clears the buffers at release so if they continued being used it'll 
generate errors.
+ * <p>
+ * To be used for testing..
+ * <p>
+ * The stacktraces of the allocation are not stored by default because
+ * it can significantly decrease the unit test performance.
+ * Configuring this class to log at DEBUG will trigger their collection.
+ * @see ByteBufferAllocationStacktraceException
+ * <p>
+ * Adapted from Parquet class {@code 
org.apache.parquet.bytes.TrackingByteBufferAllocator}.
+ */
+@VisibleForTesting
+public final class TrackingByteBufferPool implements ByteBufferPool, 
AutoCloseable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(TrackingByteBufferPool.class);
+
+  /**
+   * Wrap an existing allocator with this tracking allocator.
+   * @param allocator allocator to wrap.
+   * @return a new allocator.
+   */
+  public static TrackingByteBufferPool wrap(ByteBufferPool allocator) {
+    return new TrackingByteBufferPool(allocator);
+  }
+
+  public static class LeakDetectorHeapByteBufferPoolException
+      extends RuntimeException {
+
+    private LeakDetectorHeapByteBufferPoolException(String msg) {
+      super(msg);
+    }
+
+    private LeakDetectorHeapByteBufferPoolException(String msg, Throwable 
cause) {
+      super(msg, cause);
+    }
+
+    private LeakDetectorHeapByteBufferPoolException(
+        String message,
+        Throwable cause,
+        boolean enableSuppression,
+        boolean writableStackTrace) {
+      super(message, cause, enableSuppression, writableStackTrace);
+    }
+  }
+
+  /**
+   * Strack trace of allocation as saved in the tracking map.
+   */
+  public static final class ByteBufferAllocationStacktraceException
+      extends LeakDetectorHeapByteBufferPoolException {
+
+    /**
+     * Single stack trace instance to use when DEBUG is not enabled.
+     */
+    private static final ByteBufferAllocationStacktraceException 
WITHOUT_STACKTRACE =
+        new ByteBufferAllocationStacktraceException(false);
+
+    /**
+     * Create a stack trace for the map, either using the shared static one
+     * or a dynamically created one.
+     * @return a stack
+     */
+    private static ByteBufferAllocationStacktraceException create() {
+      return LOG.isDebugEnabled()
+          ? new ByteBufferAllocationStacktraceException()
+          : WITHOUT_STACKTRACE;
+    }
+
+    private ByteBufferAllocationStacktraceException() {
+      super("Allocation stacktrace of the first ByteBuffer:");
+    }
+
+    /**
+     * Private constructor to for the singleton {@link #WITHOUT_STACKTRACE},
+     * telling develoers how to see a trace per buffer.
+     */
+    private ByteBufferAllocationStacktraceException(boolean unused) {
+      super("Log org.apache.hadoop.fs.impl.TrackingByteBufferPool at DEBUG for 
stack traces",
+          null,
+          false,
+          false);
+    }
+  }
+
+  /**
+   * Exception raised in {@link TrackingByteBufferPool#putBuffer(ByteBuffer)} 
if the
+   * buffer to release was not in the hash map.
+   */
+  public static final class ReleasingUnallocatedByteBufferException
+      extends LeakDetectorHeapByteBufferPoolException {
+
+    private ReleasingUnallocatedByteBufferException(final ByteBuffer b) {
+      super(String.format("Releasing a ByteBuffer instance that is not 
allocated"
+          + " by this buffer pool or already been released: %s size %d; hash 
code %s",
+          b, b.capacity(), identityHashCode(b)));
+    }
+  }
+
+  /**
+   * Exception raised in {@link TrackingByteBufferPool#close()} if there
+   * was an unreleased buffer.
+   */
+  public static final class LeakedByteBufferException
+      extends LeakDetectorHeapByteBufferPoolException {
+
+    private final int count;
+
+    private LeakedByteBufferException(int count, 
ByteBufferAllocationStacktraceException e) {
+      super(count + " ByteBuffer object(s) is/are remained unreleased"
+          + " after closing this buffer pool.", e);
+      this.count = count;
+    }
+
+    /**
+     * Get the number of unreleased buffers.
+     * @return number of unreleased buffers
+     */
+    public int getCount() {
+      return count;
+    }
+  }
+
+  /**
+   * Tracker of allocations.
+   * <p>
+   * The key maps by the object id of the buffer, and refers to either a 
common stack trace
+   * or one dynamically created for each allocation.
+   */
+  private final Map<ByteBuffer, ByteBufferAllocationStacktraceException> 
allocated =
+      new IdentityHashMap<>();
+
+  /**
+   * Wrapped buffer pool.
+   */
+  private final ByteBufferPool allocator;
+
+  /**
+   * Number of buffer allocations.
+   * <p>
+   * This is incremented in {@link #getBuffer(boolean, int)}.
+   */
+  private final AtomicInteger bufferAllocations = new AtomicInteger();
+
+  /**
+   * Number of buffer releases.
+   * <p>
+   * This is incremented in {@link #putBuffer(ByteBuffer)}.
+   */
+  private final AtomicInteger bufferReleases = new AtomicInteger();
+
+  /**
+   * private constructor.
+   * @param allocator pool allocator.
+   */
+  private TrackingByteBufferPool(ByteBufferPool allocator) {
+    this.allocator = allocator;
+  }
+
+  public int getBufferAllocations() {
+    return bufferAllocations.get();
+  }
+
+  public int getBufferReleases() {
+    return bufferReleases.get();
+  }
+
+  /**
+   * Get a buffer from the pool.
+   * <p>
+   * This increments the {@link #bufferAllocations} counter and stores the
+   * singleron or local allocation stack trace in the {@link #allocated} map.
+   * @param direct whether to allocate a direct buffer or not
+   * @param size size of the buffer to allocate
+   * @return a ByteBuffer instance
+   */
+  @Override
+  public synchronized ByteBuffer getBuffer(final boolean direct, final int 
size) {
+    bufferAllocations.incrementAndGet();
+    ByteBuffer buffer = allocator.getBuffer(direct, size);
+    final ByteBufferAllocationStacktraceException ex =
+        ByteBufferAllocationStacktraceException.create();
+    allocated.put(buffer, ex);
+    LOG.debug("Creating ByteBuffer:{} size {} {}",
+        identityHashCode(buffer), size, buffer, ex);
+    return buffer;
+  }
+
+  /**
+   * Release a buffer back to the pool.
+   * <p>
+   * This increments the {@link #bufferReleases} counter and removes the
+   * buffer from the {@link #allocated} map.
+   * <p>
+   * If the buffer was not allocated by this pool, it throws
+   * {@link ReleasingUnallocatedByteBufferException}.
+   *
+   * @param buffer buffer to release
+   * @throws ReleasingUnallocatedByteBufferException if the buffer was not 
allocated by this pool
+   */
+  @Override
+  public synchronized void putBuffer(ByteBuffer buffer)
+      throws ReleasingUnallocatedByteBufferException {
+
+    bufferReleases.incrementAndGet();
+    requireNonNull(buffer);
+    LOG.debug("Releasing ByteBuffer: {}: {}", identityHashCode(buffer), 
buffer);
+    if (allocated.remove(buffer) == null) {
+      throw new ReleasingUnallocatedByteBufferException(buffer);
+    }
+    allocator.putBuffer(buffer);
+    // Clearing the buffer so subsequent access would probably generate errors
+    buffer.clear();
+  }
+
+  /**
+   * Check if the buffer is in the pool.
+   * @param buffer buffer
+   * @return true if the buffer is in the pool
+   */
+  public boolean containsBuffer(ByteBuffer buffer) {
+    return allocated.containsKey(requireNonNull(buffer));
+  }
+
+  /**
+   * Get the number of allocated buffers.
+   * @return number of allocated buffers
+   */
+  public int size() {
+    return allocated.size();
+  }
+
+  /**
+   * Expect all buffers to be released -if not, log unreleased ones
+   * and then raise an exception with the stack trace of the first
+   * unreleased buffer.
+   * @throws LeakedByteBufferException if at least one buffer was not released
+   */
+  @Override
+  public void close() throws LeakedByteBufferException {
+    if (!allocated.isEmpty()) {
+      allocated.keySet().forEach(buffer ->
+          LOG.warn("Unreleased ByteBuffer {}; {}", identityHashCode(buffer), 
buffer));
+      LeakedByteBufferException ex = new LeakedByteBufferException(
+          allocated.size(),
+          allocated.values().iterator().next());
+      allocated.clear(); // Drop the references to the ByteBuffers, so they 
can be gc'd
+      throw ex;
+    }
+  }
+}
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Sizes.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Sizes.java
index bf2dc78741f..7bfce520910 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Sizes.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/Sizes.java
@@ -31,6 +31,9 @@
 @InterfaceStability.Evolving
 public final class Sizes {
 
+  /** 0 bytes: {@value}. Here to make it easy to find use of zero in 
constants. */
+  public static final int S_0 = 0;
+
   /** 2^8 bytes: {@value}. */
   public static final int S_256 = 256;
 
diff --git 
a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml 
b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 12ccac3ee65..d856a83e366 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1364,6 +1364,24 @@
   <description>File space usage statistics refresh interval in 
msec.</description>
 </property>
 
+<property>
+  <name>fs.file.checksum.verify</name>
+  <value>true</value>
+  <description>
+    Should data read through the local filesystem (file://) URLs be verified 
aginst
+    the checksums stored in the associated checksum files?
+    Setting this to false skips loading the checksum files, reading data in 
checksum-aligned
+    blocks and verifying checksums. This may improve performance
+    when reading data, though it pushes the responsibility of detecting errors
+    into the file formats themselves, or the underlying storage system.
+    Even when verification is enabled, files without associated checksum files
+    .$FILENAME.crc are never verified.
+    When fs.file.checksum.verify is false, vector reads of data will always 
return
+    buffers that are the buffers allocated through the buffer allocator
+    passed in to the API call and not sliced subsets thereof.
+  </description>
+</property>
+
 <property>
   <name>fs.automatic.close</name>
   <value>true</value>
diff --git 
a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
 
b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
index 92ef696db7a..1a762d2d077 100644
--- 
a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
+++ 
b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
@@ -541,7 +541,7 @@ end of first and start of next range is more than this 
value.
 #### `maxReadSizeForVectorReads()`
 
 Maximum number of bytes which can be read in one go after merging the ranges.
-Two ranges won't be merged if the combined data to be read It's okay we have a 
look at what we do right now for readOkayis more than this value.
+Two ranges won't be merged if the combined data to be read.
 Essentially setting this to 0 will disable the merging of ranges.
 
 #### Concurrency
@@ -647,7 +647,7 @@ For details see 
[HADOOP-19291](https://issues.apache.org/jira/browse/HADOOP-1929
 For reliable use with older hadoop releases with the API: sort the list of 
ranges
 and check for overlaps before calling `readVectored()`.
 
-*Direct Buffer Reads*
+#### Direct Buffer Reads
 
 Releases without 
[HADOOP-19101](https://issues.apache.org/jira/browse/HADOOP-19101)
 _Vectored Read into off-heap buffer broken in fallback implementation_ can 
read data
@@ -665,8 +665,54 @@ support through an explicit `hasCapability()` probe:
 Stream.hasCapability("in:readvectored")
 ```
 
-Given the HADOOP-18296 problem with `ChecksumFileSystem` and direct buffers, 
across all releases,
-it is best to avoid using this API in production with direct buffers.
+#### Buffer Slicing
+
+[HADOOP-18296](https://issues.apache.org/jira/browse/HADOOP-18296),
+_Memory fragmentation in ChecksumFileSystem Vectored IO implementation_
+highlights that `ChecksumFileSystem` (which the default implementation of 
`file://`
+subclasses), may return buffers which are sliced subsets of buffers allocated
+through the `allocate()` function passed in.
+
+This will happen during reads with and without range coalescing.
+
+Checksum verification may be disabled by setting the option
+`fs.file.checksum.verify` to false (Hadoop 3.4.2 and later).
+
+```xml
+<property>
+  <name>fs.file.checksum.verify</name>
+  <value>false</value>
+</property>
+```
+
+(As you would expect, disabling checksum verification means that errors
+reading data may not be detected during the read operation.
+Use with care in production.)
+
+Filesystem instances which split buffers during vector read operations
+MUST declare this by returning `true`
+to the path capabilities probe `fs.capability.vectoredio.sliced`,
+and for the open stream in its `hasCapability()` method.
+
+
+The local filesystem will not slice buffers if the checksum file
+of `filename + ".crc"` is not found. This is not declared in the
+filesystem `hasPathCapability(filename, "fs.capability.vectoredio.sliced")`
+call, as no checks for the checksum file are made then.
+This cannot be relied on in production, but it may be useful when
+testing for buffer recycling with Hadoop releases 3.4.1 and earlier.
+
+*Implementors Notes*
+
+* Don't slice buffers. `ChecksumFileSystem` has to be considered an outlier 
which
+  needs to be addressed in future.
+* Always free buffers in error handling code paths.
+* When handling errors in coalesced ranges, don't release buffers for any 
sub-ranges
+  which have already completed.
+
+Handling failures in coalesced ranges is complicated. Recent implementations, 
such as
+`org.apache.hadoop.fs.s3a.impl.streams.AnalyticsStream` omit range coalescing,
+relying solely on parallel HTTP for performance.
 
 
 ## `void readVectored(List<? extends FileRange> ranges, 
IntFunction<ByteBuffer> allocate, Consumer<ByteBuffer> release)`
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
index e32107be656..a9c39ef0d68 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractVectoredReadTest.java
@@ -44,6 +44,7 @@
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.impl.TrackingByteBufferPool;
 import org.apache.hadoop.io.ElasticByteBufferPool;
 import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
@@ -52,13 +53,16 @@
 import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
 import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
 import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_VECTOR;
+import static 
org.apache.hadoop.fs.StreamCapabilities.VECTOREDIO_BUFFERS_SLICED;
 import static 
org.apache.hadoop.fs.contract.ContractTestUtils.VECTORED_READ_OPERATION_TEST_TIMEOUT_SECONDS;
 import static 
org.apache.hadoop.fs.contract.ContractTestUtils.assertDatasetEquals;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.range;
 import static 
org.apache.hadoop.fs.contract.ContractTestUtils.returnBuffersToPoolPostRead;
 import static 
org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
-  import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.apache.hadoop.io.Sizes.S_128K;
+import static org.apache.hadoop.io.Sizes.S_4K;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.apache.hadoop.test.LambdaTestUtils.interceptFuture;
 import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;
 
@@ -74,7 +78,7 @@ public abstract class AbstractContractVectoredReadTest 
extends AbstractFSContrac
   private static final Logger LOG =
       LoggerFactory.getLogger(AbstractContractVectoredReadTest.class);
 
-  public static final int DATASET_LEN = 64 * 1024;
+  public static final int DATASET_LEN = S_128K;
   protected static final byte[] DATASET = 
ContractTestUtils.dataset(DATASET_LEN, 'a', 32);
   protected static final String VECTORED_READ_FILE_NAME = "vectored_file.txt";
 
@@ -91,6 +95,8 @@ public abstract class AbstractContractVectoredReadTest 
extends AbstractFSContrac
 
   private final String bufferType;
 
+  private final boolean isDirect;
+
   /**
    * Path to the vector file.
    */
@@ -110,7 +116,7 @@ public static List<String> params() {
 
   protected AbstractContractVectoredReadTest(String bufferType) {
     this.bufferType = bufferType;
-    final boolean isDirect = !"array".equals(bufferType);
+    this.isDirect = !"array".equals(bufferType);
     this.allocate = size -> pool.getBuffer(isDirect, size);
   }
 
@@ -619,4 +625,61 @@ protected <T extends Throwable> void 
verifyExceptionalVectoredRead(
       });
     }
   }
+
+  @Test
+  public void testBufferSlicing() throws Throwable {
+    describe("Test buffer slicing behavior in vectored IO");
+
+    final int numBuffers = 8;
+    final int bufferSize = S_4K;
+    long offset = 0;
+    final List<FileRange> fileRanges = new ArrayList<>();
+    for (int i = 0; i < numBuffers; i++) {
+      fileRanges.add(FileRange.createFileRange(offset, bufferSize));
+      // increment and add a non-binary-aligned gap, so as to force
+      // offsets to be misaligned with possible page sizes.
+      offset += bufferSize + 4000;
+    }
+    TrackingByteBufferPool trackerPool = 
TrackingByteBufferPool.wrap(getPool());
+    int unknownBuffers = 0;
+    boolean slicing;
+    try (FSDataInputStream in = openVectorFile()) {
+      slicing = in.hasCapability(VECTOREDIO_BUFFERS_SLICED);
+      LOG.info("Slicing is {} for vectored IO with stream {}", slicing, in);
+      in.readVectored(fileRanges, s -> trackerPool.getBuffer(isDirect, s), 
trackerPool::putBuffer);
+
+      // check that all buffers are from the the pool, unless they are sliced.
+      for (FileRange res : fileRanges) {
+        CompletableFuture<ByteBuffer> data = res.getData();
+        ByteBuffer buffer = awaitFuture(data);
+        Assertions.assertThat(buffer)
+            .describedAs("Buffer must not be null")
+            .isNotNull();
+        Assertions.assertThat(slicing || trackerPool.containsBuffer(buffer))
+            .describedAs("Buffer must be from the pool")
+            .isTrue();
+        try {
+          trackerPool.putBuffer(buffer);
+        } catch 
(TrackingByteBufferPool.ReleasingUnallocatedByteBufferException e) {
+          // this can happen if the buffer was sliced, as it is not in the 
pool.
+          if (!slicing) {
+            throw e;
+          }
+          LOG.info("Sliced buffer detected: {}", buffer);
+          unknownBuffers++;
+        }
+      }
+    }
+    try {
+      trackerPool.close();
+    } catch (TrackingByteBufferPool.LeakedByteBufferException e) {
+      if (!slicing) {
+        throw e;
+      }
+      LOG.info("Slicing is enabled; we saw leaked buffers: {} after {}"
+              + " releases of unknown buffers",
+          e.getCount(), unknownBuffers);
+    }
+
+  }
 }
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java
index 23cfcce75a2..9e0867597b3 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractVectoredRead.java
@@ -74,6 +74,13 @@ public void 
testChecksumValidationDuringVectoredReadSmallFile() throws Exception
     validateCheckReadException(testPath, length, smallFileRanges);
   }
 
+  /**
+   * Verify that checksum validation works through vectored reads.
+   * @param testPath path to the file to be tested
+   * @param length length of the file to be created
+   * @param ranges ranges to be read from the file
+   * @throws Exception any exception other than ChecksumException
+   */
   private void validateCheckReadException(Path testPath,
                                           int length,
                                           List<FileRange> ranges) throws 
Exception {
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemDelegation.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemDelegation.java
index 3a60d6ecdda..fc9b1281786 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemDelegation.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemDelegation.java
@@ -157,7 +157,8 @@ public URI getUri() {
     public void setVerifyChecksum(boolean verifyChecksum) {
       this.verifyChecksum = verifyChecksum;
     }
-    
+
+    @Override
     public boolean getVerifyChecksum(){
       return verifyChecksum;
     }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
index d41974dab91..48b76b162aa 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractVectoredRead.java
@@ -62,7 +62,7 @@
 import static 
org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
 import static 
org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE;
 import static 
org.apache.hadoop.fs.s3a.Constants.AWS_S3_VECTOR_READS_MIN_SEEK_SIZE;
-import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.disableAnalyticsAccelerator;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
 import static org.apache.hadoop.io.Sizes.S_1M;
@@ -71,7 +71,8 @@
 import static org.apache.hadoop.test.MoreAsserts.assertEqual;
 
 /**
- * S3A contract tests for vectored reads.
+ * S3A contract tests for vectored reads through the classic input stream.
+ * <p>
  * This is a complex suite as it really is testing the store, so measurements 
of
  * what IO took place is also performed if the input stream is suitable for 
this.
  */
@@ -89,14 +90,12 @@ protected AbstractFSContract createContract(Configuration 
conf) {
   }
 
   /**
-   * Analytics Accelerator Library for Amazon S3 does not support Vectored 
Reads.
-   * @throws Exception
+   * Create a configuration.
+   * @return a configuration
    */
   @Override
-  public void setup() throws Exception {
-    super.setup();
-    skipIfAnalyticsAcceleratorEnabled(getContract().getConf(),
-        "Analytics Accelerator does not support vectored reads");
+  protected Configuration createConfiguration() {
+    return disableAnalyticsAccelerator(super.createConfiguration());
   }
 
   /**
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
index 269e3a14190..d74c8446325 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
@@ -108,6 +108,7 @@
 import static org.apache.hadoop.fs.impl.FlagSet.createFlagSet;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_S3;
 import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Analytics;
+import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Classic;
 import static org.apache.hadoop.fs.s3a.impl.streams.InputStreamType.Prefetch;
 import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
 import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletion;
@@ -1876,6 +1877,18 @@ public static Configuration 
enableAnalyticsAccelerator(Configuration conf) {
     return conf;
   }
 
+  /**
+   * Disable analytics stream for S3A S3AFileSystem in tests.
+   * @param conf Configuration to update
+   * @return patched config
+   */
+  public static Configuration disableAnalyticsAccelerator(Configuration conf) {
+    removeBaseAndBucketOverrides(conf,
+        INPUT_STREAM_TYPE);
+    conf.setEnum(INPUT_STREAM_TYPE, Classic);
+    return conf;
+  }
+
   /**
    * Probe for a filesystem having a specific stream type;
    * this is done through filesystem capabilities.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org


Reply via email to