anmolanmol1234 commented on code in PR #8400:
URL: https://github.com/apache/hadoop/pull/8400#discussion_r3117015823
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java:
##########
@@ -338,6 +356,119 @@ public void queueReadAhead(final AbfsInputStream stream,
}
}
+ /**
+ * Queue a vectored read for a buffer-sized physical read unit.
+ *
+ * <p>The method first attempts to attach the logical unit to an already
+ * in-progress physical read for the same file and offset. If that is not
+ * possible, a free read buffer is acquired and a new backend read is
+ * queued.</p>
+ *
+ * @param stream input stream for the file being read
+ * @param unit buffer-sized combined file range to be read
+ * @param tracingContext tracing context used for the backend read request
+ * @param allocator allocator used to create buffers for vectored
fan-out
+ * @return {@code true} if the read was queued or attached to an existing
+ * in-progress buffer; {@code false} if no buffer was available
+ */
+ boolean queueVectoredRead(AbfsInputStream stream,
+ CombinedFileRange unit,
+ TracingContext tracingContext,
+ IntFunction<ByteBuffer> allocator) {
+ /* Create a child tracing context for vectored read-ahead requests */
+ TracingContext readAheadTracingContext =
+ new TracingContext(tracingContext);
+ readAheadTracingContext.setPrimaryRequestID();
+ readAheadTracingContext.setReadType(ReadType.VECTORED_READ);
+ synchronized (this) {
+ /*
+ * Attempt to hitchhike on an existing in-progress physical read if it
+ * covers the requested logical range completely.
+ */
+ if (isAlreadyQueued(stream.getETag(), unit.getOffset())) {
+ ReadBuffer existing = findQueuedBuffer(stream, unit.getOffset());
+ if (existing != null && existing.getStream().getETag() != null &&
stream.getETag()
+ .equals(existing.getStream().getETag())) {
+ long end = existing.getOffset() + (
+ existing.getStatus() == ReadBufferStatus.AVAILABLE
+ ? existing.getLength()
+ : existing.getRequestedLength());
+ if (end >= unit.getOffset() + unit.getLength()) {
+ existing.setBufferType(BufferType.VECTORED);
+ existing.addVectoredUnit(unit);
+ existing.setAllocator(allocator);
+ if (existing.getStatus() == ReadBufferStatus.AVAILABLE) {
+ /*
+ * Buffer is already AVAILABLE. Trigger completion immediately.
+ * Use getLength() (actual bytes) for coverage — redundant here
+ * since the outer check already used getLength() for AVAILABLE,
+ * but kept explicit for clarity.
+ */
+ LOGGER.debug("Hitchhiking onto AVAILABLE buffer {}, length {}",
+ existing, existing.getLength());
+ handleVectoredCompletion(existing,
+ existing.getStatus(),
+ existing.getLength());
+ }
+ /*
+ * For AVAILABLE buffers use the actual bytes read (getLength())
for
+ * coverage check. For all other states (NOT_AVAILABLE or
+ * READING_IN_PROGRESS), use requestedLength as an estimate of the
+ * planned physical read coverage. The short-read guard will later
be
+ * enforced in doneReading() once the actual bytes read are known.
+ */
+ return true;
+ }
+ }
+ }
+ /*
+ * Ensure a free buffer is available, attempting best-effort recovery
+ * through memory upscaling or eviction if necessary.
+ */
+ if (isFreeListEmpty() && !tryMemoryUpscale() && !tryEvict()) {
+ return false;
+ }
+ /*
+ * Create a logical ReadBuffer descriptor without binding pooled memory.
+ * This captures metadata required to schedule the physical read.
+ */
+ ReadBuffer buffer = new ReadBuffer();
+ buffer.setStream(stream);
+ buffer.setETag(stream.getETag());
+ buffer.setPath(stream.getPath());
+ buffer.setOffset(unit.getOffset());
+ buffer.setRequestedLength(unit.getLength());
+ buffer.setBufferType(BufferType.VECTORED);
+ buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
+ buffer.setLatch(new CountDownLatch(1));
+ buffer.addVectoredUnit(unit);
+ buffer.setAllocator(allocator);
+ buffer.setTracingContext(readAheadTracingContext);
+ /*
+ * Perform a final free-list check before consuming pooled memory to
+ * ensure buffer availability.
+ */
+ if (isFreeListEmpty()) {
+ return false;
+ }
+ Integer bufferIndex = popFromFreeList();
+ if (bufferIndex >= bufferPool.length) {
+ /* Defensive guard; should never occur */
+ return false;
+ }
+ /*
+ * Bind the physical buffer and queue the read for asynchronous
+ * execution.
+ */
+ buffer.setBuffer(bufferPool[bufferIndex]);
+ buffer.setBufferindex(bufferIndex);
+
+ getReadAheadQueue().add(buffer);
Review Comment:
taken
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java:
##########
@@ -208,8 +350,69 @@ public ReadBuffer getNextBlockToRead() throws
InterruptedException {
public void doneReading(final ReadBuffer buffer, final ReadBufferStatus
result, final int bytesActuallyRead) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("ReadBufferWorker completed read file {} for offset {}
outcome {} bytes {}",
- buffer.getStream().getPath(), buffer.getOffset(), result,
bytesActuallyRead);
+ buffer.getStream().getPath(), buffer.getOffset(), result,
bytesActuallyRead);
+ }
+
+ List<CombinedFileRange> vectoredUnits = buffer.getVectoredUnits();
+ if (result == ReadBufferStatus.AVAILABLE
+ && (buffer.getBufferType() == BufferType.VECTORED &&
!vectoredUnits.isEmpty())) {
+
+ /*
+ * Set length BEFORE handling vectored completion so that any
+ * hitchhiked units that call existing.getLength() see the correct
+ * actual value rather than 0.
+ */
+ buffer.setLength(bytesActuallyRead);
+
+ /*
+ * Guard against short reads: units hitchhiked while buffer was
+ * READING_IN_PROGRESS used requestedLength as coverage estimate.
+ * Now that actual bytes are known, fail any units not fully covered
+ * so their callers are not left hanging on the CompletableFuture.
+ */
+ long actualEnd = buffer.getOffset() + bytesActuallyRead;
+
+ /*
+ * Fast path: check if any unit exceeds actual bytes read before
+ * doing expensive stream/collect. Short reads are rare so this
+ * avoids unnecessary allocations in the common case.
+ */
+ boolean hasUncovered = false;
+ for (CombinedFileRange u : vectoredUnits) {
+ if ((u.getOffset() + u.getLength()) > actualEnd) {
+ hasUncovered = true;
+ break;
+ }
+ }
+
+ if (hasUncovered) {
+ /*
+ * Short read detected — fail uncovered units explicitly so callers
+ * are not left hanging on their CompletableFuture.
+ */
+ Iterator<CombinedFileRange> it = vectoredUnits.iterator();
+ while (it.hasNext()) {
+ CombinedFileRange u = it.next();
+ if ((u.getOffset() + u.getLength()) > actualEnd) {
+ it.remove();
+ LOGGER.debug(
Review Comment:
taken
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]