[ 
https://issues.apache.org/jira/browse/HADOOP-19613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18008781#comment-18008781
 ] 

ASF GitHub Bot commented on HADOOP-19613:
-----------------------------------------

anujmodi2021 commented on code in PR #7801:
URL: https://github.com/apache/hadoop/pull/7801#discussion_r2219959703


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java:
##########
@@ -0,0 +1,637 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Stack;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * The Read Buffer Manager for Rest AbfsClient.
+ * V1 implementation of ReadBufferManager.
+ */
+final class ReadBufferManagerV1 implements ReadBufferManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(
+      ReadBufferManagerV1.class);
+  private static final int ONE_KB = 1024;
+  private static final int ONE_MB = ONE_KB * ONE_KB;
+
+  private static final int NUM_BUFFERS = 16;
+  private static final int NUM_THREADS = 8;
+  private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have 
to see if 3 seconds is a good threshold
+
+  private static int blockSize = 4 * ONE_MB;
+  private static int thresholdAgeMilliseconds = 
DEFAULT_THRESHOLD_AGE_MILLISECONDS;
+  private Thread[] threads = new Thread[NUM_THREADS];
+  private byte[][] buffers;    // array of byte[] buffers, to hold the data 
that is read
+  private Stack<Integer> freeList = new Stack<>();   // indices in buffers[] 
array that are available
+
+  private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of 
requests that are not picked up by any worker thread yet
+  private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // 
requests being processed by worker threads
+  private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // 
buffers available for reading
+  private static ReadBufferManagerV1 bufferManager; // singleton, initialized 
in static initialization block
+  private static final ReentrantLock LOCK = new ReentrantLock();
+
+  static ReadBufferManagerV1 getBufferManager() {
+    if (bufferManager == null) {
+      LOCK.lock();
+      try {
+        if (bufferManager == null) {
+          bufferManager = new ReadBufferManagerV1();
+          bufferManager.init();
+        }
+      } finally {
+        LOCK.unlock();
+      }
+    }
+    return bufferManager;
+  }
+
+  static void setReadBufferManagerConfigs(int readAheadBlockSize) {
+    if (bufferManager == null) {
+      LOGGER.debug(
+          "ReadBufferManagerV1 not initialized yet. Overriding 
readAheadBlockSize as {}",
+          readAheadBlockSize);
+      blockSize = readAheadBlockSize;
+    }
+  }
+
+  private void init() {
+    buffers = new byte[NUM_BUFFERS][];
+    for (int i = 0; i < NUM_BUFFERS; i++) {
+      buffers[i] = new byte[blockSize];  // same buffers are reused. The byte 
array never goes back to GC
+      freeList.add(i);
+    }
+    for (int i = 0; i < NUM_THREADS; i++) {
+      Thread t = new Thread(new ReadBufferWorker(i));
+      t.setDaemon(true);
+      threads[i] = t;
+      t.setName("ABFS-prefetch-" + i);
+      t.start();
+    }
+    ReadBufferWorker.UNLEASH_WORKERS.countDown();
+  }
+
+  // hide instance constructor
+  private ReadBufferManagerV1() {
+    LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
+  }
+
+  /**
+   * {@link AbfsInputStream} calls this method to queue read-aheads.
+   *
+   * @param stream          The {@link AbfsInputStream} for which to do the 
read-ahead
+   * @param requestedOffset The offset in the file which shoukd be read
+   * @param requestedLength The length to read
+   */
+  @Override
+  public void queueReadAhead(final AbfsInputStream stream, final long 
requestedOffset, final int requestedLength,
+      TracingContext tracingContext) {
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
+          stream.getPath(), requestedOffset, requestedLength);
+    }
+    ReadBuffer buffer;
+    synchronized (this) {
+      if (isAlreadyQueued(stream, requestedOffset)) {
+        return; // already queued, do not queue again
+      }
+      if (freeList.isEmpty() && !tryEvict()) {
+        return; // no buffers available, cannot queue anything
+      }
+
+      buffer = new ReadBuffer();
+      buffer.setStream(stream);
+      buffer.setOffset(requestedOffset);
+      buffer.setLength(0);
+      buffer.setRequestedLength(requestedLength);
+      buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
+      buffer.setLatch(new CountDownLatch(1));
+      buffer.setTracingContext(tracingContext);
+
+      Integer bufferIndex = freeList.pop();  // will return a value, since we 
have checked size > 0 already
+
+      buffer.setBuffer(buffers[bufferIndex]);
+      buffer.setBufferindex(bufferIndex);
+      readAheadQueue.add(buffer);
+      notifyAll();
+      if (LOGGER.isTraceEnabled()) {
+        LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx 
{}",
+            stream.getPath(), requestedOffset, buffer.getBufferindex());
+      }
+    }
+  }
+
+  /**
+   * {@link AbfsInputStream} calls this method read any bytes already 
available in a buffer (thereby saving a
+   * remote read). This returns the bytes if the data already exists in 
buffer. If there is a buffer that is reading
+   * the requested offset, then this method blocks until that read completes. 
If the data is queued in a read-ahead
+   * but not picked up by a worker thread yet, then it cancels that read-ahead 
and reports cache miss. This is because
+   * depending on worker thread availability, the read-ahead may take a while 
- the calling thread can do it's own
+   * read to get the data faster (copmared to the read waiting in queue for an 
indeterminate amount of time).
+   *
+   * @param stream   the file to read bytes for
+   * @param position the offset in the file to do a read for
+   * @param length   the length to read
+   * @param buffer   the buffer to read data into. Note that the buffer will 
be written into from offset 0.
+   * @return the number of bytes read
+   */
+  @Override
+  public int getBlock(final AbfsInputStream stream, final long position, final 
int length, final byte[] buffer)
+      throws IOException {
+    // not synchronized, so have to be careful with locking
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("getBlock for file {}  position {}  thread {}",
+          stream.getPath(), position, Thread.currentThread().getName());
+    }
+
+    waitForProcess(stream, position);
+
+    int bytesRead = 0;
+    synchronized (this) {
+      bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
+    }
+    if (bytesRead > 0) {
+      if (LOGGER.isTraceEnabled()) {
+        LOGGER.trace("Done read from Cache for {} position {} length {}",
+            stream.getPath(), position, bytesRead);
+      }
+      return bytesRead;
+    }
+
+    // otherwise, just say we got nothing - calling thread can do its own read
+    return 0;
+  }
+
+  /**
+   * ReadBufferWorker thread calls this to get the next buffer that it should 
work on.
+   *
+   * @return {@link ReadBuffer}
+   * @throws InterruptedException if thread is interrupted
+   */
+  @Override
+  public ReadBuffer getNextBlockToRead() throws InterruptedException {
+    ReadBuffer buffer = null;
+    synchronized (this) {
+      //buffer = readAheadQueue.take();  // blocking method
+      while (readAheadQueue.size() == 0) {
+        wait();
+      }
+      buffer = readAheadQueue.remove();
+      notifyAll();
+      if (buffer == null) {
+        return null;            // should never happen
+      }
+      buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
+      inProgressList.add(buffer);
+    }
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
+          buffer.getStream().getPath(), buffer.getOffset());
+    }
+    return buffer;
+  }
+
+  /**
+   * ReadBufferWorker thread calls this method to post completion.
+   *
+   * @param buffer            the buffer whose read was completed
+   * @param result            the {@link ReadBufferStatus} after the read 
operation in the worker thread
+   * @param bytesActuallyRead the number of bytes that the worker thread was 
actually able to read
+   */
+  @Override
+  public void doneReading(final ReadBuffer buffer, final ReadBufferStatus 
result, final int bytesActuallyRead) {
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("ReadBufferWorker completed read file {} for offset {} 
outcome {} bytes {}",
+          buffer.getStream().getPath(),  buffer.getOffset(), result, 
bytesActuallyRead);
+    }
+    synchronized (this) {
+      // If this buffer has already been purged during
+      // close of InputStream then we don't update the lists.
+      if (inProgressList.contains(buffer)) {
+        inProgressList.remove(buffer);
+        if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
+          buffer.setStatus(ReadBufferStatus.AVAILABLE);
+          buffer.setLength(bytesActuallyRead);
+        } else {
+          freeList.push(buffer.getBufferindex());
+          // buffer will be deleted as per the eviction policy.
+        }
+        // completed list also contains FAILED read buffers
+        // for sending exception message to clients.
+        buffer.setStatus(result);
+        buffer.setTimeStamp(currentTimeMillis());
+        completedReadList.add(buffer);
+      }
+    }
+
+    //outside the synchronized, since anyone receiving a wake-up from the 
latch must see safe-published results
+    buffer.getLatch().countDown(); // wake up waiting threads (if any)
+  }
+
+  /**
+   * Purging the buffers associated with an {@link AbfsInputStream}
+   * from {@link ReadBufferManagerV1} when stream is closed.
+   * @param stream input stream.
+   */
+  @Override
+  public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
+    LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
+    readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
+    purgeList(stream, completedReadList);
+  }
+
+  private void waitForProcess(final AbfsInputStream stream, final long 
position) {
+    ReadBuffer readBuf;
+    synchronized (this) {
+      clearFromReadAheadQueue(stream, position);
+      readBuf = getFromList(inProgressList, stream, position);
+    }
+    if (readBuf != null) {         // if in in-progress queue, then block for 
it
+      try {
+        if (LOGGER.isTraceEnabled()) {
+          LOGGER.trace("got a relevant read buffer for file {} offset {} 
buffer idx {}",
+              stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex());
+        }
+        readBuf.getLatch().await();  // blocking wait on the caller stream's 
thread
+        // Note on correctness: readBuf gets out of inProgressList only in 1 
place: after worker thread
+        // is done processing it (in doneReading). There, the latch is set 
after removing the buffer from
+        // inProgressList. So this latch is safe to be outside the 
synchronized block.
+        // Putting it in synchronized would result in a deadlock, since this 
thread would be holding the lock
+        // while waiting, so no one will be able to  change any state. If this 
becomes more complex in the future,
+        // then the latch cane be removed and replaced with wait/notify 
whenever inProgressList is touched.
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+      }
+      if (LOGGER.isTraceEnabled()) {
+        LOGGER.trace("latch done for file {} buffer idx {} length {}",
+            stream.getPath(), readBuf.getBufferindex(), readBuf.getLength());
+      }
+    }
+  }
+
+  /**
+   * If any buffer in the completedlist can be reclaimed then reclaim it and 
return the buffer to free list.
+   * The objective is to find just one buffer - there is no advantage to 
evicting more than one.
+   *
+   * @return whether the eviction succeeeded - i.e., were we able to free up 
one buffer
+   */
+  private synchronized boolean tryEvict() {
+    ReadBuffer nodeToEvict = null;
+    if (completedReadList.size() <= 0) {
+      return false;  // there are no evict-able buffers
+    }
+
+    long currentTimeInMs = currentTimeMillis();
+
+    // first, try buffers where all bytes have been consumed (approximated as 
first and last bytes consumed)
+    for (ReadBuffer buf : completedReadList) {
+      if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
+        nodeToEvict = buf;
+        break;
+      }
+    }
+    if (nodeToEvict != null) {
+      return evict(nodeToEvict);
+    }
+
+    // next, try buffers where any bytes have been consumed (may be a bad 
idea? have to experiment and see)
+    for (ReadBuffer buf : completedReadList) {
+      if (buf.isAnyByteConsumed()) {
+        nodeToEvict = buf;
+        break;
+      }
+    }
+
+    if (nodeToEvict != null) {
+      return evict(nodeToEvict);
+    }
+
+    // next, try any old nodes that have not been consumed
+    // Failed read buffers (with buffer index=-1) that are older than
+    // thresholdAge should be cleaned up, but at the same time should not
+    // report successful eviction.
+    // Queue logic expects that a buffer is freed up for read ahead when
+    // eviction is successful, whereas a failed ReadBuffer would have released
+    // its buffer when its status was set to READ_FAILED.
+    long earliestBirthday = Long.MAX_VALUE;
+    ArrayList<ReadBuffer> oldFailedBuffers = new ArrayList<>();
+    for (ReadBuffer buf : completedReadList) {
+      if ((buf.getBufferindex() != -1)
+          && (buf.getTimeStamp() < earliestBirthday)) {
+        nodeToEvict = buf;
+        earliestBirthday = buf.getTimeStamp();
+      } else if ((buf.getBufferindex() == -1)
+          && (currentTimeInMs - buf.getTimeStamp()) > 
thresholdAgeMilliseconds) {
+        oldFailedBuffers.add(buf);
+      }
+    }
+
+    for (ReadBuffer buf : oldFailedBuffers) {
+      evict(buf);
+    }
+
+    if ((currentTimeInMs - earliestBirthday > thresholdAgeMilliseconds) && 
(nodeToEvict != null)) {
+      return evict(nodeToEvict);
+    }
+
+    LOGGER.trace("No buffer eligible for eviction");
+    // nothing can be evicted
+    return false;
+  }
+
+  private boolean evict(final ReadBuffer buf) {
+    // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList,
+    // avoid adding it to freeList.
+    if (buf.getBufferindex() != -1) {
+      freeList.push(buf.getBufferindex());
+    }
+
+    completedReadList.remove(buf);
+    buf.setTracingContext(null);
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} 
length {}",
+          buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), 
buf.getLength());
+    }
+    return true;
+  }
+
+  private boolean isAlreadyQueued(final AbfsInputStream stream, final long 
requestedOffset) {
+    // returns true if any part of the buffer is already queued
+    return (isInList(readAheadQueue, stream, requestedOffset)
+        || isInList(inProgressList, stream, requestedOffset)
+        || isInList(completedReadList, stream, requestedOffset));
+  }
+
+  private boolean isInList(final Collection<ReadBuffer> list, final 
AbfsInputStream stream, final long requestedOffset) {
+    return (getFromList(list, stream, requestedOffset) != null);
+  }
+
+  private ReadBuffer getFromList(final Collection<ReadBuffer> list, final 
AbfsInputStream stream, final long requestedOffset) {
+    for (ReadBuffer buffer : list) {
+      if (buffer.getStream() == stream) {
+        if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
+            && requestedOffset >= buffer.getOffset()
+            && requestedOffset < buffer.getOffset() + buffer.getLength()) {
+          return buffer;
+        } else if (requestedOffset >= buffer.getOffset()
+            && requestedOffset < buffer.getOffset() + 
buffer.getRequestedLength()) {
+          return buffer;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Returns buffers that failed or passed from completed queue.
+   * @param stream
+   * @param requestedOffset
+   * @return

Review Comment:
   Added in base class



##########
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java:
##########
@@ -921,10 +921,14 @@ private AzureBlobFileSystem createTestFile(Path 
testFilePath, long testFileSize,
   }
 
   private void resetReadBufferManager(int bufferSize, int threshold) {
-    ReadBufferManager.getBufferManager()
+    getBufferManager()
         .testResetReadBufferManager(bufferSize, threshold);
     // Trigger GC as aggressive recreation of ReadBufferManager buffers
     // by successive tests can lead to OOM based on the dev VM/machine 
capacity.
     System.gc();
   }
+
+  private ReadBufferManager getBufferManager() {
+    return ReadBufferManagerV1.getBufferManager();
+  }
 }

Review Comment:
   Taken





> ABFS: [ReadAheadV2] Refactor ReadBufferManager to isolate new code with the 
> current working code
> ------------------------------------------------------------------------------------------------
>
>                 Key: HADOOP-19613
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19613
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/azure
>    Affects Versions: 3.5.0, 3.4.1
>            Reporter: Anuj Modi
>            Assignee: Anuj Modi
>            Priority: Major
>              Labels: pull-request-available
>
> Read Buffer Manager used today was introduced way back and has been stable 
> for quite a while.
> Read Buffer Manager to be introduced as part of 
> https://issues.apache.org/jira/browse/HADOOP-19596 will introduce many 
> changes incrementally over time. While the development goes on and we are 
> able to fully stabilise the optimized version we need the current flow to be 
> functional and undisturbed. 
> This work item is to isolate that from new code by refactoring 
> ReadBufferManager class to have 2 different implementations with same public 
> interfaces: ReadBufferManagerV1 and ReadBufferManagerV2.
> This will also introduce new configs that can be used to toggle between new 
> and old code. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to