This is an automated email from the ASF dual-hosted git repository.

anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9d5e111237b HADOOP-19613. [ABFS][ReadAheadV2] Refactor 
ReadBufferManager to isolate new code with the current working code (#7801)
9d5e111237b is described below

commit 9d5e111237bdb34723418e56be90b27970c65466
Author: Anuj Modi <[email protected]>
AuthorDate: Mon Jul 28 19:01:28 2025 +0530

    HADOOP-19613. [ABFS][ReadAheadV2] Refactor ReadBufferManager to isolate new 
code with the current working code (#7801)
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |  83 +++
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |   1 +
 .../fs/azurebfs/constants/ConfigurationKeys.java   |  38 +-
 .../constants/FileSystemConfigurations.java        |   8 +
 .../fs/azurebfs/services/AbfsInputStream.java      |  36 +-
 .../azurebfs/services/AbfsInputStreamContext.java  |  12 +
 .../fs/azurebfs/services/ReadBufferManager.java    | 693 +++++----------------
 ...BufferManager.java => ReadBufferManagerV1.java} | 504 +++++++--------
 .../fs/azurebfs/services/ReadBufferManagerV2.java  | 228 +++++++
 .../fs/azurebfs/services/ReadBufferWorker.java     |   5 +-
 .../fs/azurebfs/ITestGetNameSpaceEnabled.java      |  22 +-
 .../azurebfs/services/ITestReadBufferManager.java  |   8 +-
 .../fs/azurebfs/services/TestAbfsInputStream.java  |  49 +-
 13 files changed, 848 insertions(+), 839 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 9aaf1714472..1242122f030 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -381,6 +381,41 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_ENABLE_READAHEAD)
   private boolean enabledReadAhead;
 
+  @BooleanConfigurationValidatorAnnotation(
+      ConfigurationKey = FS_AZURE_ENABLE_READAHEAD_V2,
+      DefaultValue = DEFAULT_ENABLE_READAHEAD_V2)
+  private boolean isReadAheadV2Enabled;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+      FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE,
+      DefaultValue = DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE)
+  private int minReadAheadV2ThreadPoolSize;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+      FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE,
+      DefaultValue = DEFAULT_READAHEAD_V2_MAX_THREAD_POOL_SIZE)
+  private int maxReadAheadV2ThreadPoolSize;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+      FS_AZURE_READAHEAD_V2_MIN_BUFFER_POOL_SIZE,
+      DefaultValue = DEFAULT_READAHEAD_V2_MIN_BUFFER_POOL_SIZE)
+  private int minReadAheadV2BufferPoolSize;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+      FS_AZURE_READAHEAD_V2_MAX_BUFFER_POOL_SIZE,
+      DefaultValue = DEFAULT_READAHEAD_V2_MAX_BUFFER_POOL_SIZE)
+  private int maxReadAheadV2BufferPoolSize;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+      FS_AZURE_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS,
+      DefaultValue = DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS)
+  private int readAheadExecutorServiceTTLMillis;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+      FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS,
+      DefaultValue = DEFAULT_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS)
+  private int readAheadV2CachedBufferTTLMillis;
+
   @LongConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS,
       MinValue = 0,
       DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
@@ -1368,6 +1403,54 @@ public boolean isReadAheadEnabled() {
     return this.enabledReadAhead;
   }
 
+  public int getMinReadAheadV2ThreadPoolSize() {
+    if (minReadAheadV2ThreadPoolSize <= 0) {
+      // If the minReadAheadV2ThreadPoolSize is not set, use the default value
+      return 2 * Runtime.getRuntime().availableProcessors();
+    }
+    return minReadAheadV2ThreadPoolSize;
+  }
+
+  public int getMaxReadAheadV2ThreadPoolSize() {
+    if (maxReadAheadV2ThreadPoolSize <= 0) {
+      // If the maxReadAheadV2ThreadPoolSize is not set, use the default value
+      return 4 * Runtime.getRuntime().availableProcessors();
+    }
+    return maxReadAheadV2ThreadPoolSize;
+  }
+
+  public int getMinReadAheadV2BufferPoolSize() {
+    if (minReadAheadV2BufferPoolSize <= 0) {
+      // If the minReadAheadV2BufferPoolSize is not set, use the default value
+      return 2 * Runtime.getRuntime().availableProcessors();
+    }
+    return minReadAheadV2BufferPoolSize;
+  }
+
+  public int getMaxReadAheadV2BufferPoolSize() {
+    if (maxReadAheadV2BufferPoolSize <= 0) {
+      // If the maxReadAheadV2BufferPoolSize is not set, use the default value
+      return 4 * Runtime.getRuntime().availableProcessors();
+    }
+    return maxReadAheadV2BufferPoolSize;
+  }
+
+  public int getReadAheadExecutorServiceTTLInMillis() {
+    return readAheadExecutorServiceTTLMillis;
+  }
+
+  public int getReadAheadV2CachedBufferTTLMillis() {
+    return readAheadV2CachedBufferTTLMillis;
+  }
+
+  /**
+   * Checks if the read-ahead v2 feature is enabled by user.
+   * @return true if read-ahead v2 is enabled, false otherwise.
+   */
+  public boolean isReadAheadV2Enabled() {
+    return this.isReadAheadV2Enabled;
+  }
+
   @VisibleForTesting
   void setReadAheadEnabled(final boolean enabledReadAhead) {
     this.enabledReadAhead = enabledReadAhead;
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index d93c1a3dc65..439b7626a86 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -955,6 +955,7 @@ AZURE_FOOTER_READ_BUFFER_SIZE, 
getAbfsConfiguration().getFooterReadBufferSize())
             
.withReadAheadQueueDepth(getAbfsConfiguration().getReadAheadQueueDepth())
             
.withTolerateOobAppends(getAbfsConfiguration().getTolerateOobAppends())
             .isReadAheadEnabled(getAbfsConfiguration().isReadAheadEnabled())
+            
.isReadAheadV2Enabled(getAbfsConfiguration().isReadAheadV2Enabled())
             
.withReadSmallFilesCompletely(getAbfsConfiguration().readSmallFilesCompletely())
             
.withOptimizeFooterRead(getAbfsConfiguration().optimizeFooterRead())
             .withFooterReadBufferSize(footerReadBufferSize)
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 40095c7a802..50a88ab4e45 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -259,10 +259,46 @@ public final class ConfigurationKeys {
   public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = 
"fs.azure.shellkeyprovider.script";
 
   /**
-   * Enable or disable readahead buffer in AbfsInputStream.
+   * Enable or disable readahead V1 in AbfsInputStream.
    * Value: {@value}.
    */
   public static final String FS_AZURE_ENABLE_READAHEAD = 
"fs.azure.enable.readahead";
+  /**
+   * Enable or disable readahead V2 in AbfsInputStream. This will work 
independent of V1.
+   * Value: {@value}.
+   */
+  public static final String FS_AZURE_ENABLE_READAHEAD_V2 = 
"fs.azure.enable.readahead.v2";
+
+  /**
+   * Minimum number of prefetch threads in the thread pool for readahead V2.
+   * {@value }
+   */
+  public static final String FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE = 
"fs.azure.readahead.v2.min.thread.pool.size";
+  /**
+   * Maximum number of prefetch threads in the thread pool for readahead V2.
+   * {@value }
+   */
+  public static final String FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE = 
"fs.azure.readahead.v2.max.thread.pool.size";
+  /**
+   * Minimum size of the buffer pool for caching prefetched data for readahead 
V2.
+   * {@value }
+   */
+  public static final String FS_AZURE_READAHEAD_V2_MIN_BUFFER_POOL_SIZE = 
"fs.azure.readahead.v2.min.buffer.pool.size";
+  /**
+   * Maximum size of the buffer pool for caching prefetched data for readahead 
V2.
+   * {@value }
+   */
+  public static final String FS_AZURE_READAHEAD_V2_MAX_BUFFER_POOL_SIZE = 
"fs.azure.readahead.v2.max.buffer.pool.size";
+
+  /**
+   * TTL in milliseconds for the idle threads in executor service used by read 
ahead v2.
+   */
+  public static final String FS_AZURE_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS 
= "fs.azure.readahead.v2.executor.service.ttl.millis";
+
+  /**
+   * TTL in milliseconds for the cached buffers in buffer pool used by read 
ahead v2.
+   */
+  public static final String FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS = 
"fs.azure.readahead.v2.cached.buffer.ttl.millis";
 
   /** Setting this true will make the driver use it's own RemoteIterator 
implementation */
   public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = 
"fs.azure.enable.abfslistiterator";
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index f7ccc0e9175..824a4c9701e 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -127,6 +127,14 @@ public final class FileSystemConfigurations {
   public static final long 
DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;
 
   public static final boolean DEFAULT_ENABLE_READAHEAD = true;
+  public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = false;
+  public static final int DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE = -1;
+  public static final int DEFAULT_READAHEAD_V2_MAX_THREAD_POOL_SIZE = -1;
+  public static final int DEFAULT_READAHEAD_V2_MIN_BUFFER_POOL_SIZE = -1;
+  public static final int DEFAULT_READAHEAD_V2_MAX_BUFFER_POOL_SIZE = -1;
+  public static final int DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS = 
3_000;
+  public static final int DEFAULT_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS = 
6_000;
+
   public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
   public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";
 
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 5d770805a51..3dc7f88e529 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -79,6 +79,7 @@ public class AbfsInputStream extends FSInputStream implements 
CanUnbuffer,
   private final String eTag;                  // eTag of the path when 
InputStream are created
   private final boolean tolerateOobAppends; // whether tolerate Oob Appends
   private final boolean readAheadEnabled; // whether enable readAhead;
+  private final boolean readAheadV2Enabled; // whether enable readAhead V2;
   private final String inputStreamId;
   private final boolean alwaysReadBufferSize;
   /*
@@ -130,6 +131,7 @@ public class AbfsInputStream extends FSInputStream 
implements CanUnbuffer,
 
   /** ABFS instance to be held by the input stream to avoid GC close. */
   private final BackReference fsBackRef;
+  private ReadBufferManager readBufferManager;
 
   public AbfsInputStream(
           final AbfsClient client,
@@ -150,6 +152,7 @@ public AbfsInputStream(
     this.eTag = eTag;
     this.readAheadRange = abfsInputStreamContext.getReadAheadRange();
     this.readAheadEnabled = abfsInputStreamContext.isReadAheadEnabled();
+    this.readAheadV2Enabled = abfsInputStreamContext.isReadAheadV2Enabled();
     this.alwaysReadBufferSize
         = abfsInputStreamContext.shouldReadBufferSizeAlways();
     this.bufferedPreadDisabled = abfsInputStreamContext
@@ -173,9 +176,19 @@ public AbfsInputStream(
     this.fsBackRef = abfsInputStreamContext.getFsBackRef();
     contextEncryptionAdapter = abfsInputStreamContext.getEncryptionAdapter();
 
-    // Propagate the config values to ReadBufferManager so that the first 
instance
-    // to initialize can set the readAheadBlockSize
-    ReadBufferManager.setReadBufferManagerConfigs(readAheadBlockSize);
+    /*
+     * Initialize the ReadBufferManager based on whether readAheadV2 is 
enabled or not.
+     * Precedence is given to ReadBufferManagerV2.
+     * If none of the V1 and V2 are enabled, then no read ahead will be done.
+     */
+    if (readAheadV2Enabled) {
+      ReadBufferManagerV2.setReadBufferManagerConfigs(
+          readAheadBlockSize, client.getAbfsConfiguration());
+      readBufferManager = ReadBufferManagerV2.getBufferManager();
+    } else {
+      ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize);
+      readBufferManager = ReadBufferManagerV1.getBufferManager();
+    }
     if (streamStatistics != null) {
       ioStatistics = streamStatistics.getIOStatistics();
     }
@@ -491,7 +504,7 @@ private int copyToUserBuffer(byte[] b, int off, int len){
 
   private int readInternal(final long position, final byte[] b, final int 
offset, final int length,
                            final boolean bypassReadAhead) throws IOException {
-    if (readAheadEnabled && !bypassReadAhead) {
+    if (isReadAheadEnabled() && !bypassReadAhead) {
       // try reading from read-ahead
       if (offset != 0) {
         throw new IllegalArgumentException("readahead buffers cannot have 
non-zero buffer offsets");
@@ -510,7 +523,7 @@ private int readInternal(final long position, final byte[] 
b, final int offset,
       while (numReadAheads > 0 && nextOffset < contentLength) {
         LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
             nextOffset, nextSize);
-        ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, 
(int) nextSize,
+        readBufferManager.queueReadAhead(this, nextOffset, (int) nextSize,
                 new TracingContext(readAheadTracingContext));
         nextOffset = nextOffset + nextSize;
         numReadAheads--;
@@ -519,7 +532,7 @@ private int readInternal(final long position, final byte[] 
b, final int offset,
       }
 
       // try reading from buffers first
-      receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, 
position, length, b);
+      receivedBytes = readBufferManager.getBlock(this, position, length, b);
       bytesFromReadAhead += receivedBytes;
       if (receivedBytes > 0) {
         incrementReadOps();
@@ -720,7 +733,9 @@ public boolean seekToNewSource(long l) throws IOException {
   public synchronized void close() throws IOException {
     LOG.debug("Closing {}", this);
     closed = true;
-    ReadBufferManager.getBufferManager().purgeBuffersForStream(this);
+    if (readBufferManager != null) {
+      readBufferManager.purgeBuffersForStream(this);
+    }
     buffer = null; // de-reference the buffer so it can be GC'ed sooner
     if (contextEncryptionAdapter != null) {
       contextEncryptionAdapter.destroy();
@@ -773,9 +788,14 @@ byte[] getBuffer() {
     return buffer;
   }
 
+  /**
+   * Checks if any version of read ahead is enabled.
+   * If both are disabled, then skip read ahead logic.
+   * @return true if read ahead is enabled, false otherwise.
+   */
   @VisibleForTesting
   public boolean isReadAheadEnabled() {
-    return readAheadEnabled;
+    return (readAheadEnabled || readAheadV2Enabled) && readBufferManager != 
null;
   }
 
   @VisibleForTesting
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
index fdcad5ac3a0..f6272492d60 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
@@ -41,6 +41,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext 
{
 
   private boolean isReadAheadEnabled = true;
 
+  private boolean isReadAheadV2Enabled;
+
   private boolean alwaysReadBufferSize;
 
   private int readAheadBlockSize;
@@ -91,6 +93,12 @@ public AbfsInputStreamContext isReadAheadEnabled(
     return this;
   }
 
+  public AbfsInputStreamContext isReadAheadV2Enabled(
+      final boolean isReadAheadV2Enabled) {
+    this.isReadAheadV2Enabled = isReadAheadV2Enabled;
+    return this;
+  }
+
   public AbfsInputStreamContext withReadAheadRange(
           final int readAheadRange) {
     this.readAheadRange = readAheadRange;
@@ -181,6 +189,10 @@ public boolean isReadAheadEnabled() {
     return isReadAheadEnabled;
   }
 
+  public boolean isReadAheadV2Enabled() {
+    return isReadAheadV2Enabled;
+  }
+
   public int getReadAheadRange() {
     return readAheadRange;
   }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
index 031545f57a1..9ee128fbc32 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
@@ -15,636 +15,287 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.hadoop.fs.azurebfs.services;
 
-import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+package org.apache.hadoop.fs.azurebfs.services;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
 import java.util.Stack;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_AHEAD_BLOCK_SIZE;
 
 /**
- * The Read Buffer Manager for Rest AbfsClient.
+ * Abstract class for managing read buffers for Azure Blob File System input 
streams.
  */
-final class ReadBufferManager {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ReadBufferManager.class);
-  private static final int ONE_KB = 1024;
-  private static final int ONE_MB = ONE_KB * ONE_KB;
-
-  private static final int NUM_BUFFERS = 16;
-  private static final int NUM_THREADS = 8;
-  private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have 
to see if 3 seconds is a good threshold
-
-  private static int blockSize = 4 * ONE_MB;
-  private static int thresholdAgeMilliseconds = 
DEFAULT_THRESHOLD_AGE_MILLISECONDS;
-  private Thread[] threads = new Thread[NUM_THREADS];
-  private byte[][] buffers;    // array of byte[] buffers, to hold the data 
that is read
-  private Stack<Integer> freeList = new Stack<>();   // indices in buffers[] 
array that are available
+public abstract class ReadBufferManager {
+  protected static final Logger LOGGER = LoggerFactory.getLogger(
+      ReadBufferManager.class);
+  protected static final ReentrantLock LOCK = new ReentrantLock();
+  private static int thresholdAgeMilliseconds;
+  private static int blockSize = DEFAULT_READ_AHEAD_BLOCK_SIZE; // default 
block size for read-ahead in bytes
 
+  private Stack<Integer> freeList = new Stack<>();   // indices in buffers[] 
array that are available
   private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of 
requests that are not picked up by any worker thread yet
   private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // 
requests being processed by worker threads
   private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // 
buffers available for reading
-  private static ReadBufferManager bufferManager; // singleton, initialized in 
static initialization block
-  private static final ReentrantLock LOCK = new ReentrantLock();
-
-  static ReadBufferManager getBufferManager() {
-    if (bufferManager == null) {
-      LOCK.lock();
-      try {
-        if (bufferManager == null) {
-          bufferManager = new ReadBufferManager();
-          bufferManager.init();
-        }
-      } finally {
-        LOCK.unlock();
-      }
-    }
-    return bufferManager;
-  }
 
-  static void setReadBufferManagerConfigs(int readAheadBlockSize) {
-    if (bufferManager == null) {
-      LOGGER.debug(
-          "ReadBufferManager not initialized yet. Overriding 
readAheadBlockSize as {}",
-          readAheadBlockSize);
-      blockSize = readAheadBlockSize;
-    }
-  }
-
-  private void init() {
-    buffers = new byte[NUM_BUFFERS][];
-    for (int i = 0; i < NUM_BUFFERS; i++) {
-      buffers[i] = new byte[blockSize];  // same buffers are reused. The byte 
array never goes back to GC
-      freeList.add(i);
-    }
-    for (int i = 0; i < NUM_THREADS; i++) {
-      Thread t = new Thread(new ReadBufferWorker(i));
-      t.setDaemon(true);
-      threads[i] = t;
-      t.setName("ABFS-prefetch-" + i);
-      t.start();
-    }
-    ReadBufferWorker.UNLEASH_WORKERS.countDown();
-  }
-
-  // hide instance constructor
-  private ReadBufferManager() {
-    LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
-  }
-
-
-  /*
-   *
-   *  AbfsInputStream-facing methods
-   *
+  /**
+   * Initializes the ReadBufferManager singleton instance. Creates the read 
buffers and threads.
+   * This method should be called once to set up the read buffer manager.
    */
-
+  abstract void init();
 
   /**
-   * {@link AbfsInputStream} calls this method to queue read-aheads.
-   *
-   * @param stream          The {@link AbfsInputStream} for which to do the 
read-ahead
-   * @param requestedOffset The offset in the file which shoukd be read
-   * @param requestedLength The length to read
+   * Queues a read-ahead request from {@link AbfsInputStream}
+   * for a given offset in file and given length.
+   * @param stream the input stream requesting the read-ahead
+   * @param requestedOffset the offset in the remote file to start reading
+   * @param requestedLength the number of bytes to read from file
+   * @param tracingContext the tracing context for diagnostics
    */
-  void queueReadAhead(final AbfsInputStream stream, final long 
requestedOffset, final int requestedLength,
-                      TracingContext tracingContext) {
-    if (LOGGER.isTraceEnabled()) {
-      LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
-          stream.getPath(), requestedOffset, requestedLength);
-    }
-    ReadBuffer buffer;
-    synchronized (this) {
-      if (isAlreadyQueued(stream, requestedOffset)) {
-        return; // already queued, do not queue again
-      }
-      if (freeList.isEmpty() && !tryEvict()) {
-        return; // no buffers available, cannot queue anything
-      }
-
-      buffer = new ReadBuffer();
-      buffer.setStream(stream);
-      buffer.setOffset(requestedOffset);
-      buffer.setLength(0);
-      buffer.setRequestedLength(requestedLength);
-      buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
-      buffer.setLatch(new CountDownLatch(1));
-      buffer.setTracingContext(tracingContext);
-
-      Integer bufferIndex = freeList.pop();  // will return a value, since we 
have checked size > 0 already
-
-      buffer.setBuffer(buffers[bufferIndex]);
-      buffer.setBufferindex(bufferIndex);
-      readAheadQueue.add(buffer);
-      notifyAll();
-      if (LOGGER.isTraceEnabled()) {
-        LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx 
{}",
-            stream.getPath(), requestedOffset, buffer.getBufferindex());
-      }
-    }
-  }
-
+  abstract void queueReadAhead(AbfsInputStream stream,
+      long requestedOffset,
+      int requestedLength,
+      TracingContext tracingContext);
 
   /**
+   * Gets a block of data from the prefetched data by ReadBufferManager.
    * {@link AbfsInputStream} calls this method read any bytes already 
available in a buffer (thereby saving a
    * remote read). This returns the bytes if the data already exists in 
buffer. If there is a buffer that is reading
    * the requested offset, then this method blocks until that read completes. 
If the data is queued in a read-ahead
    * but not picked up by a worker thread yet, then it cancels that read-ahead 
and reports cache miss. This is because
-   * depending on worker thread availability, the read-ahead may take a while 
- the calling thread can do it's own
-   * read to get the data faster (copmared to the read waiting in queue for an 
indeterminate amount of time).
+   * depending on worker thread availability, the read-ahead may take a while 
- the calling thread can do its own
+   * read to get the data faster (compared to the read waiting in queue for an 
indeterminate amount of time).
    *
-   * @param stream   the file to read bytes for
-   * @param position the offset in the file to do a read for
-   * @param length   the length to read
-   * @param buffer   the buffer to read data into. Note that the buffer will 
be written into from offset 0.
-   * @return the number of bytes read
+   * @param stream the input stream requesting the block
+   * @param position the position in the file to read from
+   * @param length the number of bytes to read
+   * @param buffer the buffer to store the read data
+   * @return the number of bytes actually read
+   * @throws IOException if an I/O error occurs
    */
-  int getBlock(final AbfsInputStream stream, final long position, final int 
length, final byte[] buffer)
-      throws IOException {
-    // not synchronized, so have to be careful with locking
-    if (LOGGER.isTraceEnabled()) {
-      LOGGER.trace("getBlock for file {}  position {}  thread {}",
-          stream.getPath(), position, Thread.currentThread().getName());
-    }
-
-    waitForProcess(stream, position);
-
-    int bytesRead = 0;
-    synchronized (this) {
-      bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
-    }
-    if (bytesRead > 0) {
-      if (LOGGER.isTraceEnabled()) {
-        LOGGER.trace("Done read from Cache for {} position {} length {}",
-            stream.getPath(), position, bytesRead);
-      }
-      return bytesRead;
-    }
+  abstract int getBlock(AbfsInputStream stream,
+      long position,
+      int length,
+      byte[] buffer) throws IOException;
 
-    // otherwise, just say we got nothing - calling thread can do its own read
-    return 0;
-  }
-
-  /*
-   *
-   *  Internal methods
+  /**
+   * {@link ReadBufferWorker} calls this to get the next buffer to read from 
read-ahead queue.
+   * Requested read will be performed by background thread.
    *
+   * @return the next {@link ReadBuffer} to read
+   * @throws InterruptedException if interrupted while waiting
    */
-
-  private void waitForProcess(final AbfsInputStream stream, final long 
position) {
-    ReadBuffer readBuf;
-    synchronized (this) {
-      clearFromReadAheadQueue(stream, position);
-      readBuf = getFromList(inProgressList, stream, position);
-    }
-    if (readBuf != null) {         // if in in-progress queue, then block for 
it
-      try {
-        if (LOGGER.isTraceEnabled()) {
-          LOGGER.trace("got a relevant read buffer for file {} offset {} 
buffer idx {}",
-              stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex());
-        }
-        readBuf.getLatch().await();  // blocking wait on the caller stream's 
thread
-        // Note on correctness: readBuf gets out of inProgressList only in 1 
place: after worker thread
-        // is done processing it (in doneReading). There, the latch is set 
after removing the buffer from
-        // inProgressList. So this latch is safe to be outside the 
synchronized block.
-        // Putting it in synchronized would result in a deadlock, since this 
thread would be holding the lock
-        // while waiting, so no one will be able to  change any state. If this 
becomes more complex in the future,
-        // then the latch cane be removed and replaced with wait/notify 
whenever inProgressList is touched.
-      } catch (InterruptedException ex) {
-        Thread.currentThread().interrupt();
-      }
-      if (LOGGER.isTraceEnabled()) {
-        LOGGER.trace("latch done for file {} buffer idx {} length {}",
-            stream.getPath(), readBuf.getBufferindex(), readBuf.getLength());
-      }
-    }
-  }
+  abstract ReadBuffer getNextBlockToRead() throws InterruptedException;
 
   /**
-   * If any buffer in the completedlist can be reclaimed then reclaim it and 
return the buffer to free list.
-   * The objective is to find just one buffer - there is no advantage to 
evicting more than one.
+   * Marks the specified buffer as done reading and updates its status.
+   * Called by {@link ReadBufferWorker} after reading is complete.
    *
-   * @return whether the eviction succeeeded - i.e., were we able to free up 
one buffer
+   * @param buffer the buffer that was read by worker thread
+   * @param result the status of the read operation
+   * @param bytesActuallyRead the number of bytes actually read by worker 
thread.
    */
-  private synchronized boolean tryEvict() {
-    ReadBuffer nodeToEvict = null;
-    if (completedReadList.size() <= 0) {
-      return false;  // there are no evict-able buffers
-    }
-
-    long currentTimeInMs = currentTimeMillis();
-
-    // first, try buffers where all bytes have been consumed (approximated as 
first and last bytes consumed)
-    for (ReadBuffer buf : completedReadList) {
-      if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
-        nodeToEvict = buf;
-        break;
-      }
-    }
-    if (nodeToEvict != null) {
-      return evict(nodeToEvict);
-    }
-
-    // next, try buffers where any bytes have been consumed (may be a bad 
idea? have to experiment and see)
-    for (ReadBuffer buf : completedReadList) {
-      if (buf.isAnyByteConsumed()) {
-        nodeToEvict = buf;
-        break;
-      }
-    }
-
-    if (nodeToEvict != null) {
-      return evict(nodeToEvict);
-    }
-
-    // next, try any old nodes that have not been consumed
-    // Failed read buffers (with buffer index=-1) that are older than
-    // thresholdAge should be cleaned up, but at the same time should not
-    // report successful eviction.
-    // Queue logic expects that a buffer is freed up for read ahead when
-    // eviction is successful, whereas a failed ReadBuffer would have released
-    // its buffer when its status was set to READ_FAILED.
-    long earliestBirthday = Long.MAX_VALUE;
-    ArrayList<ReadBuffer> oldFailedBuffers = new ArrayList<>();
-    for (ReadBuffer buf : completedReadList) {
-      if ((buf.getBufferindex() != -1)
-          && (buf.getTimeStamp() < earliestBirthday)) {
-        nodeToEvict = buf;
-        earliestBirthday = buf.getTimeStamp();
-      } else if ((buf.getBufferindex() == -1)
-          && (currentTimeInMs - buf.getTimeStamp()) > 
thresholdAgeMilliseconds) {
-        oldFailedBuffers.add(buf);
-      }
-    }
-
-    for (ReadBuffer buf : oldFailedBuffers) {
-      evict(buf);
-    }
-
-    if ((currentTimeInMs - earliestBirthday > thresholdAgeMilliseconds) && 
(nodeToEvict != null)) {
-      return evict(nodeToEvict);
-    }
-
-    LOGGER.trace("No buffer eligible for eviction");
-    // nothing can be evicted
-    return false;
-  }
-
-  private boolean evict(final ReadBuffer buf) {
-    // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList,
-    // avoid adding it to freeList.
-    if (buf.getBufferindex() != -1) {
-      freeList.push(buf.getBufferindex());
-    }
-
-    completedReadList.remove(buf);
-    buf.setTracingContext(null);
-    if (LOGGER.isTraceEnabled()) {
-      LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} 
length {}",
-          buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(), 
buf.getLength());
-    }
-    return true;
-  }
-
-  private boolean isAlreadyQueued(final AbfsInputStream stream, final long 
requestedOffset) {
-    // returns true if any part of the buffer is already queued
-    return (isInList(readAheadQueue, stream, requestedOffset)
-        || isInList(inProgressList, stream, requestedOffset)
-        || isInList(completedReadList, stream, requestedOffset));
-  }
-
-  private boolean isInList(final Collection<ReadBuffer> list, final 
AbfsInputStream stream, final long requestedOffset) {
-    return (getFromList(list, stream, requestedOffset) != null);
-  }
-
-  private ReadBuffer getFromList(final Collection<ReadBuffer> list, final 
AbfsInputStream stream, final long requestedOffset) {
-    for (ReadBuffer buffer : list) {
-      if (buffer.getStream() == stream) {
-        if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
-            && requestedOffset >= buffer.getOffset()
-            && requestedOffset < buffer.getOffset() + buffer.getLength()) {
-          return buffer;
-        } else if (requestedOffset >= buffer.getOffset()
-            && requestedOffset < buffer.getOffset() + 
buffer.getRequestedLength()) {
-          return buffer;
-        }
-      }
-    }
-    return null;
-  }
+  abstract void doneReading(ReadBuffer buffer,
+      ReadBufferStatus result,
+      int bytesActuallyRead);
 
   /**
-   * Returns buffers that failed or passed from completed queue.
-   * @param stream
-   * @param requestedOffset
-   * @return
+   * Purging the buffers associated with an {@link AbfsInputStream}
+   * from {@link ReadBufferManager} when stream is closed.
+   *
+   * @param stream the input stream whose buffers should be purged.
    */
-  private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, 
final long requestedOffset) {
-    for (ReadBuffer buffer : completedReadList) {
-      // Buffer is returned if the requestedOffset is at or above buffer's
-      // offset but less than buffer's length or the actual requestedLength
-      if ((buffer.getStream() == stream)
-          && (requestedOffset >= buffer.getOffset())
-          && ((requestedOffset < buffer.getOffset() + buffer.getLength())
-          || (requestedOffset < buffer.getOffset() + 
buffer.getRequestedLength()))) {
-          return buffer;
-        }
-      }
-
-    return null;
-  }
-
-  private void clearFromReadAheadQueue(final AbfsInputStream stream, final 
long requestedOffset) {
-    ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
-    if (buffer != null) {
-      readAheadQueue.remove(buffer);
-      notifyAll();   // lock is held in calling method
-      freeList.push(buffer.getBufferindex());
-    }
-  }
-
-  private int getBlockFromCompletedQueue(final AbfsInputStream stream, final 
long position, final int length,
-                                         final byte[] buffer) throws 
IOException {
-    ReadBuffer buf = getBufferFromCompletedQueue(stream, position);
-
-    if (buf == null) {
-      return 0;
-    }
+  abstract void purgeBuffersForStream(AbfsInputStream stream);
 
-    if (buf.getStatus() == ReadBufferStatus.READ_FAILED) {
-      // To prevent new read requests to fail due to old read-ahead attempts,
-      // return exception only from buffers that failed within last 
thresholdAgeMilliseconds
-      if ((currentTimeMillis() - (buf.getTimeStamp()) < 
thresholdAgeMilliseconds)) {
-        throw buf.getErrException();
-      } else {
-        return 0;
-      }
-    }
 
-    if ((buf.getStatus() != ReadBufferStatus.AVAILABLE)
-        || (position >= buf.getOffset() + buf.getLength())) {
-      return 0;
-    }
+  // Following Methods are for testing purposes only and should not be used in 
production code.
 
-    int cursor = (int) (position - buf.getOffset());
-    int availableLengthInBuffer = buf.getLength() - cursor;
-    int lengthToCopy = Math.min(length, availableLengthInBuffer);
-    System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
-    if (cursor == 0) {
-      buf.setFirstByteConsumed(true);
-    }
-    if (cursor + lengthToCopy == buf.getLength()) {
-      buf.setLastByteConsumed(true);
-    }
-    buf.setAnyByteConsumed(true);
-    return lengthToCopy;
-  }
-
-  /*
-   *
-   *  ReadBufferWorker-thread-facing methods
+  /**
+   * Gets the number of buffers currently managed by the read buffer manager.
    *
+   * @return the number of buffers
    */
+  @VisibleForTesting
+  abstract int getNumBuffers();
 
   /**
-   * ReadBufferWorker thread calls this to get the next buffer that it should 
work on.
-   *
-   * @return {@link ReadBuffer}
-   * @throws InterruptedException if thread is interrupted
+   * Attempts to evict buffers based on the eviction policy.
    */
-  ReadBuffer getNextBlockToRead() throws InterruptedException {
-    ReadBuffer buffer = null;
-    synchronized (this) {
-      //buffer = readAheadQueue.take();  // blocking method
-      while (readAheadQueue.size() == 0) {
-        wait();
-      }
-      buffer = readAheadQueue.remove();
-      notifyAll();
-      if (buffer == null) {
-        return null;            // should never happen
-      }
-      buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
-      inProgressList.add(buffer);
-    }
-    if (LOGGER.isTraceEnabled()) {
-      LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
-          buffer.getStream().getPath(), buffer.getOffset());
-    }
-    return buffer;
-  }
+  @VisibleForTesting
+  abstract void callTryEvict();
 
   /**
-   * ReadBufferWorker thread calls this method to post completion.
+   * Resets the read buffer manager for testing purposes. Clean up the current
+   * state of readAhead buffers and the lists. Will also trigger a fresh init.
+   */
+  @VisibleForTesting
+  abstract void testResetReadBufferManager();
+
+  /**
+   * Resets the read buffer manager for testing with the specified block size 
and threshold age.
    *
-   * @param buffer            the buffer whose read was completed
-   * @param result            the {@link ReadBufferStatus} after the read 
operation in the worker thread
-   * @param bytesActuallyRead the number of bytes that the worker thread was 
actually able to read
+   * @param readAheadBlockSize the block size for read-ahead
+   * @param thresholdAgeMilliseconds the threshold age in milliseconds
    */
-  void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, 
final int bytesActuallyRead) {
-    if (LOGGER.isTraceEnabled()) {
-      LOGGER.trace("ReadBufferWorker completed read file {} for offset {} 
outcome {} bytes {}",
-          buffer.getStream().getPath(),  buffer.getOffset(), result, 
bytesActuallyRead);
-    }
-    synchronized (this) {
-      // If this buffer has already been purged during
-      // close of InputStream then we don't update the lists.
-      if (inProgressList.contains(buffer)) {
-        inProgressList.remove(buffer);
-        if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
-          buffer.setStatus(ReadBufferStatus.AVAILABLE);
-          buffer.setLength(bytesActuallyRead);
-        } else {
-          freeList.push(buffer.getBufferindex());
-          // buffer will be deleted as per the eviction policy.
-        }
-        // completed list also contains FAILED read buffers
-        // for sending exception message to clients.
-        buffer.setStatus(result);
-        buffer.setTimeStamp(currentTimeMillis());
-        completedReadList.add(buffer);
-      }
-    }
+  @VisibleForTesting
+  abstract void testResetReadBufferManager(int readAheadBlockSize, int 
thresholdAgeMilliseconds);
 
-    //outside the synchronized, since anyone receiving a wake-up from the 
latch must see safe-published results
-    buffer.getLatch().countDown(); // wake up waiting threads (if any)
-  }
+  /**
+   * Resets the buffer manager instance to null for testing purposes.
+   * This allows for reinitialization in tests.
+   */
+  abstract void resetBufferManager();
 
   /**
-   * Similar to System.currentTimeMillis, except implemented with 
System.nanoTime().
-   * System.currentTimeMillis can go backwards when system clock is changed 
(e.g., with NTP time synchronization),
-   * making it unsuitable for measuring time intervals. nanotime is strictly 
monotonically increasing per CPU core.
-   * Note: it is not monotonic across Sockets, and even within a CPU, its only 
the
-   * more recent parts which share a clock across all cores.
+   * Gets the threshold age in milliseconds for buffer eviction.
    *
-   * @return current time in milliseconds
+   * @return the threshold age in milliseconds
    */
-  private long currentTimeMillis() {
-    return System.nanoTime() / 1000 / 1000;
-  }
-
   @VisibleForTesting
-  int getThresholdAgeMilliseconds() {
+  protected static int getThresholdAgeMilliseconds() {
     return thresholdAgeMilliseconds;
   }
 
+  /**
+   * Sets the threshold age in milliseconds for buffer eviction.
+   *
+   * @param thresholdAgeMs the threshold age in milliseconds
+   */
   @VisibleForTesting
-  static void setThresholdAgeMilliseconds(int thresholdAgeMs) {
+  protected static void setThresholdAgeMilliseconds(int thresholdAgeMs) {
     thresholdAgeMilliseconds = thresholdAgeMs;
   }
 
+  /**
+   * Gets the block size used for read-ahead operations.
+   *
+   * @return the read-ahead block size in bytes
+   */
   @VisibleForTesting
-  int getCompletedReadListSize() {
-    return completedReadList.size();
-  }
-
-  @VisibleForTesting
-  public synchronized List<ReadBuffer> getCompletedReadListCopy() {
-    return new ArrayList<>(completedReadList);
-  }
-
-  @VisibleForTesting
-  public synchronized List<Integer> getFreeListCopy() {
-    return new ArrayList<>(freeList);
+  protected static int getReadAheadBlockSize() {
+    return blockSize;
   }
 
+  /**
+   * Sets the block size used for read-ahead operations.
+   *
+   * @param readAheadBlockSize the read-ahead block size in bytes
+   */
   @VisibleForTesting
-  public synchronized List<ReadBuffer> getReadAheadQueueCopy() {
-    return new ArrayList<>(readAheadQueue);
+  protected static void setReadAheadBlockSize(int readAheadBlockSize) {
+    if (readAheadBlockSize <= 0) {
+      throw new IllegalArgumentException("Read-ahead block size must be 
positive");
+    }
+    blockSize = readAheadBlockSize;
   }
 
-  @VisibleForTesting
-  public synchronized List<ReadBuffer> getInProgressCopiedList() {
-    return new ArrayList<>(inProgressList);
+  /**
+   * Gets the stack of free buffer indices.
+   *
+   * @return the stack of free buffer indices
+   */
+  public Stack<Integer> getFreeList() {
+    return freeList;
   }
 
-  @VisibleForTesting
-  void callTryEvict() {
-    tryEvict();
+  /**
+   * Gets the queue of read-ahead requests.
+   *
+   * @return the queue of {@link ReadBuffer} objects in the read-ahead queue
+   */
+  public Queue<ReadBuffer> getReadAheadQueue() {
+    return readAheadQueue;
   }
 
-
   /**
-   * Purging the buffers associated with an {@link AbfsInputStream}
-   * from {@link ReadBufferManager} when stream is closed.
-   * @param stream input stream.
+   * Gets the list of in-progress read buffers.
+   *
+   * @return the list of {@link ReadBuffer} objects that are currently being 
processed
    */
-  public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
-    LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
-    readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
-    purgeList(stream, completedReadList);
+  public LinkedList<ReadBuffer> getInProgressList() {
+    return inProgressList;
   }
 
   /**
-   * Method to remove buffers associated with a {@link AbfsInputStream}
-   * when its close method is called.
-   * NOTE: This method is not threadsafe and must be called inside a
-   * synchronised block. See caller.
-   * @param stream associated input stream.
-   * @param list list of buffers like {@link this#completedReadList}
-   *             or {@link this#inProgressList}.
+   * Gets the list of completed read buffers.
+   *
+   * @return the list of {@link ReadBuffer} objects that have been read and 
are available for use
    */
-  private void purgeList(AbfsInputStream stream, LinkedList<ReadBuffer> list) {
-    for (Iterator<ReadBuffer> it = list.iterator(); it.hasNext();) {
-      ReadBuffer readBuffer = it.next();
-      if (readBuffer.getStream() == stream) {
-        it.remove();
-        // As failed ReadBuffers (bufferIndex = -1) are already pushed to free
-        // list in doneReading method, we will skip adding those here again.
-        if (readBuffer.getBufferindex() != -1) {
-          freeList.push(readBuffer.getBufferindex());
-        }
-      }
-    }
+  public LinkedList<ReadBuffer> getCompletedReadList() {
+    return completedReadList;
   }
 
+
   /**
-   * Test method that can clean up the current state of readAhead buffers and
-   * the lists. Will also trigger a fresh init.
+   * Gets a copy of the list of free buffer indices.
+   *
+   * @return a list of free buffer indices
    */
   @VisibleForTesting
-  void testResetReadBufferManager() {
-    synchronized (this) {
-      ArrayList<ReadBuffer> completedBuffers = new ArrayList<>();
-      for (ReadBuffer buf : completedReadList) {
-        if (buf != null) {
-          completedBuffers.add(buf);
-        }
-      }
-
-      for (ReadBuffer buf : completedBuffers) {
-        evict(buf);
-      }
-
-      readAheadQueue.clear();
-      inProgressList.clear();
-      completedReadList.clear();
-      freeList.clear();
-      for (int i = 0; i < NUM_BUFFERS; i++) {
-        buffers[i] = null;
-      }
-      buffers = null;
-      resetBufferManager();
-    }
+  protected synchronized List<Integer> getFreeListCopy() {
+    return new ArrayList<>(freeList);
   }
 
   /**
-   * Reset buffer manager to null.
+   * Gets a copy of the read-ahead queue.
+   *
+   * @return a list of {@link ReadBuffer} objects in the read-ahead queue
    */
   @VisibleForTesting
-  static void resetBufferManager() {
-    bufferManager = null;
+  protected synchronized List<ReadBuffer> getReadAheadQueueCopy() {
+    return new ArrayList<>(readAheadQueue);
   }
 
   /**
-   * Reset readAhead buffer to needed readAhead block size and
-   * thresholdAgeMilliseconds.
-   * @param readAheadBlockSize
-   * @param thresholdAgeMilliseconds
+   * Gets a copy of the list of in-progress read buffers.
+   *
+   * @return a list of in-progress {@link ReadBuffer} objects
    */
   @VisibleForTesting
-  void testResetReadBufferManager(int readAheadBlockSize, int 
thresholdAgeMilliseconds) {
-    setBlockSize(readAheadBlockSize);
-    setThresholdAgeMilliseconds(thresholdAgeMilliseconds);
-    testResetReadBufferManager();
+  protected synchronized List<ReadBuffer> getInProgressCopiedList() {
+    return new ArrayList<>(inProgressList);
   }
 
+  /**
+   * Gets a copy of the list of completed read buffers.
+   *
+   * @return a list of completed {@link ReadBuffer} objects
+   */
   @VisibleForTesting
-  static void setBlockSize(int readAheadBlockSize) {
-    blockSize = readAheadBlockSize;
+  protected synchronized List<ReadBuffer> getCompletedReadListCopy() {
+    return new ArrayList<>(completedReadList);
   }
 
+  /**
+   * Gets the size of the completed read list.
+   *
+   * @return the number of completed read buffers
+   */
   @VisibleForTesting
-  int getReadAheadBlockSize() {
-    return blockSize;
+  protected int getCompletedReadListSize() {
+    return completedReadList.size();
   }
 
   /**
-   * Test method that can mimic no free buffers scenario and also add a 
ReadBuffer
-   * into completedReadList. This readBuffer will get picked up by TryEvict()
-   * next time a new queue request comes in.
-   * @param buf that needs to be added to completedReadlist
+   * Simulates full buffer usage and adds a failed buffer for testing.
+   *
+   * @param buf the buffer to add as failed
    */
   @VisibleForTesting
-  void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) {
+  protected void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) {
     freeList.clear();
     completedReadList.add(buf);
   }
-
-  @VisibleForTesting
-  int getNumBuffers() {
-    return NUM_BUFFERS;
-  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
similarity index 63%
copy from 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
copy to 
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
index 031545f57a1..fe1ac3fa1f2 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
@@ -18,53 +18,60 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.Stack;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.classification.VisibleForTesting;
 
 /**
  * The Read Buffer Manager for Rest AbfsClient.
+ * V1 implementation of ReadBufferManager.
  */
-final class ReadBufferManager {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(ReadBufferManager.class);
-  private static final int ONE_KB = 1024;
-  private static final int ONE_MB = ONE_KB * ONE_KB;
+final class ReadBufferManagerV1 extends ReadBufferManager {
 
   private static final int NUM_BUFFERS = 16;
   private static final int NUM_THREADS = 8;
-  private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have 
to see if 3 seconds is a good threshold
+  private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000;
 
-  private static int blockSize = 4 * ONE_MB;
-  private static int thresholdAgeMilliseconds = 
DEFAULT_THRESHOLD_AGE_MILLISECONDS;
   private Thread[] threads = new Thread[NUM_THREADS];
-  private byte[][] buffers;    // array of byte[] buffers, to hold the data 
that is read
-  private Stack<Integer> freeList = new Stack<>();   // indices in buffers[] 
array that are available
+  private byte[][] buffers;
+  private static  ReadBufferManagerV1 bufferManager;
 
-  private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of 
requests that are not picked up by any worker thread yet
-  private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); // 
requests being processed by worker threads
-  private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); // 
buffers available for reading
-  private static ReadBufferManager bufferManager; // singleton, initialized in 
static initialization block
-  private static final ReentrantLock LOCK = new ReentrantLock();
+  // hide instance constructor
+  private ReadBufferManagerV1() {
+    LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
+  }
+
+  /**
+   * Sets the read buffer manager configurations.
+   * @param readAheadBlockSize the size of the read-ahead block in bytes
+   */
+  static void setReadBufferManagerConfigs(int readAheadBlockSize) {
+    if (bufferManager == null) {
+      LOGGER.debug(
+          "ReadBufferManagerV1 not initialized yet. Overriding 
readAheadBlockSize as {}",
+          readAheadBlockSize);
+      setReadAheadBlockSize(readAheadBlockSize);
+      setThresholdAgeMilliseconds(DEFAULT_THRESHOLD_AGE_MILLISECONDS);
+    }
+  }
 
-  static ReadBufferManager getBufferManager() {
+  /**
+   * Returns the singleton instance of ReadBufferManagerV1.
+   * @return the singleton instance of ReadBufferManagerV1
+   */
+  static ReadBufferManagerV1 getBufferManager() {
     if (bufferManager == null) {
       LOCK.lock();
       try {
         if (bufferManager == null) {
-          bufferManager = new ReadBufferManager();
+          bufferManager = new ReadBufferManagerV1();
           bufferManager.init();
         }
       } finally {
@@ -74,23 +81,18 @@ static ReadBufferManager getBufferManager() {
     return bufferManager;
   }
 
-  static void setReadBufferManagerConfigs(int readAheadBlockSize) {
-    if (bufferManager == null) {
-      LOGGER.debug(
-          "ReadBufferManager not initialized yet. Overriding 
readAheadBlockSize as {}",
-          readAheadBlockSize);
-      blockSize = readAheadBlockSize;
-    }
-  }
-
-  private void init() {
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  void init() {
     buffers = new byte[NUM_BUFFERS][];
     for (int i = 0; i < NUM_BUFFERS; i++) {
-      buffers[i] = new byte[blockSize];  // same buffers are reused. The byte 
array never goes back to GC
-      freeList.add(i);
+      buffers[i] = new byte[getReadAheadBlockSize()];  // same buffers are 
reused. These byte arrays are never garbage collected
+      getFreeList().add(i);
     }
     for (int i = 0; i < NUM_THREADS; i++) {
-      Thread t = new Thread(new ReadBufferWorker(i));
+      Thread t = new Thread(new ReadBufferWorker(i, this));
       t.setDaemon(true);
       threads[i] = t;
       t.setName("ABFS-prefetch-" + i);
@@ -99,28 +101,12 @@ private void init() {
     ReadBufferWorker.UNLEASH_WORKERS.countDown();
   }
 
-  // hide instance constructor
-  private ReadBufferManager() {
-    LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
-  }
-
-
-  /*
-   *
-   *  AbfsInputStream-facing methods
-   *
-   */
-
-
   /**
-   * {@link AbfsInputStream} calls this method to queue read-aheads.
-   *
-   * @param stream          The {@link AbfsInputStream} for which to do the 
read-ahead
-   * @param requestedOffset The offset in the file which shoukd be read
-   * @param requestedLength The length to read
+   * {@inheritDoc}
    */
-  void queueReadAhead(final AbfsInputStream stream, final long 
requestedOffset, final int requestedLength,
-                      TracingContext tracingContext) {
+  @Override
+  public void queueReadAhead(final AbfsInputStream stream, final long 
requestedOffset, final int requestedLength,
+      TracingContext tracingContext) {
     if (LOGGER.isTraceEnabled()) {
       LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
           stream.getPath(), requestedOffset, requestedLength);
@@ -130,7 +116,7 @@ void queueReadAhead(final AbfsInputStream stream, final 
long requestedOffset, fi
       if (isAlreadyQueued(stream, requestedOffset)) {
         return; // already queued, do not queue again
       }
-      if (freeList.isEmpty() && !tryEvict()) {
+      if (getFreeList().isEmpty() && !tryEvict()) {
         return; // no buffers available, cannot queue anything
       }
 
@@ -143,11 +129,11 @@ void queueReadAhead(final AbfsInputStream stream, final 
long requestedOffset, fi
       buffer.setLatch(new CountDownLatch(1));
       buffer.setTracingContext(tracingContext);
 
-      Integer bufferIndex = freeList.pop();  // will return a value, since we 
have checked size > 0 already
+      Integer bufferIndex = getFreeList().pop();  // will return a value, 
since we have checked size > 0 already
 
       buffer.setBuffer(buffers[bufferIndex]);
       buffer.setBufferindex(bufferIndex);
-      readAheadQueue.add(buffer);
+      getReadAheadQueue().add(buffer);
       notifyAll();
       if (LOGGER.isTraceEnabled()) {
         LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx 
{}",
@@ -156,22 +142,11 @@ void queueReadAhead(final AbfsInputStream stream, final 
long requestedOffset, fi
     }
   }
 
-
   /**
-   * {@link AbfsInputStream} calls this method read any bytes already 
available in a buffer (thereby saving a
-   * remote read). This returns the bytes if the data already exists in 
buffer. If there is a buffer that is reading
-   * the requested offset, then this method blocks until that read completes. 
If the data is queued in a read-ahead
-   * but not picked up by a worker thread yet, then it cancels that read-ahead 
and reports cache miss. This is because
-   * depending on worker thread availability, the read-ahead may take a while 
- the calling thread can do it's own
-   * read to get the data faster (copmared to the read waiting in queue for an 
indeterminate amount of time).
-   *
-   * @param stream   the file to read bytes for
-   * @param position the offset in the file to do a read for
-   * @param length   the length to read
-   * @param buffer   the buffer to read data into. Note that the buffer will 
be written into from offset 0.
-   * @return the number of bytes read
+   * {@inheritDoc}
    */
-  int getBlock(final AbfsInputStream stream, final long position, final int 
length, final byte[] buffer)
+  @Override
+  public int getBlock(final AbfsInputStream stream, final long position, final 
int length, final byte[] buffer)
       throws IOException {
     // not synchronized, so have to be careful with locking
     if (LOGGER.isTraceEnabled()) {
@@ -197,17 +172,87 @@ int getBlock(final AbfsInputStream stream, final long 
position, final int length
     return 0;
   }
 
-  /*
-   *
-   *  Internal methods
-   *
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ReadBuffer getNextBlockToRead() throws InterruptedException {
+    ReadBuffer buffer = null;
+    synchronized (this) {
+      while (getReadAheadQueue().isEmpty()) {
+        wait();
+      }
+      buffer = getReadAheadQueue().remove();
+      notifyAll();
+      if (buffer == null) {
+        return null;            // should never happen
+      }
+      buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
+      getInProgressList().add(buffer);
+    }
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
+          buffer.getStream().getPath(), buffer.getOffset());
+    }
+    return buffer;
+  }
+
+  /**
+   * {@inheritDoc}
    */
+  @Override
+  public void doneReading(final ReadBuffer buffer, final ReadBufferStatus 
result, final int bytesActuallyRead) {
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace("ReadBufferWorker completed read file {} for offset {} 
outcome {} bytes {}",
+          buffer.getStream().getPath(),  buffer.getOffset(), result, 
bytesActuallyRead);
+    }
+    synchronized (this) {
+      // If this buffer has already been purged during
+      // close of InputStream then we don't update the lists.
+      if (getInProgressList().contains(buffer)) {
+        getInProgressList().remove(buffer);
+        if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
+          buffer.setStatus(ReadBufferStatus.AVAILABLE);
+          buffer.setLength(bytesActuallyRead);
+        } else {
+          getFreeList().push(buffer.getBufferindex());
+          // buffer will be deleted as per the eviction policy.
+        }
+        // completed list also contains FAILED read buffers
+        // for sending exception message to clients.
+        buffer.setStatus(result);
+        buffer.setTimeStamp(currentTimeMillis());
+        getCompletedReadList().add(buffer);
+      }
+    }
 
+    //outside the synchronized, since anyone receiving a wake-up from the 
latch must see safe-published results
+    buffer.getLatch().countDown(); // wake up waiting threads (if any)
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
+    LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
+    getReadAheadQueue().removeIf(readBuffer -> readBuffer.getStream() == 
stream);
+    purgeList(stream, getCompletedReadList());
+  }
+
+  /**
+   * Waits for the process to complete for the given stream and position.
+   * If the buffer is in progress, it waits for the latch to be released.
+   * If the buffer is not in progress, it clears it from the read-ahead queue.
+   *
+   * @param stream    the AbfsInputStream associated with the read request
+   * @param position  the position in the stream to wait for
+   */
   private void waitForProcess(final AbfsInputStream stream, final long 
position) {
     ReadBuffer readBuf;
     synchronized (this) {
       clearFromReadAheadQueue(stream, position);
-      readBuf = getFromList(inProgressList, stream, position);
+      readBuf = getFromList(getInProgressList(), stream, position);
     }
     if (readBuf != null) {         // if in in-progress queue, then block for 
it
       try {
@@ -240,14 +285,14 @@ private void waitForProcess(final AbfsInputStream stream, 
final long position) {
    */
   private synchronized boolean tryEvict() {
     ReadBuffer nodeToEvict = null;
-    if (completedReadList.size() <= 0) {
+    if (getCompletedReadList().size() <= 0) {
       return false;  // there are no evict-able buffers
     }
 
     long currentTimeInMs = currentTimeMillis();
 
     // first, try buffers where all bytes have been consumed (approximated as 
first and last bytes consumed)
-    for (ReadBuffer buf : completedReadList) {
+    for (ReadBuffer buf : getCompletedReadList()) {
       if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
         nodeToEvict = buf;
         break;
@@ -257,8 +302,8 @@ private synchronized boolean tryEvict() {
       return evict(nodeToEvict);
     }
 
-    // next, try buffers where any bytes have been consumed (may be a bad 
idea? have to experiment and see)
-    for (ReadBuffer buf : completedReadList) {
+    // next, try buffers where any bytes have been consumed (maybe a bad idea? 
have to experiment and see)
+    for (ReadBuffer buf : getCompletedReadList()) {
       if (buf.isAnyByteConsumed()) {
         nodeToEvict = buf;
         break;
@@ -278,13 +323,13 @@ private synchronized boolean tryEvict() {
     // its buffer when its status was set to READ_FAILED.
     long earliestBirthday = Long.MAX_VALUE;
     ArrayList<ReadBuffer> oldFailedBuffers = new ArrayList<>();
-    for (ReadBuffer buf : completedReadList) {
+    for (ReadBuffer buf : getCompletedReadList()) {
       if ((buf.getBufferindex() != -1)
           && (buf.getTimeStamp() < earliestBirthday)) {
         nodeToEvict = buf;
         earliestBirthday = buf.getTimeStamp();
       } else if ((buf.getBufferindex() == -1)
-          && (currentTimeInMs - buf.getTimeStamp()) > 
thresholdAgeMilliseconds) {
+          && (currentTimeInMs - buf.getTimeStamp()) > 
getThresholdAgeMilliseconds()) {
         oldFailedBuffers.add(buf);
       }
     }
@@ -293,7 +338,7 @@ private synchronized boolean tryEvict() {
       evict(buf);
     }
 
-    if ((currentTimeInMs - earliestBirthday > thresholdAgeMilliseconds) && 
(nodeToEvict != null)) {
+    if ((currentTimeInMs - earliestBirthday > getThresholdAgeMilliseconds()) 
&& (nodeToEvict != null)) {
       return evict(nodeToEvict);
     }
 
@@ -302,14 +347,20 @@ private synchronized boolean tryEvict() {
     return false;
   }
 
+  /**
+   * Evicts the given buffer by removing it from the completedReadList and 
adding its index to the freeList.
+   *
+   * @param buf the ReadBuffer to evict
+   * @return true if eviction was successful, false otherwise
+   */
   private boolean evict(final ReadBuffer buf) {
     // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList,
     // avoid adding it to freeList.
     if (buf.getBufferindex() != -1) {
-      freeList.push(buf.getBufferindex());
+      getFreeList().push(buf.getBufferindex());
     }
 
-    completedReadList.remove(buf);
+    getCompletedReadList().remove(buf);
     buf.setTracingContext(null);
     if (LOGGER.isTraceEnabled()) {
       LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} 
length {}",
@@ -318,17 +369,38 @@ private boolean evict(final ReadBuffer buf) {
     return true;
   }
 
+  /**
+   * Checks if the requested offset is already queued in any of the lists:
+   * @param stream the AbfsInputStream associated with the read request
+   * @param requestedOffset the offset in the stream to check
+   * @return true if the requested offset is already queued in any of the 
lists,
+   */
   private boolean isAlreadyQueued(final AbfsInputStream stream, final long 
requestedOffset) {
     // returns true if any part of the buffer is already queued
-    return (isInList(readAheadQueue, stream, requestedOffset)
-        || isInList(inProgressList, stream, requestedOffset)
-        || isInList(completedReadList, stream, requestedOffset));
+    return (isInList(getReadAheadQueue(), stream, requestedOffset)
+        || isInList(getInProgressList(), stream, requestedOffset)
+        || isInList(getCompletedReadList(), stream, requestedOffset));
   }
 
+  /**
+   * Checks if the requested offset is in the given list.
+   * @param list the collection of ReadBuffer to check against
+   * @param stream the AbfsInputStream associated with the read request
+   * @param requestedOffset the offset in the stream to check
+   * @return true if the requested offset is in the list,
+   */
   private boolean isInList(final Collection<ReadBuffer> list, final 
AbfsInputStream stream, final long requestedOffset) {
     return (getFromList(list, stream, requestedOffset) != null);
   }
 
+  /**
+   * Returns the ReadBuffer from the given list that matches the stream and 
requested offset.
+   * If the buffer is found, it checks if the requested offset is within the 
buffer's range.
+   * @param list the collection of ReadBuffer to search in
+   * @param stream the AbfsInputStream associated with the read request
+   * @param requestedOffset the offset in the stream to check
+   * @return the ReadBuffer if found, null otherwise
+   */
   private ReadBuffer getFromList(final Collection<ReadBuffer> list, final 
AbfsInputStream stream, final long requestedOffset) {
     for (ReadBuffer buffer : list) {
       if (buffer.getStream() == stream) {
@@ -346,37 +418,60 @@ private ReadBuffer getFromList(final 
Collection<ReadBuffer> list, final AbfsInpu
   }
 
   /**
-   * Returns buffers that failed or passed from completed queue.
-   * @param stream
-   * @param requestedOffset
-   * @return
+   * Returns a ReadBuffer from the completedReadList that matches the stream 
and requested offset.
+   * The buffer is returned if the requestedOffset is at or above buffer's 
offset but less than buffer's length
+   * or the actual requestedLength.
+   *
+   * @param stream the AbfsInputStream associated with the read request
+   * @param requestedOffset the offset in the stream to check
+   * @return the ReadBuffer if found, null otherwise
    */
   private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, 
final long requestedOffset) {
-    for (ReadBuffer buffer : completedReadList) {
+    for (ReadBuffer buffer : getCompletedReadList()) {
       // Buffer is returned if the requestedOffset is at or above buffer's
       // offset but less than buffer's length or the actual requestedLength
       if ((buffer.getStream() == stream)
           && (requestedOffset >= buffer.getOffset())
           && ((requestedOffset < buffer.getOffset() + buffer.getLength())
           || (requestedOffset < buffer.getOffset() + 
buffer.getRequestedLength()))) {
-          return buffer;
-        }
+        return buffer;
       }
+    }
 
     return null;
   }
 
+  /**
+   * Clears the buffer from the read-ahead queue for the given stream and 
requested offset.
+   * This method is called when the stream is waiting for a process to 
complete.
+   * It removes the buffer from the read-ahead queue and adds its index back 
to the free list.
+   *
+   * @param stream the AbfsInputStream associated with the read request
+   * @param requestedOffset the offset in the stream to check
+   */
   private void clearFromReadAheadQueue(final AbfsInputStream stream, final 
long requestedOffset) {
-    ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
+    ReadBuffer buffer = getFromList(getReadAheadQueue(), stream, 
requestedOffset);
     if (buffer != null) {
-      readAheadQueue.remove(buffer);
+      getReadAheadQueue().remove(buffer);
       notifyAll();   // lock is held in calling method
-      freeList.push(buffer.getBufferindex());
+      getFreeList().push(buffer.getBufferindex());
     }
   }
 
+  /**
+   * Gets a block of data from the completed read buffers.
+   * If the buffer is found, it copies the data to the provided buffer and 
updates the status of the ReadBuffer.
+   * If the buffer is not found or not available, it returns 0.
+   *
+   * @param stream the AbfsInputStream associated with the read request
+   * @param position the position in the file to read from
+   * @param length the number of bytes to read
+   * @param buffer the buffer to store the read data
+   * @return the number of bytes actually read
+   * @throws IOException if an I/O error occurs while reading from the buffer
+   */
   private int getBlockFromCompletedQueue(final AbfsInputStream stream, final 
long position, final int length,
-                                         final byte[] buffer) throws 
IOException {
+      final byte[] buffer) throws IOException {
     ReadBuffer buf = getBufferFromCompletedQueue(stream, position);
 
     if (buf == null) {
@@ -386,7 +481,7 @@ private int getBlockFromCompletedQueue(final 
AbfsInputStream stream, final long
     if (buf.getStatus() == ReadBufferStatus.READ_FAILED) {
       // To prevent new read requests to fail due to old read-ahead attempts,
       // return exception only from buffers that failed within last 
thresholdAgeMilliseconds
-      if ((currentTimeMillis() - (buf.getTimeStamp()) < 
thresholdAgeMilliseconds)) {
+      if ((currentTimeMillis() - (buf.getTimeStamp()) < 
getThresholdAgeMilliseconds())) {
         throw buf.getErrException();
       } else {
         return 0;
@@ -412,76 +507,6 @@ private int getBlockFromCompletedQueue(final 
AbfsInputStream stream, final long
     return lengthToCopy;
   }
 
-  /*
-   *
-   *  ReadBufferWorker-thread-facing methods
-   *
-   */
-
-  /**
-   * ReadBufferWorker thread calls this to get the next buffer that it should 
work on.
-   *
-   * @return {@link ReadBuffer}
-   * @throws InterruptedException if thread is interrupted
-   */
-  ReadBuffer getNextBlockToRead() throws InterruptedException {
-    ReadBuffer buffer = null;
-    synchronized (this) {
-      //buffer = readAheadQueue.take();  // blocking method
-      while (readAheadQueue.size() == 0) {
-        wait();
-      }
-      buffer = readAheadQueue.remove();
-      notifyAll();
-      if (buffer == null) {
-        return null;            // should never happen
-      }
-      buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
-      inProgressList.add(buffer);
-    }
-    if (LOGGER.isTraceEnabled()) {
-      LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
-          buffer.getStream().getPath(), buffer.getOffset());
-    }
-    return buffer;
-  }
-
-  /**
-   * ReadBufferWorker thread calls this method to post completion.
-   *
-   * @param buffer            the buffer whose read was completed
-   * @param result            the {@link ReadBufferStatus} after the read 
operation in the worker thread
-   * @param bytesActuallyRead the number of bytes that the worker thread was 
actually able to read
-   */
-  void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, 
final int bytesActuallyRead) {
-    if (LOGGER.isTraceEnabled()) {
-      LOGGER.trace("ReadBufferWorker completed read file {} for offset {} 
outcome {} bytes {}",
-          buffer.getStream().getPath(),  buffer.getOffset(), result, 
bytesActuallyRead);
-    }
-    synchronized (this) {
-      // If this buffer has already been purged during
-      // close of InputStream then we don't update the lists.
-      if (inProgressList.contains(buffer)) {
-        inProgressList.remove(buffer);
-        if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
-          buffer.setStatus(ReadBufferStatus.AVAILABLE);
-          buffer.setLength(bytesActuallyRead);
-        } else {
-          freeList.push(buffer.getBufferindex());
-          // buffer will be deleted as per the eviction policy.
-        }
-        // completed list also contains FAILED read buffers
-        // for sending exception message to clients.
-        buffer.setStatus(result);
-        buffer.setTimeStamp(currentTimeMillis());
-        completedReadList.add(buffer);
-      }
-    }
-
-    //outside the synchronized, since anyone receiving a wake-up from the 
latch must see safe-published results
-    buffer.getLatch().countDown(); // wake up waiting threads (if any)
-  }
-
   /**
    * Similar to System.currentTimeMillis, except implemented with 
System.nanoTime().
    * System.currentTimeMillis can go backwards when system clock is changed 
(e.g., with NTP time synchronization),
@@ -495,66 +520,13 @@ private long currentTimeMillis() {
     return System.nanoTime() / 1000 / 1000;
   }
 
-  @VisibleForTesting
-  int getThresholdAgeMilliseconds() {
-    return thresholdAgeMilliseconds;
-  }
-
-  @VisibleForTesting
-  static void setThresholdAgeMilliseconds(int thresholdAgeMs) {
-    thresholdAgeMilliseconds = thresholdAgeMs;
-  }
-
-  @VisibleForTesting
-  int getCompletedReadListSize() {
-    return completedReadList.size();
-  }
-
-  @VisibleForTesting
-  public synchronized List<ReadBuffer> getCompletedReadListCopy() {
-    return new ArrayList<>(completedReadList);
-  }
-
-  @VisibleForTesting
-  public synchronized List<Integer> getFreeListCopy() {
-    return new ArrayList<>(freeList);
-  }
-
-  @VisibleForTesting
-  public synchronized List<ReadBuffer> getReadAheadQueueCopy() {
-    return new ArrayList<>(readAheadQueue);
-  }
-
-  @VisibleForTesting
-  public synchronized List<ReadBuffer> getInProgressCopiedList() {
-    return new ArrayList<>(inProgressList);
-  }
-
-  @VisibleForTesting
-  void callTryEvict() {
-    tryEvict();
-  }
-
-
-  /**
-   * Purging the buffers associated with an {@link AbfsInputStream}
-   * from {@link ReadBufferManager} when stream is closed.
-   * @param stream input stream.
-   */
-  public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
-    LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
-    readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
-    purgeList(stream, completedReadList);
-  }
-
   /**
    * Method to remove buffers associated with a {@link AbfsInputStream}
    * when its close method is called.
    * NOTE: This method is not threadsafe and must be called inside a
    * synchronised block. See caller.
    * @param stream associated input stream.
-   * @param list list of buffers like {@link this#completedReadList}
-   *             or {@link this#inProgressList}.
+   * @param list list of buffers like completedReadList or inProgressList
    */
   private void purgeList(AbfsInputStream stream, LinkedList<ReadBuffer> list) {
     for (Iterator<ReadBuffer> it = list.iterator(); it.hasNext();) {
@@ -564,21 +536,39 @@ private void purgeList(AbfsInputStream stream, 
LinkedList<ReadBuffer> list) {
         // As failed ReadBuffers (bufferIndex = -1) are already pushed to free
         // list in doneReading method, we will skip adding those here again.
         if (readBuffer.getBufferindex() != -1) {
-          freeList.push(readBuffer.getBufferindex());
+          getFreeList().push(readBuffer.getBufferindex());
         }
       }
     }
   }
 
   /**
-   * Test method that can clean up the current state of readAhead buffers and
-   * the lists. Will also trigger a fresh init.
+   * {@inheritDoc}
    */
   @VisibleForTesting
-  void testResetReadBufferManager() {
+  @Override
+  public int getNumBuffers() {
+    return NUM_BUFFERS;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @VisibleForTesting
+  @Override
+  public void callTryEvict() {
+    tryEvict();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @VisibleForTesting
+  @Override
+  public void testResetReadBufferManager() {
     synchronized (this) {
       ArrayList<ReadBuffer> completedBuffers = new ArrayList<>();
-      for (ReadBuffer buf : completedReadList) {
+      for (ReadBuffer buf : getCompletedReadList()) {
         if (buf != null) {
           completedBuffers.add(buf);
         }
@@ -588,10 +578,10 @@ void testResetReadBufferManager() {
         evict(buf);
       }
 
-      readAheadQueue.clear();
-      inProgressList.clear();
-      completedReadList.clear();
-      freeList.clear();
+      getReadAheadQueue().clear();
+      getInProgressList().clear();
+      getCompletedReadList().clear();
+      getFreeList().clear();
       for (int i = 0; i < NUM_BUFFERS; i++) {
         buffers[i] = null;
       }
@@ -601,50 +591,22 @@ void testResetReadBufferManager() {
   }
 
   /**
-   * Reset buffer manager to null.
+   * {@inheritDoc}
    */
   @VisibleForTesting
-  static void resetBufferManager() {
-    bufferManager = null;
-  }
-
-  /**
-   * Reset readAhead buffer to needed readAhead block size and
-   * thresholdAgeMilliseconds.
-   * @param readAheadBlockSize
-   * @param thresholdAgeMilliseconds
-   */
-  @VisibleForTesting
-  void testResetReadBufferManager(int readAheadBlockSize, int 
thresholdAgeMilliseconds) {
-    setBlockSize(readAheadBlockSize);
+  @Override
+  public void testResetReadBufferManager(int readAheadBlockSize, int 
thresholdAgeMilliseconds) {
+    setReadAheadBlockSize(readAheadBlockSize);
     setThresholdAgeMilliseconds(thresholdAgeMilliseconds);
     testResetReadBufferManager();
   }
 
-  @VisibleForTesting
-  static void setBlockSize(int readAheadBlockSize) {
-    blockSize = readAheadBlockSize;
+  @Override
+  void resetBufferManager() {
+    setBufferManager(null); // reset the singleton instance
   }
 
-  @VisibleForTesting
-  int getReadAheadBlockSize() {
-    return blockSize;
-  }
-
-  /**
-   * Test method that can mimic no free buffers scenario and also add a 
ReadBuffer
-   * into completedReadList. This readBuffer will get picked up by TryEvict()
-   * next time a new queue request comes in.
-   * @param buf that needs to be added to completedReadlist
-   */
-  @VisibleForTesting
-  void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) {
-    freeList.clear();
-    completedReadList.add(buf);
-  }
-
-  @VisibleForTesting
-  int getNumBuffers() {
-    return NUM_BUFFERS;
+  private static void setBufferManager(ReadBufferManagerV1 manager) {
+    bufferManager = manager;
   }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
new file mode 100644
index 00000000000..9cce860127d
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+final class ReadBufferManagerV2 extends ReadBufferManager {
+
+  // Thread Pool Configurations
+  private static int minThreadPoolSize;
+  private static int maxThreadPoolSize;
+  private static int executorServiceKeepAliveTimeInMilliSec;
+  private ThreadPoolExecutor workerPool;
+
+  // Buffer Pool Configurations
+  private static int minBufferPoolSize;
+  private static int maxBufferPoolSize;
+  private int numberOfActiveBuffers = 0;
+  private byte[][] bufferPool;
+
+  private static ReadBufferManagerV2 bufferManager;
+
+  // hide instance constructor
+  private ReadBufferManagerV2() {
+    LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
+  }
+
+  /**
+   * Sets the read buffer manager configurations.
+   * @param readAheadBlockSize the size of the read-ahead block in bytes
+   * @param abfsConfiguration the AbfsConfiguration instance for other 
configurations
+   */
+  static void setReadBufferManagerConfigs(int readAheadBlockSize, 
AbfsConfiguration abfsConfiguration) {
+    if (bufferManager == null) {
+      minThreadPoolSize = abfsConfiguration.getMinReadAheadV2ThreadPoolSize();
+      maxThreadPoolSize = abfsConfiguration.getMaxReadAheadV2ThreadPoolSize();
+      executorServiceKeepAliveTimeInMilliSec = 
abfsConfiguration.getReadAheadExecutorServiceTTLInMillis();
+
+      minBufferPoolSize = abfsConfiguration.getMinReadAheadV2BufferPoolSize();
+      maxBufferPoolSize = abfsConfiguration.getMaxReadAheadV2BufferPoolSize();
+      
setThresholdAgeMilliseconds(abfsConfiguration.getReadAheadV2CachedBufferTTLMillis());
+      setReadAheadBlockSize(readAheadBlockSize);
+    }
+  }
+
+  /**
+   * Returns the singleton instance of ReadBufferManagerV2.
+   * @return the singleton instance of ReadBufferManagerV2
+   */
+  static ReadBufferManagerV2 getBufferManager() {
+    if (bufferManager == null) {
+      LOCK.lock();
+      try {
+        if (bufferManager == null) {
+          bufferManager = new ReadBufferManagerV2();
+          bufferManager.init();
+        }
+      } finally {
+        LOCK.unlock();
+      }
+    }
+    return bufferManager;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  void init() {
+    // Initialize Buffer Pool
+    bufferPool = new byte[maxBufferPoolSize][];
+    for (int i = 0; i < minBufferPoolSize; i++) {
+      bufferPool[i] = new byte[getReadAheadBlockSize()];  // same buffers are 
reused. These byte arrays are never garbage collected
+      getFreeList().add(i);
+      numberOfActiveBuffers++;
+    }
+
+    // Initialize a Fixed Size Thread Pool with minThreadPoolSize threads
+    workerPool = new ThreadPoolExecutor(
+        minThreadPoolSize,
+        maxThreadPoolSize,
+        executorServiceKeepAliveTimeInMilliSec,
+        TimeUnit.MILLISECONDS,
+        new SynchronousQueue<>(),
+        namedThreadFactory);
+    workerPool.allowCoreThreadTimeOut(true);
+    for (int i = 0; i < minThreadPoolSize; i++) {
+      ReadBufferWorker worker = new ReadBufferWorker(i, this);
+      workerPool.submit(worker);
+    }
+    ReadBufferWorker.UNLEASH_WORKERS.countDown();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void queueReadAhead(final AbfsInputStream stream,
+      final long requestedOffset,
+      final int requestedLength,
+      final TracingContext tracingContext) {
+    // TODO: To be implemented
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public int getBlock(final AbfsInputStream stream,
+      final long position,
+      final int length,
+      final byte[] buffer) throws IOException {
+    // TODO: To be implemented
+    return 0;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ReadBuffer getNextBlockToRead() throws InterruptedException {
+    // TODO: To be implemented
+    return null;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void doneReading(final ReadBuffer buffer,
+      final ReadBufferStatus result,
+      final int bytesActuallyRead) {
+    // TODO: To be implemented
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void purgeBuffersForStream(final AbfsInputStream stream) {
+    // TODO: To be implemented
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @VisibleForTesting
+  @Override
+  public int getNumBuffers() {
+    return numberOfActiveBuffers;
+  }
+  /**
+   * {@inheritDoc}
+   */
+  @VisibleForTesting
+  @Override
+  public void callTryEvict() {
+    // TODO: To be implemented
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @VisibleForTesting
+  @Override
+  public void testResetReadBufferManager() {
+    // TODO: To be implemented
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @VisibleForTesting
+  @Override
+  public void testResetReadBufferManager(final int readAheadBlockSize,
+      final int thresholdAgeMilliseconds) {
+    // TODO: To be implemented
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void testMimicFullUseAndAddFailedBuffer(final ReadBuffer buf) {
+    // TODO: To be implemented
+  }
+
+  private final ThreadFactory namedThreadFactory = new ThreadFactory() {
+    private int count = 0;
+    @Override
+    public Thread newThread(Runnable r) {
+      return new Thread(r, "ReadAheadV2-Thread-" + count++);
+    }
+  };
+
+  @Override
+  void resetBufferManager() {
+    setBufferManager(null); // reset the singleton instance
+  }
+
+  private static void setBufferManager(ReadBufferManagerV2 manager) {
+    bufferManager = manager;
+  }
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
index a30f06261ef..79d5eef955a 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
@@ -28,9 +28,11 @@ class ReadBufferWorker implements Runnable {
 
   protected static final CountDownLatch UNLEASH_WORKERS = new 
CountDownLatch(1);
   private int id;
+  private ReadBufferManager bufferManager;
 
-  ReadBufferWorker(final int id) {
+  ReadBufferWorker(final int id, final ReadBufferManager bufferManager) {
     this.id = id;
+    this.bufferManager = bufferManager;
   }
 
   /**
@@ -51,7 +53,6 @@ public void run() {
     } catch (InterruptedException ex) {
       Thread.currentThread().interrupt();
     }
-    ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
     ReadBuffer buffer;
     while (true) {
       try {
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java
index dd32643063a..78cd6bd9d6a 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java
@@ -26,7 +26,6 @@
 import org.assertj.core.api.Assertions;
 import org.mockito.Mockito;
 
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
 import 
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
@@ -43,8 +42,11 @@
 import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
 import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
 import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.accountProperty;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DFS_DOMAIN_NAME;
 import static 
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ACCOUNT_KEY;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -139,7 +141,7 @@ private AzureBlobFileSystem getNewFSWithHnsConf(
         this.getAccountName()), isNamespaceEnabledAccount);
     rawConfig
         .setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION, 
true);
-    rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+    rawConfig.set(FS_DEFAULT_NAME_KEY,
         getNonExistingUrl());
     return (AzureBlobFileSystem) FileSystem.get(rawConfig);
   }
@@ -315,29 +317,29 @@ public void testAccountSpecificConfig() throws Exception {
     rawConfig.set(accountProperty(FS_AZURE_ACCOUNT_KEY, testAccountName), 
dummyAcountKey);
     rawConfig.set(accountProperty(FS_AZURE_ACCOUNT_KEY, otherAccountName), 
dummyAcountKey);
     // Assert that account specific config takes precedence
-    rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, 
defaultUri);
+    rawConfig.set(FS_DEFAULT_NAME_KEY, defaultUri);
     assertFileSystemInitWithExpectedHNSSettings(rawConfig, false);
     // Assert that other account still uses account agnostic config
-    rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, otherUri);
+    rawConfig.set(FS_DEFAULT_NAME_KEY, otherUri);
     assertFileSystemInitWithExpectedHNSSettings(rawConfig, true);
 
     // Set only the account specific config for test account
     rawConfig.set(accountProperty(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, 
testAccountName), FALSE_STR);
     rawConfig.unset(FS_AZURE_ACCOUNT_IS_HNS_ENABLED);
     // Assert that only account specific config is enough for test account
-    rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, 
defaultUri);
+    rawConfig.set(FS_DEFAULT_NAME_KEY, defaultUri);
     assertFileSystemInitWithExpectedHNSSettings(rawConfig, false);
 
     // Set only account agnostic config
     rawConfig.set(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, FALSE_STR);
     rawConfig.unset(accountProperty(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, 
testAccountName));
-    rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, 
defaultUri);
+    rawConfig.set(FS_DEFAULT_NAME_KEY, defaultUri);
     assertFileSystemInitWithExpectedHNSSettings(rawConfig, false);
 
     // Unset both account specific and account agnostic config
     rawConfig.unset(FS_AZURE_ACCOUNT_IS_HNS_ENABLED);
     rawConfig.unset(accountProperty(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, 
testAccountName));
-    rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, 
defaultUri);
+    rawConfig.set(FS_DEFAULT_NAME_KEY, defaultUri);
     rawConfig.set(AZURE_MAX_IO_RETRIES, "0");
     // Assert that file system init fails with UnknownHost exception as 
getAcl() is needed.
     try {
@@ -471,10 +473,10 @@ private Configuration getConfigurationWithoutHnsConfig() {
     rawConfig.unset(FS_AZURE_ACCOUNT_IS_HNS_ENABLED);
     rawConfig.unset(accountProperty(FS_AZURE_ACCOUNT_IS_HNS_ENABLED,
         this.getAccountName()));
-    String testAccountName = "testAccount.dfs.core.windows.net";
-    String defaultUri = this.getTestUrl().replace(this.getAccountName(), 
testAccountName);
+    String defaultUri = getRawConfiguration().get(FS_DEFAULT_NAME_KEY).
+        replace(ABFS_BLOB_DOMAIN_NAME, ABFS_DFS_DOMAIN_NAME);
     // Assert that account specific config takes precedence
-    rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, 
defaultUri);
+    rawConfig.set(FS_DEFAULT_NAME_KEY, defaultUri);
     return rawConfig;
   }
 
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
index a57430fa808..b70f36de318 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
@@ -67,7 +67,7 @@ public ITestReadBufferManager() throws Exception {
 
     @Test
     public void testPurgeBufferManagerForParallelStreams() throws Exception {
-        describe("Testing purging of buffers from ReadBufferManager for "
+        describe("Testing purging of buffers from ReadBufferManagerV1 for "
                 + "parallel input streams");
         final int numBuffers = 16;
         final LinkedList<Integer> freeList = new LinkedList<>();
@@ -99,7 +99,7 @@ public void testPurgeBufferManagerForParallelStreams() throws 
Exception {
             executorService.awaitTermination(1, TimeUnit.MINUTES);
         }
 
-        ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
+        ReadBufferManagerV1 bufferManager = 
ReadBufferManagerV1.getBufferManager();
         // readahead queue is empty
         assertListEmpty("ReadAheadQueue", 
bufferManager.getReadAheadQueueCopy());
         // verify the in progress list eventually empties out.
@@ -115,7 +115,7 @@ private void assertListEmpty(String listName, 
List<ReadBuffer> list) {
 
     @Test
     public void testPurgeBufferManagerForSequentialStream() throws Exception {
-        describe("Testing purging of buffers in ReadBufferManager for "
+        describe("Testing purging of buffers in ReadBufferManagerV1 for "
                 + "sequential input streams");
         AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
         final String fileName = methodName.getMethodName();
@@ -131,7 +131,7 @@ public void testPurgeBufferManagerForSequentialStream() 
throws Exception {
         } finally {
             IOUtils.closeStream(iStream1);
         }
-        ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
+        ReadBufferManagerV1 bufferManager = 
ReadBufferManagerV1.getBufferManager();
         AbfsInputStream iStream2 = null;
         try {
             iStream2 = (AbfsInputStream) 
fs.open(testFilePath).getWrappedStream();
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
index e4ed9881ffa..de49da5dc51 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
@@ -89,7 +89,7 @@ public class TestAbfsInputStream extends
   @Override
   public void teardown() throws Exception {
     super.teardown();
-    ReadBufferManager.getBufferManager().testResetReadBufferManager();
+    getBufferManager().testResetReadBufferManager();
   }
 
   private AbfsRestOperation getMockRestOp() {
@@ -164,12 +164,12 @@ public AbfsInputStream getAbfsInputStream(AbfsClient 
abfsClient,
 
   private void queueReadAheads(AbfsInputStream inputStream) {
     // Mimic AbfsInputStream readAhead queue requests
-    ReadBufferManager.getBufferManager()
+    getBufferManager()
         .queueReadAhead(inputStream, 0, ONE_KB, 
inputStream.getTracingContext());
-    ReadBufferManager.getBufferManager()
+    getBufferManager()
         .queueReadAhead(inputStream, ONE_KB, ONE_KB,
             inputStream.getTracingContext());
-    ReadBufferManager.getBufferManager()
+    getBufferManager()
         .queueReadAhead(inputStream, TWO_KB, TWO_KB,
             inputStream.getTracingContext());
   }
@@ -187,15 +187,15 @@ private void verifyReadCallCount(AbfsClient client, int 
count)
   private void checkEvictedStatus(AbfsInputStream inputStream, int position, 
boolean expectedToThrowException)
       throws Exception {
     // Sleep for the eviction threshold time
-    
Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds() 
+ 1000);
+    Thread.sleep(getBufferManager().getThresholdAgeMilliseconds() + 1000);
 
     // Eviction is done only when AbfsInputStream tries to queue new items.
     // 1 tryEvict will remove 1 eligible item. To ensure that the current test 
buffer
     // will get evicted (considering there could be other tests running in 
parallel),
     // call tryEvict for the number of items that are there in 
completedReadList.
-    int numOfCompletedReadListItems = 
ReadBufferManager.getBufferManager().getCompletedReadListSize();
+    int numOfCompletedReadListItems = 
getBufferManager().getCompletedReadListSize();
     while (numOfCompletedReadListItems > 0) {
-      ReadBufferManager.getBufferManager().callTryEvict();
+      getBufferManager().callTryEvict();
       numOfCompletedReadListItems--;
     }
 
@@ -210,7 +210,7 @@ private void checkEvictedStatus(AbfsInputStream 
inputStream, int position, boole
   public TestAbfsInputStream() throws Exception {
     super();
     // Reduce thresholdAgeMilliseconds to 3 sec for the tests
-    
ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD);
+    
getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD);
   }
 
   private void writeBufferToNewFile(Path testFile, byte[] buffer) throws 
IOException {
@@ -364,7 +364,7 @@ public void testFailedReadAhead() throws Exception {
   public void testFailedReadAheadEviction() throws Exception {
     AbfsClient client = getMockAbfsClient();
     AbfsRestOperation successOp = getMockRestOp();
-    
ReadBufferManager.setThresholdAgeMilliseconds(INCREASED_READ_BUFFER_AGE_THRESHOLD);
+    
getBufferManager().setThresholdAgeMilliseconds(INCREASED_READ_BUFFER_AGE_THRESHOLD);
     // Stub :
     // Read request leads to 3 readahead calls: Fail all 3 
readahead-client.read()
     // Actual read request fails with the failure in readahead thread
@@ -379,7 +379,8 @@ public void testFailedReadAheadEviction() throws Exception {
     // Add a failed buffer to completed queue and set to no free buffers to 
read ahead.
     ReadBuffer buff = new ReadBuffer();
     buff.setStatus(ReadBufferStatus.READ_FAILED);
-    
ReadBufferManager.getBufferManager().testMimicFullUseAndAddFailedBuffer(buff);
+    buff.setStream(inputStream);
+    getBufferManager().testMimicFullUseAndAddFailedBuffer(buff);
 
     // if read failed buffer eviction is tagged as a valid eviction, it will 
lead to
     // wrong assumption of queue logic that a buffer is freed up and can lead 
to :
@@ -387,7 +388,7 @@ public void testFailedReadAheadEviction() throws Exception {
     // at java.util.Stack.peek(Stack.java:102)
     // at java.util.Stack.pop(Stack.java:84)
     // at 
org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.queueReadAhead
-    ReadBufferManager.getBufferManager().queueReadAhead(inputStream, 0, ONE_KB,
+    getBufferManager().queueReadAhead(inputStream, 0, ONE_KB,
         getTestTracingContext(getFileSystem(), true));
   }
 
@@ -429,7 +430,7 @@ public void testOlderReadAheadFailure() throws Exception {
     verifyReadCallCount(client, 3);
 
     // Sleep for thresholdAgeMs so that the read ahead buffer qualifies for 
being old.
-    
Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds());
+    Thread.sleep(getBufferManager().getThresholdAgeMilliseconds());
 
     // Second read request should retry the read (and not issue any new 
readaheads)
     inputStream.read(ONE_KB, new byte[ONE_KB], 0, ONE_KB);
@@ -474,7 +475,7 @@ public void testSuccessfulReadAhead() throws Exception {
             any(String.class), any(), any(TracingContext.class));
 
     AbfsInputStream inputStream = getAbfsInputStream(client, 
"testSuccessfulReadAhead.txt");
-    int beforeReadCompletedListSize = 
ReadBufferManager.getBufferManager().getCompletedReadListSize();
+    int beforeReadCompletedListSize = 
getBufferManager().getCompletedReadListSize();
 
     // First read request that triggers readAheads.
     inputStream.read(new byte[ONE_KB]);
@@ -482,7 +483,7 @@ public void testSuccessfulReadAhead() throws Exception {
     // Only the 3 readAhead threads should have triggered client.read
     verifyReadCallCount(client, 3);
     int newAdditionsToCompletedRead =
-        ReadBufferManager.getBufferManager().getCompletedReadListSize()
+        getBufferManager().getCompletedReadListSize()
             - beforeReadCompletedListSize;
     // read buffer might be dumped if the ReadBufferManager getblock preceded
     // the action of buffer being picked for reading from readaheadqueue, so 
that
@@ -530,7 +531,7 @@ public void testStreamPurgeDuringReadAheadCallExecuting() 
throws Exception {
             any(TracingContext.class));
 
     final ReadBufferManager readBufferManager
-        = ReadBufferManager.getBufferManager();
+        = getBufferManager();
 
     final int readBufferTotal = readBufferManager.getNumBuffers();
     final int expectedFreeListBufferCount = readBufferTotal
@@ -607,7 +608,7 @@ public void testReadAheadManagerForFailedReadAhead() throws 
Exception {
     // if readAhead failed for specific offset, getBlock should
     // throw exception from the ReadBuffer that failed within last 
thresholdAgeMilliseconds sec
     intercept(IOException.class,
-        () -> ReadBufferManager.getBufferManager().getBlock(
+        () -> getBufferManager().getBlock(
             inputStream,
             0,
             ONE_KB,
@@ -655,14 +656,14 @@ public void 
testReadAheadManagerForOlderReadAheadFailure() throws Exception {
     // AbfsInputStream Read would have waited for the read-ahead for the 
requested offset
     // as we are testing from ReadAheadManager directly, sleep for 
thresholdAgeMilliseconds so that
     // read buffer qualifies for to be an old buffer
-    
Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds());
+    Thread.sleep(getBufferManager().getThresholdAgeMilliseconds());
 
     // Only the 3 readAhead threads should have triggered client.read
     verifyReadCallCount(client, 3);
 
     // getBlock from a new read request should return 0 if there is a failure
     // 30 sec before in read ahead buffer for respective offset.
-    int bytesRead = ReadBufferManager.getBufferManager().getBlock(
+    int bytesRead = getBufferManager().getBlock(
         inputStream,
         ONE_KB,
         ONE_KB,
@@ -715,7 +716,7 @@ public void testReadAheadManagerForSuccessfulReadAhead() 
throws Exception {
     verifyReadCallCount(client, 3);
 
     // getBlock for a new read should return the buffer read-ahead
-    int bytesRead = ReadBufferManager.getBufferManager().getBlock(
+    int bytesRead = getBufferManager().getBlock(
         inputStream,
         ONE_KB,
         ONE_KB,
@@ -853,7 +854,7 @@ public AbfsInputStream testReadAheadConfigs(int 
readRequestSize,
         .describedAs("Unexpected AlwaysReadBufferSize settings")
         .isEqualTo(alwaysReadBufferSizeEnabled);
 
-    
Assertions.assertThat(ReadBufferManager.getBufferManager().getReadAheadBlockSize())
+    Assertions.assertThat(getBufferManager().getReadAheadBlockSize())
         .describedAs("Unexpected readAhead block size")
         .isEqualTo(readAheadBlockSize);
 
@@ -921,10 +922,14 @@ private AzureBlobFileSystem createTestFile(Path 
testFilePath, long testFileSize,
   }
 
   private void resetReadBufferManager(int bufferSize, int threshold) {
-    ReadBufferManager.getBufferManager()
+    getBufferManager()
         .testResetReadBufferManager(bufferSize, threshold);
     // Trigger GC as aggressive recreation of ReadBufferManager buffers
     // by successive tests can lead to OOM based on the dev VM/machine 
capacity.
     System.gc();
   }
-}
\ No newline at end of file
+
+  private ReadBufferManager getBufferManager() {
+    return ReadBufferManagerV1.getBufferManager();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to