manika137 commented on code in PR #8400:
URL: https://github.com/apache/hadoop/pull/8400#discussion_r3067601972
##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java:
##########
@@ -146,6 +168,124 @@ public void queueReadAhead(final AbfsInputStream stream,
final long requestedOff
}
}
+ /**
+ * Queue a vectored read for a buffer-sized physical read unit.
+ *
+ * <p>The method first attempts to attach the logical unit to an already
+ * in-progress physical read for the same file and offset. If that is not
+ * possible, a free read buffer is acquired and a new backend read is
+ * queued.</p>
+ *
+ * @param stream input stream for the file being read
+ * @param unit buffer-sized combined file range to be read
+ * @param tracingContext tracing context used for the backend read request
+ * @param allocator allocator used to create buffers for vectored
fan-out
+ * @return {@code true} if the read was queued or attached to an existing
+ * in-progress buffer; {@code false} if no buffer was available
+ */
+ boolean queueVectoredRead(AbfsInputStream stream,
+ CombinedFileRange unit,
+ TracingContext tracingContext,
+ IntFunction<ByteBuffer> allocator) {
+ /* Create a child tracing context for vectored read-ahead requests */
+ TracingContext readAheadTracingContext =
+ new TracingContext(tracingContext);
+ readAheadTracingContext.setPrimaryRequestID();
+ readAheadTracingContext.setReadType(ReadType.VECTORED_READ);
+
+ synchronized (this) {
+ if (isAlreadyQueued(stream, unit.getOffset())) {
+ ReadBuffer existing = findQueuedBuffer(stream, unit.getOffset());
+ if (existing != null && existing.getStream().getETag() != null
+ && stream.getETag().equals(existing.getStream().getETag())) {
+ /*
+ * For AVAILABLE buffers use actual bytes read (getLength()) for
+ * coverage check. For READING_IN_PROGRESS buffers use
+ * requestedLength as an estimate — the short-read guard will be
+ * applied later in doneReading before dispatching completion.
+ */
+ long end = existing.getOffset() + (
+ existing.getStatus() == ReadBufferStatus.AVAILABLE
+ ? existing.getLength()
+ : existing.getRequestedLength());
+ if (end >= unit.getOffset() + unit.getLength()) {
+ existing.setBufferType(BufferType.VECTORED);
+ existing.addVectoredUnit(unit);
+ existing.setAllocator(allocator);
+ if (existing.getStatus() == ReadBufferStatus.AVAILABLE) {
+ /*
+ * Buffer is already AVAILABLE. Trigger completion immediately.
+ * Use getLength() (actual bytes) for coverage — redundant here
+ * since the outer check already used getLength() for AVAILABLE,
+ * but kept explicit for clarity.
+ */
+ LOGGER.debug("Hitchhiking onto AVAILABLE buffer {}, length {}",
+ existing, existing.getLength());
+ handleVectoredCompletion(existing,
+ existing.getStatus(),
+ existing.getLength());
+ }
+ /*
+ * For READING_IN_PROGRESS: unit is attached and will be
+ * completed in doneReading once actual bytes are known.
+ * Short-read safety is enforced there via per-unit coverage check.
+ */
+ return true;
Review Comment:
this is not only the READING_IN_PROGRESS case right- incase the buffer is
UNAVAILABLE (queued but not picked up by any thread yet) it would also land
here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]