This is an automated email from the ASF dual-hosted git repository.

anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2e9ef96d8f4 HADOOP-19622: [ABFS][ReadAheadV2] Implement Read Buffer 
Manager V2 with improved aggressiveness (#7832)
2e9ef96d8f4 is described below

commit 2e9ef96d8f4e91e5861f42b073c2a92bc98bb49f
Author: Anuj Modi <[email protected]>
AuthorDate: Fri Oct 31 12:20:24 2025 +0530

    HADOOP-19622: [ABFS][ReadAheadV2] Implement Read Buffer Manager V2 with 
improved aggressiveness (#7832)
    
    Contributed by Anuj Modi
---
 .../hadoop/fs/azurebfs/AbfsConfiguration.java      |   81 +-
 .../fs/azurebfs/AzureBlobFileSystemStore.java      |   34 +-
 .../fs/azurebfs/constants/ConfigurationKeys.java   |   38 +
 .../constants/FileSystemConfigurations.java        |   13 +-
 .../fs/azurebfs/services/AbfsInputStream.java      |   36 +-
 .../azurebfs/services/AbfsInputStreamContext.java  |   16 +-
 .../hadoop/fs/azurebfs/services/ReadBuffer.java    |   37 +
 .../fs/azurebfs/services/ReadBufferManager.java    |   19 +-
 .../fs/azurebfs/services/ReadBufferManagerV1.java  |    8 +-
 .../fs/azurebfs/services/ReadBufferManagerV2.java  | 1027 ++++++++++++++++++--
 .../fs/azurebfs/services/ReadBufferWorker.java     |   16 +-
 .../fs/azurebfs/AbstractAbfsIntegrationTest.java   |   27 +
 .../fs/azurebfs/ITestAzureBlobFileSystemFlush.java |   20 +-
 .../services/AbfsInputStreamTestUtils.java         |   61 --
 .../fs/azurebfs/services/ITestAbfsInputStream.java |    8 +-
 .../services/ITestAbfsInputStreamReadFooter.java   |   16 +-
 .../ITestAbfsInputStreamSmallFileReads.java        |   16 +-
 .../azurebfs/services/ITestReadBufferManager.java  |  222 ++---
 .../services/ITestReadBufferManagerV2.java         |  147 +++
 .../fs/azurebfs/services/TestAbfsInputStream.java  |   22 +-
 .../azurebfs/services/TestReadBufferManagerV2.java |  298 ++++++
 21 files changed, 1805 insertions(+), 357 deletions(-)

diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index c57a0ea2a7f..88e35b890a2 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -396,6 +396,11 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_ENABLE_READAHEAD_V2)
   private boolean isReadAheadV2Enabled;
 
+  @BooleanConfigurationValidatorAnnotation(
+      ConfigurationKey = FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING,
+      DefaultValue = DEFAULT_ENABLE_READAHEAD_V2_DYNAMIC_SCALING)
+  private boolean isReadAheadV2DynamicScalingEnabled;
+
   @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
       FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE,
       DefaultValue = DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE)
@@ -416,6 +421,26 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_READAHEAD_V2_MAX_BUFFER_POOL_SIZE)
   private int maxReadAheadV2BufferPoolSize;
 
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+      FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS,
+      DefaultValue = DEFAULT_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS)
+  private int readAheadV2CpuMonitoringIntervalMillis;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+      FS_AZURE_READAHEAD_V2_THREAD_POOL_UPSCALE_PERCENTAGE,
+      DefaultValue = DEFAULT_READAHEAD_V2_THREAD_POOL_UPSCALE_PERCENTAGE)
+  private int readAheadV2ThreadPoolUpscalePercentage;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+      FS_AZURE_READAHEAD_V2_THREAD_POOL_DOWNSCALE_PERCENTAGE,
+      DefaultValue = DEFAULT_READAHEAD_V2_THREAD_POOL_DOWNSCALE_PERCENTAGE)
+  private int readAheadV2ThreadPoolDownscalePercentage;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+      FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS,
+      DefaultValue = DEFAULT_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS)
+  private int readAheadV2MemoryMonitoringIntervalMillis;
+
   @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
       FS_AZURE_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS,
       DefaultValue = DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS)
@@ -426,6 +451,16 @@ public class AbfsConfiguration{
       DefaultValue = DEFAULT_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS)
   private int readAheadV2CachedBufferTTLMillis;
 
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+      FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT,
+      DefaultValue = DEFAULT_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENTAGE)
+  private int readAheadV2CpuUsageThresholdPercent;
+
+  @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+      FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT,
+      DefaultValue = DEFAULT_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENTAGE)
+  private int readAheadV2MemoryUsageThresholdPercent;
+
   @LongConfigurationValidatorAnnotation(ConfigurationKey = 
FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS,
       MinValue = 0,
       DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
@@ -1548,13 +1583,25 @@ public EncryptionContextProvider 
createEncryptionContextProvider() {
   }
 
   public boolean isReadAheadEnabled() {
-    return this.enabledReadAhead;
+    return enabledReadAhead;
+  }
+
+  /**
+   * Checks if the read-ahead v2 feature is enabled by user.
+   * @return true if read-ahead v2 is enabled, false otherwise.
+   */
+  public boolean isReadAheadV2Enabled() {
+    return isReadAheadV2Enabled;
+  }
+
+  public boolean isReadAheadV2DynamicScalingEnabled() {
+    return isReadAheadV2DynamicScalingEnabled;
   }
 
   public int getMinReadAheadV2ThreadPoolSize() {
     if (minReadAheadV2ThreadPoolSize <= 0) {
       // If the minReadAheadV2ThreadPoolSize is not set, use the default value
-      return 2 * Runtime.getRuntime().availableProcessors();
+      return DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE;
     }
     return minReadAheadV2ThreadPoolSize;
   }
@@ -1570,7 +1617,7 @@ public int getMaxReadAheadV2ThreadPoolSize() {
   public int getMinReadAheadV2BufferPoolSize() {
     if (minReadAheadV2BufferPoolSize <= 0) {
       // If the minReadAheadV2BufferPoolSize is not set, use the default value
-      return 2 * Runtime.getRuntime().availableProcessors();
+      return DEFAULT_READAHEAD_V2_MIN_BUFFER_POOL_SIZE;
     }
     return minReadAheadV2BufferPoolSize;
   }
@@ -1583,6 +1630,22 @@ public int getMaxReadAheadV2BufferPoolSize() {
     return maxReadAheadV2BufferPoolSize;
   }
 
+  public int getReadAheadV2CpuMonitoringIntervalMillis() {
+    return readAheadV2CpuMonitoringIntervalMillis;
+  }
+
+  public int getReadAheadV2ThreadPoolUpscalePercentage() {
+    return readAheadV2ThreadPoolUpscalePercentage;
+  }
+
+  public int getReadAheadV2ThreadPoolDownscalePercentage() {
+    return readAheadV2ThreadPoolDownscalePercentage;
+  }
+
+  public int getReadAheadV2MemoryMonitoringIntervalMillis() {
+    return readAheadV2MemoryMonitoringIntervalMillis;
+  }
+
   public int getReadAheadExecutorServiceTTLInMillis() {
     return readAheadExecutorServiceTTLMillis;
   }
@@ -1591,12 +1654,12 @@ public int getReadAheadV2CachedBufferTTLMillis() {
     return readAheadV2CachedBufferTTLMillis;
   }
 
-  /**
-   * Checks if the read-ahead v2 feature is enabled by user.
-   * @return true if read-ahead v2 is enabled, false otherwise.
-   */
-  public boolean isReadAheadV2Enabled() {
-    return this.isReadAheadV2Enabled;
+  public int getReadAheadV2CpuUsageThresholdPercent() {
+    return readAheadV2CpuUsageThresholdPercent;
+  }
+
+  public int getReadAheadV2MemoryUsageThresholdPercent() {
+    return readAheadV2MemoryUsageThresholdPercent;
   }
 
   @VisibleForTesting
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 486e4b3cc9b..6d06580726e 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -963,24 +963,24 @@ private AbfsInputStreamContext 
populateAbfsInputStreamContext(
     int footerReadBufferSize = options.map(c -> c.getInt(
         AZURE_FOOTER_READ_BUFFER_SIZE, 
getAbfsConfiguration().getFooterReadBufferSize()))
         .orElse(getAbfsConfiguration().getFooterReadBufferSize());
+
     return new 
AbfsInputStreamContext(getAbfsConfiguration().getSasTokenRenewPeriodForStreamsInSeconds())
-            .withReadBufferSize(getAbfsConfiguration().getReadBufferSize())
-            
.withReadAheadQueueDepth(getAbfsConfiguration().getReadAheadQueueDepth())
-            
.withTolerateOobAppends(getAbfsConfiguration().getTolerateOobAppends())
-            .isReadAheadEnabled(getAbfsConfiguration().isReadAheadEnabled())
-            
.isReadAheadV2Enabled(getAbfsConfiguration().isReadAheadV2Enabled())
-            
.withReadSmallFilesCompletely(getAbfsConfiguration().readSmallFilesCompletely())
-            
.withOptimizeFooterRead(getAbfsConfiguration().optimizeFooterRead())
-            .withFooterReadBufferSize(footerReadBufferSize)
-            .withReadAheadRange(getAbfsConfiguration().getReadAheadRange())
-            .withStreamStatistics(new AbfsInputStreamStatisticsImpl())
-            .withShouldReadBufferSizeAlways(
-                getAbfsConfiguration().shouldReadBufferSizeAlways())
-            
.withReadAheadBlockSize(getAbfsConfiguration().getReadAheadBlockSize())
-            .withBufferedPreadDisabled(bufferedPreadDisabled)
-            .withEncryptionAdapter(contextEncryptionAdapter)
-            .withAbfsBackRef(fsBackRef)
-            .build();
+        .withReadBufferSize(getAbfsConfiguration().getReadBufferSize())
+        
.withReadAheadQueueDepth(getAbfsConfiguration().getReadAheadQueueDepth())
+        .withTolerateOobAppends(getAbfsConfiguration().getTolerateOobAppends())
+        .isReadAheadEnabled(getAbfsConfiguration().isReadAheadEnabled())
+        .isReadAheadV2Enabled(getAbfsConfiguration().isReadAheadV2Enabled())
+        
.withReadSmallFilesCompletely(getAbfsConfiguration().readSmallFilesCompletely())
+        .withOptimizeFooterRead(getAbfsConfiguration().optimizeFooterRead())
+        .withFooterReadBufferSize(footerReadBufferSize)
+        .withReadAheadRange(getAbfsConfiguration().getReadAheadRange())
+        .withStreamStatistics(new AbfsInputStreamStatisticsImpl())
+        
.withShouldReadBufferSizeAlways(getAbfsConfiguration().shouldReadBufferSizeAlways())
+        .withReadAheadBlockSize(getAbfsConfiguration().getReadAheadBlockSize())
+        .withBufferedPreadDisabled(bufferedPreadDisabled)
+        .withEncryptionAdapter(contextEncryptionAdapter)
+        .withAbfsBackRef(fsBackRef)
+        .build();
   }
 
   public OutputStream openFileForWrite(final Path path,
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 899e96dadc1..b365217323f 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -276,6 +276,12 @@ public final class ConfigurationKeys {
    */
   public static final String FS_AZURE_ENABLE_READAHEAD_V2 = 
"fs.azure.enable.readahead.v2";
 
+  /**
+   * Enable or disable dynamic scaling of thread pool and buffer pool of 
readahead V2.
+   * Value: {@value}.
+   */
+  public static final String FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING = 
"fs.azure.enable.readahead.v2.dynamic.scaling";
+
   /**
    * Minimum number of prefetch threads in the thread pool for readahead V2.
    * {@value }
@@ -297,6 +303,28 @@ public final class ConfigurationKeys {
    */
   public static final String FS_AZURE_READAHEAD_V2_MAX_BUFFER_POOL_SIZE = 
"fs.azure.readahead.v2.max.buffer.pool.size";
 
+  /**
+   * Interval in milliseconds for periodic monitoring of CPU usage and up/down 
scaling thread pool size accordingly.
+   * {@value }
+   */
+  public static final String 
FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS = 
"fs.azure.readahead.v2.cpu.monitoring.interval.millis";
+
+  /**
+   * Percentage by which the thread pool size should be upscaled when CPU 
usage is low.
+   */
+  public static final String 
FS_AZURE_READAHEAD_V2_THREAD_POOL_UPSCALE_PERCENTAGE = 
"fs.azure.readahead.v2.thread.pool.upscale.percentage";
+
+  /**
+   * Percentage by which the thread pool size should be downscaled when CPU 
usage is high.
+   */
+  public static final String 
FS_AZURE_READAHEAD_V2_THREAD_POOL_DOWNSCALE_PERCENTAGE = 
"fs.azure.readahead.v2.thread.pool.downscale.percentage";
+
+  /**
+   * Interval in milliseconds for periodic monitoring of memory usage and 
up/down scaling buffer pool size accordingly.
+   * {@value }
+   */
+  public static final String 
FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS = 
"fs.azure.readahead.v2.memory.monitoring.interval.millis";
+
   /**
    * TTL in milliseconds for the idle threads in executor service used by read 
ahead v2.
    */
@@ -307,6 +335,16 @@ public final class ConfigurationKeys {
    */
   public static final String FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS = 
"fs.azure.readahead.v2.cached.buffer.ttl.millis";
 
+  /**
+   * Threshold percentage for CPU usage to scale up/down the thread pool size 
in read ahead v2.
+   */
+  public static final String FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT 
= "fs.azure.readahead.v2.cpu.usage.threshold.percent";
+
+  /**
+   * Threshold percentage for memory usage to scale up/down the buffer pool 
size in read ahead v2.
+   */
+  public static final String 
FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT = 
"fs.azure.readahead.v2.memory.usage.threshold.percent";
+
   /** Setting this true will make the driver use it's own RemoteIterator 
implementation */
   public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = 
"fs.azure.enable.abfslistiterator";
   /** Server side encryption key encoded in Base6format {@value}.*/
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index ea77f9d874a..e95ea8b1dde 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -153,12 +153,19 @@ public final class FileSystemConfigurations {
 
   public static final boolean DEFAULT_ENABLE_READAHEAD = true;
   public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = false;
-  public static final int DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE = -1;
+  public static final boolean DEFAULT_ENABLE_READAHEAD_V2_DYNAMIC_SCALING = 
false;
+  public static final int DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE = 8;
   public static final int DEFAULT_READAHEAD_V2_MAX_THREAD_POOL_SIZE = -1;
-  public static final int DEFAULT_READAHEAD_V2_MIN_BUFFER_POOL_SIZE = -1;
+  public static final int DEFAULT_READAHEAD_V2_MIN_BUFFER_POOL_SIZE = 16;
   public static final int DEFAULT_READAHEAD_V2_MAX_BUFFER_POOL_SIZE = -1;
-  public static final int DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS = 
3_000;
+  public static final int DEFAULT_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS 
= 6_000;
+  public static final int DEFAULT_READAHEAD_V2_THREAD_POOL_UPSCALE_PERCENTAGE 
= 20;
+  public static final int 
DEFAULT_READAHEAD_V2_THREAD_POOL_DOWNSCALE_PERCENTAGE = 30;
+  public static final int 
DEFAULT_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS = 6_000;
+  public static final int DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS = 
6_000;
   public static final int DEFAULT_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS = 
6_000;
+  public static final int DEFAULT_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENTAGE 
= 50;
+  public static final int 
DEFAULT_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENTAGE = 50;
 
   public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
   public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index ba2bb61802a..9d29614a044 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -69,7 +69,7 @@ public class AbfsInputStream extends FSInputStream implements 
CanUnbuffer,
   public static final int FOOTER_SIZE = 16 * ONE_KB;
   public static final int MAX_OPTIMIZED_READ_ATTEMPTS = 2;
 
-  private int readAheadBlockSize;
+  private final int readAheadBlockSize;
   private final AbfsClient client;
   private final Statistics statistics;
   private final String path;
@@ -132,7 +132,7 @@ public class AbfsInputStream extends FSInputStream 
implements CanUnbuffer,
 
   /** ABFS instance to be held by the input stream to avoid GC close. */
   private final BackReference fsBackRef;
-  private ReadBufferManager readBufferManager;
+  private final ReadBufferManager readBufferManager;
 
   public AbfsInputStream(
           final AbfsClient client,
@@ -184,10 +184,13 @@ public AbfsInputStream(
      * If none of the V1 and V2 are enabled, then no read ahead will be done.
      */
     if (readAheadV2Enabled) {
-      LOG.debug("ReadBufferManagerV2 not yet implemented, defaulting to 
ReadBufferManagerV1");
+      ReadBufferManagerV2.setReadBufferManagerConfigs(
+          readAheadBlockSize, client.getAbfsConfiguration());
+      readBufferManager = ReadBufferManagerV2.getBufferManager();
+    } else {
+      ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize);
+      readBufferManager = ReadBufferManagerV1.getBufferManager();
     }
-    ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize);
-    readBufferManager = ReadBufferManagerV1.getBufferManager();
 
     if (streamStatistics != null) {
       ioStatistics = streamStatistics.getIOStatistics();
@@ -530,7 +533,7 @@ private int readInternal(final long position, final byte[] 
b, final int offset,
       while (numReadAheads > 0 && nextOffset < contentLength) {
         LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
             nextOffset, nextSize);
-        readBufferManager.queueReadAhead(this, nextOffset, (int) nextSize,
+        getReadBufferManager().queueReadAhead(this, nextOffset, (int) nextSize,
                 new TracingContext(readAheadTracingContext));
         nextOffset = nextOffset + nextSize;
         numReadAheads--;
@@ -539,7 +542,7 @@ private int readInternal(final long position, final byte[] 
b, final int offset,
       }
 
       // try reading from buffers first
-      receivedBytes = readBufferManager.getBlock(this, position, length, b);
+      receivedBytes = getReadBufferManager().getBlock(this, position, length, 
b);
       bytesFromReadAhead += receivedBytes;
       if (receivedBytes > 0) {
         incrementReadOps();
@@ -743,8 +746,8 @@ public boolean seekToNewSource(long l) throws IOException {
   public synchronized void close() throws IOException {
     LOG.debug("Closing {}", this);
     closed = true;
-    if (readBufferManager != null) {
-      readBufferManager.purgeBuffersForStream(this);
+    if (getReadBufferManager() != null) {
+      getReadBufferManager().purgeBuffersForStream(this);
     }
     buffer = null; // de-reference the buffer so it can be GC'ed sooner
     if (contextEncryptionAdapter != null) {
@@ -805,7 +808,7 @@ byte[] getBuffer() {
    */
   @VisibleForTesting
   public boolean isReadAheadEnabled() {
-    return (readAheadEnabled || readAheadV2Enabled) && readBufferManager != 
null;
+    return (readAheadEnabled || readAheadV2Enabled) && getReadBufferManager() 
!= null;
   }
 
   @VisibleForTesting
@@ -823,6 +826,10 @@ public String getStreamID() {
     return inputStreamId;
   }
 
+  public String getETag() {
+    return eTag;
+  }
+
   /**
    * Getter for AbfsInputStreamStatistics.
    *
@@ -920,11 +927,20 @@ long getLimit() {
     return this.limit;
   }
 
+  boolean isFirstRead() {
+    return this.firstRead;
+  }
+
   @VisibleForTesting
   BackReference getFsBackRef() {
     return fsBackRef;
   }
 
+  @VisibleForTesting
+  ReadBufferManager getReadBufferManager() {
+    return readBufferManager;
+  }
+
   @Override
   public int minSeekForVectorReads() {
     return S_128K;
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
index 15ee4809911..cb51fa22900 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
@@ -248,11 +248,11 @@ public AbfsInputStreamContext withAbfsBackRef(
    * @param contextEncryptionAdapter encryption adapter.
    * @return this instance.
    */
-    public AbfsInputStreamContext withEncryptionAdapter(
-        ContextEncryptionAdapter contextEncryptionAdapter){
-      this.contextEncryptionAdapter = contextEncryptionAdapter;
-      return this;
-    }
+  public AbfsInputStreamContext withEncryptionAdapter(
+      ContextEncryptionAdapter contextEncryptionAdapter){
+    this.contextEncryptionAdapter = contextEncryptionAdapter;
+    return this;
+  }
 
   /**
    * Finalizes and validates the context configuration.
@@ -348,7 +348,7 @@ public BackReference getFsBackRef() {
   }
 
   /** @return context encryption adapter. */
-    public ContextEncryptionAdapter getEncryptionAdapter() {
-      return contextEncryptionAdapter;
-    }
+  public ContextEncryptionAdapter getEncryptionAdapter() {
+    return contextEncryptionAdapter;
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
index 9ce926d841c..a6aa75f59d2 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
@@ -20,6 +20,7 @@
 
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
@@ -29,6 +30,8 @@
 class ReadBuffer {
 
   private AbfsInputStream stream;
+  private String eTag;
+  private String path;                   // path of the file this buffer is for
   private long offset;                   // offset within the file for the 
buffer
   private int length;                    // actual length, set after the 
buffer is filles
   private int requestedLength;           // requested length of the read
@@ -44,6 +47,7 @@ class ReadBuffer {
   private boolean isFirstByteConsumed = false;
   private boolean isLastByteConsumed = false;
   private boolean isAnyByteConsumed = false;
+  private AtomicInteger refCount = new AtomicInteger(0);
 
   private IOException errException = null;
 
@@ -51,10 +55,26 @@ public AbfsInputStream getStream() {
     return stream;
   }
 
+  public String getETag() {
+    return eTag;
+  }
+
+  public String getPath() {
+    return path;
+  }
+
   public void setStream(AbfsInputStream stream) {
     this.stream = stream;
   }
 
+  public void setETag(String eTag) {
+    this.eTag = eTag;
+  }
+
+  public void setPath(String path) {
+    this.path = path;
+  }
+
   public void setTracingContext(TracingContext tracingContext) {
     this.tracingContext = tracingContext;
   }
@@ -122,6 +142,20 @@ public void setStatus(ReadBufferStatus status) {
     }
   }
 
+  public void startReading() {
+    refCount.getAndIncrement();
+  }
+
+  public void endReading() {
+    if (refCount.decrementAndGet() < 0) {
+      throw new IllegalStateException("ReadBuffer refCount cannot be 
negative");
+    }
+  }
+
+  public int getRefCount() {
+    return refCount.get();
+  }
+
   public CountDownLatch getLatch() {
     return latch;
   }
@@ -162,4 +196,7 @@ public void setAnyByteConsumed(boolean isAnyByteConsumed) {
     this.isAnyByteConsumed = isAnyByteConsumed;
   }
 
+  public boolean isFullyConsumed() {
+    return isFirstByteConsumed() && isLastByteConsumed();
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
index 9ee128fbc32..712b04fb499 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
@@ -119,7 +119,6 @@ abstract void doneReading(ReadBuffer buffer,
    */
   abstract void purgeBuffersForStream(AbfsInputStream stream);
 
-
   // Following Methods are for testing purposes only and should not be used in 
production code.
 
   /**
@@ -206,7 +205,7 @@ protected static void setReadAheadBlockSize(int 
readAheadBlockSize) {
    *
    * @return the stack of free buffer indices
    */
-  public Stack<Integer> getFreeList() {
+  Stack<Integer> getFreeList() {
     return freeList;
   }
 
@@ -215,7 +214,7 @@ public Stack<Integer> getFreeList() {
    *
    * @return the queue of {@link ReadBuffer} objects in the read-ahead queue
    */
-  public Queue<ReadBuffer> getReadAheadQueue() {
+  Queue<ReadBuffer> getReadAheadQueue() {
     return readAheadQueue;
   }
 
@@ -224,7 +223,7 @@ public Queue<ReadBuffer> getReadAheadQueue() {
    *
    * @return the list of {@link ReadBuffer} objects that are currently being 
processed
    */
-  public LinkedList<ReadBuffer> getInProgressList() {
+  LinkedList<ReadBuffer> getInProgressList() {
     return inProgressList;
   }
 
@@ -233,7 +232,7 @@ public LinkedList<ReadBuffer> getInProgressList() {
    *
    * @return the list of {@link ReadBuffer} objects that have been read and 
are available for use
    */
-  public LinkedList<ReadBuffer> getCompletedReadList() {
+  LinkedList<ReadBuffer> getCompletedReadList() {
     return completedReadList;
   }
 
@@ -244,7 +243,7 @@ public LinkedList<ReadBuffer> getCompletedReadList() {
    * @return a list of free buffer indices
    */
   @VisibleForTesting
-  protected synchronized List<Integer> getFreeListCopy() {
+  List<Integer> getFreeListCopy() {
     return new ArrayList<>(freeList);
   }
 
@@ -254,7 +253,7 @@ protected synchronized List<Integer> getFreeListCopy() {
    * @return a list of {@link ReadBuffer} objects in the read-ahead queue
    */
   @VisibleForTesting
-  protected synchronized List<ReadBuffer> getReadAheadQueueCopy() {
+  synchronized List<ReadBuffer> getReadAheadQueueCopy() {
     return new ArrayList<>(readAheadQueue);
   }
 
@@ -264,7 +263,7 @@ protected synchronized List<ReadBuffer> 
getReadAheadQueueCopy() {
    * @return a list of in-progress {@link ReadBuffer} objects
    */
   @VisibleForTesting
-  protected synchronized List<ReadBuffer> getInProgressCopiedList() {
+  synchronized List<ReadBuffer> getInProgressListCopy() {
     return new ArrayList<>(inProgressList);
   }
 
@@ -274,7 +273,7 @@ protected synchronized List<ReadBuffer> 
getInProgressCopiedList() {
    * @return a list of completed {@link ReadBuffer} objects
    */
   @VisibleForTesting
-  protected synchronized List<ReadBuffer> getCompletedReadListCopy() {
+  synchronized List<ReadBuffer> getCompletedReadListCopy() {
     return new ArrayList<>(completedReadList);
   }
 
@@ -284,7 +283,7 @@ protected synchronized List<ReadBuffer> 
getCompletedReadListCopy() {
    * @return the number of completed read buffers
    */
   @VisibleForTesting
-  protected int getCompletedReadListSize() {
+  int getCompletedReadListSize() {
     return completedReadList.size();
   }
 
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
index fe1ac3fa1f2..e476f6d7446 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
@@ -33,7 +33,7 @@
  * The Read Buffer Manager for Rest AbfsClient.
  * V1 implementation of ReadBufferManager.
  */
-final class ReadBufferManagerV1 extends ReadBufferManager {
+public final class ReadBufferManagerV1 extends ReadBufferManager {
 
   private static final int NUM_BUFFERS = 16;
   private static final int NUM_THREADS = 8;
@@ -41,7 +41,7 @@ final class ReadBufferManagerV1 extends ReadBufferManager {
 
   private Thread[] threads = new Thread[NUM_THREADS];
   private byte[][] buffers;
-  private static  ReadBufferManagerV1 bufferManager;
+  private static ReadBufferManagerV1 bufferManager;
 
   // hide instance constructor
   private ReadBufferManagerV1() {
@@ -52,7 +52,7 @@ private ReadBufferManagerV1() {
    * Sets the read buffer manager configurations.
    * @param readAheadBlockSize the size of the read-ahead block in bytes
    */
-  static void setReadBufferManagerConfigs(int readAheadBlockSize) {
+  public static void setReadBufferManagerConfigs(int readAheadBlockSize) {
     if (bufferManager == null) {
       LOGGER.debug(
           "ReadBufferManagerV1 not initialized yet. Overriding 
readAheadBlockSize as {}",
@@ -88,7 +88,7 @@ static ReadBufferManagerV1 getBufferManager() {
   void init() {
     buffers = new byte[NUM_BUFFERS][];
     for (int i = 0; i < NUM_BUFFERS; i++) {
-      buffers[i] = new byte[getReadAheadBlockSize()];  // same buffers are 
reused. These byte arrays are never garbage collected
+      buffers[i] = new byte[getReadAheadBlockSize()];  // same buffers are 
reused. The byte array never goes back to GC
       getFreeList().add(i);
     }
     for (int i = 0; i < NUM_THREADS; i++) {
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
index 9cce860127d..c7e6e4c3d28 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
@@ -17,67 +17,112 @@
  */
 package org.apache.hadoop.fs.azurebfs.services;
 
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+
+import com.sun.management.OperatingSystemMXBean;
+
 import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Stack;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.SynchronousQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.hadoop.classification.VisibleForTesting;
-import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.classification.VisibleForTesting;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
 
-final class ReadBufferManagerV2 extends ReadBufferManager {
+/**
+ * The Improved Read Buffer Manager for Rest AbfsClient.
+ */
+public final class ReadBufferManagerV2 extends ReadBufferManager {
+
+  // Internal constants
+  private static final ReentrantLock LOCK = new ReentrantLock();
 
   // Thread Pool Configurations
   private static int minThreadPoolSize;
+
   private static int maxThreadPoolSize;
+
+  private static int cpuMonitoringIntervalInMilliSec;
+
+  private static double cpuThreshold;
+
+  private static int threadPoolUpscalePercentage;
+
+  private static int threadPoolDownscalePercentage;
+
   private static int executorServiceKeepAliveTimeInMilliSec;
+
+  private static final double THREAD_POOL_REQUIREMENT_BUFFER = 1.2;
+      // 20% more threads than the queue size
+
+  private static boolean isDynamicScalingEnabled;
+
+  private ScheduledExecutorService cpuMonitorThread;
+
   private ThreadPoolExecutor workerPool;
 
+  private final List<ReadBufferWorker> workerRefs = new ArrayList<>();
+
   // Buffer Pool Configurations
   private static int minBufferPoolSize;
+
   private static int maxBufferPoolSize;
-  private int numberOfActiveBuffers = 0;
+
+  private static int memoryMonitoringIntervalInMilliSec;
+
+  private static double memoryThreshold;
+
+  private final AtomicInteger numberOfActiveBuffers = new AtomicInteger(0);
+
   private byte[][] bufferPool;
 
+  private final Stack<Integer> removedBufferList = new Stack<>();
+
+  private ScheduledExecutorService memoryMonitorThread;
+
+  // Buffer Manager Structures
   private static ReadBufferManagerV2 bufferManager;
 
-  // hide instance constructor
-  private ReadBufferManagerV2() {
-    LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
-  }
+  private static AtomicBoolean isConfigured = new AtomicBoolean(false);
 
   /**
-   * Sets the read buffer manager configurations.
-   * @param readAheadBlockSize the size of the read-ahead block in bytes
-   * @param abfsConfiguration the AbfsConfiguration instance for other 
configurations
+   * Private constructor to prevent instantiation as this needs to be 
singleton.
    */
-  static void setReadBufferManagerConfigs(int readAheadBlockSize, 
AbfsConfiguration abfsConfiguration) {
-    if (bufferManager == null) {
-      minThreadPoolSize = abfsConfiguration.getMinReadAheadV2ThreadPoolSize();
-      maxThreadPoolSize = abfsConfiguration.getMaxReadAheadV2ThreadPoolSize();
-      executorServiceKeepAliveTimeInMilliSec = 
abfsConfiguration.getReadAheadExecutorServiceTTLInMillis();
-
-      minBufferPoolSize = abfsConfiguration.getMinReadAheadV2BufferPoolSize();
-      maxBufferPoolSize = abfsConfiguration.getMaxReadAheadV2BufferPoolSize();
-      
setThresholdAgeMilliseconds(abfsConfiguration.getReadAheadV2CachedBufferTTLMillis());
-      setReadAheadBlockSize(readAheadBlockSize);
-    }
+  private ReadBufferManagerV2() {
+    printTraceLog("Creating Read Buffer Manager V2 with HADOOP-18546 patch");
   }
 
-  /**
-   * Returns the singleton instance of ReadBufferManagerV2.
-   * @return the singleton instance of ReadBufferManagerV2
-   */
   static ReadBufferManagerV2 getBufferManager() {
+    if (!isConfigured.get()) {
+      throw new IllegalStateException("ReadBufferManagerV2 is not configured. "
+          + "Please call setReadBufferManagerConfigs() before calling 
getBufferManager().");
+    }
     if (bufferManager == null) {
       LOCK.lock();
       try {
         if (bufferManager == null) {
           bufferManager = new ReadBufferManagerV2();
           bufferManager.init();
+          LOGGER.trace("ReadBufferManagerV2 singleton initialized");
         }
       } finally {
         LOCK.unlock();
@@ -87,17 +132,73 @@ static ReadBufferManagerV2 getBufferManager() {
   }
 
   /**
-   * {@inheritDoc}
+   * Set the ReadBufferManagerV2 configurations based on the provided before 
singleton initialization.
+   * @param readAheadBlockSize the read-ahead block size to set for the 
ReadBufferManagerV2.
+   * @param abfsConfiguration the configuration to set for the 
ReadBufferManagerV2.
+   */
+  public static void setReadBufferManagerConfigs(final int readAheadBlockSize,
+      final AbfsConfiguration abfsConfiguration) {
+    // Set Configs only before initializations.
+    if (bufferManager == null && !isConfigured.get()) {
+      LOCK.lock();
+      try {
+        if (bufferManager == null && !isConfigured.get()) {
+          minThreadPoolSize = 
abfsConfiguration.getMinReadAheadV2ThreadPoolSize();
+          maxThreadPoolSize = 
abfsConfiguration.getMaxReadAheadV2ThreadPoolSize();
+          cpuMonitoringIntervalInMilliSec
+              = abfsConfiguration.getReadAheadV2CpuMonitoringIntervalMillis();
+          cpuThreshold = 
abfsConfiguration.getReadAheadV2CpuUsageThresholdPercent()
+              / HUNDRED_D;
+          threadPoolUpscalePercentage
+              = abfsConfiguration.getReadAheadV2ThreadPoolUpscalePercentage();
+          threadPoolDownscalePercentage
+              = 
abfsConfiguration.getReadAheadV2ThreadPoolDownscalePercentage();
+          executorServiceKeepAliveTimeInMilliSec
+              = abfsConfiguration.getReadAheadExecutorServiceTTLInMillis();
+
+          minBufferPoolSize = 
abfsConfiguration.getMinReadAheadV2BufferPoolSize();
+          maxBufferPoolSize = 
abfsConfiguration.getMaxReadAheadV2BufferPoolSize();
+          memoryMonitoringIntervalInMilliSec
+              = 
abfsConfiguration.getReadAheadV2MemoryMonitoringIntervalMillis();
+          memoryThreshold =
+              abfsConfiguration.getReadAheadV2MemoryUsageThresholdPercent()
+                  / HUNDRED_D;
+          setThresholdAgeMilliseconds(
+              abfsConfiguration.getReadAheadV2CachedBufferTTLMillis());
+          isDynamicScalingEnabled
+              = abfsConfiguration.isReadAheadV2DynamicScalingEnabled();
+          setReadAheadBlockSize(readAheadBlockSize);
+          setIsConfigured(true);
+        }
+      } finally {
+        LOCK.unlock();
+      }
+    }
+  }
+
+  /**
+   * Initialize the singleton ReadBufferManagerV2.
    */
   @Override
   void init() {
-    // Initialize Buffer Pool
+    // Initialize Buffer Pool. Size can never be more than max pool size
     bufferPool = new byte[maxBufferPoolSize][];
     for (int i = 0; i < minBufferPoolSize; i++) {
-      bufferPool[i] = new byte[getReadAheadBlockSize()];  // same buffers are 
reused. These byte arrays are never garbage collected
+      // Start with just minimum number of buffers.
+      bufferPool[i]
+          = new byte[getReadAheadBlockSize()];  // same buffers are reused. 
The byte array never goes back to GC
       getFreeList().add(i);
-      numberOfActiveBuffers++;
+      numberOfActiveBuffers.getAndIncrement();
     }
+    memoryMonitorThread = Executors.newSingleThreadScheduledExecutor(
+        runnable -> {
+          Thread t = new Thread(runnable, "ReadAheadV2-Memory-Monitor");
+          t.setDaemon(true);
+          return t;
+        });
+    memoryMonitorThread.scheduleAtFixedRate(this::scheduledEviction,
+        getMemoryMonitoringIntervalInMilliSec(),
+        getMemoryMonitoringIntervalInMilliSec(), TimeUnit.MILLISECONDS);
 
     // Initialize a Fixed Size Thread Pool with minThreadPoolSize threads
     workerPool = new ThreadPoolExecutor(
@@ -106,123 +207,893 @@ void init() {
         executorServiceKeepAliveTimeInMilliSec,
         TimeUnit.MILLISECONDS,
         new SynchronousQueue<>(),
-        namedThreadFactory);
+        workerThreadFactory);
     workerPool.allowCoreThreadTimeOut(true);
     for (int i = 0; i < minThreadPoolSize; i++) {
-      ReadBufferWorker worker = new ReadBufferWorker(i, this);
+      ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
+      workerRefs.add(worker);
       workerPool.submit(worker);
     }
     ReadBufferWorker.UNLEASH_WORKERS.countDown();
+
+    if (isDynamicScalingEnabled) {
+      cpuMonitorThread = Executors.newSingleThreadScheduledExecutor(
+          runnable -> {
+            Thread t = new Thread(runnable, "ReadAheadV2-CPU-Monitor");
+            t.setDaemon(true);
+            return t;
+          });
+      cpuMonitorThread.scheduleAtFixedRate(this::adjustThreadPool,
+          getCpuMonitoringIntervalInMilliSec(),
+          getCpuMonitoringIntervalInMilliSec(),
+          TimeUnit.MILLISECONDS);
+    }
+
+    printTraceLog(
+        "ReadBufferManagerV2 initialized with {} buffers and {} worker 
threads",
+        numberOfActiveBuffers.get(), workerRefs.size());
   }
 
   /**
-   * {@inheritDoc}
+   * {@link AbfsInputStream} calls this method to queueing read-ahead.
+   * @param stream which read-ahead is requested from.
+   * @param requestedOffset The offset in the file which should be read.
+   * @param requestedLength The length to read.
    */
   @Override
   public void queueReadAhead(final AbfsInputStream stream,
       final long requestedOffset,
       final int requestedLength,
-      final TracingContext tracingContext) {
-    // TODO: To be implemented
+      TracingContext tracingContext) {
+    printTraceLog(
+        "Start Queueing readAhead for file: {}, with eTag: {}, "
+            + "offset: {}, length: {}, triggered by stream: {}",
+        stream.getPath(), stream.getETag(), requestedOffset, requestedLength,
+        stream.hashCode());
+    ReadBuffer buffer;
+    synchronized (this) {
+      if (isAlreadyQueued(stream.getETag(), requestedOffset)) {
+        // Already queued for this offset, so skip queuing.
+        printTraceLog(
+            "Skipping queuing readAhead for file: {}, with eTag: {}, "
+                + "offset: {}, triggered by stream: {} as it is already 
queued",
+            stream.getPath(), stream.getETag(), requestedOffset,
+            stream.hashCode());
+        return;
+      }
+      if (isFreeListEmpty() && !tryMemoryUpscale() && !tryEvict()) {
+        // No buffers are available and more buffers cannot be created. Skip 
queuing.
+        printTraceLog(
+            "Skipping queuing readAhead for file: {}, with eTag: {}, offset: 
{}, triggered by stream: {} as no buffers are available",
+            stream.getPath(), stream.getETag(), requestedOffset,
+            stream.hashCode());
+        return;
+      }
+
+      // Create a new ReadBuffer to keep the prefetched data and queue.
+      buffer = new ReadBuffer();
+      buffer.setStream(stream); // To map buffer with stream that requested it
+      buffer.setETag(stream.getETag()); // To map buffer with file it belongs 
to
+      buffer.setPath(stream.getPath());
+      buffer.setOffset(requestedOffset);
+      buffer.setLength(0);
+      buffer.setRequestedLength(requestedLength);
+      buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
+      buffer.setLatch(new CountDownLatch(1));
+      buffer.setTracingContext(tracingContext);
+
+      if (isFreeListEmpty()) {
+        /*
+         * By now there should be at least one buffer available.
+         * This is to double sure that after upscaling or eviction,
+         * we still have free buffer available. If not, we skip queueing.
+         */
+        return;
+      }
+      Integer bufferIndex = popFromFreeList();
+      if (bufferIndex > bufferPool.length) {
+        // This should never happen.
+        printTraceLog(
+            "Skipping queuing readAhead for file: {}, with eTag: {}, offset: 
{}, triggered by stream: {} as invalid buffer index popped from free list",
+            stream.getPath(), stream.getETag(), requestedOffset,
+            stream.hashCode());
+        return;
+      }
+      buffer.setBuffer(bufferPool[bufferIndex]);
+      buffer.setBufferindex(bufferIndex);
+      getReadAheadQueue().add(buffer);
+      notifyAll();
+      printTraceLog(
+          "Done q-ing readAhead for file: {}, with eTag:{}, offset: {}, "
+              + "buffer idx: {}, triggered by stream: {}",
+          stream.getPath(), stream.getETag(), requestedOffset,
+          buffer.getBufferindex(), stream.hashCode());
+    }
   }
 
   /**
-   * {@inheritDoc}
+   * {@link AbfsInputStream} calls this method read any bytes already 
available in a buffer (thereby saving a
+   * remote read). This returns the bytes if the data already exists in 
buffer. If there is a buffer that is reading
+   * the requested offset, then this method blocks until that read completes. 
If the data is queued in a read-ahead
+   * but not picked up by a worker thread yet, then it cancels that read-ahead 
and reports cache miss. This is because
+   * depending on worker thread availability, the read-ahead may take a while 
- the calling thread can do its own
+   * read to get the data faster (compared to the read waiting in queue for an 
indeterminate amount of time).
+   *
+   * @param stream of the file to read bytes for
+   * @param position the offset in the file to do a read for
+   * @param length   the length to read
+   * @param buffer   the buffer to read data into. Note that the buffer will 
be written into from offset 0.
+   * @return the number of bytes read
    */
   @Override
   public int getBlock(final AbfsInputStream stream,
       final long position,
       final int length,
-      final byte[] buffer) throws IOException {
-    // TODO: To be implemented
+      final byte[] buffer)
+      throws IOException {
+    // not synchronized, so have to be careful with locking
+    printTraceLog(
+        "getBlock request for file: {}, with eTag: {}, for position: {} "
+            + "for length: {} received from stream: {}",
+        stream.getPath(), stream.getETag(), position, length,
+        stream.hashCode());
+
+    String requestedETag = stream.getETag();
+    boolean isFirstRead = stream.isFirstRead();
+
+    // Wait for any in-progress read to complete.
+    waitForProcess(requestedETag, position, isFirstRead);
+
+    int bytesRead = 0;
+    synchronized (this) {
+      bytesRead = getBlockFromCompletedQueue(requestedETag, position, length,
+          buffer);
+    }
+    if (bytesRead > 0) {
+      printTraceLog(
+          "Done read from Cache for the file with eTag: {}, position: {}, 
length: {}, requested by stream: {}",
+          requestedETag, position, bytesRead, stream.hashCode());
+      return bytesRead;
+    }
+
+    // otherwise, just say we got nothing - calling thread can do its own read
     return 0;
   }
 
   /**
-   * {@inheritDoc}
+   * {@link ReadBufferWorker} thread calls this to get the next buffer that it 
should work on.
+   * @return {@link ReadBuffer}
+   * @throws InterruptedException if thread is interrupted
    */
   @Override
   public ReadBuffer getNextBlockToRead() throws InterruptedException {
-    // TODO: To be implemented
-    return null;
+    ReadBuffer buffer = null;
+    synchronized (this) {
+      // Blocking Call to wait for prefetch to be queued.
+      while (getReadAheadQueue().size() == 0) {
+        wait();
+      }
+
+      buffer = getReadAheadQueue().remove();
+      notifyAll();
+      if (buffer == null) {
+        return null;
+      }
+      buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
+      getInProgressList().add(buffer);
+    }
+    printTraceLog(
+        "ReadBufferWorker picked file: {}, with eTag: {}, for offset: {}, "
+            + "queued by stream: {}",
+        buffer.getPath(), buffer.getETag(), buffer.getOffset(),
+        buffer.getStream().hashCode());
+    return buffer;
   }
 
   /**
-   * {@inheritDoc}
+   * {@link ReadBufferWorker} thread calls this method to post completion.   *
+   * @param buffer            the buffer whose read was completed
+   * @param result            the {@link ReadBufferStatus} after the read 
operation in the worker thread
+   * @param bytesActuallyRead the number of bytes that the worker thread was 
actually able to read
    */
   @Override
   public void doneReading(final ReadBuffer buffer,
       final ReadBufferStatus result,
       final int bytesActuallyRead) {
-    // TODO: To be implemented
+    printTraceLog(
+        "ReadBufferWorker completed prefetch for file: {} with eTag: {}, for 
offset: {}, queued by stream: {}, with status: {} and bytes read: {}",
+        buffer.getPath(), buffer.getETag(), buffer.getOffset(),
+        buffer.getStream().hashCode(), result, bytesActuallyRead);
+    synchronized (this) {
+      // If this buffer has already been purged during
+      // close of InputStream then we don't update the lists.
+      if (getInProgressList().contains(buffer)) {
+        getInProgressList().remove(buffer);
+        if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
+          // Successful read, so update the buffer status and length
+          buffer.setStatus(ReadBufferStatus.AVAILABLE);
+          buffer.setLength(bytesActuallyRead);
+        } else {
+          // Failed read, reuse buffer for next read, this buffer will be
+          // evicted later based on eviction policy.
+          pushToFreeList(buffer.getBufferindex());
+        }
+        // completed list also contains FAILED read buffers
+        // for sending exception message to clients.
+        buffer.setStatus(result);
+        buffer.setTimeStamp(currentTimeMillis());
+        getCompletedReadList().add(buffer);
+      }
+    }
+
+    //outside the synchronized, since anyone receiving a wake-up from the 
latch must see safe-published results
+    buffer.getLatch().countDown(); // wake up waiting threads (if any)
   }
 
   /**
-   * {@inheritDoc}
+   * Purging the buffers associated with an {@link AbfsInputStream}
+   * from {@link ReadBufferManagerV2} when stream is closed.
+   * @param stream input stream.
    */
-  @Override
-  public void purgeBuffersForStream(final AbfsInputStream stream) {
-    // TODO: To be implemented
+  public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
+    printDebugLog("Purging stale buffers for AbfsInputStream {} ", stream);
+    getReadAheadQueue().removeIf(
+        readBuffer -> readBuffer.getStream() == stream);
+    purgeList(stream, getCompletedReadList());
   }
 
   /**
-   * {@inheritDoc}
+   * Check if any buffer is already queued for the requested offset.
+   * @param eTag the eTag of the file
+   * @param requestedOffset the requested offset
+   * @return whether any buffer is already queued
    */
-  @VisibleForTesting
-  @Override
-  public int getNumBuffers() {
-    return numberOfActiveBuffers;
+  private boolean isAlreadyQueued(final String eTag,
+      final long requestedOffset) {
+    // returns true if any part of the buffer is already queued
+    return (isInList(getReadAheadQueue(), eTag, requestedOffset)
+        || isInList(getInProgressList(), eTag, requestedOffset)
+        || isInList(getCompletedReadList(), eTag, requestedOffset));
+  }
+
+  /**
+   * Check if any buffer in the list contains the requested offset.
+   * @param list the list to check
+   * @param eTag the eTag of the file
+   * @param requestedOffset the requested offset
+   * @return whether any buffer in the list contains the requested offset
+   */
+  private boolean isInList(final Collection<ReadBuffer> list, final String 
eTag,
+      final long requestedOffset) {
+    return (getFromList(list, eTag, requestedOffset) != null);
   }
+
   /**
-   * {@inheritDoc}
+   * Get the buffer from the list that contains the requested offset.
+   * @param list the list to check
+   * @param eTag the eTag of the file
+   * @param requestedOffset the requested offset
+   * @return the buffer if found, null otherwise
    */
-  @VisibleForTesting
-  @Override
-  public void callTryEvict() {
-    // TODO: To be implemented
+  private ReadBuffer getFromList(final Collection<ReadBuffer> list,
+      final String eTag,
+      final long requestedOffset) {
+    for (ReadBuffer buffer : list) {
+      if (eTag.equals(buffer.getETag())) {
+        if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
+            && requestedOffset >= buffer.getOffset()
+            && requestedOffset < buffer.getOffset() + buffer.getLength()) {
+          return buffer;
+        } else if (requestedOffset >= buffer.getOffset()
+            && requestedOffset
+            < buffer.getOffset() + buffer.getRequestedLength()) {
+          return buffer;
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * If any buffer in the completed list can be reclaimed then reclaim it and 
return the buffer to free list.
+   * The objective is to find just one buffer - there is no advantage to 
evicting more than one.
+   * @return whether the eviction succeeded - i.e., were we able to free up 
one buffer
+   */
+  private synchronized boolean tryEvict() {
+    ReadBuffer nodeToEvict = null;
+    if (getCompletedReadList().size() <= 0) {
+      return false;  // there are no evict-able buffers
+    }
+
+    long currentTimeInMs = currentTimeMillis();
+
+    // first, try buffers where all bytes have been consumed (approximated as 
first and last bytes consumed)
+    for (ReadBuffer buf : getCompletedReadList()) {
+      if (buf.isFullyConsumed()) {
+        nodeToEvict = buf;
+        break;
+      }
+    }
+    if (nodeToEvict != null) {
+      return manualEviction(nodeToEvict);
+    }
+
+    // next, try buffers where any bytes have been consumed (maybe a bad idea? 
have to experiment and see)
+    for (ReadBuffer buf : getCompletedReadList()) {
+      if (buf.isAnyByteConsumed()) {
+        nodeToEvict = buf;
+        break;
+      }
+    }
+
+    if (nodeToEvict != null) {
+      return manualEviction(nodeToEvict);
+    }
+
+    // next, try any old nodes that have not been consumed
+    // Failed read buffers (with buffer index=-1) that are older than
+    // thresholdAge should be cleaned up, but at the same time should not
+    // report successful eviction.
+    // Queue logic expects that a buffer is freed up for read ahead when
+    // eviction is successful, whereas a failed ReadBuffer would have released
+    // its buffer when its status was set to READ_FAILED.
+    long earliestBirthday = Long.MAX_VALUE;
+    ArrayList<ReadBuffer> oldFailedBuffers = new ArrayList<>();
+    for (ReadBuffer buf : getCompletedReadList()) {
+      if ((buf.getBufferindex() != -1)
+          && (buf.getTimeStamp() < earliestBirthday)) {
+        nodeToEvict = buf;
+        earliestBirthday = buf.getTimeStamp();
+      } else if ((buf.getBufferindex() == -1)
+          && (currentTimeInMs - buf.getTimeStamp())
+          > getThresholdAgeMilliseconds()) {
+        oldFailedBuffers.add(buf);
+      }
+    }
+
+    for (ReadBuffer buf : oldFailedBuffers) {
+      manualEviction(buf);
+    }
+
+    if ((currentTimeInMs - earliestBirthday > getThresholdAgeMilliseconds())
+        && (nodeToEvict != null)) {
+      return manualEviction(nodeToEvict);
+    }
+
+    printTraceLog("No buffer eligible for eviction");
+    // nothing can be evicted
+    return false;
+  }
+
+  /**
+   * Evict the given buffer.
+   * @param buf the buffer to evict
+   * @return whether the eviction succeeded
+   */
+  private boolean evict(final ReadBuffer buf) {
+    if (buf.getRefCount() > 0) {
+      // If the buffer is still being read, then we cannot evict it.
+      printTraceLog(
+          "Cannot evict buffer with index: {}, file: {}, with eTag: {}, 
offset: {} as it is still being read by some input stream",
+          buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset());
+      return false;
+    }
+    // As failed ReadBuffers (bufferIndx = -1) are saved in 
getCompletedReadList(),
+    // avoid adding it to availableBufferList.
+    if (buf.getBufferindex() != -1) {
+      pushToFreeList(buf.getBufferindex());
+    }
+    getCompletedReadList().remove(buf);
+    buf.setTracingContext(null);
+    printTraceLog(
+        "Eviction of Buffer Completed for BufferIndex: {}, file: {}, with 
eTag: {}, offset: {}, is fully consumed: {}, is partially consumed: {}",
+        buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset(),
+        buf.isFullyConsumed(), buf.isAnyByteConsumed());
+    return true;
+  }
+
+  /**
+   * Wait for any in-progress read for the requested offset to complete.
+   * @param eTag the eTag of the file
+   * @param position the requested offset
+   * @param isFirstRead whether this is the first read of the stream
+   */
+  private void waitForProcess(final String eTag,
+      final long position,
+      boolean isFirstRead) {
+    ReadBuffer readBuf;
+    synchronized (this) {
+      readBuf = clearFromReadAheadQueue(eTag, position, isFirstRead);
+      if (readBuf == null) {
+        readBuf = getFromList(getInProgressList(), eTag, position);
+      }
+    }
+    if (readBuf != null) {         // if in in-progress queue, then block for 
it
+      try {
+        printTraceLog(
+            "A relevant read buffer for file: {}, with eTag: {}, offset: {}, "
+                + "queued by stream: {}, having buffer idx: {} is being 
prefetched, waiting for latch",
+            readBuf.getPath(), readBuf.getETag(), readBuf.getOffset(),
+            readBuf.getStream().hashCode(), readBuf.getBufferindex());
+        readBuf.getLatch()
+            .await();  // blocking wait on the caller stream's thread
+        // Note on correctness: readBuf gets out of getInProgressList() only 
in 1 place: after worker thread
+        // is done processing it (in doneReading). There, the latch is set 
after removing the buffer from
+        // getInProgressList(). So this latch is safe to be outside the 
synchronized block.
+        // Putting it in synchronized would result in a deadlock, since this 
thread would be holding the lock
+        // while waiting, so no one will be able to  change any state. If this 
becomes more complex in the future,
+        // then the latch can be removed and replaced with wait/notify 
whenever getInProgressList() is touched.
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+      }
+      printTraceLog("Latch done for file: {}, with eTag: {}, for offset: {}, "
+              + "buffer index: {} queued by stream: {}", readBuf.getPath(),
+          readBuf.getETag(),
+          readBuf.getOffset(), readBuf.getBufferindex(),
+          readBuf.getStream().hashCode());
+    }
+  }
+
+  /**
+   * Clear the buffer from read-ahead queue if it exists.
+   * @param eTag the eTag of the file
+   * @param requestedOffset the requested offset
+   * @param isFirstRead whether this is the first read of the stream
+   * @return the buffer if found, null otherwise
+   */
+  private ReadBuffer clearFromReadAheadQueue(final String eTag,
+      final long requestedOffset,
+      boolean isFirstRead) {
+    ReadBuffer buffer = getFromList(getReadAheadQueue(), eTag, 
requestedOffset);
+    /*
+     * If this prefetch was triggered by first read of this input stream,
+     * we should not remove it from queue and let it complete by backend 
threads.
+     */
+    if (buffer != null && isFirstRead) {
+      return buffer;
+    }
+    if (buffer != null) {
+      getReadAheadQueue().remove(buffer);
+      notifyAll();   // lock is held in calling method
+      pushToFreeList(buffer.getBufferindex());
+    }
+    return null;
+  }
+
+  /**
+   * Get the block from completed queue if it exists.
+   * @param eTag the eTag of the file
+   * @param position the requested offset
+   * @param length the length to read
+   * @param buffer the buffer to read data into
+   * @return the number of bytes read
+   * @throws IOException if an I/O error occurs
+   */
+  private int getBlockFromCompletedQueue(final String eTag, final long 
position,
+      final int length, final byte[] buffer) throws IOException {
+    ReadBuffer buf = getBufferFromCompletedQueue(eTag, position);
+
+    if (buf == null) {
+      return 0;
+    }
+
+    buf.startReading(); // atomic increment of refCount.
+
+    if (buf.getStatus() == ReadBufferStatus.READ_FAILED) {
+      // To prevent new read requests to fail due to old read-ahead attempts,
+      // return exception only from buffers that failed within last 
getThresholdAgeMilliseconds()
+      if ((currentTimeMillis() - (buf.getTimeStamp())
+          < getThresholdAgeMilliseconds())) {
+        throw buf.getErrException();
+      } else {
+        return 0;
+      }
+    }
+
+    if ((buf.getStatus() != ReadBufferStatus.AVAILABLE)
+        || (position >= buf.getOffset() + buf.getLength())) {
+      return 0;
+    }
+
+    int cursor = (int) (position - buf.getOffset());
+    int availableLengthInBuffer = buf.getLength() - cursor;
+    int lengthToCopy = Math.min(length, availableLengthInBuffer);
+    System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
+    if (cursor == 0) {
+      buf.setFirstByteConsumed(true);
+    }
+    if (cursor + lengthToCopy == buf.getLength()) {
+      buf.setLastByteConsumed(true);
+    }
+    buf.setAnyByteConsumed(true);
+
+    buf.endReading(); // atomic decrement of refCount
+    return lengthToCopy;
+  }
+
+  /**
+   * Get the buffer from completed queue that contains the requested offset.
+   * @param eTag the eTag of the file
+   * @param requestedOffset the requested offset
+   * @return the buffer if found, null otherwise
+   */
+  private ReadBuffer getBufferFromCompletedQueue(final String eTag,
+      final long requestedOffset) {
+    for (ReadBuffer buffer : getCompletedReadList()) {
+      // Buffer is returned if the requestedOffset is at or above buffer's
+      // offset but less than buffer's length or the actual requestedLength
+      if (eTag.equals(buffer.getETag())
+          && (requestedOffset >= buffer.getOffset())
+          && ((requestedOffset < buffer.getOffset() + buffer.getLength())
+          || (requestedOffset
+          < buffer.getOffset() + buffer.getRequestedLength()))) {
+        return buffer;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Try to upscale memory by adding more buffers to the pool if memory usage 
is below threshold.
+   * @return whether the upscale succeeded
+   */
+  private synchronized boolean tryMemoryUpscale() {
+    if (!isDynamicScalingEnabled) {
+      printTraceLog("Dynamic scaling is disabled, skipping memory upscale");
+      return false; // Dynamic scaling is disabled, so no upscaling.
+    }
+    double memoryLoad = getMemoryLoad();
+    if (memoryLoad < memoryThreshold && getNumBuffers() < maxBufferPoolSize) {
+      // Create and Add more buffers in getFreeList().
+      int nextIndx = getNumBuffers();
+      if (removedBufferList.isEmpty() && nextIndx < bufferPool.length) {
+        bufferPool[nextIndx] = new byte[getReadAheadBlockSize()];
+        pushToFreeList(nextIndx);
+      } else {
+        // Reuse a removed buffer index.
+        int freeIndex = removedBufferList.pop();
+        if (freeIndex >= bufferPool.length || bufferPool[freeIndex] != null) {
+          printTraceLog("Invalid free index: {}. Current buffer pool size: {}",
+              freeIndex, bufferPool.length);
+          return false;
+        }
+        bufferPool[freeIndex] = new byte[getReadAheadBlockSize()];
+        pushToFreeList(freeIndex);
+      }
+      incrementActiveBufferCount();
+      printTraceLog(
+          "Current Memory Load: {}. Incrementing buffer pool size to {}",
+          memoryLoad, getNumBuffers());
+      return true;
+    }
+    printTraceLog("Could not Upscale memory. Total buffers: {} Memory Load: 
{}",
+        getNumBuffers(), memoryLoad);
+    return false;
+  }
+
+  /**
+   * Scheduled Eviction task that runs periodically to evict old buffers.
+   */
+  private void scheduledEviction() {
+    for (ReadBuffer buf : getCompletedReadList()) {
+      if (currentTimeMillis() - buf.getTimeStamp()
+          > getThresholdAgeMilliseconds()) {
+        // If the buffer is older than thresholdAge, evict it.
+        printTraceLog(
+            "Scheduled Eviction of Buffer Triggered for BufferIndex: {}, "
+                + "file: {}, with eTag: {}, offset: {}, length: {}, queued by 
stream: {}",
+            buf.getBufferindex(), buf.getPath(), buf.getETag(), 
buf.getOffset(),
+            buf.getLength(), buf.getStream().hashCode());
+        evict(buf);
+      }
+    }
+
+    double memoryLoad = getMemoryLoad();
+    if (isDynamicScalingEnabled && memoryLoad > memoryThreshold) {
+      synchronized (this) {
+        if (isFreeListEmpty()) {
+          printTraceLog(
+              "No free buffers available. Skipping downscale of buffer pool");
+          return; // No free buffers available, so cannot downscale.
+        }
+        int freeIndex = popFromFreeList();
+        if (freeIndex > bufferPool.length || bufferPool[freeIndex] == null) {
+          printTraceLog("Invalid free index: {}. Current buffer pool size: {}",
+              freeIndex, bufferPool.length);
+          return;
+        }
+        bufferPool[freeIndex] = null;
+        removedBufferList.add(freeIndex);
+        decrementActiveBufferCount();
+        printTraceLog(
+            "Current Memory Load: {}. Decrementing buffer pool size to {}",
+            memoryLoad, getNumBuffers());
+      }
+    }
+  }
+
+  /**
+   * Manual Eviction of a buffer.
+   * @param buf the buffer to evict
+   * @return whether the eviction succeeded
+   */
+  private boolean manualEviction(final ReadBuffer buf) {
+    printTraceLog(
+        "Manual Eviction of Buffer Triggered for BufferIndex: {}, file: {}, 
with eTag: {}, offset: {}, queued by stream: {}",
+        buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset(),
+        buf.getStream().hashCode());
+    return evict(buf);
+  }
+
+  /**
+   * Adjust the thread pool size based on CPU load and queue size.
+   */
+  private void adjustThreadPool() {
+    int currentPoolSize = workerRefs.size();
+    double cpuLoad = getCpuLoad();
+    int requiredPoolSize = getRequiredThreadPoolSize();
+    int newThreadPoolSize;
+    printTraceLog(
+        "Current CPU load: {}, Current worker pool size: {}, Current queue 
size: {}",
+        cpuLoad, currentPoolSize, requiredPoolSize);
+    if (currentPoolSize < requiredPoolSize && cpuLoad < cpuThreshold) {
+      // Submit more background tasks.
+      newThreadPoolSize = Math.min(maxThreadPoolSize,
+          (int) Math.ceil(
+              (currentPoolSize * (HUNDRED_D + threadPoolUpscalePercentage))
+                  / HUNDRED_D));
+      // Create new Worker Threads
+      for (int i = currentPoolSize; i < newThreadPoolSize; i++) {
+        ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
+        workerRefs.add(worker);
+        workerPool.submit(worker);
+      }
+      printTraceLog("Increased worker pool size from {} to {}", 
currentPoolSize,
+          newThreadPoolSize);
+    } else if (cpuLoad > cpuThreshold || currentPoolSize > requiredPoolSize) {
+      newThreadPoolSize = Math.max(minThreadPoolSize,
+          (int) Math.ceil(
+              (currentPoolSize * (HUNDRED_D - threadPoolDownscalePercentage))
+                  / HUNDRED_D));
+      // Signal the extra workers to stop
+      while (workerRefs.size() > newThreadPoolSize) {
+        ReadBufferWorker worker = workerRefs.remove(workerRefs.size() - 1);
+        worker.stop();
+      }
+      printTraceLog("Decreased worker pool size from {} to {}", 
currentPoolSize,
+          newThreadPoolSize);
+    } else {
+      printTraceLog("No change in worker pool size. CPU load: {} Pool size: 
{}",
+          cpuLoad, currentPoolSize);
+    }
+  }
+
+  /**
+   * Similar to System.currentTimeMillis, except implemented with 
System.nanoTime().
+   * System.currentTimeMillis can go backwards when system clock is changed 
(e.g., with NTP time synchronization),
+   * making it unsuitable for measuring time intervals. nanotime is strictly 
monotonically increasing per CPU core.
+   * Note: it is not monotonic across Sockets, and even within a CPU, its only 
the
+   * more recent parts which share a clock across all cores.
+   *
+   * @return current time in milliseconds
+   */
+  private long currentTimeMillis() {
+    return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
+  }
+
+  /**
+   * Purge all buffers associated with the given stream from the given list.
+   * @param stream the stream whose buffers are to be purged
+   * @param list the list to purge from
+   */
+  private void purgeList(AbfsInputStream stream, LinkedList<ReadBuffer> list) {
+    for (Iterator<ReadBuffer> it = list.iterator(); it.hasNext();) {
+      ReadBuffer readBuffer = it.next();
+      if (readBuffer.getStream() == stream) {
+        it.remove();
+        // As failed ReadBuffers (bufferIndex = -1) are already pushed to free
+        // list in doneReading method, we will skip adding those here again.
+        if (readBuffer.getBufferindex() != -1) {
+          pushToFreeList(readBuffer.getBufferindex());
+        }
+      }
+    }
   }
 
   /**
-   * {@inheritDoc}
+   * Test method that can clean up the current state of readAhead buffers and
+   * the lists. Will also trigger a fresh init.
    */
   @VisibleForTesting
   @Override
   public void testResetReadBufferManager() {
-    // TODO: To be implemented
+    synchronized (this) {
+      ArrayList<ReadBuffer> completedBuffers = new ArrayList<>();
+      for (ReadBuffer buf : getCompletedReadList()) {
+        if (buf != null) {
+          completedBuffers.add(buf);
+        }
+      }
+
+      for (ReadBuffer buf : completedBuffers) {
+        manualEviction(buf);
+      }
+
+      getReadAheadQueue().clear();
+      getInProgressList().clear();
+      getCompletedReadList().clear();
+      getFreeList().clear();
+      for (int i = 0; i < maxBufferPoolSize; i++) {
+        bufferPool[i] = null;
+      }
+      bufferPool = null;
+      if (cpuMonitorThread != null) {
+        cpuMonitorThread.shutdownNow();
+      }
+      if (memoryMonitorThread != null) {
+        memoryMonitorThread.shutdownNow();
+      }
+      if (workerPool != null) {
+        workerPool.shutdownNow();
+      }
+      resetBufferManager();
+    }
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @VisibleForTesting
   @Override
-  public void testResetReadBufferManager(final int readAheadBlockSize,
-      final int thresholdAgeMilliseconds) {
-    // TODO: To be implemented
+  public void testResetReadBufferManager(int readAheadBlockSize,
+      int thresholdAgeMilliseconds) {
+    setReadAheadBlockSize(readAheadBlockSize);
+    setThresholdAgeMilliseconds(thresholdAgeMilliseconds);
+    testResetReadBufferManager();
+  }
+
+  @VisibleForTesting
+  public void callTryEvict() {
+    tryEvict();
+  }
+
+  @VisibleForTesting
+  public int getNumBuffers() {
+    return numberOfActiveBuffers.get();
   }
 
-  /**
-   * {@inheritDoc}
-   */
   @Override
-  public void testMimicFullUseAndAddFailedBuffer(final ReadBuffer buf) {
-    // TODO: To be implemented
+  void resetBufferManager() {
+    setBufferManager(null); // reset the singleton instance
+    setIsConfigured(false);
   }
 
-  private final ThreadFactory namedThreadFactory = new ThreadFactory() {
+  private static void setBufferManager(ReadBufferManagerV2 manager) {
+    bufferManager = manager;
+  }
+
+  private static void setIsConfigured(boolean configured) {
+    isConfigured.set(configured);
+  }
+
+  private final ThreadFactory workerThreadFactory = new ThreadFactory() {
     private int count = 0;
+
     @Override
     public Thread newThread(Runnable r) {
-      return new Thread(r, "ReadAheadV2-Thread-" + count++);
+      Thread t = new Thread(r, "ReadAheadV2-WorkerThread-" + count++);
+      t.setDaemon(true);
+      return t;
     }
   };
 
-  @Override
-  void resetBufferManager() {
-    setBufferManager(null); // reset the singleton instance
+  private void printTraceLog(String message, Object... args) {
+    if (LOGGER.isTraceEnabled()) {
+      LOGGER.trace(message, args);
+    }
   }
 
-  private static void setBufferManager(ReadBufferManagerV2 manager) {
-    bufferManager = manager;
+  private void printDebugLog(String message, Object... args) {
+    LOGGER.debug(message, args);
+  }
+
+  /**
+   * Get the current memory load of the JVM.
+   * @return the memory load as a double value between 0.0 and 1.0
+   */
+  @VisibleForTesting
+  double getMemoryLoad() {
+    MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
+    MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
+    return (double) memoryUsage.getUsed() / memoryUsage.getMax();
+  }
+
+  /**
+   * Get the current CPU load of the system.
+   * @return the CPU load as a double value between 0.0 and 1.0
+   */
+  @VisibleForTesting
+  public double getCpuLoad() {
+    OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(
+        OperatingSystemMXBean.class);
+    double cpuLoad = osBean.getSystemCpuLoad();
+    if (cpuLoad < 0) {
+      // If the CPU load is not available, return 0.0
+      return 0.0;
+    }
+    return cpuLoad;
+  }
+
+  @VisibleForTesting
+  synchronized static ReadBufferManagerV2 getInstance() {
+    return bufferManager;
+  }
+
+  @VisibleForTesting
+  public int getMinBufferPoolSize() {
+    return minBufferPoolSize;
+  }
+
+  @VisibleForTesting
+  public int getMaxBufferPoolSize() {
+    return maxBufferPoolSize;
+  }
+
+  @VisibleForTesting
+  public int getCurrentThreadPoolSize() {
+    return workerRefs.size();
+  }
+
+  @VisibleForTesting
+  public int getCpuMonitoringIntervalInMilliSec() {
+    return cpuMonitoringIntervalInMilliSec;
+  }
+
+  @VisibleForTesting
+  public int getMemoryMonitoringIntervalInMilliSec() {
+    return memoryMonitoringIntervalInMilliSec;
+  }
+
+  @VisibleForTesting
+  public ScheduledExecutorService getCpuMonitoringThread() {
+    return cpuMonitorThread;
+  }
+
+  public int getRequiredThreadPoolSize() {
+    return (int) Math.ceil(THREAD_POOL_REQUIREMENT_BUFFER
+        * (getReadAheadQueue().size()
+        + getInProgressList().size())); // 20% more for buffer
+  }
+
+  private boolean isFreeListEmpty() {
+    LOCK.lock();
+    try {
+      return getFreeList().isEmpty();
+    } finally {
+      LOCK.unlock();
+    }
+  }
+
+  private Integer popFromFreeList() {
+    LOCK.lock();
+    try {
+      return getFreeList().pop();
+    } finally {
+      LOCK.unlock();
+    }
+  }
+
+  private void pushToFreeList(int idx) {
+    LOCK.lock();
+    try {
+      getFreeList().push(idx);
+    } finally {
+      LOCK.unlock();
+    }
+  }
+
+  private void incrementActiveBufferCount() {
+    numberOfActiveBuffers.getAndIncrement();
+  }
+
+  private void decrementActiveBufferCount() {
+    numberOfActiveBuffers.getAndDecrement();
   }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
index 79d5eef955a..2c6efdc735a 100644
--- 
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
+++ 
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
@@ -20,7 +20,9 @@
 
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
 
@@ -29,6 +31,7 @@ class ReadBufferWorker implements Runnable {
   protected static final CountDownLatch UNLEASH_WORKERS = new 
CountDownLatch(1);
   private int id;
   private ReadBufferManager bufferManager;
+  private AtomicBoolean isRunning = new AtomicBoolean(true);
 
   ReadBufferWorker(final int id, final ReadBufferManager bufferManager) {
     this.id = id;
@@ -54,7 +57,7 @@ public void run() {
       Thread.currentThread().interrupt();
     }
     ReadBuffer buffer;
-    while (true) {
+    while (isRunning()) {
       try {
         buffer = bufferManager.getNextBlockToRead();   // blocks, until a 
buffer is available for this thread
       } catch (InterruptedException ex) {
@@ -72,7 +75,7 @@ public void run() {
               // read-ahead buffer size, make sure a valid length is passed
               // for remote read
               Math.min(buffer.getRequestedLength(), buffer.getBuffer().length),
-                  buffer.getTracingContext());
+              buffer.getTracingContext());
 
           bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, 
bytesRead);  // post result back to ReadBufferManager
         } catch (IOException ex) {
@@ -85,4 +88,13 @@ public void run() {
       }
     }
   }
+
+  public void stop() {
+    isRunning.set(false);
+  }
+
+  @VisibleForTesting
+  public boolean isRunning() {
+    return isRunning.get();
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
index c35b76e1a73..798b1943e07 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
@@ -38,6 +38,7 @@
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
@@ -711,12 +712,38 @@ protected void assertPathDns(Path path) {
         .contains(expectedDns);
   }
 
+  /**
+   * Return array of random bytes of the given length.
+   *
+   * @param length length of the byte array
+   * @return byte array
+   */
   protected byte[] getRandomBytesArray(int length) {
     final byte[] b = new byte[length];
     new Random().nextBytes(b);
     return b;
   }
 
+  /**
+   * Create a file on the file system with the given file name and content.
+   *
+   * @param fs fileSystem that stores the file
+   * @param fileName name of the file
+   * @param fileContent content of the file
+   *
+   * @return path of the file created
+   * @throws IOException exception in writing file on fileSystem
+   */
+  protected Path createFileWithContent(FileSystem fs, String fileName,
+      byte[] fileContent) throws IOException {
+    Path testFilePath = path(fileName);
+    try (FSDataOutputStream oStream = fs.create(testFilePath)) {
+      oStream.write(fileContent);
+      oStream.flush();
+    }
+    return testFilePath;
+  }
+
   /**
    * Checks a list of futures for exceptions.
    *
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
index 1e5ba3689f5..33b7acf4658 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
@@ -233,7 +233,7 @@ private void testFlush(boolean disableOutputStreamFlush) 
throws Exception {
         .setDisableOutputStreamFlush(disableOutputStreamFlush);
 
     final Path testFilePath = path(methodName.getMethodName());
-    byte[] buffer = getRandomBytesArray();
+    byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH);
     // The test case must write "fs.azure.write.request.size" bytes
     // to the stream in order for the data to be uploaded to storage.
     assertTrue(fs.getAbfsStore().getAbfsConfiguration().getWriteBufferSize()
@@ -265,7 +265,7 @@ private void testFlush(boolean disableOutputStreamFlush) 
throws Exception {
   @Test
   public void testHflushWithFlushEnabled() throws Exception {
     final AzureBlobFileSystem fs = this.getFileSystem();
-    byte[] buffer = getRandomBytesArray();
+    byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH);
     String fileName = UUID.randomUUID().toString();
     final Path testFilePath = path(fileName);
 
@@ -278,7 +278,7 @@ public void testHflushWithFlushEnabled() throws Exception {
   @Test
   public void testHflushWithFlushDisabled() throws Exception {
     final AzureBlobFileSystem fs = this.getFileSystem();
-    byte[] buffer = getRandomBytesArray();
+    byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH);
     final Path testFilePath = path(methodName.getMethodName());
     boolean isAppendBlob = false;
     if 
(fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) {
@@ -295,7 +295,7 @@ public void testHflushWithFlushDisabled() throws Exception {
   @Test
   public void testHsyncWithFlushEnabled() throws Exception {
     final AzureBlobFileSystem fs = this.getFileSystem();
-    byte[] buffer = getRandomBytesArray();
+    byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH);
 
     final Path testFilePath = path(methodName.getMethodName());
 
@@ -332,7 +332,7 @@ public void testTracingHeaderForAppendBlob() throws 
Exception {
   @Test
   public void testStreamCapabilitiesWithFlushDisabled() throws Exception {
     final AzureBlobFileSystem fs = this.getFileSystem();
-    byte[] buffer = getRandomBytesArray();
+    byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH);
 
     final Path testFilePath = path(methodName.getMethodName());
 
@@ -349,7 +349,7 @@ public void testStreamCapabilitiesWithFlushDisabled() 
throws Exception {
   @Test
   public void testStreamCapabilitiesWithFlushEnabled() throws Exception {
     final AzureBlobFileSystem fs = this.getFileSystem();
-    byte[] buffer = getRandomBytesArray();
+    byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH);
     final Path testFilePath = path(methodName.getMethodName());
     try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath, 
buffer, true)) {
       assertHasStreamCapabilities(stream,
@@ -365,7 +365,7 @@ public void testStreamCapabilitiesWithFlushEnabled() throws 
Exception {
   @Test
   public void testHsyncWithFlushDisabled() throws Exception {
     final AzureBlobFileSystem fs = this.getFileSystem();
-    byte[] buffer = getRandomBytesArray();
+    byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH);
     final Path testFilePath = path(methodName.getMethodName());
     boolean isAppendBlob = false;
     if 
(fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) {
@@ -378,12 +378,6 @@ public void testHsyncWithFlushDisabled() throws Exception {
     }
   }
 
-  private byte[] getRandomBytesArray() {
-    final byte[] b = new byte[TEST_FILE_LENGTH];
-    new Random().nextBytes(b);
-    return b;
-  }
-
   private FSDataOutputStream getStreamAfterWrite(AzureBlobFileSystem fs, Path 
path, byte[] buffer, boolean enableFlush) throws IOException {
     fs.getAbfsStore().getAbfsConfiguration().setEnableFlush(enableFlush);
     FSDataOutputStream stream = fs.create(path);
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java
index 388e662115e..ec249e2b040 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java
@@ -19,23 +19,17 @@
 package org.apache.hadoop.fs.azurebfs.services;
 
 import java.io.IOException;
-import java.util.Random;
-import java.util.UUID;
 
 import org.assertj.core.api.Assertions;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
-import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
 
-import static 
org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest.SHORTENED_GUID_LEN;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_OPTIMIZE_FOOTER_READ;
 import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_SMALL_FILES_COMPLETELY;
 
@@ -49,29 +43,6 @@ public AbfsInputStreamTestUtils(AbstractAbfsIntegrationTest 
abstractAbfsIntegrat
     this.abstractAbfsIntegrationTest = abstractAbfsIntegrationTest;
   }
 
-  private Path path(String filepath) throws IOException {
-    return abstractAbfsIntegrationTest.getFileSystem().makeQualified(
-        new Path(getTestPath(), getUniquePath(filepath)));
-  }
-
-  private Path getTestPath() {
-    Path path = new Path(UriUtils.generateUniqueTestPath());
-    return path;
-  }
-
-  /**
-   * Generate a unique path using the given filepath.
-   * @param filepath path string
-   * @return unique path created from filepath and a GUID
-   */
-  private Path getUniquePath(String filepath) {
-    if (filepath.equals("/")) {
-      return new Path(filepath);
-    }
-    return new Path(filepath + StringUtils
-        .right(UUID.randomUUID().toString(), SHORTENED_GUID_LEN));
-  }
-
   /**
    * Returns AzureBlobFileSystem instance with the required
    * readFullFileOptimization configuration.
@@ -90,38 +61,6 @@ public AzureBlobFileSystem getFileSystem(boolean 
readSmallFilesCompletely)
     return (AzureBlobFileSystem) FileSystem.newInstance(configuration);
   }
 
-  /**
-   * Return array of random bytes of the given length.
-   *
-   * @param length length of the byte array
-   * @return byte array
-   */
-  public byte[] getRandomBytesArray(int length) {
-    final byte[] b = new byte[length];
-    new Random().nextBytes(b);
-    return b;
-  }
-
-  /**
-   * Create a file on the file system with the given file name and content.
-   *
-   * @param fs fileSystem that stores the file
-   * @param fileName name of the file
-   * @param fileContent content of the file
-   *
-   * @return path of the file created
-   * @throws IOException exception in writing file on fileSystem
-   */
-  public Path createFileWithContent(FileSystem fs, String fileName,
-      byte[] fileContent) throws IOException {
-    Path testFilePath = path(fileName);
-    try (FSDataOutputStream oStream = fs.create(testFilePath)) {
-      oStream.write(fileContent);
-      oStream.flush();
-    }
-    return testFilePath;
-  }
-
   /**
    * Assert that the content read from the subsection of a file is correct.
    *
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
index 6bcf31f9e69..938f5f4300c 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
@@ -52,8 +52,8 @@ public void testWithNoOptimization() throws Exception {
       int fileSize = i * ONE_MB;
       final AzureBlobFileSystem fs = getFileSystem(false, false, fileSize);
       String fileName = methodName.getMethodName() + i;
-      byte[] fileContent = 
abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
-      Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs, 
fileName, fileContent);
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
       testWithNoOptimization(fs, testFilePath, HUNDRED, fileContent);
     }
   }
@@ -97,8 +97,8 @@ public void testExceptionInOptimization() throws Exception {
       int fileSize = i * ONE_MB;
       final AzureBlobFileSystem fs = getFileSystem(true, true, fileSize);
       String fileName = methodName.getMethodName() + i;
-      byte[] fileContent = 
abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
-      Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs, 
fileName, fileContent);
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
       testExceptionInOptimization(fs, testFilePath, fileSize - HUNDRED,
           fileSize / 4, fileContent);
     }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java
index fbafc12490c..9cd3433a88e 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java
@@ -258,8 +258,8 @@ private void validateSeekAndReadWithConf(boolean 
optimizeFooterRead,
         try (AzureBlobFileSystem spiedFs = createSpiedFs(
             getRawConfiguration())) {
           String fileName = methodName.getMethodName() + fileId;
-          byte[] fileContent = 
abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
-          Path testFilePath = 
abfsInputStreamTestUtils.createFileWithContent(spiedFs, fileName,
+          byte[] fileContent = getRandomBytesArray(fileSize);
+          Path testFilePath = createFileWithContent(spiedFs, fileName,
               fileContent);
           for (int readBufferSize : READ_BUFFER_SIZE) {
             validateSeekAndReadWithConf(spiedFs, optimizeFooterRead, seekTo,
@@ -389,8 +389,8 @@ public void testPartialReadWithNoData() throws Exception {
       futureList.add(executorService.submit(() -> {
         try (AzureBlobFileSystem spiedFs = createSpiedFs(
             getRawConfiguration())) {
-          byte[] fileContent = 
abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
-          Path testFilePath = 
abfsInputStreamTestUtils.createFileWithContent(spiedFs, fileName,
+          byte[] fileContent = getRandomBytesArray(fileSize);
+          Path testFilePath = createFileWithContent(spiedFs, fileName,
               fileContent);
           validatePartialReadWithNoData(spiedFs, fileSize, fileContent,
               testFilePath);
@@ -461,8 +461,8 @@ public void testPartialReadWithSomeData() throws Exception {
         try (AzureBlobFileSystem spiedFs = createSpiedFs(
             getRawConfiguration())) {
           String fileName = methodName.getMethodName() + fileId;
-          byte[] fileContent = 
abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
-          Path testFilePath = 
abfsInputStreamTestUtils.createFileWithContent(spiedFs, fileName,
+          byte[] fileContent = getRandomBytesArray(fileSize);
+          Path testFilePath = createFileWithContent(spiedFs, fileName,
               fileContent);
           validatePartialReadWithSomeData(spiedFs, fileSize, testFilePath,
               fileContent);
@@ -583,8 +583,8 @@ private void verifyConfigValueInStream(final 
FSDataInputStream inputStream,
   private Path createPathAndFileWithContent(final AzureBlobFileSystem fs,
       final int fileIdx, final int fileSize) throws Exception {
     String fileName = methodName.getMethodName() + fileIdx;
-    byte[] fileContent = 
abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
-    return abfsInputStreamTestUtils.createFileWithContent(fs, fileName, 
fileContent);
+    byte[] fileContent = getRandomBytesArray(fileSize);
+    return createFileWithContent(fs, fileName, fileContent);
   }
 
   private FutureDataInputStreamBuilder getParameterizedBuilder(final Path path,
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java
index 5e3879a525c..f92d64359a2 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java
@@ -74,8 +74,8 @@ private void validateNumBackendCalls(final boolean 
readSmallFilesCompletely,
     for (int i = 1; i <= 4; i++) {
       String fileName = methodName.getMethodName() + i;
       int fileSize = i * ONE_MB;
-      byte[] fileContent = 
abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
-      Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs, 
fileName, fileContent);
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
       int length = ONE_KB;
       try (FSDataInputStream iStream = fs.open(testFilePath)) {
         byte[] buffer = new byte[length];
@@ -185,8 +185,8 @@ private void validateSeekAndReadWithConf(final SeekTo 
seekTo,
     for (int i = startFileSizeInMB; i <= endFileSizeInMB; i++) {
       String fileName = methodName.getMethodName() + i;
       int fileSize = i * ONE_MB;
-      byte[] fileContent = 
abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
-      Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs, 
fileName, fileContent);
+      byte[] fileContent = getRandomBytesArray(fileSize);
+      Path testFilePath = createFileWithContent(fs, fileName, fileContent);
       int length = ONE_KB;
       int seekPos = seekPos(seekTo, fileSize, length);
       seekReadAndTest(fs, testFilePath, seekPos, length, fileContent);
@@ -255,9 +255,9 @@ public void testPartialReadWithNoData() throws Exception {
       try (AzureBlobFileSystem fs = abfsInputStreamTestUtils.getFileSystem(
           true)) {
         String fileName = methodName.getMethodName() + i;
-        byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(
+        byte[] fileContent = getRandomBytesArray(
             fileSize);
-        Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs,
+        Path testFilePath = createFileWithContent(fs,
             fileName, fileContent);
         partialReadWithNoData(fs, testFilePath, fileSize / 2, fileSize / 4,
             fileContent);
@@ -304,9 +304,9 @@ public void testPartialReadWithSomeData() throws Exception {
       try (AzureBlobFileSystem fs = abfsInputStreamTestUtils.getFileSystem(
           true)) {
         String fileName = methodName.getMethodName() + i;
-        byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(
+        byte[] fileContent = getRandomBytesArray(
             fileSize);
-        Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs,
+        Path testFilePath = createFileWithContent(fs,
             fileName, fileContent);
         partialReadWithSomeData(fs, testFilePath, fileSize / 2,
             fileSize / 4, fileContent);
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
index 1ead30e9fa2..84b0fbd5196 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
@@ -18,10 +18,8 @@
 
 package org.apache.hadoop.fs.azurebfs.services;
 
-import java.io.IOException;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -29,7 +27,6 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
@@ -62,127 +59,122 @@ public class ITestReadBufferManager extends 
AbstractAbfsIntegrationTest {
    */
   public static final int PROBE_INTERVAL_MILLIS = 1_000;
 
-    public ITestReadBufferManager() throws Exception {
+  public ITestReadBufferManager() throws Exception {
+  }
+
+  @Test
+  public void testPurgeBufferManagerForParallelStreams() throws Exception {
+    describe("Testing purging of buffers from ReadBufferManagerV1 for "
+        + "parallel input streams");
+    final int numBuffers = 16;
+    final LinkedList<Integer> freeList = new LinkedList<>();
+    for (int i=0; i < numBuffers; i++) {
+      freeList.add(i);
     }
-
-    @Test
-    public void testPurgeBufferManagerForParallelStreams() throws Exception {
-        describe("Testing purging of buffers from ReadBufferManagerV1 for "
-                + "parallel input streams");
-        final int numBuffers = 16;
-        final LinkedList<Integer> freeList = new LinkedList<>();
-        for (int i=0; i < numBuffers; i++) {
-            freeList.add(i);
-        }
-        ExecutorService executorService = Executors.newFixedThreadPool(4);
-        AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
-        // verify that the fs has the capability to validate the fix
-        Assertions.assertThat(fs.hasPathCapability(new Path("/"), 
CAPABILITY_SAFE_READAHEAD))
-            .describedAs("path capability %s in %s", 
CAPABILITY_SAFE_READAHEAD, fs)
-            .isTrue();
-
-        try {
-            for (int i = 0; i < 4; i++) {
-                final String fileName = methodName.getMethodName() + i;
-                executorService.submit((Callable<Void>) () -> {
-                    byte[] fileContent = getRandomBytesArray(ONE_MB);
-                    Path testFilePath = createFileWithContent(fs, fileName, 
fileContent);
-                    try (FSDataInputStream iStream = fs.open(testFilePath)) {
-                        iStream.read();
-                    }
-                    return null;
-                });
-            }
-        } finally {
-            executorService.shutdown();
-            // wait for all tasks to finish
-            executorService.awaitTermination(1, TimeUnit.MINUTES);
-        }
-
-        ReadBufferManagerV1 bufferManager = 
ReadBufferManagerV1.getBufferManager();
-        // readahead queue is empty
-        assertListEmpty("ReadAheadQueue", 
bufferManager.getReadAheadQueueCopy());
-        // verify the in progress list eventually empties out.
-        eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET, 
PROBE_INTERVAL_MILLIS, () ->
-            assertListEmpty("InProgressList", 
bufferManager.getInProgressCopiedList()));
+    ExecutorService executorService = Executors.newFixedThreadPool(4);
+    AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
+    // verify that the fs has the capability to validate the fix
+    Assertions.assertThat(fs.hasPathCapability(new Path("/"), 
CAPABILITY_SAFE_READAHEAD))
+        .describedAs("path capability %s in %s", CAPABILITY_SAFE_READAHEAD, fs)
+        .isTrue();
+
+    try {
+      for (int i = 0; i < 4; i++) {
+        final String fileName = methodName.getMethodName() + i;
+        executorService.submit((Callable<Void>) () -> {
+          byte[] fileContent = getRandomBytesArray(ONE_MB);
+          Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+          try (FSDataInputStream iStream = fs.open(testFilePath)) {
+            iStream.read();
+          }
+          return null;
+        });
+      }
+    } finally {
+      executorService.shutdown();
+      // wait for all tasks to finish
+      executorService.awaitTermination(1, TimeUnit.MINUTES);
     }
 
-    private void assertListEmpty(String listName, List<ReadBuffer> list) {
-        Assertions.assertThat(list)
-                .describedAs("After closing all streams %s should be empty", 
listName)
-                .hasSize(0);
+    ReadBufferManager bufferManager = getBufferManager(fs);
+    // readahead queue is empty
+    assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
+    // verify the in progress list eventually empties out.
+    eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET, PROBE_INTERVAL_MILLIS, 
() ->
+        assertListEmpty("InProgressList", 
bufferManager.getInProgressListCopy()));
+  }
+
+  private void assertListEmpty(String listName, List<ReadBuffer> list) {
+    Assertions.assertThat(list)
+        .describedAs("After closing all streams %s should be empty", listName)
+        .hasSize(0);
+  }
+
+  @Test
+  public void testPurgeBufferManagerForSequentialStream() throws Exception {
+    describe("Testing purging of buffers in ReadBufferManagerV1 for "
+        + "sequential input streams");
+    AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
+    final String fileName = methodName.getMethodName();
+    byte[] fileContent = getRandomBytesArray(ONE_MB);
+    Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+
+    AbfsInputStream iStream1 =  null;
+    // stream1 will be closed right away.
+    try {
+      iStream1 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
+      // Just reading one byte will trigger all read ahead calls.
+      iStream1.read();
+    } finally {
+      IOUtils.closeStream(iStream1);
     }
-
-    @Test
-    public void testPurgeBufferManagerForSequentialStream() throws Exception {
-        describe("Testing purging of buffers in ReadBufferManagerV1 for "
-                + "sequential input streams");
-        AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
-        final String fileName = methodName.getMethodName();
-        byte[] fileContent = getRandomBytesArray(ONE_MB);
-        Path testFilePath = createFileWithContent(fs, fileName, fileContent);
-
-        AbfsInputStream iStream1 =  null;
-        // stream1 will be closed right away.
-        try {
-            iStream1 = (AbfsInputStream) 
fs.open(testFilePath).getWrappedStream();
-            // Just reading one byte will trigger all read ahead calls.
-            iStream1.read();
-        } finally {
-            IOUtils.closeStream(iStream1);
-        }
-        ReadBufferManagerV1 bufferManager = 
ReadBufferManagerV1.getBufferManager();
-        AbfsInputStream iStream2 = null;
-        try {
-            iStream2 = (AbfsInputStream) 
fs.open(testFilePath).getWrappedStream();
-            iStream2.read();
-            // After closing stream1, no queued buffers of stream1 should be 
present
-            // assertions can't be made about the state of the other lists as 
it is
-            // too prone to race conditions.
-            
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(),
 iStream1);
-        } finally {
-            // closing the stream later.
-            IOUtils.closeStream(iStream2);
-        }
-        // After closing stream2, no queued buffers of stream2 should be 
present.
-        
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(),
 iStream2);
-
-        // After closing both the streams, read queue should be empty.
-        assertListEmpty("ReadAheadQueue", 
bufferManager.getReadAheadQueueCopy());
-
+    ReadBufferManager bufferManager = getBufferManager(fs);
+    AbfsInputStream iStream2 = null;
+    try {
+      iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
+      iStream2.read();
+      // After closing stream1, no queued buffers of stream1 should be present
+      // assertions can't be made about the state of the other lists as it is
+      // too prone to race conditions.
+      
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(),
 iStream1);
+    } finally {
+      // closing the stream later.
+      IOUtils.closeStream(iStream2);
     }
+    // After closing stream2, no queued buffers of stream2 should be present.
+    
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(),
 iStream2);
 
+    // After closing both the streams, read queue should be empty.
+    assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
 
-    private void assertListDoesnotContainBuffersForIstream(List<ReadBuffer> 
list,
-                                                           AbfsInputStream 
inputStream) {
-        for (ReadBuffer buffer : list) {
-            Assertions.assertThat(buffer.getStream())
-                    .describedAs("Buffers associated with closed input streams 
shouldn't be present")
-                    .isNotEqualTo(inputStream);
-        }
-    }
+  }
 
-    private AzureBlobFileSystem getABFSWithReadAheadConfig() throws Exception {
-        Configuration conf = getRawConfiguration();
-        conf.setLong(FS_AZURE_READ_AHEAD_QUEUE_DEPTH, 8);
-        conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
-        conf.setInt(FS_AZURE_READ_AHEAD_BLOCK_SIZE, MIN_BUFFER_SIZE);
-        return (AzureBlobFileSystem) FileSystem.newInstance(conf);
-    }
 
-    protected byte[] getRandomBytesArray(int length) {
-        final byte[] b = new byte[length];
-        new Random().nextBytes(b);
-        return b;
+  private void assertListDoesnotContainBuffersForIstream(List<ReadBuffer> list,
+      AbfsInputStream inputStream) {
+    for (ReadBuffer buffer : list) {
+      Assertions.assertThat(buffer.getStream())
+          .describedAs("Buffers associated with closed input streams shouldn't 
be present")
+          .isNotEqualTo(inputStream);
     }
-
-    protected Path createFileWithContent(FileSystem fs, String fileName,
-                                         byte[] fileContent) throws 
IOException {
-        Path testFilePath = path(fileName);
-        try (FSDataOutputStream oStream = fs.create(testFilePath)) {
-            oStream.write(fileContent);
-            oStream.flush();
-        }
-        return testFilePath;
+  }
+
+  private AzureBlobFileSystem getABFSWithReadAheadConfig() throws Exception {
+    Configuration conf = getRawConfiguration();
+    conf.setLong(FS_AZURE_READ_AHEAD_QUEUE_DEPTH, 8);
+    conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
+    conf.setInt(FS_AZURE_READ_AHEAD_BLOCK_SIZE, MIN_BUFFER_SIZE);
+    return (AzureBlobFileSystem) FileSystem.newInstance(conf);
+  }
+
+  private ReadBufferManager getBufferManager(AzureBlobFileSystem fs) {
+    int blockSize = 
fs.getAbfsStore().getAbfsConfiguration().getReadAheadBlockSize();
+    if (getConfiguration().isReadAheadV2Enabled()) {
+      ReadBufferManagerV2.setReadBufferManagerConfigs(blockSize,
+          getConfiguration());
+      return ReadBufferManagerV2.getBufferManager();
     }
+    ReadBufferManagerV1.setReadBufferManagerConfigs(blockSize);
+    return ReadBufferManagerV1.getBufferManager();
+  }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManagerV2.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManagerV2.java
new file mode 100644
index 00000000000..c409ee9991d
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManagerV2.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ITestReadBufferManagerV2 extends AbstractAbfsIntegrationTest {
+
+  private static final int LESS_NUM_FILES = 2;
+  private static final int MORE_NUM_FILES = 5;
+  private static final int SMALL_FILE_SIZE = 6 * ONE_MB;
+  private static final int LARGE_FILE_SIZE = 50 * ONE_MB;
+  private static final int BLOCK_SIZE = 4 * ONE_MB;
+
+  public ITestReadBufferManagerV2() throws Exception {
+  }
+
+  @Test
+  public void testReadDifferentFilesInParallel() throws Exception {
+    try (AzureBlobFileSystem fs = getConfiguredFileSystem()) {
+      int fileSize = LARGE_FILE_SIZE;
+      int numFiles = MORE_NUM_FILES;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+
+      Path[] testPaths = new Path[numFiles];
+      int[] idx = {0};
+      for (int i = 0; i < numFiles; i++) {
+        final String fileName = methodName.getMethodName() + i;
+        testPaths[i] = createFileWithContent(fs, fileName, fileContent);
+      }
+      ExecutorService executorService = Executors.newFixedThreadPool(numFiles);
+      Map<String, Long> metricMap = getInstrumentationMap(fs);
+      long requestsMadeBeforeTest = metricMap
+          .get(CONNECTIONS_MADE.getStatName());
+      try {
+        for (int i = 0; i < numFiles; i++) {
+          executorService.submit((Callable<Void>) () -> {
+            try (FSDataInputStream iStream = fs.open(testPaths[idx[0]++])) {
+              byte[] buffer = new byte[fileSize];
+              int bytesRead = iStream.read(buffer, 0, fileSize);
+              assertThat(bytesRead).isEqualTo(fileSize);
+              assertThat(buffer).isEqualTo(fileContent);
+            }
+            return null;
+          });
+        }
+      } finally {
+        executorService.shutdown();
+        // wait for all tasks to finish
+        executorService.awaitTermination(1, TimeUnit.MINUTES);
+      }
+      metricMap = getInstrumentationMap(fs);
+      long requestsMadeAfterTest = metricMap
+          .get(CONNECTIONS_MADE.getStatName());
+      int expectedRequests = numFiles // Get Path Status for each file
+          + ((int) Math.ceil((double) fileSize / BLOCK_SIZE))
+          * numFiles; // Read requests for each file
+      assertEquals(expectedRequests,
+          requestsMadeAfterTest - requestsMadeBeforeTest);
+    }
+  }
+
+  @Test
+  public void testReadSameFileInParallel() throws Exception {
+    try (AzureBlobFileSystem fs = getConfiguredFileSystem()) {
+      int fileSize = SMALL_FILE_SIZE;
+      int numFiles = LESS_NUM_FILES;
+      byte[] fileContent = getRandomBytesArray(fileSize);
+
+      final String fileName = methodName.getMethodName();
+      Path testPath = createFileWithContent(fs, fileName, fileContent);
+      ExecutorService executorService = Executors.newFixedThreadPool(numFiles);
+      Map<String, Long> metricMap = getInstrumentationMap(fs);
+      long requestsMadeBeforeTest = metricMap
+          .get(CONNECTIONS_MADE.getStatName());
+      try {
+        for (int i = 0; i < numFiles; i++) {
+          executorService.submit((Callable<Void>) () -> {
+            try (FSDataInputStream iStream = fs.open(testPath)) {
+              byte[] buffer = new byte[fileSize];
+              int bytesRead = iStream.read(buffer, 0, fileSize);
+              assertThat(bytesRead).isEqualTo(fileSize);
+              assertThat(buffer).isEqualTo(fileContent);
+            }
+            return null;
+          });
+        }
+      } finally {
+        executorService.shutdown();
+        // wait for all tasks to finish
+        executorService.awaitTermination(1, TimeUnit.MINUTES);
+      }
+      metricMap = getInstrumentationMap(fs);
+      long requestsMadeAfterTest = metricMap
+          .get(CONNECTIONS_MADE.getStatName());
+      int expectedRequests = numFiles // Get Path Status for each file
+          + ((int) Math.ceil(
+          (double) fileSize / BLOCK_SIZE)); // Read requests for each file
+      assertEquals(expectedRequests,
+          requestsMadeAfterTest - requestsMadeBeforeTest);
+    }
+  }
+
+  private AzureBlobFileSystem getConfiguredFileSystem() throws Exception {
+    Configuration config = new Configuration(getRawConfiguration());
+    config.set(FS_AZURE_ENABLE_READAHEAD_V2, TRUE);
+    config.set(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, TRUE);
+    AzureBlobFileSystem fs = (AzureBlobFileSystem) 
FileSystem.newInstance(config);
+    return fs;
+  }
+}
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
index 9b388b57c3e..37d8046203e 100644
--- 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
@@ -112,7 +112,7 @@ public void teardown() throws Exception {
     getBufferManager().testResetReadBufferManager();
   }
 
-  private AbfsRestOperation getMockRestOp() {
+  AbfsRestOperation getMockRestOp() {
     AbfsRestOperation op = mock(AbfsRestOperation.class);
     AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class);
     when(httpOp.getBytesReceived()).thenReturn(1024L);
@@ -121,7 +121,7 @@ private AbfsRestOperation getMockRestOp() {
     return op;
   }
 
-  private AbfsClient getMockAbfsClient() throws URISyntaxException {
+  AbfsClient getMockAbfsClient() throws URISyntaxException {
     // Mock failure for client.read()
     AbfsClient client = mock(AbfsClient.class);
     AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new 
URI("abcd")));
@@ -135,7 +135,7 @@ private AbfsClient getMockAbfsClient() throws 
URISyntaxException {
     return client;
   }
 
-  private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient,
+  AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient,
       String fileName) throws IOException {
     AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
     // Create AbfsInputStream with the client instance
@@ -144,7 +144,10 @@ private AbfsInputStream getAbfsInputStream(AbfsClient 
mockAbfsClient,
         null,
         FORWARD_SLASH + fileName,
         THREE_KB,
-        
inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10).withReadAheadBlockSize(ONE_KB),
+        inputStreamContext.withReadBufferSize(ONE_KB)
+            .withReadAheadQueueDepth(10)
+            .withReadAheadBlockSize(ONE_KB)
+            .isReadAheadV2Enabled(getConfiguration().isReadAheadV2Enabled()),
         "eTag",
         getTestTracingContext(null, false));
 
@@ -182,7 +185,7 @@ public AbfsInputStream getAbfsInputStream(AbfsClient 
abfsClient,
     return inputStream;
   }
 
-  private void queueReadAheads(AbfsInputStream inputStream) {
+  void queueReadAheads(AbfsInputStream inputStream) {
     // Mimic AbfsInputStream readAhead queue requests
     getBufferManager()
         .queueReadAhead(inputStream, 0, ONE_KB, 
inputStream.getTracingContext());
@@ -564,7 +567,7 @@ public void testStreamPurgeDuringReadAheadCallExecuting() 
throws Exception {
       //Sleeping to give ReadBufferWorker to pick the readBuffers for 
processing.
       Thread.sleep(readBufferTransferToInProgressProbableTime);
 
-      assertThat(readBufferManager.getInProgressCopiedList())
+      assertThat(readBufferManager.getInProgressListCopy())
           .describedAs(String.format("InProgressList should have %d elements",
               readBufferQueuedCount))
           .hasSize(readBufferQueuedCount);
@@ -577,7 +580,7 @@ public void testStreamPurgeDuringReadAheadCallExecuting() 
throws Exception {
           .hasSize(0);
     }
 
-    assertThat(readBufferManager.getInProgressCopiedList())
+    assertThat(readBufferManager.getInProgressListCopy())
         .describedAs(String.format("InProgressList should have %d elements",
             readBufferQueuedCount))
         .hasSize(readBufferQueuedCount);
@@ -1125,6 +1128,11 @@ private void resetReadBufferManager(int bufferSize, int 
threshold) {
   }
 
   private ReadBufferManager getBufferManager() {
+    if (getConfiguration().isReadAheadV2Enabled()) {
+      ReadBufferManagerV2.setReadBufferManagerConfigs(
+          getConfiguration().getReadAheadBlockSize(), getConfiguration());
+      return ReadBufferManagerV2.getBufferManager();
+    }
     return ReadBufferManagerV1.getBufferManager();
   }
 }
diff --git 
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
new file mode 100644
index 00000000000..77fbb769042
--- /dev/null
+++ 
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT;
+import static 
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED;
+import static 
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit Tests around different components of Read Buffer Manager V2
+ */
+public class TestReadBufferManagerV2 extends AbstractAbfsIntegrationTest {
+  private volatile boolean running = true;
+  private final List<byte[]> allocations = new ArrayList<>();
+  private static final double HIGH_MEMORY_USAGE_THRESHOLD_PERCENT = 0.8;
+
+  public TestReadBufferManagerV2() throws Exception {
+    super();
+  }
+
+  /**
+   * Test to verify init of ReadBufferManagerV2
+   * @throws Exception if test fails
+   */
+  @Test
+  public void testReadBufferManagerV2Init() throws Exception {
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(),
 getConfiguration());
+    ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+    assertThat(ReadBufferManagerV2.getInstance())
+        .as("ReadBufferManager should be uninitialized").isNull();
+    intercept(IllegalStateException.class, "ReadBufferManagerV2 is not 
configured.", () -> {
+      ReadBufferManagerV2.getBufferManager();
+    });
+    // verify that multiple invocations of getBufferManager returns same 
instance.
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(),
 getConfiguration());
+    ReadBufferManagerV2 bufferManager = ReadBufferManagerV2.getBufferManager();
+    ReadBufferManagerV2 bufferManager2 = 
ReadBufferManagerV2.getBufferManager();
+    ReadBufferManagerV2 bufferManager3 = ReadBufferManagerV2.getInstance();
+    assertThat(bufferManager).isNotNull();
+    assertThat(bufferManager2).isNotNull();
+    assertThat(bufferManager).isSameAs(bufferManager2);
+    assertThat(bufferManager3).isNotNull();
+    assertThat(bufferManager3).isSameAs(bufferManager);
+
+    // Verify default values are not invalid.
+    assertThat(bufferManager.getMinBufferPoolSize()).isGreaterThan(0);
+    assertThat(bufferManager.getMaxBufferPoolSize()).isGreaterThan(0);
+  }
+
+  /**
+   * Test to verify that cpu monitor thread is not active if disabled.
+   * @throws Exception if test fails
+   */
+  @Test
+  public void testDynamicScalingSwitchingOnAndOff() throws Exception {
+    Configuration conf = new Configuration(getRawConfiguration());
+    conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2, true);
+    conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, true);
+    try(AzureBlobFileSystem fs = (AzureBlobFileSystem) 
FileSystem.newInstance(getFileSystem().getUri(), conf)) {
+      AbfsConfiguration abfsConfiguration = 
fs.getAbfsStore().getAbfsConfiguration();
+      
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(),
 abfsConfiguration);
+      ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+      
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(),
 abfsConfiguration);
+      ReadBufferManagerV2 bufferManagerV2 = 
ReadBufferManagerV2.getBufferManager();
+      assertThat(bufferManagerV2.getCpuMonitoringThread())
+          .as("CPU Monitor thread should be initialized").isNotNull();
+      bufferManagerV2.resetBufferManager();
+    }
+
+    conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, false);
+    try(AzureBlobFileSystem fs = (AzureBlobFileSystem) 
FileSystem.newInstance(getFileSystem().getUri(), conf)) {
+      AbfsConfiguration abfsConfiguration = 
fs.getAbfsStore().getAbfsConfiguration();
+      
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(),
 abfsConfiguration);
+      ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+      
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(),
 abfsConfiguration);
+      ReadBufferManagerV2 bufferManagerV2 = 
ReadBufferManagerV2.getBufferManager();
+      assertThat(bufferManagerV2.getCpuMonitoringThread())
+          .as("CPU Monitor thread should not be initialized").isNull();
+      bufferManagerV2.resetBufferManager();
+    }
+  }
+
+  @Test
+  public void testThreadPoolDynamicScaling() throws Exception {
+    running = true;
+    TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
+    AbfsClient client = testAbfsInputStream.getMockAbfsClient();
+    AbfsInputStream inputStream = 
testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
+    Configuration configuration = getReadAheadV2Configuration();
+    AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+        getAccountName());
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
 abfsConfig);
+    ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
 abfsConfig);
+    ReadBufferManagerV2 bufferManagerV2 = 
ReadBufferManagerV2.getBufferManager();
+    assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(2);
+    int[] reqOffset = {0};
+    int reqLength = 1;
+    Thread t = new Thread(() -> {
+      while (running) {
+        bufferManagerV2.queueReadAhead(inputStream, reqOffset[0], reqLength,
+            inputStream.getTracingContext());
+        reqOffset[0] += reqLength;
+      }
+    });
+    t.start();
+    Thread.sleep(2L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec());
+    assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isGreaterThan(2);
+    running = false;
+    t.join();
+    Thread.sleep(4L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec());
+    assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isLessThan(4);
+  }
+
+  @Test
+  public void testCpuUpscaleNotAllowedIfCpuAboveThreshold() throws Exception {
+    TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
+    AbfsClient client = testAbfsInputStream.getMockAbfsClient();
+    AbfsInputStream inputStream = 
testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
+    Configuration configuration = getReadAheadV2Configuration();
+    configuration.set(FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT, "0"); 
// set low threshold
+    AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+        getAccountName());
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
 abfsConfig);
+    ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
 abfsConfig);
+    ReadBufferManagerV2 bufferManagerV2 = 
ReadBufferManagerV2.getBufferManager();
+    assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(2);
+    int[] reqOffset = {0};
+    int reqLength = 1;
+    running = true;
+    Thread t = new Thread(() -> {
+      while (running) {
+        bufferManagerV2.queueReadAhead(inputStream, reqOffset[0], reqLength,
+            inputStream.getTracingContext());
+        reqOffset[0] += reqLength;
+      }
+    });
+    t.start();
+    Thread.sleep(2L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec());
+    assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(2);
+    running = false;
+    t.join();
+  }
+
+  @Test
+  public void testScheduledEviction() throws Exception {
+    TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
+    AbfsClient client = testAbfsInputStream.getMockAbfsClient();
+    AbfsInputStream inputStream = 
testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
+    Configuration configuration = getReadAheadV2Configuration();
+    AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+        getAccountName());
+    ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
 abfsConfig);
+    ReadBufferManagerV2 bufferManagerV2 = 
ReadBufferManagerV2.getBufferManager();
+    // Add a failed buffer to completed queue and set to no free buffers to 
read ahead.
+    ReadBuffer buff = new ReadBuffer();
+    buff.setStatus(ReadBufferStatus.READ_FAILED);
+    buff.setStream(inputStream);
+    bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff);
+    bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff);
+    assertThat(bufferManagerV2.getCompletedReadListSize()).isEqualTo(2);
+    Thread.sleep(2L * bufferManagerV2.getMemoryMonitoringIntervalInMilliSec());
+    assertThat(bufferManagerV2.getCompletedReadListSize()).isEqualTo(0);
+  }
+
+  @Test
+  public void testMemoryUpscaleNotAllowedIfMemoryAboveThreshold() throws 
Exception {
+    TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
+    AbfsClient client = testAbfsInputStream.getMockAbfsClient();
+    AbfsInputStream inputStream = 
testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
+    Configuration configuration = getReadAheadV2Configuration();
+    configuration.set(FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT, 
"0"); // set low threshold
+    AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+        getAccountName());
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
 abfsConfig);
+    ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
 abfsConfig);
+    ReadBufferManagerV2 bufferManagerV2 = 
ReadBufferManagerV2.getBufferManager();
+    // Add a failed buffer to completed queue and set to no free buffers to 
read ahead.
+    ReadBuffer buff = new ReadBuffer();
+    buff.setStatus(ReadBufferStatus.READ_FAILED);
+    buff.setStream(inputStream);
+    bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff);
+    
assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(bufferManagerV2.getMinBufferPoolSize());
+    bufferManagerV2.queueReadAhead(inputStream, 0, ONE_KB,
+        inputStream.getTracingContext());
+    
assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(bufferManagerV2.getMinBufferPoolSize());
+  }
+
+  @Test
+  public void testMemoryUpscaleIfMemoryBelowThreshold() throws Exception {
+    TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
+    AbfsClient client = testAbfsInputStream.getMockAbfsClient();
+    AbfsInputStream inputStream = 
testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
+    Configuration configuration = getReadAheadV2Configuration();
+    configuration.set(FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT, 
"100");
+    AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+        getAccountName());
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
 abfsConfig);
+    ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
 abfsConfig);
+    ReadBufferManagerV2 bufferManagerV2 = 
ReadBufferManagerV2.getBufferManager();
+    // Add a failed buffer to completed queue and set to no free buffers to 
read ahead.
+    ReadBuffer buff = new ReadBuffer();
+    buff.setStatus(ReadBufferStatus.READ_FAILED);
+    buff.setStream(inputStream);
+    bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff);
+    
assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(bufferManagerV2.getMinBufferPoolSize());
+    bufferManagerV2.queueReadAhead(inputStream, 0, ONE_KB,
+        inputStream.getTracingContext());
+    
assertThat(bufferManagerV2.getNumBuffers()).isGreaterThan(bufferManagerV2.getMinBufferPoolSize());
+  }
+
+  @Test
+  public void testMemoryDownscaleIfMemoryAboveThreshold() throws Exception {
+    Configuration configuration = getReadAheadV2Configuration();
+    configuration.set(FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT, 
"2");
+    AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+        getAccountName());
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
 abfsConfig);
+    ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+    
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
 abfsConfig);
+    ReadBufferManagerV2 bufferManagerV2 = 
ReadBufferManagerV2.getBufferManager();
+    int initialBuffers = bufferManagerV2.getMinBufferPoolSize();
+    assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(initialBuffers);
+    running = true;
+    Thread t = new Thread(() -> {
+      while (running) {
+        long maxMemory = Runtime.getRuntime().maxMemory();
+        long usedMemory = Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory();
+        double usage = (double) usedMemory / maxMemory;
+
+        if (usage < HIGH_MEMORY_USAGE_THRESHOLD_PERCENT) {
+          // Allocate more memory
+          allocations.add(new byte[10 * 1024 * 1024]); // 10MB
+        }
+      }
+    }, "MemoryLoadThread");
+    t.setDaemon(true);
+    t.start();
+    Thread.sleep(2L * bufferManagerV2.getMemoryMonitoringIntervalInMilliSec());
+    assertThat(bufferManagerV2.getNumBuffers()).isLessThan(initialBuffers);
+    running = false;
+    t.join();
+  }
+
+  private Configuration getReadAheadV2Configuration() {
+    Configuration conf = new Configuration(getRawConfiguration());
+    conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2, true);
+    conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, true);
+    conf.setInt(FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE, 2);
+    conf.setInt(FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE, 4);
+    conf.setInt(FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT, HUNDRED);
+    conf.setInt(FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS, 1_000);
+    conf.setInt(FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS, 
1_000);
+    conf.setInt(FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS, 1_000);
+    return conf;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to