anujmodi2021 commented on code in PR #7801:
URL: https://github.com/apache/hadoop/pull/7801#discussion_r2218255246


##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java:
##########
@@ -15,636 +15,166 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.fs.azurebfs.services;
 
-import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+package org.apache.hadoop.fs.azurebfs.services;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Queue;
-import java.util.Stack;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 
 /**
- * The Read Buffer Manager for Rest AbfsClient.
+ * Interface for managing read buffers for Azure Blob File System input 
streams.
  */
-final class ReadBufferManager {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ReadBufferManager.class);
-  private static final int ONE_KB = 1024;
-  private static final int ONE_MB = ONE_KB * ONE_KB;
-
-  private static final int NUM_BUFFERS = 16;
-  private static final int NUM_THREADS = 8;
-  private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have 
to see if 3 seconds is a good threshold
-
-  private static int blockSize = 4 * ONE_MB;
-  private static int thresholdAgeMilliseconds = 
DEFAULT_THRESHOLD_AGE_MILLISECONDS;
-  private Thread[] threads = new Thread[NUM_THREADS];
-  private byte[][] buffers;    // array of byte[] buffers, to hold the data 
that is read
-  private Stack<Integer> freeList = new Stack<>();   // indices in buffers[] 
array that are available
-
-  private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of 
requests that are not picked up by any worker thread yet
-  private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // 
requests being processed by worker threads
-  private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // 
buffers available for reading
-  private static ReadBufferManager bufferManager; // singleton, initialized in 
static initialization block
-  private static final ReentrantLock LOCK = new ReentrantLock();
-
-  static ReadBufferManager getBufferManager() {
-    if (bufferManager == null) {
-      LOCK.lock();
-      try {
-        if (bufferManager == null) {
-          bufferManager = new ReadBufferManager();
-          bufferManager.init();
-        }
-      } finally {
-        LOCK.unlock();
-      }
-    }
-    return bufferManager;
-  }
-
-  static void setReadBufferManagerConfigs(int readAheadBlockSize) {
-    if (bufferManager == null) {
-      LOGGER.debug(
-          "ReadBufferManager not initialized yet. Overriding 
readAheadBlockSize as {}",
-          readAheadBlockSize);
-      blockSize = readAheadBlockSize;
-    }
-  }
-
-  private void init() {
-    buffers = new byte[NUM_BUFFERS][];
-    for (int i = 0; i < NUM_BUFFERS; i++) {
-      buffers[i] = new byte[blockSize];  // same buffers are reused. The byte 
array never goes back to GC
-      freeList.add(i);
-    }
-    for (int i = 0; i < NUM_THREADS; i++) {
-      Thread t = new Thread(new ReadBufferWorker(i));
-      t.setDaemon(true);
-      threads[i] = t;
-      t.setName("ABFS-prefetch-" + i);
-      t.start();
-    }
-    ReadBufferWorker.UNLEASH_WORKERS.countDown();
-  }
-
-  // hide instance constructor
-  private ReadBufferManager() {
-    LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
-  }
-
-
-  /*
-   *
-   *  AbfsInputStream-facing methods
-   *
-   */
-
+public interface ReadBufferManager {
 
   /**
-   * {@link AbfsInputStream} calls this method to queue read-aheads.
-   *
-   * @param stream          The {@link AbfsInputStream} for which to do the 
read-ahead
-   * @param requestedOffset The offset in the file which shoukd be read
-   * @param requestedLength The length to read
+   * Queues a read-ahead request from {@link AbfsInputStream}
+   * for a given offset in file and given length.
+   * @param stream the input stream requesting the read-ahead
+   * @param requestedOffset the offset in the remote file to start reading
+   * @param requestedLength the number of bytes to read from file
+   * @param tracingContext the tracing context for diagnostics
    */
-  void queueReadAhead(final AbfsInputStream stream, final long 
requestedOffset, final int requestedLength,
-                      TracingContext tracingContext) {
-    if (LOGGER.isTraceEnabled()) {
-      LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
-          stream.getPath(), requestedOffset, requestedLength);
-    }
-    ReadBuffer buffer;
-    synchronized (this) {
-      if (isAlreadyQueued(stream, requestedOffset)) {
-        return; // already queued, do not queue again
-      }
-      if (freeList.isEmpty() && !tryEvict()) {
-        return; // no buffers available, cannot queue anything
-      }
-
-      buffer = new ReadBuffer();
-      buffer.setStream(stream);
-      buffer.setOffset(requestedOffset);
-      buffer.setLength(0);
-      buffer.setRequestedLength(requestedLength);
-      buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
-      buffer.setLatch(new CountDownLatch(1));
-      buffer.setTracingContext(tracingContext);
-
-      Integer bufferIndex = freeList.pop();  // will return a value, since we 
have checked size > 0 already
-
-      buffer.setBuffer(buffers[bufferIndex]);
-      buffer.setBufferindex(bufferIndex);
-      readAheadQueue.add(buffer);
-      notifyAll();
-      if (LOGGER.isTraceEnabled()) {
-        LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx 
{}",
-            stream.getPath(), requestedOffset, buffer.getBufferindex());
-      }
-    }
-  }
+  void queueReadAhead(final AbfsInputStream stream, final long requestedOffset,
+      final int requestedLength, TracingContext tracingContext);
 
+  /**
+   * Gets a block of data from the prefetched data by ReadBufferManager.
+   * {@link AbfsInputStream} calls this method to read data
+   * @param stream the input stream requesting the block
+   * @param position the position in the file to read from
+   * @param length the number of bytes to read
+   * @param buffer the buffer to store the read data
+   * @return the number of bytes actually read
+   * @throws IOException if an I/O error occurs
+   */
+  int getBlock(final AbfsInputStream stream,
+      final long position,
+      final int length,
+      final byte[] buffer)
+      throws IOException;
 
   /**
-   * {@link AbfsInputStream} calls this method read any bytes already 
available in a buffer (thereby saving a
-   * remote read). This returns the bytes if the data already exists in 
buffer. If there is a buffer that is reading
-   * the requested offset, then this method blocks until that read completes. 
If the data is queued in a read-ahead
-   * but not picked up by a worker thread yet, then it cancels that read-ahead 
and reports cache miss. This is because
-   * depending on worker thread availability, the read-ahead may take a while 
- the calling thread can do it's own
-   * read to get the data faster (copmared to the read waiting in queue for an 
indeterminate amount of time).
+   * {@link ReadBufferWorker} calls this to get the next buffer to read from 
read-ahead queue.
+   * Requested read will be performed by background thread.
    *
-   * @param stream   the file to read bytes for
-   * @param position the offset in the file to do a read for
-   * @param length   the length to read
-   * @param buffer   the buffer to read data into. Note that the buffer will 
be written into from offset 0.
-   * @return the number of bytes read
+   * @return the next {@link ReadBuffer} to read
+   * @throws InterruptedException if interrupted while waiting
    */
-  int getBlock(final AbfsInputStream stream, final long position, final int 
length, final byte[] buffer)
-      throws IOException {
-    // not synchronized, so have to be careful with locking
-    if (LOGGER.isTraceEnabled()) {
-      LOGGER.trace("getBlock for file {}  position {}  thread {}",
-          stream.getPath(), position, Thread.currentThread().getName());
-    }
-
-    waitForProcess(stream, position);
-
-    int bytesRead = 0;
-    synchronized (this) {
-      bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
-    }
-    if (bytesRead > 0) {
-      if (LOGGER.isTraceEnabled()) {
-        LOGGER.trace("Done read from Cache for {} position {} length {}",
-            stream.getPath(), position, bytesRead);
-      }
-      return bytesRead;
-    }
+  ReadBuffer getNextBlockToRead() throws InterruptedException;
 
-    // otherwise, just say we got nothing - calling thread can do its own read
-    return 0;
-  }
-
-  /*
-   *
-   *  Internal methods
-   *
+  /**
+   * Marks the specified buffer as done reading and updates its status.
+   * Called by {@link ReadBufferWorker} after reading is complete.
+   * @param buffer the buffer that was read by worker thread
+   * @param result the status of the read operation
+   * @param bytesActuallyRead the number of bytes actually read
    */
-
-  private void waitForProcess(final AbfsInputStream stream, final long 
position) {
-    ReadBuffer readBuf;
-    synchronized (this) {
-      clearFromReadAheadQueue(stream, position);
-      readBuf = getFromList(inProgressList, stream, position);
-    }
-    if (readBuf != null) {         // if in in-progress queue, then block for 
it
-      try {
-        if (LOGGER.isTraceEnabled()) {
-          LOGGER.trace("got a relevant read buffer for file {} offset {} 
buffer idx {}",
-              stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex());
-        }
-        readBuf.getLatch().await();  // blocking wait on the caller stream's 
thread
-        // Note on correctness: readBuf gets out of inProgressList only in 1 
place: after worker thread
-        // is done processing it (in doneReading). There, the latch is set 
after removing the buffer from
-        // inProgressList. So this latch is safe to be outside the 
synchronized block.
-        // Putting it in synchronized would result in a deadlock, since this 
thread would be holding the lock
-        // while waiting, so no one will be able to  change any state. If this 
becomes more complex in the future,
-        // then the latch cane be removed and replaced with wait/notify 
whenever inProgressList is touched.
-      } catch (InterruptedException ex) {
-        Thread.currentThread().interrupt();
-      }
-      if (LOGGER.isTraceEnabled()) {
-        LOGGER.trace("latch done for file {} buffer idx {} length {}",
-            stream.getPath(), readBuf.getBufferindex(), readBuf.getLength());
-      }
-    }
-  }
+  void doneReading(final ReadBuffer buffer, final ReadBufferStatus result,
+      final int bytesActuallyRead);
 
   /**
-   * If any buffer in the completedlist can be reclaimed then reclaim it and 
return the buffer to free list.
-   * The objective is to find just one buffer - there is no advantage to 
evicting more than one.
+   * Purges all buffers associated with the calling {@link AbfsInputStream}.
    *
-   * @return whether the eviction succeeeded - i.e., were we able to free up 
one buffer
+   * @param stream the input stream whose buffers should be purged
    */
-  private synchronized boolean tryEvict() {
-    ReadBuffer nodeToEvict = null;
-    if (completedReadList.size() <= 0) {
-      return false;  // there are no evict-able buffers
-    }
-
-    long currentTimeInMs = currentTimeMillis();
-
-    // first, try buffers where all bytes have been consumed (approximated as 
first and last bytes consumed)
-    for (ReadBuffer buf : completedReadList) {
-      if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
-        nodeToEvict = buf;
-        break;
-      }
-    }
-    if (nodeToEvict != null) {
-      return evict(nodeToEvict);
-    }
-
-    // next, try buffers where any bytes have been consumed (may be a bad 
idea? have to experiment and see)
-    for (ReadBuffer buf : completedReadList) {
-      if (buf.isAnyByteConsumed()) {
-        nodeToEvict = buf;
-        break;
-      }
-    }
+  void purgeBuffersForStream(AbfsInputStream stream);
 
-    if (nodeToEvict != null) {
-      return evict(nodeToEvict);
-    }
-
-    // next, try any old nodes that have not been consumed
-    // Failed read buffers (with buffer index=-1) that are older than
-    // thresholdAge should be cleaned up, but at the same time should not
-    // report successful eviction.
-    // Queue logic expects that a buffer is freed up for read ahead when
-    // eviction is successful, whereas a failed ReadBuffer would have released
-    // its buffer when its status was set to READ_FAILED.
-    long earliestBirthday = Long.MAX_VALUE;
-    ArrayList<ReadBuffer> oldFailedBuffers = new ArrayList<>();
-    for (ReadBuffer buf : completedReadList) {
-      if ((buf.getBufferindex() != -1)
-          && (buf.getTimeStamp() < earliestBirthday)) {
-        nodeToEvict = buf;
-        earliestBirthday = buf.getTimeStamp();
-      } else if ((buf.getBufferindex() == -1)
-          && (currentTimeInMs - buf.getTimeStamp()) > 
thresholdAgeMilliseconds) {
-        oldFailedBuffers.add(buf);
-      }
-    }
-
-    for (ReadBuffer buf : oldFailedBuffers) {
-      evict(buf);
-    }
-
-    if ((currentTimeInMs - earliestBirthday > thresholdAgeMilliseconds) && 
(nodeToEvict != null)) {
-      return evict(nodeToEvict);
-    }
-
-    LOGGER.trace("No buffer eligible for eviction");
-    // nothing can be evicted
-    return false;
-  }
-
-  private boolean evict(final ReadBuffer buf) {
-    // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList,
-    // avoid adding it to freeList.
-    if (buf.getBufferindex() != -1) {
-      freeList.push(buf.getBufferindex());
-    }
-
-    completedReadList.remove(buf);
-    buf.setTracingContext(null);
-    if (LOGGER.isTraceEnabled()) {
-      LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} 
length {}",
-          buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), 
buf.getLength());
-    }
-    return true;
-  }
-
-  private boolean isAlreadyQueued(final AbfsInputStream stream, final long 
requestedOffset) {
-    // returns true if any part of the buffer is already queued
-    return (isInList(readAheadQueue, stream, requestedOffset)
-        || isInList(inProgressList, stream, requestedOffset)
-        || isInList(completedReadList, stream, requestedOffset));
-  }
-
-  private boolean isInList(final Collection<ReadBuffer> list, final 
AbfsInputStream stream, final long requestedOffset) {
-    return (getFromList(list, stream, requestedOffset) != null);
-  }
-
-  private ReadBuffer getFromList(final Collection<ReadBuffer> list, final 
AbfsInputStream stream, final long requestedOffset) {
-    for (ReadBuffer buffer : list) {
-      if (buffer.getStream() == stream) {
-        if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
-            && requestedOffset >= buffer.getOffset()
-            && requestedOffset < buffer.getOffset() + buffer.getLength()) {
-          return buffer;
-        } else if (requestedOffset >= buffer.getOffset()
-            && requestedOffset < buffer.getOffset() + 
buffer.getRequestedLength()) {
-          return buffer;
-        }
-      }
-    }
-    return null;
-  }
+  // Following Methods are for testing purposes only and should not be used in 
production code.
 
   /**
-   * Returns buffers that failed or passed from completed queue.
-   * @param stream
-   * @param requestedOffset
-   * @return
+   * Resets the read buffer manager for testing purposes.
    */
-  private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, 
final long requestedOffset) {
-    for (ReadBuffer buffer : completedReadList) {
-      // Buffer is returned if the requestedOffset is at or above buffer's
-      // offset but less than buffer's length or the actual requestedLength
-      if ((buffer.getStream() == stream)
-          && (requestedOffset >= buffer.getOffset())
-          && ((requestedOffset < buffer.getOffset() + buffer.getLength())
-          || (requestedOffset < buffer.getOffset() + 
buffer.getRequestedLength()))) {
-          return buffer;
-        }
-      }
-
-    return null;
-  }
-
-  private void clearFromReadAheadQueue(final AbfsInputStream stream, final 
long requestedOffset) {
-    ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
-    if (buffer != null) {
-      readAheadQueue.remove(buffer);
-      notifyAll();   // lock is held in calling method
-      freeList.push(buffer.getBufferindex());
-    }
-  }
-
-  private int getBlockFromCompletedQueue(final AbfsInputStream stream, final 
long position, final int length,
-                                         final byte[] buffer) throws 
IOException {
-    ReadBuffer buf = getBufferFromCompletedQueue(stream, position);
-
-    if (buf == null) {
-      return 0;
-    }
-
-    if (buf.getStatus() == ReadBufferStatus.READ_FAILED) {
-      // To prevent new read requests to fail due to old read-ahead attempts,
-      // return exception only from buffers that failed within last 
thresholdAgeMilliseconds
-      if ((currentTimeMillis() - (buf.getTimeStamp()) < 
thresholdAgeMilliseconds)) {
-        throw buf.getErrException();
-      } else {
-        return 0;
-      }
-    }
-
-    if ((buf.getStatus() != ReadBufferStatus.AVAILABLE)
-        || (position >= buf.getOffset() + buf.getLength())) {
-      return 0;
-    }
-
-    int cursor = (int) (position - buf.getOffset());
-    int availableLengthInBuffer = buf.getLength() - cursor;
-    int lengthToCopy = Math.min(length, availableLengthInBuffer);
-    System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
-    if (cursor == 0) {
-      buf.setFirstByteConsumed(true);
-    }
-    if (cursor + lengthToCopy == buf.getLength()) {
-      buf.setLastByteConsumed(true);
-    }
-    buf.setAnyByteConsumed(true);
-    return lengthToCopy;
-  }
+  @VisibleForTesting
+  void testResetReadBufferManager();

Review Comment:
   It was there already in RBMV1. It makes sense to keep it this way because 
this should never be called from production flow and its critical to test some 
really important scenarios so we can't void it.



##########
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java:
##########
@@ -0,0 +1,637 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Stack;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.classification.VisibleForTesting;
+
+/**
+ * The Read Buffer Manager for Rest AbfsClient.
+ * V1 implementation of ReadBufferManager.
+ */
+final class ReadBufferManagerV1 implements ReadBufferManager {
+  private static final Logger LOGGER = LoggerFactory.getLogger(
+      ReadBufferManagerV1.class);
+  private static final int ONE_KB = 1024;
+  private static final int ONE_MB = ONE_KB * ONE_KB;
+
+  private static final int NUM_BUFFERS = 16;
+  private static final int NUM_THREADS = 8;
+  private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have 
to see if 3 seconds is a good threshold
+
+  private static int blockSize = 4 * ONE_MB;
+  private static int thresholdAgeMilliseconds = 
DEFAULT_THRESHOLD_AGE_MILLISECONDS;
+  private Thread[] threads = new Thread[NUM_THREADS];
+  private byte[][] buffers;    // array of byte[] buffers, to hold the data 
that is read
+  private Stack<Integer> freeList = new Stack<>();   // indices in buffers[] 
array that are available
+
+  private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of 
requests that are not picked up by any worker thread yet
+  private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // 
requests being processed by worker threads
+  private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // 
buffers available for reading
+  private static ReadBufferManagerV1 bufferManager; // singleton, initialized 
in static initialization block
+  private static final ReentrantLock LOCK = new ReentrantLock();
+
+  static ReadBufferManagerV1 getBufferManager() {

Review Comment:
   Makes sense. Will take it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to