[ https://issues.apache.org/jira/browse/HADOOP-19613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18007998#comment-18007998 ]
ASF GitHub Bot commented on HADOOP-19613: ----------------------------------------- anmolanmol1234 commented on code in PR #7801: URL: https://github.com/apache/hadoop/pull/7801#discussion_r2215342272 ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java: ########## @@ -15,636 +15,166 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.fs.azurebfs.services; -import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Iterator; -import java.util.LinkedList; import java.util.List; -import java.util.Queue; -import java.util.Stack; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.locks.ReentrantLock; -import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; /** - * The Read Buffer Manager for Rest AbfsClient. + * Interface for managing read buffers for Azure Blob File System input streams. */ -final class ReadBufferManager { - private static final Logger LOGGER = LoggerFactory.getLogger(ReadBufferManager.class); - private static final int ONE_KB = 1024; - private static final int ONE_MB = ONE_KB * ONE_KB; - - private static final int NUM_BUFFERS = 16; - private static final int NUM_THREADS = 8; - private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold - - private static int blockSize = 4 * ONE_MB; - private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS; - private Thread[] threads = new Thread[NUM_THREADS]; - private byte[][] buffers; // array of byte[] buffers, to hold the data that is read - private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available - - private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet - private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads - private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading - private static ReadBufferManager bufferManager; // singleton, initialized in static initialization block - private static final ReentrantLock LOCK = new ReentrantLock(); - - static ReadBufferManager getBufferManager() { - if (bufferManager == null) { - LOCK.lock(); - try { - if (bufferManager == null) { - bufferManager = new ReadBufferManager(); - bufferManager.init(); - } - } finally { - LOCK.unlock(); - } - } - return bufferManager; - } - - static void setReadBufferManagerConfigs(int readAheadBlockSize) { - if (bufferManager == null) { - LOGGER.debug( - "ReadBufferManager not initialized yet. Overriding readAheadBlockSize as {}", - readAheadBlockSize); - blockSize = readAheadBlockSize; - } - } - - private void init() { - buffers = new byte[NUM_BUFFERS][]; - for (int i = 0; i < NUM_BUFFERS; i++) { - buffers[i] = new byte[blockSize]; // same buffers are reused. The byte array never goes back to GC - freeList.add(i); - } - for (int i = 0; i < NUM_THREADS; i++) { - Thread t = new Thread(new ReadBufferWorker(i)); - t.setDaemon(true); - threads[i] = t; - t.setName("ABFS-prefetch-" + i); - t.start(); - } - ReadBufferWorker.UNLEASH_WORKERS.countDown(); - } - - // hide instance constructor - private ReadBufferManager() { - LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch"); - } - - - /* - * - * AbfsInputStream-facing methods - * - */ - +public interface ReadBufferManager { /** - * {@link AbfsInputStream} calls this method to queue read-aheads. - * - * @param stream The {@link AbfsInputStream} for which to do the read-ahead - * @param requestedOffset The offset in the file which shoukd be read - * @param requestedLength The length to read + * Queues a read-ahead request from {@link AbfsInputStream} + * for a given offset in file and given length. + * @param stream the input stream requesting the read-ahead + * @param requestedOffset the offset in the remote file to start reading + * @param requestedLength the number of bytes to read from file + * @param tracingContext the tracing context for diagnostics */ - void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength, - TracingContext tracingContext) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", - stream.getPath(), requestedOffset, requestedLength); - } - ReadBuffer buffer; - synchronized (this) { - if (isAlreadyQueued(stream, requestedOffset)) { - return; // already queued, do not queue again - } - if (freeList.isEmpty() && !tryEvict()) { - return; // no buffers available, cannot queue anything - } - - buffer = new ReadBuffer(); - buffer.setStream(stream); - buffer.setOffset(requestedOffset); - buffer.setLength(0); - buffer.setRequestedLength(requestedLength); - buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); - buffer.setLatch(new CountDownLatch(1)); - buffer.setTracingContext(tracingContext); - - Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already - - buffer.setBuffer(buffers[bufferIndex]); - buffer.setBufferindex(bufferIndex); - readAheadQueue.add(buffer); - notifyAll(); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", - stream.getPath(), requestedOffset, buffer.getBufferindex()); - } - } - } + void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, + final int requestedLength, TracingContext tracingContext); + /** + * Gets a block of data from the prefetched data by ReadBufferManager. + * {@link AbfsInputStream} calls this method to read data + * @param stream the input stream requesting the block + * @param position the position in the file to read from + * @param length the number of bytes to read + * @param buffer the buffer to store the read data + * @return the number of bytes actually read + * @throws IOException if an I/O error occurs + */ + int getBlock(final AbfsInputStream stream, + final long position, + final int length, + final byte[] buffer) + throws IOException; /** - * {@link AbfsInputStream} calls this method read any bytes already available in a buffer (thereby saving a - * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading - * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead - * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because - * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own - * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time). + * {@link ReadBufferWorker} calls this to get the next buffer to read from read-ahead queue. + * Requested read will be performed by background thread. * - * @param stream the file to read bytes for - * @param position the offset in the file to do a read for - * @param length the length to read - * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. - * @return the number of bytes read + * @return the next {@link ReadBuffer} to read + * @throws InterruptedException if interrupted while waiting */ - int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) - throws IOException { - // not synchronized, so have to be careful with locking - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("getBlock for file {} position {} thread {}", - stream.getPath(), position, Thread.currentThread().getName()); - } - - waitForProcess(stream, position); - - int bytesRead = 0; - synchronized (this) { - bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer); - } - if (bytesRead > 0) { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Done read from Cache for {} position {} length {}", - stream.getPath(), position, bytesRead); - } - return bytesRead; - } + ReadBuffer getNextBlockToRead() throws InterruptedException; - // otherwise, just say we got nothing - calling thread can do its own read - return 0; - } - - /* - * - * Internal methods - * + /** + * Marks the specified buffer as done reading and updates its status. + * Called by {@link ReadBufferWorker} after reading is complete. + * @param buffer the buffer that was read by worker thread + * @param result the status of the read operation + * @param bytesActuallyRead the number of bytes actually read */ - - private void waitForProcess(final AbfsInputStream stream, final long position) { - ReadBuffer readBuf; - synchronized (this) { - clearFromReadAheadQueue(stream, position); - readBuf = getFromList(inProgressList, stream, position); - } - if (readBuf != null) { // if in in-progress queue, then block for it - try { - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}", - stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex()); - } - readBuf.getLatch().await(); // blocking wait on the caller stream's thread - // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread - // is done processing it (in doneReading). There, the latch is set after removing the buffer from - // inProgressList. So this latch is safe to be outside the synchronized block. - // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock - // while waiting, so no one will be able to change any state. If this becomes more complex in the future, - // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched. - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("latch done for file {} buffer idx {} length {}", - stream.getPath(), readBuf.getBufferindex(), readBuf.getLength()); - } - } - } + void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, + final int bytesActuallyRead); /** - * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list. - * The objective is to find just one buffer - there is no advantage to evicting more than one. + * Purges all buffers associated with the calling {@link AbfsInputStream}. * - * @return whether the eviction succeeeded - i.e., were we able to free up one buffer + * @param stream the input stream whose buffers should be purged */ - private synchronized boolean tryEvict() { - ReadBuffer nodeToEvict = null; - if (completedReadList.size() <= 0) { - return false; // there are no evict-able buffers - } - - long currentTimeInMs = currentTimeMillis(); - - // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) - for (ReadBuffer buf : completedReadList) { - if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { - nodeToEvict = buf; - break; - } - } - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see) - for (ReadBuffer buf : completedReadList) { - if (buf.isAnyByteConsumed()) { - nodeToEvict = buf; - break; - } - } + void purgeBuffersForStream(AbfsInputStream stream); - if (nodeToEvict != null) { - return evict(nodeToEvict); - } - - // next, try any old nodes that have not been consumed - // Failed read buffers (with buffer index=-1) that are older than - // thresholdAge should be cleaned up, but at the same time should not - // report successful eviction. - // Queue logic expects that a buffer is freed up for read ahead when - // eviction is successful, whereas a failed ReadBuffer would have released - // its buffer when its status was set to READ_FAILED. - long earliestBirthday = Long.MAX_VALUE; - ArrayList<ReadBuffer> oldFailedBuffers = new ArrayList<>(); - for (ReadBuffer buf : completedReadList) { - if ((buf.getBufferindex() != -1) - && (buf.getTimeStamp() < earliestBirthday)) { - nodeToEvict = buf; - earliestBirthday = buf.getTimeStamp(); - } else if ((buf.getBufferindex() == -1) - && (currentTimeInMs - buf.getTimeStamp()) > thresholdAgeMilliseconds) { - oldFailedBuffers.add(buf); - } - } - - for (ReadBuffer buf : oldFailedBuffers) { - evict(buf); - } - - if ((currentTimeInMs - earliestBirthday > thresholdAgeMilliseconds) && (nodeToEvict != null)) { - return evict(nodeToEvict); - } - - LOGGER.trace("No buffer eligible for eviction"); - // nothing can be evicted - return false; - } - - private boolean evict(final ReadBuffer buf) { - // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList, - // avoid adding it to freeList. - if (buf.getBufferindex() != -1) { - freeList.push(buf.getBufferindex()); - } - - completedReadList.remove(buf); - buf.setTracingContext(null); - if (LOGGER.isTraceEnabled()) { - LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", - buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength()); - } - return true; - } - - private boolean isAlreadyQueued(final AbfsInputStream stream, final long requestedOffset) { - // returns true if any part of the buffer is already queued - return (isInList(readAheadQueue, stream, requestedOffset) - || isInList(inProgressList, stream, requestedOffset) - || isInList(completedReadList, stream, requestedOffset)); - } - - private boolean isInList(final Collection<ReadBuffer> list, final AbfsInputStream stream, final long requestedOffset) { - return (getFromList(list, stream, requestedOffset) != null); - } - - private ReadBuffer getFromList(final Collection<ReadBuffer> list, final AbfsInputStream stream, final long requestedOffset) { - for (ReadBuffer buffer : list) { - if (buffer.getStream() == stream) { - if (buffer.getStatus() == ReadBufferStatus.AVAILABLE - && requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getLength()) { - return buffer; - } else if (requestedOffset >= buffer.getOffset() - && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) { - return buffer; - } - } - } - return null; - } + // Following Methods are for testing purposes only and should not be used in production code. /** - * Returns buffers that failed or passed from completed queue. - * @param stream - * @param requestedOffset - * @return + * Resets the read buffer manager for testing purposes. */ - private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, final long requestedOffset) { - for (ReadBuffer buffer : completedReadList) { - // Buffer is returned if the requestedOffset is at or above buffer's - // offset but less than buffer's length or the actual requestedLength - if ((buffer.getStream() == stream) - && (requestedOffset >= buffer.getOffset()) - && ((requestedOffset < buffer.getOffset() + buffer.getLength()) - || (requestedOffset < buffer.getOffset() + buffer.getRequestedLength()))) { - return buffer; - } - } - - return null; - } - - private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) { - ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset); - if (buffer != null) { - readAheadQueue.remove(buffer); - notifyAll(); // lock is held in calling method - freeList.push(buffer.getBufferindex()); - } - } - - private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length, - final byte[] buffer) throws IOException { - ReadBuffer buf = getBufferFromCompletedQueue(stream, position); - - if (buf == null) { - return 0; - } - - if (buf.getStatus() == ReadBufferStatus.READ_FAILED) { - // To prevent new read requests to fail due to old read-ahead attempts, - // return exception only from buffers that failed within last thresholdAgeMilliseconds - if ((currentTimeMillis() - (buf.getTimeStamp()) < thresholdAgeMilliseconds)) { - throw buf.getErrException(); - } else { - return 0; - } - } - - if ((buf.getStatus() != ReadBufferStatus.AVAILABLE) - || (position >= buf.getOffset() + buf.getLength())) { - return 0; - } - - int cursor = (int) (position - buf.getOffset()); - int availableLengthInBuffer = buf.getLength() - cursor; - int lengthToCopy = Math.min(length, availableLengthInBuffer); - System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy); - if (cursor == 0) { - buf.setFirstByteConsumed(true); - } - if (cursor + lengthToCopy == buf.getLength()) { - buf.setLastByteConsumed(true); - } - buf.setAnyByteConsumed(true); - return lengthToCopy; - } + @VisibleForTesting + void testResetReadBufferManager(); Review Comment: why does the method name in production have test ? > ABFS: [ReadAheadV2] Refactor ReadBufferManager to isolate new code with the > current working code > ------------------------------------------------------------------------------------------------ > > Key: HADOOP-19613 > URL: https://issues.apache.org/jira/browse/HADOOP-19613 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/azure > Affects Versions: 3.5.0, 3.4.1 > Reporter: Anuj Modi > Assignee: Anuj Modi > Priority: Major > Labels: pull-request-available > > Read Buffer Manager used today was introduced way back and has been stable > for quite a while. > Read Buffer Manager to be introduced as part of > https://issues.apache.org/jira/browse/HADOOP-19596 will introduce many > changes incrementally over time. While the development goes on and we are > able to fully stabilise the optimized version we need the current flow to be > functional and undisturbed. > This work item is to isolate that from new code by refactoring > ReadBufferManager class to have 2 different implementations with same public > interfaces: ReadBufferManagerV1 and ReadBufferManagerV2. > This will also introduce new configs that can be used to toggle between new > and old code. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org