anmolanmol1234 commented on code in PR #8400: URL: https://github.com/apache/hadoop/pull/8400#discussion_r3071254102
########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/VectoredReadHandler.java: ########## @@ -0,0 +1,696 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.EOFException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.VectoredReadUtils; +import org.apache.hadoop.fs.azurebfs.constants.ReadType; +import org.apache.hadoop.fs.azurebfs.enums.VectoredReadStrategy; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.impl.CombinedFileRange; + +/** + * Handles vectored read operations by coordinating with a ReadBufferManager + * and applying the configured VectoredReadStrategy. + * This class acts as the orchestration layer that decides how vectored reads + * are executed, while delegating buffer management and read behavior to + * dedicated components. + */ +class VectoredReadHandler { + + private static final Logger LOG = LoggerFactory.getLogger(VectoredReadHandler.class); + + /** + * Manages allocation, lifecycle, and reuse of read buffers + * used during vectored read operations. + */ + private final ReadBufferManager readBufferManager; + + /** + * Strategy defining how vectored reads should be performed. + */ + private final VectoredReadStrategy strategy; + + /** + * Shared reconstruction buffers for ranges that span multiple chunks. + * Keyed by the original FileRange instance. + */ + private final ConcurrentHashMap<RangeKey, ByteBuffer> partialBuffers = + new ConcurrentHashMap<>(); + + /** + * Tracks remaining bytes to be received for each logical range. + * Keyed by the original FileRange instance. + */ + private final ConcurrentHashMap<RangeKey, AtomicInteger> pendingBytes = + new ConcurrentHashMap<>(); + + /** + * Creates a VectoredReadHandler using the provided ReadBufferManager. + * The vectored read strategy is obtained from the manager to ensure + * consistent configuration across the read pipeline. + * + * @param readBufferManager manager responsible for buffer handling + * and providing the vectored read strategy + */ + VectoredReadHandler(ReadBufferManager readBufferManager) { + this.readBufferManager = readBufferManager; + this.strategy = readBufferManager.getVectoredReadStrategy(); + LOG.debug("VectoredReadHandler initialized with strategy={}", strategy); + } + + /** + * Perform a vectored read over multiple logical file ranges. + * + * <p>Logical ranges are first merged using a span-first strategy determined + * by the configured {@link VectoredReadStrategy}. The merged ranges are then + * split into buffer-sized physical read units and queued for asynchronous + * execution. If a pooled buffer is unavailable, the read falls back to a + * direct read path.</p> + * + * @param stream input stream for the file being read + * @param ranges logical file ranges to read; each range will be completed + * with data or failure via its associated future + * @param allocator allocator used to create buffers for direct reads and + * vectored fan-out + */ + public void readVectored( + AbfsInputStream stream, + List<? extends FileRange> ranges, + IntFunction<ByteBuffer> allocator) throws EOFException { + LOG.debug("readVectored invoked: path={}, rangeCount={}", + stream.getPath(), ranges.size()); + + /* Initialize a future for each logical file range */ + long fileLength = stream.getContentLength(); + List<FileRange> validRanges = validateAndPrepareRanges(ranges, fileLength); + + /* Select the maximum allowed merge span based on the configured strategy */ + int maxSpan = + (strategy == VectoredReadStrategy.TPS_OPTIMIZED) + ? readBufferManager.getMaxSeekForVectoredReads() + : readBufferManager.getMaxSeekForVectoredReadsThroughput(); + + LOG.debug("readVectored: path={}, strategy={}, maxSpan={}", + stream.getPath(), strategy, maxSpan); + + /* Merge logical ranges using a span-first coalescing strategy */ + List<CombinedFileRange> merged = mergeBySpanAndGap(validRanges, maxSpan, fileLength); + + LOG.debug("readVectored: path={}, mergedRangeCount={}", + stream.getPath(), merged.size()); + + /* Read buffer size acts as a hard upper bound for physical reads */ + int readBufferSize = ReadBufferManager.getReadAheadBlockSize(); + + /* Split merged ranges into buffer-sized chunks and queue each for read */ + for (CombinedFileRange unit : merged) { + List<CombinedFileRange> chunks = splitByBufferSize(unit, readBufferSize); + + LOG.debug("readVectored: path={}, mergedOffset={}, mergedLength={}, chunkCount={}", + stream.getPath(), unit.getOffset(), unit.getLength(), chunks.size()); + + for (CombinedFileRange chunk : chunks) { + try { + VectoredReadUtils.validateRangeRequest(chunk); + boolean queued = queueVectoredRead(stream, chunk, allocator); + if (!queued) { + LOG.debug("readVectored: buffer pool exhausted, falling back to directRead: path={}, offset={}, length={}", + stream.getPath(), chunk.getOffset(), chunk.getLength()); + directRead(stream, chunk, allocator); + } + } catch (Exception e) { + LOG.warn("readVectored: chunk read failed, failing underlying ranges:" + + " path={}, offset={}, length={}", + stream.getPath(), chunk.getOffset(), chunk.getLength(), e); + failUnit(chunk, e); + } + } + } + } + + /** + * Queues a vectored read request with the buffer manager. + * + * @return true if successfully queued, false if the queue is full and fallback is required. + */ + @VisibleForTesting + boolean queueVectoredRead(AbfsInputStream stream, CombinedFileRange unit, + IntFunction<ByteBuffer> allocator) { + LOG.debug("queueVectoredRead: path={}, offset={}, length={}", + stream.getPath(), unit.getOffset(), unit.getLength()); + TracingContext tracingContext = stream.getTracingContext(); + tracingContext.setReadType(ReadType.VECTORED_READ); + return getReadBufferManager().queueVectoredRead(stream, unit, tracingContext, allocator); + } + + /** + * Accesses the shared manager responsible for coordinating asynchronous read buffers. + * + * @return the {@link ReadBufferManager} instance. + */ + public ReadBufferManager getReadBufferManager() { + return readBufferManager; + } + + /** + * Validates and prepares a list of {@link FileRange} instances for vectored reads. + * + * <p>This method performs the following steps for each input range: + * <ul> + * <li>Validates the range using {@link VectoredReadUtils#validateRangeRequest(FileRange)}</li> + * <li>Initializes a {@link CompletableFuture} to hold the read result</li> + * <li>Checks that the range falls within the bounds of the file</li> + * </ul> + * + * <p>Ranges that are invalid (e.g., negative offset/length or exceeding file bounds) + * are not included in the returned list. Instead, their associated future is + * completed exceptionally with an {@link EOFException}. + * + * @param ranges the input list of logical file ranges to validate and prepare + * @param fileLength the total length of the file, used for bounds checking + * @return a list of valid {@link FileRange} instances ready for vectored read processing + */ + private List<FileRange> validateAndPrepareRanges( + List<? extends FileRange> ranges, long fileLength) throws EOFException { + + List<FileRange> validRanges = new ArrayList<>(); + + for (FileRange r : ranges) { + VectoredReadUtils.validateRangeRequest(r); + r.setData(new CompletableFuture<>()); + + long offset = r.getOffset(); + long length = r.getLength(); + + if (offset < 0 || length < 0 || offset > fileLength || length > fileLength - offset) { + r.getData().completeExceptionally(new EOFException( + "Invalid range: offset=" + offset + ", length=" + length + ", fileLength=" + fileLength)); + continue; + } + validRanges.add(r); + } + return validRanges; + } + + /** + * Splits a merged logical {@link CombinedFileRange} into smaller + * buffer-sized physical read units. + * + * <p>Each resulting unit will have a maximum size equal to the provided + * {@code bufferSize}. Any handling of multi-chunk ranges or reassembly + * of underlying {@link FileRange} data is delegated to the fan-out logic.</p> + * + * @param unit the combined logical range to be split + * @param bufferSize the maximum size (in bytes) of each physical read unit + * @return a list of buffer-sized {@link CombinedFileRange} instances + */ + private List<CombinedFileRange> splitByBufferSize( + CombinedFileRange unit, + int bufferSize) { + LOG.debug("splitByBufferSize: offset={}, length={}, bufferSize={}", + unit.getOffset(), unit.getLength(), bufferSize); + + List<CombinedFileRange> parts = new ArrayList<>(); + long unitStart = unit.getOffset(); + long unitEnd = unitStart + unit.getLength(); + long start = unitStart; + + while (start < unitEnd) { + long partEnd = Math.min(start + bufferSize, unitEnd); + + CombinedFileRange part = + new CombinedFileRange(start, partEnd, unit.getUnderlying().get(0)); + part.getUnderlying().clear(); + + for (FileRange r : unit.getUnderlying()) { + long rStart = r.getOffset(); + long rEnd = rStart + r.getLength(); + if (rEnd > start && rStart < partEnd) { + part.getUnderlying().add(r); + } + } + parts.add(part); + start = partEnd; + } + + LOG.debug("splitByBufferSize: offset={}, produced {} parts", + unit.getOffset(), parts.size()); + return parts; + } + + /** + * Merge logical {@link FileRange}s into {@link CombinedFileRange}s using a + * span-first coalescing strategy. + * + * <p>Ranges are merged as long as the total span from the first offset to the + * end of the last range does not exceed {@code maxSpan}. Gaps between ranges + * are ignored.</p> + * + * @param ranges logical file ranges to merge + * @param maxSpan maximum allowed span (in bytes) for a combined read + * @return merged {@link CombinedFileRange}s covering the input ranges + */ + private List<CombinedFileRange> mergeBySpanAndGap( + List<? extends FileRange> ranges, + int maxSpan, long fileLength) throws EOFException { + + LOG.debug("mergeBySpanAndGap: rangeCount={}, maxSpan={}", ranges.size(), maxSpan); + List<? extends FileRange> sortedRanges = VectoredReadUtils.validateAndSortRanges( + ranges, Optional.of(fileLength)); + + List<CombinedFileRange> out = new ArrayList<>(); + CombinedFileRange current = null; + + for (FileRange r : sortedRanges) { + long rOffset = r.getOffset(); + long rEnd = rOffset + r.getLength(); + + /* Initialize the first combined range */ + if (current == null) { + current = new CombinedFileRange(rOffset, rEnd, r); + continue; + } + + /* Check whether adding this range keeps the total span within the limit */ + long newSpan = rEnd - current.getOffset(); + + if (newSpan <= maxSpan) { + current.setLength((int) newSpan); + current.getUnderlying().add(r); + } else { + /* Span exceeded; finalize current range and start a new one */ + out.add(current); + current = new CombinedFileRange(rOffset, rEnd, r); + } + } + + /* Add the final combined range, if any */ + if (current != null) { + out.add(current); + } + + LOG.debug("mergeBySpanAndGap: produced {} combined ranges", out.size()); + return out; + } + + /** + * Distributes data from a physical read buffer into the corresponding + * logical {@link FileRange}s. + * + * <p>This method performs a "fan-out" operation where a single physical + * read (represented by {@link ReadBuffer}) may contain data for multiple + * logical ranges. The relevant portions are copied into per-range buffers + * and completed once fully populated.</p> + * + * <p>Partial reads are accumulated using {@code partialBuffers} and + * {@code pendingBytes}. A range is only completed when all expected + * bytes have been received.</p> + * + * <p>Thread safety: + * <ul> + * <li>Each logical range buffer is synchronized independently</li> + * <li>Writes use {@code System.arraycopy} directly into the backing array + * to avoid shared {@link ByteBuffer} position mutation</li> + * </ul> + * </p> + * + * @param buffer the physical read buffer containing merged data + * @param bytesRead number of valid bytes in the buffer + */ + void fanOut(ReadBuffer buffer, int bytesRead) { + LOG.debug("fanOut: path={}, bufferOffset={}, bytesRead={}", + buffer.getPath(), buffer.getOffset(), bytesRead); + + List<CombinedFileRange> units = buffer.getVectoredUnits(); + if (units == null) { + LOG.warn("fanOut: no vectored units found for path={}, offset={}", + buffer.getPath(), buffer.getOffset()); + return; + } + + long bufferStart = buffer.getOffset(); + long bufferEnd = bufferStart + bytesRead; + + /* Iterate over all combined logical units mapped to this buffer */ + for (CombinedFileRange unit : units) { + /* Each unit may contain multiple logical FileRanges */ + for (FileRange r : unit.getUnderlying()) { + CompletableFuture<ByteBuffer> future = r.getData(); + RangeKey key = new RangeKey(r); + + /* Skip already completed or cancelled ranges */ + if (future.isCancelled()) { + LOG.debug("fanOut: range cancelled, cleaning up: path={}, rangeOffset={}", + buffer.getPath(), r.getOffset()); + partialBuffers.remove(key); + pendingBytes.remove(key); + continue; + } + if (future.isDone()) { + continue; + } + + try { + long rangeStart = r.getOffset(); + long rangeEnd = rangeStart + r.getLength(); + + /* Compute overlap between buffer and logical range */ + long overlapStart = Math.max(rangeStart, bufferStart); + long overlapEnd = Math.min(rangeEnd, bufferEnd); + + /* No overlap nothing to copy */ + if (overlapStart >= overlapEnd) { + LOG.debug("fanOut: no overlap for path={}, rangeOffset={}, bufferOffset={}", + buffer.getPath(), r.getOffset(), bufferStart); + continue; + } + + int srcOffset = (int) (overlapStart - bufferStart); + int destOffset = (int) (overlapStart - rangeStart); + int length = (int) (overlapEnd - overlapStart); + + LOG.debug("fanOut: copying path={}, rangeOffset={}, rangeLength={}," + + " bufferOffset={}, srcOffset={}, destOffset={}, length={}", + buffer.getPath(), r.getOffset(), r.getLength(), + bufferStart, srcOffset, destOffset, length); + + /* Allocate or reuse the full buffer for this logical range */ + ByteBuffer fullBuf = partialBuffers.computeIfAbsent( + key, k -> buffer.getAllocator().apply(r.getLength())); + + /* Track remaining bytes required to complete this range */ + AtomicInteger pending = pendingBytes.computeIfAbsent( + key, k -> new AtomicInteger(r.getLength())); + + synchronized (fullBuf) { + /* Double-check completion inside lock */ + if (future.isDone()) { + continue; + } + + ByteBuffer dst = fullBuf.duplicate(); + dst.position(destOffset); + dst.put(buffer.getBuffer(), srcOffset, length); + + int left = pending.addAndGet(-length); + + LOG.debug("fanOut: wrote chunk: path={}, rangeOffset={}, destOffset={}," + + " length={}, pendingBytes={}", + buffer.getPath(), r.getOffset(), destOffset, length, left); + + if (left < 0) { + LOG.error("fanOut: pending bytes went negative possible duplicate write:" Review Comment: taken -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
