[
https://issues.apache.org/jira/browse/HADOOP-19901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18083043#comment-18083043
]
ASF GitHub Bot commented on HADOOP-19901:
-----------------------------------------
iemejia opened a new pull request, #8511:
URL: https://github.com/apache/hadoop/pull/8511
## Description
`ChecksumFileSystem.readVectored()` allocates buffers for both data ranges
and checksum ranges through the caller's allocator (`IntFunction<ByteBuffer>`).
After checksum verification completes, the checksum buffers were never released
because:
1. The caller has no reference to checksum buffers (only data range results
are visible)
2. The system did not call the `release` consumer on them
This causes buffer leaks when callers use a tracking or pooled allocator.
This bug was discovered while working on Apache Parquet's
`TrackingByteBufferAllocator`, which detected unreleased buffers accumulating
during vectored reads through `ChecksumFileSystem`.
## Root Cause
In `ChecksumFileSystem.ChecksumFSInputChecker.readVectored()`, checksum
ranges are read via `sums.readVectored(checksumRanges, allocate, release)`. The
checksum buffers are used in `thenCombineAsync` calls for verification, but
after verification completes, no code released them back to the caller's pool.
A single checksum range can cover multiple data ranges, so the checksum
buffer is shared across multiple `thenCombineAsync` calls — it must only be
released once, after ALL verifications using it are complete.
## Fix
After collecting all verification futures for a checksum range, use
`CompletableFuture.allOf(verifications).thenRun(...)` to release the checksum
buffer exactly once after all verifications complete:
```java
CompletableFuture.allOf(verifications.toArray(new CompletableFuture[0]))
.thenRun(() -> release.accept(checksumRange.getData().join()));
```
This ensures:
- Exactly-once release (not once per data range that shares the checksum
buffer)
- Release happens only after the buffer is no longer in use
- Works correctly with both the 2-arg API (no-op release) and the 3-arg API
## Testing
Added `testChecksumBuffersReleasedAfterVectoredRead()` to
`TestLocalFSContractVectoredRead`:
- Uses a counting allocator/release pair to track buffer lifecycle
- Performs a vectored read through `LocalFileSystem` (which uses
`ChecksumFileSystem`)
- Asserts that the system calls the release consumer at least once for
internal buffers
- Confirmed the test **fails without the fix** (0 system-initiated releases)
and **passes with it**
All 54 existing vectored read contract tests continue to pass.
## JIRA
https://issues.apache.org/jira/browse/HADOOP-19901
> ChecksumFileSystem.readVectored leaks buffers allocated through caller's
> IntFunction allocator
> ----------------------------------------------------------------------------------------------
>
> Key: HADOOP-19901
> URL: https://issues.apache.org/jira/browse/HADOOP-19901
> Project: Hadoop Common
> Issue Type: Bug
> Components: fs
> Affects Versions: 3.5.0, 3.4.3
> Reporter: Ismaël Mejía
> Priority: Major
>
> h3. Summary
> When {{ChecksumFileSystem.readVectored()}} is called with checksum
> verification enabled (the default for {{LocalFileSystem}}), it allocates
> buffers for *both* file data ranges and checksum ranges through the
> caller-provided {{IntFunction<ByteBuffer> allocate}} function. However, the
> checksum buffers are only used temporarily for verification and are never
> released back to the caller. The caller has no reference to these buffers and
> no mechanism to release them.
> This was discovered in Apache Parquet Java while upgrading from Hadoop 3.3.0
> to 3.4.3 and testing with {{TrackingByteBufferAllocator}}, which detected
> leaked {{ByteBuffer}} allocations.
> h3. Root cause
> In {{ChecksumFSInputChecker.readVectored()}} ([ChecksumFileSystem.java,
> trunk|https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java]):
> {code:java}
> @Override
> public void readVectored(final List<? extends FileRange> ranges,
> final IntFunction<ByteBuffer> allocate,
> final Consumer<ByteBuffer> release) throws IOException {
> // ...
> sums.readVectored(checksumRanges, allocate, release); // allocates
> checksum buffers via caller's allocator
> datas.readVectored(dataRanges, allocate, release); // allocates data
> buffers via caller's allocator
> for (CombinedFileRange checksumRange : checksumRanges) {
> for (FileRange dataRange : checksumRange.getUnderlying()) {
> CompletableFuture<ByteBuffer> result =
> checksumRange.getData().thenCombineAsync(dataRange.getData(),
> (sumBuffer, dataBuffer) ->
> checkBytes(sumBuffer, checksumRange.getOffset(),
> dataBuffer, dataRange.getOffset(), bytesPerSum, file));
> for (FileRange original : ((CombinedFileRange)
> dataRange).getUnderlying()) {
> original.setData(result.thenApply(
> (b) -> VectoredReadUtils.sliceTo(b, dataRange.getOffset(),
> original)));
> }
> }
> }
> }
> {code}
> Two problems:
> # *Checksum buffers are never released.* {{sums.readVectored(checksumRanges,
> allocate, release)}} allocates buffers through the caller's {{allocate}}
> function to read checksum data. After {{checkBytes()}} verifies the data, the
> checksum buffers ({{sumBuffer}}) are no longer needed, but they are never
> passed to {{release}} and are invisible to the caller. They leak.
> # *The 2-arg API provides no release mechanism.* The 2-arg overload passes a
> no-op release:
> {code:java}
> public void readVectored(List<? extends FileRange> ranges,
> IntFunction<ByteBuffer> allocate) throws
> IOException {
> readVectored(ranges, allocate, (b) -> { });
> }
> {code}
> Even callers using the 3-arg API don't benefit, because
> {{ChecksumFileSystem}} itself never calls {{release}} on the checksum buffers
> -- it only passes {{release}} down to the underlying streams.
> h3. How this was discovered
> Apache Parquet Java uses a {{TrackingByteBufferAllocator}} in tests that
> wraps the real allocator and tracks all allocations. When the allocator is
> closed, it throws {{LeakedByteBufferException}} if any allocated buffers were
> not released. After upgrading Hadoop from 3.3.0 to 3.4.3, the following test
> classes started failing with buffer leak errors in the vectored I/O path:
> * {{TestRecordLevelFilters}} (15 tests)
> * {{TestColumnIndexFiltering}} (24 tests)
> * {{TestParquetReader}} (6+ tests)
> The allocation stacktrace showed:
> {code}
> TrackingByteBufferAllocator.allocate
> -> VectorIOBufferPool.getBuffer
> -> RawLocalFileSystem$AsyncHandler.initiateRead
> {code}
> Parquet's {{readVectored()}} method passes a {{ByteBufferAllocator}} to
> Hadoop, but Hadoop uses it for internal temporary allocations (checksum
> ranges) that are invisible to the caller.
> h3. Workaround in Parquet
> We implemented a "capturing allocator" pattern that wraps the allocator to
> track all buffers allocated during {{readVectored()}}, then registers them
> all for release:
> {code:java}
> List<ByteBuffer> allocatedBuffers = new ArrayList<>();
> ByteBufferAllocator capturingAllocator = new ByteBufferAllocator() {
> @Override
> public ByteBuffer allocate(int size) {
> ByteBuffer buf = options.getAllocator().allocate(size);
> allocatedBuffers.add(buf);
> return buf;
> }
> // ...
> };
> try {
> f.readVectored(ranges, capturingAllocator);
> // ... process futures ...
> } finally {
> builder.addBuffersToRelease(allocatedBuffers);
> }
> {code}
> This ensures all buffers allocated through the caller's allocator are
> eventually released, regardless of whether they are returned in a future or
> used internally by ChecksumFileSystem. See [parquet-java commit
> fc0586d68|https://github.com/apache/parquet-java/commit/fc0586d68].
> h3. Suggested fixes
> *Option A (minimal): Release checksum buffers after verification.*
> In {{ChecksumFSInputChecker.readVectored()}}, after {{checkBytes()}}
> completes, call {{release}} on the checksum buffer:
> {code:java}
> CompletableFuture<ByteBuffer> result =
> checksumRange.getData().thenCombineAsync(dataRange.getData(),
> (sumBuffer, dataBuffer) -> {
> ByteBuffer verified = checkBytes(sumBuffer,
> checksumRange.getOffset(),
> dataBuffer, dataRange.getOffset(), bytesPerSum, file);
> release.accept(sumBuffer); // release checksum buffer after
> verification
> return verified;
> });
> {code}
> *Option B (comprehensive): Don't use the caller's allocator for internal
> temporaries.*
> ChecksumFileSystem should allocate its own temporary buffers for checksum
> data instead of using the caller-provided allocator. The caller's allocator
> is intended for buffers that the caller will own and manage. Using it for
> internal temporaries violates that expectation.
> {code:java}
> // Use internal allocation for checksums, not the caller's allocator
> sums.readVectored(checksumRanges, ByteBuffer::allocate, (b) -> { });
> // Only use caller's allocator for data ranges
> datas.readVectored(dataRanges, allocate, release);
> {code}
> *Option C (API improvement): Extend the API to support paired
> allocate/release.*
> The current {{IntFunction<ByteBuffer>}} allocator is one-way -- there's no
> way for Hadoop to release a buffer it allocated through the caller's
> function. HADOOP-19303 added a {{Consumer<ByteBuffer> release}} parameter,
> but it's separate from the allocate function and {{ChecksumFileSystem}}
> doesn't use it for its own intermediate buffers. A paired allocator/releaser
> interface (similar to Parquet's {{ByteBufferAllocator}} with both
> {{allocate}} and {{release}} methods) would make the lifecycle explicit.
> h3. Related issues
> * *HADOOP-19303* (VectorIO API to support releasing buffers on failure) --
> Added the 3-arg {{readVectored}} with {{release}} Consumer, but
> {{ChecksumFileSystem}} doesn't call {{release}} on checksum buffers.
> * *HADOOP-18296* (Memory fragmentation in ChecksumFileSystem Vectored IO) --
> Fixed range merging fragmentation, but did not address checksum buffer leaks.
> * *PARQUET-2171* (Implement vectored IO in parquet file format) -- The
> Parquet side implementation.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]