[ https://issues.apache.org/jira/browse/HADOOP-19613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18008781#comment-18008781 ]
ASF GitHub Bot commented on HADOOP-19613: ----------------------------------------- anujmodi2021 commented on code in PR #7801: URL: https://github.com/apache/hadoop/pull/7801#discussion_r2219959703 ########## hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java: ########## @@ -0,0 +1,637 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.Stack; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.classification.VisibleForTesting; + +/** + * The Read Buffer Manager for Rest AbfsClient. + * V1 implementation of ReadBufferManager. + */ +final class ReadBufferManagerV1 implements ReadBufferManager { + private static final Logger LOGGER = LoggerFactory.getLogger( + ReadBufferManagerV1.class); + private static final int ONE_KB = 1024; + private static final int ONE_MB = ONE_KB * ONE_KB; + + private static final int NUM_BUFFERS = 16; + private static final int NUM_THREADS = 8; + private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold + + private static int blockSize = 4 * ONE_MB; + private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS; + private Thread[] threads = new Thread[NUM_THREADS]; + private byte[][] buffers; // array of byte[] buffers, to hold the data that is read + private Stack<Integer> freeList = new Stack<>(); // indices in buffers[] array that are available + + private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of requests that are not picked up by any worker thread yet + private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // requests being processed by worker threads + private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // buffers available for reading + private static ReadBufferManagerV1 bufferManager; // singleton, initialized in static initialization block + private static final ReentrantLock LOCK = new ReentrantLock(); + + static ReadBufferManagerV1 getBufferManager() { + if (bufferManager == null) { + LOCK.lock(); + try { + if (bufferManager == null) { + bufferManager = new ReadBufferManagerV1(); + bufferManager.init(); + } + } finally { + LOCK.unlock(); + } + } + return bufferManager; + } + + static void setReadBufferManagerConfigs(int readAheadBlockSize) { + if (bufferManager == null) { + LOGGER.debug( + "ReadBufferManagerV1 not initialized yet. Overriding readAheadBlockSize as {}", + readAheadBlockSize); + blockSize = readAheadBlockSize; + } + } + + private void init() { + buffers = new byte[NUM_BUFFERS][]; + for (int i = 0; i < NUM_BUFFERS; i++) { + buffers[i] = new byte[blockSize]; // same buffers are reused. The byte array never goes back to GC + freeList.add(i); + } + for (int i = 0; i < NUM_THREADS; i++) { + Thread t = new Thread(new ReadBufferWorker(i)); + t.setDaemon(true); + threads[i] = t; + t.setName("ABFS-prefetch-" + i); + t.start(); + } + ReadBufferWorker.UNLEASH_WORKERS.countDown(); + } + + // hide instance constructor + private ReadBufferManagerV1() { + LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch"); + } + + /** + * {@link AbfsInputStream} calls this method to queue read-aheads. + * + * @param stream The {@link AbfsInputStream} for which to do the read-ahead + * @param requestedOffset The offset in the file which shoukd be read + * @param requestedLength The length to read + */ + @Override + public void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength, + TracingContext tracingContext) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", + stream.getPath(), requestedOffset, requestedLength); + } + ReadBuffer buffer; + synchronized (this) { + if (isAlreadyQueued(stream, requestedOffset)) { + return; // already queued, do not queue again + } + if (freeList.isEmpty() && !tryEvict()) { + return; // no buffers available, cannot queue anything + } + + buffer = new ReadBuffer(); + buffer.setStream(stream); + buffer.setOffset(requestedOffset); + buffer.setLength(0); + buffer.setRequestedLength(requestedLength); + buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); + buffer.setLatch(new CountDownLatch(1)); + buffer.setTracingContext(tracingContext); + + Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already + + buffer.setBuffer(buffers[bufferIndex]); + buffer.setBufferindex(bufferIndex); + readAheadQueue.add(buffer); + notifyAll(); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", + stream.getPath(), requestedOffset, buffer.getBufferindex()); + } + } + } + + /** + * {@link AbfsInputStream} calls this method read any bytes already available in a buffer (thereby saving a + * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading + * the requested offset, then this method blocks until that read completes. If the data is queued in a read-ahead + * but not picked up by a worker thread yet, then it cancels that read-ahead and reports cache miss. This is because + * depending on worker thread availability, the read-ahead may take a while - the calling thread can do it's own + * read to get the data faster (copmared to the read waiting in queue for an indeterminate amount of time). + * + * @param stream the file to read bytes for + * @param position the offset in the file to do a read for + * @param length the length to read + * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. + * @return the number of bytes read + */ + @Override + public int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) + throws IOException { + // not synchronized, so have to be careful with locking + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("getBlock for file {} position {} thread {}", + stream.getPath(), position, Thread.currentThread().getName()); + } + + waitForProcess(stream, position); + + int bytesRead = 0; + synchronized (this) { + bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer); + } + if (bytesRead > 0) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Done read from Cache for {} position {} length {}", + stream.getPath(), position, bytesRead); + } + return bytesRead; + } + + // otherwise, just say we got nothing - calling thread can do its own read + return 0; + } + + /** + * ReadBufferWorker thread calls this to get the next buffer that it should work on. + * + * @return {@link ReadBuffer} + * @throws InterruptedException if thread is interrupted + */ + @Override + public ReadBuffer getNextBlockToRead() throws InterruptedException { + ReadBuffer buffer = null; + synchronized (this) { + //buffer = readAheadQueue.take(); // blocking method + while (readAheadQueue.size() == 0) { + wait(); + } + buffer = readAheadQueue.remove(); + notifyAll(); + if (buffer == null) { + return null; // should never happen + } + buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS); + inProgressList.add(buffer); + } + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("ReadBufferWorker picked file {} for offset {}", + buffer.getStream().getPath(), buffer.getOffset()); + } + return buffer; + } + + /** + * ReadBufferWorker thread calls this method to post completion. + * + * @param buffer the buffer whose read was completed + * @param result the {@link ReadBufferStatus} after the read operation in the worker thread + * @param bytesActuallyRead the number of bytes that the worker thread was actually able to read + */ + @Override + public void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("ReadBufferWorker completed read file {} for offset {} outcome {} bytes {}", + buffer.getStream().getPath(), buffer.getOffset(), result, bytesActuallyRead); + } + synchronized (this) { + // If this buffer has already been purged during + // close of InputStream then we don't update the lists. + if (inProgressList.contains(buffer)) { + inProgressList.remove(buffer); + if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { + buffer.setStatus(ReadBufferStatus.AVAILABLE); + buffer.setLength(bytesActuallyRead); + } else { + freeList.push(buffer.getBufferindex()); + // buffer will be deleted as per the eviction policy. + } + // completed list also contains FAILED read buffers + // for sending exception message to clients. + buffer.setStatus(result); + buffer.setTimeStamp(currentTimeMillis()); + completedReadList.add(buffer); + } + } + + //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results + buffer.getLatch().countDown(); // wake up waiting threads (if any) + } + + /** + * Purging the buffers associated with an {@link AbfsInputStream} + * from {@link ReadBufferManagerV1} when stream is closed. + * @param stream input stream. + */ + @Override + public synchronized void purgeBuffersForStream(AbfsInputStream stream) { + LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream); + readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream); + purgeList(stream, completedReadList); + } + + private void waitForProcess(final AbfsInputStream stream, final long position) { + ReadBuffer readBuf; + synchronized (this) { + clearFromReadAheadQueue(stream, position); + readBuf = getFromList(inProgressList, stream, position); + } + if (readBuf != null) { // if in in-progress queue, then block for it + try { + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("got a relevant read buffer for file {} offset {} buffer idx {}", + stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex()); + } + readBuf.getLatch().await(); // blocking wait on the caller stream's thread + // Note on correctness: readBuf gets out of inProgressList only in 1 place: after worker thread + // is done processing it (in doneReading). There, the latch is set after removing the buffer from + // inProgressList. So this latch is safe to be outside the synchronized block. + // Putting it in synchronized would result in a deadlock, since this thread would be holding the lock + // while waiting, so no one will be able to change any state. If this becomes more complex in the future, + // then the latch cane be removed and replaced with wait/notify whenever inProgressList is touched. + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("latch done for file {} buffer idx {} length {}", + stream.getPath(), readBuf.getBufferindex(), readBuf.getLength()); + } + } + } + + /** + * If any buffer in the completedlist can be reclaimed then reclaim it and return the buffer to free list. + * The objective is to find just one buffer - there is no advantage to evicting more than one. + * + * @return whether the eviction succeeeded - i.e., were we able to free up one buffer + */ + private synchronized boolean tryEvict() { + ReadBuffer nodeToEvict = null; + if (completedReadList.size() <= 0) { + return false; // there are no evict-able buffers + } + + long currentTimeInMs = currentTimeMillis(); + + // first, try buffers where all bytes have been consumed (approximated as first and last bytes consumed) + for (ReadBuffer buf : completedReadList) { + if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) { + nodeToEvict = buf; + break; + } + } + if (nodeToEvict != null) { + return evict(nodeToEvict); + } + + // next, try buffers where any bytes have been consumed (may be a bad idea? have to experiment and see) + for (ReadBuffer buf : completedReadList) { + if (buf.isAnyByteConsumed()) { + nodeToEvict = buf; + break; + } + } + + if (nodeToEvict != null) { + return evict(nodeToEvict); + } + + // next, try any old nodes that have not been consumed + // Failed read buffers (with buffer index=-1) that are older than + // thresholdAge should be cleaned up, but at the same time should not + // report successful eviction. + // Queue logic expects that a buffer is freed up for read ahead when + // eviction is successful, whereas a failed ReadBuffer would have released + // its buffer when its status was set to READ_FAILED. + long earliestBirthday = Long.MAX_VALUE; + ArrayList<ReadBuffer> oldFailedBuffers = new ArrayList<>(); + for (ReadBuffer buf : completedReadList) { + if ((buf.getBufferindex() != -1) + && (buf.getTimeStamp() < earliestBirthday)) { + nodeToEvict = buf; + earliestBirthday = buf.getTimeStamp(); + } else if ((buf.getBufferindex() == -1) + && (currentTimeInMs - buf.getTimeStamp()) > thresholdAgeMilliseconds) { + oldFailedBuffers.add(buf); + } + } + + for (ReadBuffer buf : oldFailedBuffers) { + evict(buf); + } + + if ((currentTimeInMs - earliestBirthday > thresholdAgeMilliseconds) && (nodeToEvict != null)) { + return evict(nodeToEvict); + } + + LOGGER.trace("No buffer eligible for eviction"); + // nothing can be evicted + return false; + } + + private boolean evict(final ReadBuffer buf) { + // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList, + // avoid adding it to freeList. + if (buf.getBufferindex() != -1) { + freeList.push(buf.getBufferindex()); + } + + completedReadList.remove(buf); + buf.setTracingContext(null); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", + buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), buf.getLength()); + } + return true; + } + + private boolean isAlreadyQueued(final AbfsInputStream stream, final long requestedOffset) { + // returns true if any part of the buffer is already queued + return (isInList(readAheadQueue, stream, requestedOffset) + || isInList(inProgressList, stream, requestedOffset) + || isInList(completedReadList, stream, requestedOffset)); + } + + private boolean isInList(final Collection<ReadBuffer> list, final AbfsInputStream stream, final long requestedOffset) { + return (getFromList(list, stream, requestedOffset) != null); + } + + private ReadBuffer getFromList(final Collection<ReadBuffer> list, final AbfsInputStream stream, final long requestedOffset) { + for (ReadBuffer buffer : list) { + if (buffer.getStream() == stream) { + if (buffer.getStatus() == ReadBufferStatus.AVAILABLE + && requestedOffset >= buffer.getOffset() + && requestedOffset < buffer.getOffset() + buffer.getLength()) { + return buffer; + } else if (requestedOffset >= buffer.getOffset() + && requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) { + return buffer; + } + } + } + return null; + } + + /** + * Returns buffers that failed or passed from completed queue. + * @param stream + * @param requestedOffset + * @return Review Comment: Added in base class ########## hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java: ########## @@ -921,10 +921,14 @@ private AzureBlobFileSystem createTestFile(Path testFilePath, long testFileSize, } private void resetReadBufferManager(int bufferSize, int threshold) { - ReadBufferManager.getBufferManager() + getBufferManager() .testResetReadBufferManager(bufferSize, threshold); // Trigger GC as aggressive recreation of ReadBufferManager buffers // by successive tests can lead to OOM based on the dev VM/machine capacity. System.gc(); } + + private ReadBufferManager getBufferManager() { + return ReadBufferManagerV1.getBufferManager(); + } } Review Comment: Taken > ABFS: [ReadAheadV2] Refactor ReadBufferManager to isolate new code with the > current working code > ------------------------------------------------------------------------------------------------ > > Key: HADOOP-19613 > URL: https://issues.apache.org/jira/browse/HADOOP-19613 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/azure > Affects Versions: 3.5.0, 3.4.1 > Reporter: Anuj Modi > Assignee: Anuj Modi > Priority: Major > Labels: pull-request-available > > Read Buffer Manager used today was introduced way back and has been stable > for quite a while. > Read Buffer Manager to be introduced as part of > https://issues.apache.org/jira/browse/HADOOP-19596 will introduce many > changes incrementally over time. While the development goes on and we are > able to fully stabilise the optimized version we need the current flow to be > functional and undisturbed. > This work item is to isolate that from new code by refactoring > ReadBufferManager class to have 2 different implementations with same public > interfaces: ReadBufferManagerV1 and ReadBufferManagerV2. > This will also introduce new configs that can be used to toggle between new > and old code. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org