This is an automated email from the ASF dual-hosted git repository.
anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9d5e111237b HADOOP-19613. [ABFS][ReadAheadV2] Refactor
ReadBufferManager to isolate new code with the current working code (#7801)
9d5e111237b is described below
commit 9d5e111237bdb34723418e56be90b27970c65466
Author: Anuj Modi <[email protected]>
AuthorDate: Mon Jul 28 19:01:28 2025 +0530
HADOOP-19613. [ABFS][ReadAheadV2] Refactor ReadBufferManager to isolate new
code with the current working code (#7801)
---
.../hadoop/fs/azurebfs/AbfsConfiguration.java | 83 +++
.../fs/azurebfs/AzureBlobFileSystemStore.java | 1 +
.../fs/azurebfs/constants/ConfigurationKeys.java | 38 +-
.../constants/FileSystemConfigurations.java | 8 +
.../fs/azurebfs/services/AbfsInputStream.java | 36 +-
.../azurebfs/services/AbfsInputStreamContext.java | 12 +
.../fs/azurebfs/services/ReadBufferManager.java | 693 +++++----------------
...BufferManager.java => ReadBufferManagerV1.java} | 504 +++++++--------
.../fs/azurebfs/services/ReadBufferManagerV2.java | 228 +++++++
.../fs/azurebfs/services/ReadBufferWorker.java | 5 +-
.../fs/azurebfs/ITestGetNameSpaceEnabled.java | 22 +-
.../azurebfs/services/ITestReadBufferManager.java | 8 +-
.../fs/azurebfs/services/TestAbfsInputStream.java | 49 +-
13 files changed, 848 insertions(+), 839 deletions(-)
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 9aaf1714472..1242122f030 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -381,6 +381,41 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ENABLE_READAHEAD)
private boolean enabledReadAhead;
+ @BooleanConfigurationValidatorAnnotation(
+ ConfigurationKey = FS_AZURE_ENABLE_READAHEAD_V2,
+ DefaultValue = DEFAULT_ENABLE_READAHEAD_V2)
+ private boolean isReadAheadV2Enabled;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE,
+ DefaultValue = DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE)
+ private int minReadAheadV2ThreadPoolSize;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE,
+ DefaultValue = DEFAULT_READAHEAD_V2_MAX_THREAD_POOL_SIZE)
+ private int maxReadAheadV2ThreadPoolSize;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_READAHEAD_V2_MIN_BUFFER_POOL_SIZE,
+ DefaultValue = DEFAULT_READAHEAD_V2_MIN_BUFFER_POOL_SIZE)
+ private int minReadAheadV2BufferPoolSize;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_READAHEAD_V2_MAX_BUFFER_POOL_SIZE,
+ DefaultValue = DEFAULT_READAHEAD_V2_MAX_BUFFER_POOL_SIZE)
+ private int maxReadAheadV2BufferPoolSize;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS,
+ DefaultValue = DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS)
+ private int readAheadExecutorServiceTTLMillis;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS,
+ DefaultValue = DEFAULT_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS)
+ private int readAheadV2CachedBufferTTLMillis;
+
@LongConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS,
MinValue = 0,
DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
@@ -1368,6 +1403,54 @@ public boolean isReadAheadEnabled() {
return this.enabledReadAhead;
}
+ public int getMinReadAheadV2ThreadPoolSize() {
+ if (minReadAheadV2ThreadPoolSize <= 0) {
+ // If the minReadAheadV2ThreadPoolSize is not set, use the default value
+ return 2 * Runtime.getRuntime().availableProcessors();
+ }
+ return minReadAheadV2ThreadPoolSize;
+ }
+
+ public int getMaxReadAheadV2ThreadPoolSize() {
+ if (maxReadAheadV2ThreadPoolSize <= 0) {
+ // If the maxReadAheadV2ThreadPoolSize is not set, use the default value
+ return 4 * Runtime.getRuntime().availableProcessors();
+ }
+ return maxReadAheadV2ThreadPoolSize;
+ }
+
+ public int getMinReadAheadV2BufferPoolSize() {
+ if (minReadAheadV2BufferPoolSize <= 0) {
+ // If the minReadAheadV2BufferPoolSize is not set, use the default value
+ return 2 * Runtime.getRuntime().availableProcessors();
+ }
+ return minReadAheadV2BufferPoolSize;
+ }
+
+ public int getMaxReadAheadV2BufferPoolSize() {
+ if (maxReadAheadV2BufferPoolSize <= 0) {
+ // If the maxReadAheadV2BufferPoolSize is not set, use the default value
+ return 4 * Runtime.getRuntime().availableProcessors();
+ }
+ return maxReadAheadV2BufferPoolSize;
+ }
+
+ public int getReadAheadExecutorServiceTTLInMillis() {
+ return readAheadExecutorServiceTTLMillis;
+ }
+
+ public int getReadAheadV2CachedBufferTTLMillis() {
+ return readAheadV2CachedBufferTTLMillis;
+ }
+
+ /**
+ * Checks if the read-ahead v2 feature is enabled by user.
+ * @return true if read-ahead v2 is enabled, false otherwise.
+ */
+ public boolean isReadAheadV2Enabled() {
+ return this.isReadAheadV2Enabled;
+ }
+
@VisibleForTesting
void setReadAheadEnabled(final boolean enabledReadAhead) {
this.enabledReadAhead = enabledReadAhead;
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index d93c1a3dc65..439b7626a86 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -955,6 +955,7 @@ AZURE_FOOTER_READ_BUFFER_SIZE,
getAbfsConfiguration().getFooterReadBufferSize())
.withReadAheadQueueDepth(getAbfsConfiguration().getReadAheadQueueDepth())
.withTolerateOobAppends(getAbfsConfiguration().getTolerateOobAppends())
.isReadAheadEnabled(getAbfsConfiguration().isReadAheadEnabled())
+
.isReadAheadV2Enabled(getAbfsConfiguration().isReadAheadV2Enabled())
.withReadSmallFilesCompletely(getAbfsConfiguration().readSmallFilesCompletely())
.withOptimizeFooterRead(getAbfsConfiguration().optimizeFooterRead())
.withFooterReadBufferSize(footerReadBufferSize)
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 40095c7a802..50a88ab4e45 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -259,10 +259,46 @@ public final class ConfigurationKeys {
public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT =
"fs.azure.shellkeyprovider.script";
/**
- * Enable or disable readahead buffer in AbfsInputStream.
+ * Enable or disable readahead V1 in AbfsInputStream.
* Value: {@value}.
*/
public static final String FS_AZURE_ENABLE_READAHEAD =
"fs.azure.enable.readahead";
+ /**
+ * Enable or disable readahead V2 in AbfsInputStream. This will work
independent of V1.
+ * Value: {@value}.
+ */
+ public static final String FS_AZURE_ENABLE_READAHEAD_V2 =
"fs.azure.enable.readahead.v2";
+
+ /**
+ * Minimum number of prefetch threads in the thread pool for readahead V2.
+ * {@value }
+ */
+ public static final String FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE =
"fs.azure.readahead.v2.min.thread.pool.size";
+ /**
+ * Maximum number of prefetch threads in the thread pool for readahead V2.
+ * {@value }
+ */
+ public static final String FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE =
"fs.azure.readahead.v2.max.thread.pool.size";
+ /**
+ * Minimum size of the buffer pool for caching prefetched data for readahead
V2.
+ * {@value }
+ */
+ public static final String FS_AZURE_READAHEAD_V2_MIN_BUFFER_POOL_SIZE =
"fs.azure.readahead.v2.min.buffer.pool.size";
+ /**
+ * Maximum size of the buffer pool for caching prefetched data for readahead
V2.
+ * {@value }
+ */
+ public static final String FS_AZURE_READAHEAD_V2_MAX_BUFFER_POOL_SIZE =
"fs.azure.readahead.v2.max.buffer.pool.size";
+
+ /**
+ * TTL in milliseconds for the idle threads in executor service used by read
ahead v2.
+ */
+ public static final String FS_AZURE_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS
= "fs.azure.readahead.v2.executor.service.ttl.millis";
+
+ /**
+ * TTL in milliseconds for the cached buffers in buffer pool used by read
ahead v2.
+ */
+ public static final String FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS =
"fs.azure.readahead.v2.cached.buffer.ttl.millis";
/** Setting this true will make the driver use it's own RemoteIterator
implementation */
public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR =
"fs.azure.enable.abfslistiterator";
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index f7ccc0e9175..824a4c9701e 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -127,6 +127,14 @@ public final class FileSystemConfigurations {
public static final long
DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;
public static final boolean DEFAULT_ENABLE_READAHEAD = true;
+ public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = false;
+ public static final int DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE = -1;
+ public static final int DEFAULT_READAHEAD_V2_MAX_THREAD_POOL_SIZE = -1;
+ public static final int DEFAULT_READAHEAD_V2_MIN_BUFFER_POOL_SIZE = -1;
+ public static final int DEFAULT_READAHEAD_V2_MAX_BUFFER_POOL_SIZE = -1;
+ public static final int DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS =
3_000;
+ public static final int DEFAULT_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS =
6_000;
+
public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index 5d770805a51..3dc7f88e529 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -79,6 +79,7 @@ public class AbfsInputStream extends FSInputStream implements
CanUnbuffer,
private final String eTag; // eTag of the path when
InputStream are created
private final boolean tolerateOobAppends; // whether tolerate Oob Appends
private final boolean readAheadEnabled; // whether enable readAhead;
+ private final boolean readAheadV2Enabled; // whether enable readAhead V2;
private final String inputStreamId;
private final boolean alwaysReadBufferSize;
/*
@@ -130,6 +131,7 @@ public class AbfsInputStream extends FSInputStream
implements CanUnbuffer,
/** ABFS instance to be held by the input stream to avoid GC close. */
private final BackReference fsBackRef;
+ private ReadBufferManager readBufferManager;
public AbfsInputStream(
final AbfsClient client,
@@ -150,6 +152,7 @@ public AbfsInputStream(
this.eTag = eTag;
this.readAheadRange = abfsInputStreamContext.getReadAheadRange();
this.readAheadEnabled = abfsInputStreamContext.isReadAheadEnabled();
+ this.readAheadV2Enabled = abfsInputStreamContext.isReadAheadV2Enabled();
this.alwaysReadBufferSize
= abfsInputStreamContext.shouldReadBufferSizeAlways();
this.bufferedPreadDisabled = abfsInputStreamContext
@@ -173,9 +176,19 @@ public AbfsInputStream(
this.fsBackRef = abfsInputStreamContext.getFsBackRef();
contextEncryptionAdapter = abfsInputStreamContext.getEncryptionAdapter();
- // Propagate the config values to ReadBufferManager so that the first
instance
- // to initialize can set the readAheadBlockSize
- ReadBufferManager.setReadBufferManagerConfigs(readAheadBlockSize);
+ /*
+ * Initialize the ReadBufferManager based on whether readAheadV2 is
enabled or not.
+ * Precedence is given to ReadBufferManagerV2.
+ * If none of the V1 and V2 are enabled, then no read ahead will be done.
+ */
+ if (readAheadV2Enabled) {
+ ReadBufferManagerV2.setReadBufferManagerConfigs(
+ readAheadBlockSize, client.getAbfsConfiguration());
+ readBufferManager = ReadBufferManagerV2.getBufferManager();
+ } else {
+ ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize);
+ readBufferManager = ReadBufferManagerV1.getBufferManager();
+ }
if (streamStatistics != null) {
ioStatistics = streamStatistics.getIOStatistics();
}
@@ -491,7 +504,7 @@ private int copyToUserBuffer(byte[] b, int off, int len){
private int readInternal(final long position, final byte[] b, final int
offset, final int length,
final boolean bypassReadAhead) throws IOException {
- if (readAheadEnabled && !bypassReadAhead) {
+ if (isReadAheadEnabled() && !bypassReadAhead) {
// try reading from read-ahead
if (offset != 0) {
throw new IllegalArgumentException("readahead buffers cannot have
non-zero buffer offsets");
@@ -510,7 +523,7 @@ private int readInternal(final long position, final byte[]
b, final int offset,
while (numReadAheads > 0 && nextOffset < contentLength) {
LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
nextOffset, nextSize);
- ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset,
(int) nextSize,
+ readBufferManager.queueReadAhead(this, nextOffset, (int) nextSize,
new TracingContext(readAheadTracingContext));
nextOffset = nextOffset + nextSize;
numReadAheads--;
@@ -519,7 +532,7 @@ private int readInternal(final long position, final byte[]
b, final int offset,
}
// try reading from buffers first
- receivedBytes = ReadBufferManager.getBufferManager().getBlock(this,
position, length, b);
+ receivedBytes = readBufferManager.getBlock(this, position, length, b);
bytesFromReadAhead += receivedBytes;
if (receivedBytes > 0) {
incrementReadOps();
@@ -720,7 +733,9 @@ public boolean seekToNewSource(long l) throws IOException {
public synchronized void close() throws IOException {
LOG.debug("Closing {}", this);
closed = true;
- ReadBufferManager.getBufferManager().purgeBuffersForStream(this);
+ if (readBufferManager != null) {
+ readBufferManager.purgeBuffersForStream(this);
+ }
buffer = null; // de-reference the buffer so it can be GC'ed sooner
if (contextEncryptionAdapter != null) {
contextEncryptionAdapter.destroy();
@@ -773,9 +788,14 @@ byte[] getBuffer() {
return buffer;
}
+ /**
+ * Checks if any version of read ahead is enabled.
+ * If both are disabled, then skip read ahead logic.
+ * @return true if read ahead is enabled, false otherwise.
+ */
@VisibleForTesting
public boolean isReadAheadEnabled() {
- return readAheadEnabled;
+ return (readAheadEnabled || readAheadV2Enabled) && readBufferManager !=
null;
}
@VisibleForTesting
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
index fdcad5ac3a0..f6272492d60 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
@@ -41,6 +41,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext
{
private boolean isReadAheadEnabled = true;
+ private boolean isReadAheadV2Enabled;
+
private boolean alwaysReadBufferSize;
private int readAheadBlockSize;
@@ -91,6 +93,12 @@ public AbfsInputStreamContext isReadAheadEnabled(
return this;
}
+ public AbfsInputStreamContext isReadAheadV2Enabled(
+ final boolean isReadAheadV2Enabled) {
+ this.isReadAheadV2Enabled = isReadAheadV2Enabled;
+ return this;
+ }
+
public AbfsInputStreamContext withReadAheadRange(
final int readAheadRange) {
this.readAheadRange = readAheadRange;
@@ -181,6 +189,10 @@ public boolean isReadAheadEnabled() {
return isReadAheadEnabled;
}
+ public boolean isReadAheadV2Enabled() {
+ return isReadAheadV2Enabled;
+ }
+
public int getReadAheadRange() {
return readAheadRange;
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
index 031545f57a1..9ee128fbc32 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
@@ -15,636 +15,287 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.hadoop.fs.azurebfs.services;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.Stack;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_AHEAD_BLOCK_SIZE;
/**
- * The Read Buffer Manager for Rest AbfsClient.
+ * Abstract class for managing read buffers for Azure Blob File System input
streams.
*/
-final class ReadBufferManager {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ReadBufferManager.class);
- private static final int ONE_KB = 1024;
- private static final int ONE_MB = ONE_KB * ONE_KB;
-
- private static final int NUM_BUFFERS = 16;
- private static final int NUM_THREADS = 8;
- private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have
to see if 3 seconds is a good threshold
-
- private static int blockSize = 4 * ONE_MB;
- private static int thresholdAgeMilliseconds =
DEFAULT_THRESHOLD_AGE_MILLISECONDS;
- private Thread[] threads = new Thread[NUM_THREADS];
- private byte[][] buffers; // array of byte[] buffers, to hold the data
that is read
- private Stack<Integer> freeList = new Stack<>(); // indices in buffers[]
array that are available
+public abstract class ReadBufferManager {
+ protected static final Logger LOGGER = LoggerFactory.getLogger(
+ ReadBufferManager.class);
+ protected static final ReentrantLock LOCK = new ReentrantLock();
+ private static int thresholdAgeMilliseconds;
+ private static int blockSize = DEFAULT_READ_AHEAD_BLOCK_SIZE; // default
block size for read-ahead in bytes
+ private Stack<Integer> freeList = new Stack<>(); // indices in buffers[]
array that are available
private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of
requests that are not picked up by any worker thread yet
private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); //
requests being processed by worker threads
private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); //
buffers available for reading
- private static ReadBufferManager bufferManager; // singleton, initialized in
static initialization block
- private static final ReentrantLock LOCK = new ReentrantLock();
-
- static ReadBufferManager getBufferManager() {
- if (bufferManager == null) {
- LOCK.lock();
- try {
- if (bufferManager == null) {
- bufferManager = new ReadBufferManager();
- bufferManager.init();
- }
- } finally {
- LOCK.unlock();
- }
- }
- return bufferManager;
- }
- static void setReadBufferManagerConfigs(int readAheadBlockSize) {
- if (bufferManager == null) {
- LOGGER.debug(
- "ReadBufferManager not initialized yet. Overriding
readAheadBlockSize as {}",
- readAheadBlockSize);
- blockSize = readAheadBlockSize;
- }
- }
-
- private void init() {
- buffers = new byte[NUM_BUFFERS][];
- for (int i = 0; i < NUM_BUFFERS; i++) {
- buffers[i] = new byte[blockSize]; // same buffers are reused. The byte
array never goes back to GC
- freeList.add(i);
- }
- for (int i = 0; i < NUM_THREADS; i++) {
- Thread t = new Thread(new ReadBufferWorker(i));
- t.setDaemon(true);
- threads[i] = t;
- t.setName("ABFS-prefetch-" + i);
- t.start();
- }
- ReadBufferWorker.UNLEASH_WORKERS.countDown();
- }
-
- // hide instance constructor
- private ReadBufferManager() {
- LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
- }
-
-
- /*
- *
- * AbfsInputStream-facing methods
- *
+ /**
+ * Initializes the ReadBufferManager singleton instance. Creates the read
buffers and threads.
+ * This method should be called once to set up the read buffer manager.
*/
-
+ abstract void init();
/**
- * {@link AbfsInputStream} calls this method to queue read-aheads.
- *
- * @param stream The {@link AbfsInputStream} for which to do the
read-ahead
- * @param requestedOffset The offset in the file which shoukd be read
- * @param requestedLength The length to read
+ * Queues a read-ahead request from {@link AbfsInputStream}
+ * for a given offset in file and given length.
+ * @param stream the input stream requesting the read-ahead
+ * @param requestedOffset the offset in the remote file to start reading
+ * @param requestedLength the number of bytes to read from file
+ * @param tracingContext the tracing context for diagnostics
*/
- void queueReadAhead(final AbfsInputStream stream, final long
requestedOffset, final int requestedLength,
- TracingContext tracingContext) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
- stream.getPath(), requestedOffset, requestedLength);
- }
- ReadBuffer buffer;
- synchronized (this) {
- if (isAlreadyQueued(stream, requestedOffset)) {
- return; // already queued, do not queue again
- }
- if (freeList.isEmpty() && !tryEvict()) {
- return; // no buffers available, cannot queue anything
- }
-
- buffer = new ReadBuffer();
- buffer.setStream(stream);
- buffer.setOffset(requestedOffset);
- buffer.setLength(0);
- buffer.setRequestedLength(requestedLength);
- buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
- buffer.setLatch(new CountDownLatch(1));
- buffer.setTracingContext(tracingContext);
-
- Integer bufferIndex = freeList.pop(); // will return a value, since we
have checked size > 0 already
-
- buffer.setBuffer(buffers[bufferIndex]);
- buffer.setBufferindex(bufferIndex);
- readAheadQueue.add(buffer);
- notifyAll();
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx
{}",
- stream.getPath(), requestedOffset, buffer.getBufferindex());
- }
- }
- }
-
+ abstract void queueReadAhead(AbfsInputStream stream,
+ long requestedOffset,
+ int requestedLength,
+ TracingContext tracingContext);
/**
+ * Gets a block of data from the prefetched data by ReadBufferManager.
* {@link AbfsInputStream} calls this method read any bytes already
available in a buffer (thereby saving a
* remote read). This returns the bytes if the data already exists in
buffer. If there is a buffer that is reading
* the requested offset, then this method blocks until that read completes.
If the data is queued in a read-ahead
* but not picked up by a worker thread yet, then it cancels that read-ahead
and reports cache miss. This is because
- * depending on worker thread availability, the read-ahead may take a while
- the calling thread can do it's own
- * read to get the data faster (copmared to the read waiting in queue for an
indeterminate amount of time).
+ * depending on worker thread availability, the read-ahead may take a while
- the calling thread can do its own
+ * read to get the data faster (compared to the read waiting in queue for an
indeterminate amount of time).
*
- * @param stream the file to read bytes for
- * @param position the offset in the file to do a read for
- * @param length the length to read
- * @param buffer the buffer to read data into. Note that the buffer will
be written into from offset 0.
- * @return the number of bytes read
+ * @param stream the input stream requesting the block
+ * @param position the position in the file to read from
+ * @param length the number of bytes to read
+ * @param buffer the buffer to store the read data
+ * @return the number of bytes actually read
+ * @throws IOException if an I/O error occurs
*/
- int getBlock(final AbfsInputStream stream, final long position, final int
length, final byte[] buffer)
- throws IOException {
- // not synchronized, so have to be careful with locking
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("getBlock for file {} position {} thread {}",
- stream.getPath(), position, Thread.currentThread().getName());
- }
-
- waitForProcess(stream, position);
-
- int bytesRead = 0;
- synchronized (this) {
- bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer);
- }
- if (bytesRead > 0) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Done read from Cache for {} position {} length {}",
- stream.getPath(), position, bytesRead);
- }
- return bytesRead;
- }
+ abstract int getBlock(AbfsInputStream stream,
+ long position,
+ int length,
+ byte[] buffer) throws IOException;
- // otherwise, just say we got nothing - calling thread can do its own read
- return 0;
- }
-
- /*
- *
- * Internal methods
+ /**
+ * {@link ReadBufferWorker} calls this to get the next buffer to read from
read-ahead queue.
+ * Requested read will be performed by background thread.
*
+ * @return the next {@link ReadBuffer} to read
+ * @throws InterruptedException if interrupted while waiting
*/
-
- private void waitForProcess(final AbfsInputStream stream, final long
position) {
- ReadBuffer readBuf;
- synchronized (this) {
- clearFromReadAheadQueue(stream, position);
- readBuf = getFromList(inProgressList, stream, position);
- }
- if (readBuf != null) { // if in in-progress queue, then block for
it
- try {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("got a relevant read buffer for file {} offset {}
buffer idx {}",
- stream.getPath(), readBuf.getOffset(), readBuf.getBufferindex());
- }
- readBuf.getLatch().await(); // blocking wait on the caller stream's
thread
- // Note on correctness: readBuf gets out of inProgressList only in 1
place: after worker thread
- // is done processing it (in doneReading). There, the latch is set
after removing the buffer from
- // inProgressList. So this latch is safe to be outside the
synchronized block.
- // Putting it in synchronized would result in a deadlock, since this
thread would be holding the lock
- // while waiting, so no one will be able to change any state. If this
becomes more complex in the future,
- // then the latch cane be removed and replaced with wait/notify
whenever inProgressList is touched.
- } catch (InterruptedException ex) {
- Thread.currentThread().interrupt();
- }
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("latch done for file {} buffer idx {} length {}",
- stream.getPath(), readBuf.getBufferindex(), readBuf.getLength());
- }
- }
- }
+ abstract ReadBuffer getNextBlockToRead() throws InterruptedException;
/**
- * If any buffer in the completedlist can be reclaimed then reclaim it and
return the buffer to free list.
- * The objective is to find just one buffer - there is no advantage to
evicting more than one.
+ * Marks the specified buffer as done reading and updates its status.
+ * Called by {@link ReadBufferWorker} after reading is complete.
*
- * @return whether the eviction succeeeded - i.e., were we able to free up
one buffer
+ * @param buffer the buffer that was read by worker thread
+ * @param result the status of the read operation
+ * @param bytesActuallyRead the number of bytes actually read by worker
thread.
*/
- private synchronized boolean tryEvict() {
- ReadBuffer nodeToEvict = null;
- if (completedReadList.size() <= 0) {
- return false; // there are no evict-able buffers
- }
-
- long currentTimeInMs = currentTimeMillis();
-
- // first, try buffers where all bytes have been consumed (approximated as
first and last bytes consumed)
- for (ReadBuffer buf : completedReadList) {
- if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
- nodeToEvict = buf;
- break;
- }
- }
- if (nodeToEvict != null) {
- return evict(nodeToEvict);
- }
-
- // next, try buffers where any bytes have been consumed (may be a bad
idea? have to experiment and see)
- for (ReadBuffer buf : completedReadList) {
- if (buf.isAnyByteConsumed()) {
- nodeToEvict = buf;
- break;
- }
- }
-
- if (nodeToEvict != null) {
- return evict(nodeToEvict);
- }
-
- // next, try any old nodes that have not been consumed
- // Failed read buffers (with buffer index=-1) that are older than
- // thresholdAge should be cleaned up, but at the same time should not
- // report successful eviction.
- // Queue logic expects that a buffer is freed up for read ahead when
- // eviction is successful, whereas a failed ReadBuffer would have released
- // its buffer when its status was set to READ_FAILED.
- long earliestBirthday = Long.MAX_VALUE;
- ArrayList<ReadBuffer> oldFailedBuffers = new ArrayList<>();
- for (ReadBuffer buf : completedReadList) {
- if ((buf.getBufferindex() != -1)
- && (buf.getTimeStamp() < earliestBirthday)) {
- nodeToEvict = buf;
- earliestBirthday = buf.getTimeStamp();
- } else if ((buf.getBufferindex() == -1)
- && (currentTimeInMs - buf.getTimeStamp()) >
thresholdAgeMilliseconds) {
- oldFailedBuffers.add(buf);
- }
- }
-
- for (ReadBuffer buf : oldFailedBuffers) {
- evict(buf);
- }
-
- if ((currentTimeInMs - earliestBirthday > thresholdAgeMilliseconds) &&
(nodeToEvict != null)) {
- return evict(nodeToEvict);
- }
-
- LOGGER.trace("No buffer eligible for eviction");
- // nothing can be evicted
- return false;
- }
-
- private boolean evict(final ReadBuffer buf) {
- // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList,
- // avoid adding it to freeList.
- if (buf.getBufferindex() != -1) {
- freeList.push(buf.getBufferindex());
- }
-
- completedReadList.remove(buf);
- buf.setTracingContext(null);
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {}
length {}",
- buf.getBufferindex(), buf.getStream().getPath(), buf.getOffset(),
buf.getLength());
- }
- return true;
- }
-
- private boolean isAlreadyQueued(final AbfsInputStream stream, final long
requestedOffset) {
- // returns true if any part of the buffer is already queued
- return (isInList(readAheadQueue, stream, requestedOffset)
- || isInList(inProgressList, stream, requestedOffset)
- || isInList(completedReadList, stream, requestedOffset));
- }
-
- private boolean isInList(final Collection<ReadBuffer> list, final
AbfsInputStream stream, final long requestedOffset) {
- return (getFromList(list, stream, requestedOffset) != null);
- }
-
- private ReadBuffer getFromList(final Collection<ReadBuffer> list, final
AbfsInputStream stream, final long requestedOffset) {
- for (ReadBuffer buffer : list) {
- if (buffer.getStream() == stream) {
- if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
- && requestedOffset >= buffer.getOffset()
- && requestedOffset < buffer.getOffset() + buffer.getLength()) {
- return buffer;
- } else if (requestedOffset >= buffer.getOffset()
- && requestedOffset < buffer.getOffset() +
buffer.getRequestedLength()) {
- return buffer;
- }
- }
- }
- return null;
- }
+ abstract void doneReading(ReadBuffer buffer,
+ ReadBufferStatus result,
+ int bytesActuallyRead);
/**
- * Returns buffers that failed or passed from completed queue.
- * @param stream
- * @param requestedOffset
- * @return
+ * Purging the buffers associated with an {@link AbfsInputStream}
+ * from {@link ReadBufferManager} when stream is closed.
+ *
+ * @param stream the input stream whose buffers should be purged.
*/
- private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream,
final long requestedOffset) {
- for (ReadBuffer buffer : completedReadList) {
- // Buffer is returned if the requestedOffset is at or above buffer's
- // offset but less than buffer's length or the actual requestedLength
- if ((buffer.getStream() == stream)
- && (requestedOffset >= buffer.getOffset())
- && ((requestedOffset < buffer.getOffset() + buffer.getLength())
- || (requestedOffset < buffer.getOffset() +
buffer.getRequestedLength()))) {
- return buffer;
- }
- }
-
- return null;
- }
-
- private void clearFromReadAheadQueue(final AbfsInputStream stream, final
long requestedOffset) {
- ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
- if (buffer != null) {
- readAheadQueue.remove(buffer);
- notifyAll(); // lock is held in calling method
- freeList.push(buffer.getBufferindex());
- }
- }
-
- private int getBlockFromCompletedQueue(final AbfsInputStream stream, final
long position, final int length,
- final byte[] buffer) throws
IOException {
- ReadBuffer buf = getBufferFromCompletedQueue(stream, position);
-
- if (buf == null) {
- return 0;
- }
+ abstract void purgeBuffersForStream(AbfsInputStream stream);
- if (buf.getStatus() == ReadBufferStatus.READ_FAILED) {
- // To prevent new read requests to fail due to old read-ahead attempts,
- // return exception only from buffers that failed within last
thresholdAgeMilliseconds
- if ((currentTimeMillis() - (buf.getTimeStamp()) <
thresholdAgeMilliseconds)) {
- throw buf.getErrException();
- } else {
- return 0;
- }
- }
- if ((buf.getStatus() != ReadBufferStatus.AVAILABLE)
- || (position >= buf.getOffset() + buf.getLength())) {
- return 0;
- }
+ // Following Methods are for testing purposes only and should not be used in
production code.
- int cursor = (int) (position - buf.getOffset());
- int availableLengthInBuffer = buf.getLength() - cursor;
- int lengthToCopy = Math.min(length, availableLengthInBuffer);
- System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
- if (cursor == 0) {
- buf.setFirstByteConsumed(true);
- }
- if (cursor + lengthToCopy == buf.getLength()) {
- buf.setLastByteConsumed(true);
- }
- buf.setAnyByteConsumed(true);
- return lengthToCopy;
- }
-
- /*
- *
- * ReadBufferWorker-thread-facing methods
+ /**
+ * Gets the number of buffers currently managed by the read buffer manager.
*
+ * @return the number of buffers
*/
+ @VisibleForTesting
+ abstract int getNumBuffers();
/**
- * ReadBufferWorker thread calls this to get the next buffer that it should
work on.
- *
- * @return {@link ReadBuffer}
- * @throws InterruptedException if thread is interrupted
+ * Attempts to evict buffers based on the eviction policy.
*/
- ReadBuffer getNextBlockToRead() throws InterruptedException {
- ReadBuffer buffer = null;
- synchronized (this) {
- //buffer = readAheadQueue.take(); // blocking method
- while (readAheadQueue.size() == 0) {
- wait();
- }
- buffer = readAheadQueue.remove();
- notifyAll();
- if (buffer == null) {
- return null; // should never happen
- }
- buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
- inProgressList.add(buffer);
- }
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
- buffer.getStream().getPath(), buffer.getOffset());
- }
- return buffer;
- }
+ @VisibleForTesting
+ abstract void callTryEvict();
/**
- * ReadBufferWorker thread calls this method to post completion.
+ * Resets the read buffer manager for testing purposes. Clean up the current
+ * state of readAhead buffers and the lists. Will also trigger a fresh init.
+ */
+ @VisibleForTesting
+ abstract void testResetReadBufferManager();
+
+ /**
+ * Resets the read buffer manager for testing with the specified block size
and threshold age.
*
- * @param buffer the buffer whose read was completed
- * @param result the {@link ReadBufferStatus} after the read
operation in the worker thread
- * @param bytesActuallyRead the number of bytes that the worker thread was
actually able to read
+ * @param readAheadBlockSize the block size for read-ahead
+ * @param thresholdAgeMilliseconds the threshold age in milliseconds
*/
- void doneReading(final ReadBuffer buffer, final ReadBufferStatus result,
final int bytesActuallyRead) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("ReadBufferWorker completed read file {} for offset {}
outcome {} bytes {}",
- buffer.getStream().getPath(), buffer.getOffset(), result,
bytesActuallyRead);
- }
- synchronized (this) {
- // If this buffer has already been purged during
- // close of InputStream then we don't update the lists.
- if (inProgressList.contains(buffer)) {
- inProgressList.remove(buffer);
- if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
- buffer.setStatus(ReadBufferStatus.AVAILABLE);
- buffer.setLength(bytesActuallyRead);
- } else {
- freeList.push(buffer.getBufferindex());
- // buffer will be deleted as per the eviction policy.
- }
- // completed list also contains FAILED read buffers
- // for sending exception message to clients.
- buffer.setStatus(result);
- buffer.setTimeStamp(currentTimeMillis());
- completedReadList.add(buffer);
- }
- }
+ @VisibleForTesting
+ abstract void testResetReadBufferManager(int readAheadBlockSize, int
thresholdAgeMilliseconds);
- //outside the synchronized, since anyone receiving a wake-up from the
latch must see safe-published results
- buffer.getLatch().countDown(); // wake up waiting threads (if any)
- }
+ /**
+ * Resets the buffer manager instance to null for testing purposes.
+ * This allows for reinitialization in tests.
+ */
+ abstract void resetBufferManager();
/**
- * Similar to System.currentTimeMillis, except implemented with
System.nanoTime().
- * System.currentTimeMillis can go backwards when system clock is changed
(e.g., with NTP time synchronization),
- * making it unsuitable for measuring time intervals. nanotime is strictly
monotonically increasing per CPU core.
- * Note: it is not monotonic across Sockets, and even within a CPU, its only
the
- * more recent parts which share a clock across all cores.
+ * Gets the threshold age in milliseconds for buffer eviction.
*
- * @return current time in milliseconds
+ * @return the threshold age in milliseconds
*/
- private long currentTimeMillis() {
- return System.nanoTime() / 1000 / 1000;
- }
-
@VisibleForTesting
- int getThresholdAgeMilliseconds() {
+ protected static int getThresholdAgeMilliseconds() {
return thresholdAgeMilliseconds;
}
+ /**
+ * Sets the threshold age in milliseconds for buffer eviction.
+ *
+ * @param thresholdAgeMs the threshold age in milliseconds
+ */
@VisibleForTesting
- static void setThresholdAgeMilliseconds(int thresholdAgeMs) {
+ protected static void setThresholdAgeMilliseconds(int thresholdAgeMs) {
thresholdAgeMilliseconds = thresholdAgeMs;
}
+ /**
+ * Gets the block size used for read-ahead operations.
+ *
+ * @return the read-ahead block size in bytes
+ */
@VisibleForTesting
- int getCompletedReadListSize() {
- return completedReadList.size();
- }
-
- @VisibleForTesting
- public synchronized List<ReadBuffer> getCompletedReadListCopy() {
- return new ArrayList<>(completedReadList);
- }
-
- @VisibleForTesting
- public synchronized List<Integer> getFreeListCopy() {
- return new ArrayList<>(freeList);
+ protected static int getReadAheadBlockSize() {
+ return blockSize;
}
+ /**
+ * Sets the block size used for read-ahead operations.
+ *
+ * @param readAheadBlockSize the read-ahead block size in bytes
+ */
@VisibleForTesting
- public synchronized List<ReadBuffer> getReadAheadQueueCopy() {
- return new ArrayList<>(readAheadQueue);
+ protected static void setReadAheadBlockSize(int readAheadBlockSize) {
+ if (readAheadBlockSize <= 0) {
+ throw new IllegalArgumentException("Read-ahead block size must be
positive");
+ }
+ blockSize = readAheadBlockSize;
}
- @VisibleForTesting
- public synchronized List<ReadBuffer> getInProgressCopiedList() {
- return new ArrayList<>(inProgressList);
+ /**
+ * Gets the stack of free buffer indices.
+ *
+ * @return the stack of free buffer indices
+ */
+ public Stack<Integer> getFreeList() {
+ return freeList;
}
- @VisibleForTesting
- void callTryEvict() {
- tryEvict();
+ /**
+ * Gets the queue of read-ahead requests.
+ *
+ * @return the queue of {@link ReadBuffer} objects in the read-ahead queue
+ */
+ public Queue<ReadBuffer> getReadAheadQueue() {
+ return readAheadQueue;
}
-
/**
- * Purging the buffers associated with an {@link AbfsInputStream}
- * from {@link ReadBufferManager} when stream is closed.
- * @param stream input stream.
+ * Gets the list of in-progress read buffers.
+ *
+ * @return the list of {@link ReadBuffer} objects that are currently being
processed
*/
- public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
- LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
- readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
- purgeList(stream, completedReadList);
+ public LinkedList<ReadBuffer> getInProgressList() {
+ return inProgressList;
}
/**
- * Method to remove buffers associated with a {@link AbfsInputStream}
- * when its close method is called.
- * NOTE: This method is not threadsafe and must be called inside a
- * synchronised block. See caller.
- * @param stream associated input stream.
- * @param list list of buffers like {@link this#completedReadList}
- * or {@link this#inProgressList}.
+ * Gets the list of completed read buffers.
+ *
+ * @return the list of {@link ReadBuffer} objects that have been read and
are available for use
*/
- private void purgeList(AbfsInputStream stream, LinkedList<ReadBuffer> list) {
- for (Iterator<ReadBuffer> it = list.iterator(); it.hasNext();) {
- ReadBuffer readBuffer = it.next();
- if (readBuffer.getStream() == stream) {
- it.remove();
- // As failed ReadBuffers (bufferIndex = -1) are already pushed to free
- // list in doneReading method, we will skip adding those here again.
- if (readBuffer.getBufferindex() != -1) {
- freeList.push(readBuffer.getBufferindex());
- }
- }
- }
+ public LinkedList<ReadBuffer> getCompletedReadList() {
+ return completedReadList;
}
+
/**
- * Test method that can clean up the current state of readAhead buffers and
- * the lists. Will also trigger a fresh init.
+ * Gets a copy of the list of free buffer indices.
+ *
+ * @return a list of free buffer indices
*/
@VisibleForTesting
- void testResetReadBufferManager() {
- synchronized (this) {
- ArrayList<ReadBuffer> completedBuffers = new ArrayList<>();
- for (ReadBuffer buf : completedReadList) {
- if (buf != null) {
- completedBuffers.add(buf);
- }
- }
-
- for (ReadBuffer buf : completedBuffers) {
- evict(buf);
- }
-
- readAheadQueue.clear();
- inProgressList.clear();
- completedReadList.clear();
- freeList.clear();
- for (int i = 0; i < NUM_BUFFERS; i++) {
- buffers[i] = null;
- }
- buffers = null;
- resetBufferManager();
- }
+ protected synchronized List<Integer> getFreeListCopy() {
+ return new ArrayList<>(freeList);
}
/**
- * Reset buffer manager to null.
+ * Gets a copy of the read-ahead queue.
+ *
+ * @return a list of {@link ReadBuffer} objects in the read-ahead queue
*/
@VisibleForTesting
- static void resetBufferManager() {
- bufferManager = null;
+ protected synchronized List<ReadBuffer> getReadAheadQueueCopy() {
+ return new ArrayList<>(readAheadQueue);
}
/**
- * Reset readAhead buffer to needed readAhead block size and
- * thresholdAgeMilliseconds.
- * @param readAheadBlockSize
- * @param thresholdAgeMilliseconds
+ * Gets a copy of the list of in-progress read buffers.
+ *
+ * @return a list of in-progress {@link ReadBuffer} objects
*/
@VisibleForTesting
- void testResetReadBufferManager(int readAheadBlockSize, int
thresholdAgeMilliseconds) {
- setBlockSize(readAheadBlockSize);
- setThresholdAgeMilliseconds(thresholdAgeMilliseconds);
- testResetReadBufferManager();
+ protected synchronized List<ReadBuffer> getInProgressCopiedList() {
+ return new ArrayList<>(inProgressList);
}
+ /**
+ * Gets a copy of the list of completed read buffers.
+ *
+ * @return a list of completed {@link ReadBuffer} objects
+ */
@VisibleForTesting
- static void setBlockSize(int readAheadBlockSize) {
- blockSize = readAheadBlockSize;
+ protected synchronized List<ReadBuffer> getCompletedReadListCopy() {
+ return new ArrayList<>(completedReadList);
}
+ /**
+ * Gets the size of the completed read list.
+ *
+ * @return the number of completed read buffers
+ */
@VisibleForTesting
- int getReadAheadBlockSize() {
- return blockSize;
+ protected int getCompletedReadListSize() {
+ return completedReadList.size();
}
/**
- * Test method that can mimic no free buffers scenario and also add a
ReadBuffer
- * into completedReadList. This readBuffer will get picked up by TryEvict()
- * next time a new queue request comes in.
- * @param buf that needs to be added to completedReadlist
+ * Simulates full buffer usage and adds a failed buffer for testing.
+ *
+ * @param buf the buffer to add as failed
*/
@VisibleForTesting
- void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) {
+ protected void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) {
freeList.clear();
completedReadList.add(buf);
}
-
- @VisibleForTesting
- int getNumBuffers() {
- return NUM_BUFFERS;
- }
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
similarity index 63%
copy from
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
copy to
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
index 031545f57a1..fe1ac3fa1f2 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
@@ -18,53 +18,60 @@
package org.apache.hadoop.fs.azurebfs.services;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-import java.util.Stack;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.classification.VisibleForTesting;
/**
* The Read Buffer Manager for Rest AbfsClient.
+ * V1 implementation of ReadBufferManager.
*/
-final class ReadBufferManager {
- private static final Logger LOGGER =
LoggerFactory.getLogger(ReadBufferManager.class);
- private static final int ONE_KB = 1024;
- private static final int ONE_MB = ONE_KB * ONE_KB;
+final class ReadBufferManagerV1 extends ReadBufferManager {
private static final int NUM_BUFFERS = 16;
private static final int NUM_THREADS = 8;
- private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have
to see if 3 seconds is a good threshold
+ private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000;
- private static int blockSize = 4 * ONE_MB;
- private static int thresholdAgeMilliseconds =
DEFAULT_THRESHOLD_AGE_MILLISECONDS;
private Thread[] threads = new Thread[NUM_THREADS];
- private byte[][] buffers; // array of byte[] buffers, to hold the data
that is read
- private Stack<Integer> freeList = new Stack<>(); // indices in buffers[]
array that are available
+ private byte[][] buffers;
+ private static ReadBufferManagerV1 bufferManager;
- private Queue<ReadBuffer> readAheadQueue = new LinkedList<>(); // queue of
requests that are not picked up by any worker thread yet
- private LinkedList<ReadBuffer> inProgressList = new LinkedList<>(); //
requests being processed by worker threads
- private LinkedList<ReadBuffer> completedReadList = new LinkedList<>(); //
buffers available for reading
- private static ReadBufferManager bufferManager; // singleton, initialized in
static initialization block
- private static final ReentrantLock LOCK = new ReentrantLock();
+ // hide instance constructor
+ private ReadBufferManagerV1() {
+ LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
+ }
+
+ /**
+ * Sets the read buffer manager configurations.
+ * @param readAheadBlockSize the size of the read-ahead block in bytes
+ */
+ static void setReadBufferManagerConfigs(int readAheadBlockSize) {
+ if (bufferManager == null) {
+ LOGGER.debug(
+ "ReadBufferManagerV1 not initialized yet. Overriding
readAheadBlockSize as {}",
+ readAheadBlockSize);
+ setReadAheadBlockSize(readAheadBlockSize);
+ setThresholdAgeMilliseconds(DEFAULT_THRESHOLD_AGE_MILLISECONDS);
+ }
+ }
- static ReadBufferManager getBufferManager() {
+ /**
+ * Returns the singleton instance of ReadBufferManagerV1.
+ * @return the singleton instance of ReadBufferManagerV1
+ */
+ static ReadBufferManagerV1 getBufferManager() {
if (bufferManager == null) {
LOCK.lock();
try {
if (bufferManager == null) {
- bufferManager = new ReadBufferManager();
+ bufferManager = new ReadBufferManagerV1();
bufferManager.init();
}
} finally {
@@ -74,23 +81,18 @@ static ReadBufferManager getBufferManager() {
return bufferManager;
}
- static void setReadBufferManagerConfigs(int readAheadBlockSize) {
- if (bufferManager == null) {
- LOGGER.debug(
- "ReadBufferManager not initialized yet. Overriding
readAheadBlockSize as {}",
- readAheadBlockSize);
- blockSize = readAheadBlockSize;
- }
- }
-
- private void init() {
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ void init() {
buffers = new byte[NUM_BUFFERS][];
for (int i = 0; i < NUM_BUFFERS; i++) {
- buffers[i] = new byte[blockSize]; // same buffers are reused. The byte
array never goes back to GC
- freeList.add(i);
+ buffers[i] = new byte[getReadAheadBlockSize()]; // same buffers are
reused. These byte arrays are never garbage collected
+ getFreeList().add(i);
}
for (int i = 0; i < NUM_THREADS; i++) {
- Thread t = new Thread(new ReadBufferWorker(i));
+ Thread t = new Thread(new ReadBufferWorker(i, this));
t.setDaemon(true);
threads[i] = t;
t.setName("ABFS-prefetch-" + i);
@@ -99,28 +101,12 @@ private void init() {
ReadBufferWorker.UNLEASH_WORKERS.countDown();
}
- // hide instance constructor
- private ReadBufferManager() {
- LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
- }
-
-
- /*
- *
- * AbfsInputStream-facing methods
- *
- */
-
-
/**
- * {@link AbfsInputStream} calls this method to queue read-aheads.
- *
- * @param stream The {@link AbfsInputStream} for which to do the
read-ahead
- * @param requestedOffset The offset in the file which shoukd be read
- * @param requestedLength The length to read
+ * {@inheritDoc}
*/
- void queueReadAhead(final AbfsInputStream stream, final long
requestedOffset, final int requestedLength,
- TracingContext tracingContext) {
+ @Override
+ public void queueReadAhead(final AbfsInputStream stream, final long
requestedOffset, final int requestedLength,
+ TracingContext tracingContext) {
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Start Queueing readAhead for {} offset {} length {}",
stream.getPath(), requestedOffset, requestedLength);
@@ -130,7 +116,7 @@ void queueReadAhead(final AbfsInputStream stream, final
long requestedOffset, fi
if (isAlreadyQueued(stream, requestedOffset)) {
return; // already queued, do not queue again
}
- if (freeList.isEmpty() && !tryEvict()) {
+ if (getFreeList().isEmpty() && !tryEvict()) {
return; // no buffers available, cannot queue anything
}
@@ -143,11 +129,11 @@ void queueReadAhead(final AbfsInputStream stream, final
long requestedOffset, fi
buffer.setLatch(new CountDownLatch(1));
buffer.setTracingContext(tracingContext);
- Integer bufferIndex = freeList.pop(); // will return a value, since we
have checked size > 0 already
+ Integer bufferIndex = getFreeList().pop(); // will return a value,
since we have checked size > 0 already
buffer.setBuffer(buffers[bufferIndex]);
buffer.setBufferindex(bufferIndex);
- readAheadQueue.add(buffer);
+ getReadAheadQueue().add(buffer);
notifyAll();
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx
{}",
@@ -156,22 +142,11 @@ void queueReadAhead(final AbfsInputStream stream, final
long requestedOffset, fi
}
}
-
/**
- * {@link AbfsInputStream} calls this method read any bytes already
available in a buffer (thereby saving a
- * remote read). This returns the bytes if the data already exists in
buffer. If there is a buffer that is reading
- * the requested offset, then this method blocks until that read completes.
If the data is queued in a read-ahead
- * but not picked up by a worker thread yet, then it cancels that read-ahead
and reports cache miss. This is because
- * depending on worker thread availability, the read-ahead may take a while
- the calling thread can do it's own
- * read to get the data faster (copmared to the read waiting in queue for an
indeterminate amount of time).
- *
- * @param stream the file to read bytes for
- * @param position the offset in the file to do a read for
- * @param length the length to read
- * @param buffer the buffer to read data into. Note that the buffer will
be written into from offset 0.
- * @return the number of bytes read
+ * {@inheritDoc}
*/
- int getBlock(final AbfsInputStream stream, final long position, final int
length, final byte[] buffer)
+ @Override
+ public int getBlock(final AbfsInputStream stream, final long position, final
int length, final byte[] buffer)
throws IOException {
// not synchronized, so have to be careful with locking
if (LOGGER.isTraceEnabled()) {
@@ -197,17 +172,87 @@ int getBlock(final AbfsInputStream stream, final long
position, final int length
return 0;
}
- /*
- *
- * Internal methods
- *
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ReadBuffer getNextBlockToRead() throws InterruptedException {
+ ReadBuffer buffer = null;
+ synchronized (this) {
+ while (getReadAheadQueue().isEmpty()) {
+ wait();
+ }
+ buffer = getReadAheadQueue().remove();
+ notifyAll();
+ if (buffer == null) {
+ return null; // should never happen
+ }
+ buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
+ getInProgressList().add(buffer);
+ }
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
+ buffer.getStream().getPath(), buffer.getOffset());
+ }
+ return buffer;
+ }
+
+ /**
+ * {@inheritDoc}
*/
+ @Override
+ public void doneReading(final ReadBuffer buffer, final ReadBufferStatus
result, final int bytesActuallyRead) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace("ReadBufferWorker completed read file {} for offset {}
outcome {} bytes {}",
+ buffer.getStream().getPath(), buffer.getOffset(), result,
bytesActuallyRead);
+ }
+ synchronized (this) {
+ // If this buffer has already been purged during
+ // close of InputStream then we don't update the lists.
+ if (getInProgressList().contains(buffer)) {
+ getInProgressList().remove(buffer);
+ if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
+ buffer.setStatus(ReadBufferStatus.AVAILABLE);
+ buffer.setLength(bytesActuallyRead);
+ } else {
+ getFreeList().push(buffer.getBufferindex());
+ // buffer will be deleted as per the eviction policy.
+ }
+ // completed list also contains FAILED read buffers
+ // for sending exception message to clients.
+ buffer.setStatus(result);
+ buffer.setTimeStamp(currentTimeMillis());
+ getCompletedReadList().add(buffer);
+ }
+ }
+ //outside the synchronized, since anyone receiving a wake-up from the
latch must see safe-published results
+ buffer.getLatch().countDown(); // wake up waiting threads (if any)
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
+ LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
+ getReadAheadQueue().removeIf(readBuffer -> readBuffer.getStream() ==
stream);
+ purgeList(stream, getCompletedReadList());
+ }
+
+ /**
+ * Waits for the process to complete for the given stream and position.
+ * If the buffer is in progress, it waits for the latch to be released.
+ * If the buffer is not in progress, it clears it from the read-ahead queue.
+ *
+ * @param stream the AbfsInputStream associated with the read request
+ * @param position the position in the stream to wait for
+ */
private void waitForProcess(final AbfsInputStream stream, final long
position) {
ReadBuffer readBuf;
synchronized (this) {
clearFromReadAheadQueue(stream, position);
- readBuf = getFromList(inProgressList, stream, position);
+ readBuf = getFromList(getInProgressList(), stream, position);
}
if (readBuf != null) { // if in in-progress queue, then block for
it
try {
@@ -240,14 +285,14 @@ private void waitForProcess(final AbfsInputStream stream,
final long position) {
*/
private synchronized boolean tryEvict() {
ReadBuffer nodeToEvict = null;
- if (completedReadList.size() <= 0) {
+ if (getCompletedReadList().size() <= 0) {
return false; // there are no evict-able buffers
}
long currentTimeInMs = currentTimeMillis();
// first, try buffers where all bytes have been consumed (approximated as
first and last bytes consumed)
- for (ReadBuffer buf : completedReadList) {
+ for (ReadBuffer buf : getCompletedReadList()) {
if (buf.isFirstByteConsumed() && buf.isLastByteConsumed()) {
nodeToEvict = buf;
break;
@@ -257,8 +302,8 @@ private synchronized boolean tryEvict() {
return evict(nodeToEvict);
}
- // next, try buffers where any bytes have been consumed (may be a bad
idea? have to experiment and see)
- for (ReadBuffer buf : completedReadList) {
+ // next, try buffers where any bytes have been consumed (maybe a bad idea?
have to experiment and see)
+ for (ReadBuffer buf : getCompletedReadList()) {
if (buf.isAnyByteConsumed()) {
nodeToEvict = buf;
break;
@@ -278,13 +323,13 @@ private synchronized boolean tryEvict() {
// its buffer when its status was set to READ_FAILED.
long earliestBirthday = Long.MAX_VALUE;
ArrayList<ReadBuffer> oldFailedBuffers = new ArrayList<>();
- for (ReadBuffer buf : completedReadList) {
+ for (ReadBuffer buf : getCompletedReadList()) {
if ((buf.getBufferindex() != -1)
&& (buf.getTimeStamp() < earliestBirthday)) {
nodeToEvict = buf;
earliestBirthday = buf.getTimeStamp();
} else if ((buf.getBufferindex() == -1)
- && (currentTimeInMs - buf.getTimeStamp()) >
thresholdAgeMilliseconds) {
+ && (currentTimeInMs - buf.getTimeStamp()) >
getThresholdAgeMilliseconds()) {
oldFailedBuffers.add(buf);
}
}
@@ -293,7 +338,7 @@ private synchronized boolean tryEvict() {
evict(buf);
}
- if ((currentTimeInMs - earliestBirthday > thresholdAgeMilliseconds) &&
(nodeToEvict != null)) {
+ if ((currentTimeInMs - earliestBirthday > getThresholdAgeMilliseconds())
&& (nodeToEvict != null)) {
return evict(nodeToEvict);
}
@@ -302,14 +347,20 @@ private synchronized boolean tryEvict() {
return false;
}
+ /**
+ * Evicts the given buffer by removing it from the completedReadList and
adding its index to the freeList.
+ *
+ * @param buf the ReadBuffer to evict
+ * @return true if eviction was successful, false otherwise
+ */
private boolean evict(final ReadBuffer buf) {
// As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList,
// avoid adding it to freeList.
if (buf.getBufferindex() != -1) {
- freeList.push(buf.getBufferindex());
+ getFreeList().push(buf.getBufferindex());
}
- completedReadList.remove(buf);
+ getCompletedReadList().remove(buf);
buf.setTracingContext(null);
if (LOGGER.isTraceEnabled()) {
LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {}
length {}",
@@ -318,17 +369,38 @@ private boolean evict(final ReadBuffer buf) {
return true;
}
+ /**
+ * Checks if the requested offset is already queued in any of the lists:
+ * @param stream the AbfsInputStream associated with the read request
+ * @param requestedOffset the offset in the stream to check
+ * @return true if the requested offset is already queued in any of the
lists,
+ */
private boolean isAlreadyQueued(final AbfsInputStream stream, final long
requestedOffset) {
// returns true if any part of the buffer is already queued
- return (isInList(readAheadQueue, stream, requestedOffset)
- || isInList(inProgressList, stream, requestedOffset)
- || isInList(completedReadList, stream, requestedOffset));
+ return (isInList(getReadAheadQueue(), stream, requestedOffset)
+ || isInList(getInProgressList(), stream, requestedOffset)
+ || isInList(getCompletedReadList(), stream, requestedOffset));
}
+ /**
+ * Checks if the requested offset is in the given list.
+ * @param list the collection of ReadBuffer to check against
+ * @param stream the AbfsInputStream associated with the read request
+ * @param requestedOffset the offset in the stream to check
+ * @return true if the requested offset is in the list,
+ */
private boolean isInList(final Collection<ReadBuffer> list, final
AbfsInputStream stream, final long requestedOffset) {
return (getFromList(list, stream, requestedOffset) != null);
}
+ /**
+ * Returns the ReadBuffer from the given list that matches the stream and
requested offset.
+ * If the buffer is found, it checks if the requested offset is within the
buffer's range.
+ * @param list the collection of ReadBuffer to search in
+ * @param stream the AbfsInputStream associated with the read request
+ * @param requestedOffset the offset in the stream to check
+ * @return the ReadBuffer if found, null otherwise
+ */
private ReadBuffer getFromList(final Collection<ReadBuffer> list, final
AbfsInputStream stream, final long requestedOffset) {
for (ReadBuffer buffer : list) {
if (buffer.getStream() == stream) {
@@ -346,37 +418,60 @@ private ReadBuffer getFromList(final
Collection<ReadBuffer> list, final AbfsInpu
}
/**
- * Returns buffers that failed or passed from completed queue.
- * @param stream
- * @param requestedOffset
- * @return
+ * Returns a ReadBuffer from the completedReadList that matches the stream
and requested offset.
+ * The buffer is returned if the requestedOffset is at or above buffer's
offset but less than buffer's length
+ * or the actual requestedLength.
+ *
+ * @param stream the AbfsInputStream associated with the read request
+ * @param requestedOffset the offset in the stream to check
+ * @return the ReadBuffer if found, null otherwise
*/
private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream,
final long requestedOffset) {
- for (ReadBuffer buffer : completedReadList) {
+ for (ReadBuffer buffer : getCompletedReadList()) {
// Buffer is returned if the requestedOffset is at or above buffer's
// offset but less than buffer's length or the actual requestedLength
if ((buffer.getStream() == stream)
&& (requestedOffset >= buffer.getOffset())
&& ((requestedOffset < buffer.getOffset() + buffer.getLength())
|| (requestedOffset < buffer.getOffset() +
buffer.getRequestedLength()))) {
- return buffer;
- }
+ return buffer;
}
+ }
return null;
}
+ /**
+ * Clears the buffer from the read-ahead queue for the given stream and
requested offset.
+ * This method is called when the stream is waiting for a process to
complete.
+ * It removes the buffer from the read-ahead queue and adds its index back
to the free list.
+ *
+ * @param stream the AbfsInputStream associated with the read request
+ * @param requestedOffset the offset in the stream to check
+ */
private void clearFromReadAheadQueue(final AbfsInputStream stream, final
long requestedOffset) {
- ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset);
+ ReadBuffer buffer = getFromList(getReadAheadQueue(), stream,
requestedOffset);
if (buffer != null) {
- readAheadQueue.remove(buffer);
+ getReadAheadQueue().remove(buffer);
notifyAll(); // lock is held in calling method
- freeList.push(buffer.getBufferindex());
+ getFreeList().push(buffer.getBufferindex());
}
}
+ /**
+ * Gets a block of data from the completed read buffers.
+ * If the buffer is found, it copies the data to the provided buffer and
updates the status of the ReadBuffer.
+ * If the buffer is not found or not available, it returns 0.
+ *
+ * @param stream the AbfsInputStream associated with the read request
+ * @param position the position in the file to read from
+ * @param length the number of bytes to read
+ * @param buffer the buffer to store the read data
+ * @return the number of bytes actually read
+ * @throws IOException if an I/O error occurs while reading from the buffer
+ */
private int getBlockFromCompletedQueue(final AbfsInputStream stream, final
long position, final int length,
- final byte[] buffer) throws
IOException {
+ final byte[] buffer) throws IOException {
ReadBuffer buf = getBufferFromCompletedQueue(stream, position);
if (buf == null) {
@@ -386,7 +481,7 @@ private int getBlockFromCompletedQueue(final
AbfsInputStream stream, final long
if (buf.getStatus() == ReadBufferStatus.READ_FAILED) {
// To prevent new read requests to fail due to old read-ahead attempts,
// return exception only from buffers that failed within last
thresholdAgeMilliseconds
- if ((currentTimeMillis() - (buf.getTimeStamp()) <
thresholdAgeMilliseconds)) {
+ if ((currentTimeMillis() - (buf.getTimeStamp()) <
getThresholdAgeMilliseconds())) {
throw buf.getErrException();
} else {
return 0;
@@ -412,76 +507,6 @@ private int getBlockFromCompletedQueue(final
AbfsInputStream stream, final long
return lengthToCopy;
}
- /*
- *
- * ReadBufferWorker-thread-facing methods
- *
- */
-
- /**
- * ReadBufferWorker thread calls this to get the next buffer that it should
work on.
- *
- * @return {@link ReadBuffer}
- * @throws InterruptedException if thread is interrupted
- */
- ReadBuffer getNextBlockToRead() throws InterruptedException {
- ReadBuffer buffer = null;
- synchronized (this) {
- //buffer = readAheadQueue.take(); // blocking method
- while (readAheadQueue.size() == 0) {
- wait();
- }
- buffer = readAheadQueue.remove();
- notifyAll();
- if (buffer == null) {
- return null; // should never happen
- }
- buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
- inProgressList.add(buffer);
- }
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("ReadBufferWorker picked file {} for offset {}",
- buffer.getStream().getPath(), buffer.getOffset());
- }
- return buffer;
- }
-
- /**
- * ReadBufferWorker thread calls this method to post completion.
- *
- * @param buffer the buffer whose read was completed
- * @param result the {@link ReadBufferStatus} after the read
operation in the worker thread
- * @param bytesActuallyRead the number of bytes that the worker thread was
actually able to read
- */
- void doneReading(final ReadBuffer buffer, final ReadBufferStatus result,
final int bytesActuallyRead) {
- if (LOGGER.isTraceEnabled()) {
- LOGGER.trace("ReadBufferWorker completed read file {} for offset {}
outcome {} bytes {}",
- buffer.getStream().getPath(), buffer.getOffset(), result,
bytesActuallyRead);
- }
- synchronized (this) {
- // If this buffer has already been purged during
- // close of InputStream then we don't update the lists.
- if (inProgressList.contains(buffer)) {
- inProgressList.remove(buffer);
- if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
- buffer.setStatus(ReadBufferStatus.AVAILABLE);
- buffer.setLength(bytesActuallyRead);
- } else {
- freeList.push(buffer.getBufferindex());
- // buffer will be deleted as per the eviction policy.
- }
- // completed list also contains FAILED read buffers
- // for sending exception message to clients.
- buffer.setStatus(result);
- buffer.setTimeStamp(currentTimeMillis());
- completedReadList.add(buffer);
- }
- }
-
- //outside the synchronized, since anyone receiving a wake-up from the
latch must see safe-published results
- buffer.getLatch().countDown(); // wake up waiting threads (if any)
- }
-
/**
* Similar to System.currentTimeMillis, except implemented with
System.nanoTime().
* System.currentTimeMillis can go backwards when system clock is changed
(e.g., with NTP time synchronization),
@@ -495,66 +520,13 @@ private long currentTimeMillis() {
return System.nanoTime() / 1000 / 1000;
}
- @VisibleForTesting
- int getThresholdAgeMilliseconds() {
- return thresholdAgeMilliseconds;
- }
-
- @VisibleForTesting
- static void setThresholdAgeMilliseconds(int thresholdAgeMs) {
- thresholdAgeMilliseconds = thresholdAgeMs;
- }
-
- @VisibleForTesting
- int getCompletedReadListSize() {
- return completedReadList.size();
- }
-
- @VisibleForTesting
- public synchronized List<ReadBuffer> getCompletedReadListCopy() {
- return new ArrayList<>(completedReadList);
- }
-
- @VisibleForTesting
- public synchronized List<Integer> getFreeListCopy() {
- return new ArrayList<>(freeList);
- }
-
- @VisibleForTesting
- public synchronized List<ReadBuffer> getReadAheadQueueCopy() {
- return new ArrayList<>(readAheadQueue);
- }
-
- @VisibleForTesting
- public synchronized List<ReadBuffer> getInProgressCopiedList() {
- return new ArrayList<>(inProgressList);
- }
-
- @VisibleForTesting
- void callTryEvict() {
- tryEvict();
- }
-
-
- /**
- * Purging the buffers associated with an {@link AbfsInputStream}
- * from {@link ReadBufferManager} when stream is closed.
- * @param stream input stream.
- */
- public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
- LOGGER.debug("Purging stale buffers for AbfsInputStream {} ", stream);
- readAheadQueue.removeIf(readBuffer -> readBuffer.getStream() == stream);
- purgeList(stream, completedReadList);
- }
-
/**
* Method to remove buffers associated with a {@link AbfsInputStream}
* when its close method is called.
* NOTE: This method is not threadsafe and must be called inside a
* synchronised block. See caller.
* @param stream associated input stream.
- * @param list list of buffers like {@link this#completedReadList}
- * or {@link this#inProgressList}.
+ * @param list list of buffers like completedReadList or inProgressList
*/
private void purgeList(AbfsInputStream stream, LinkedList<ReadBuffer> list) {
for (Iterator<ReadBuffer> it = list.iterator(); it.hasNext();) {
@@ -564,21 +536,39 @@ private void purgeList(AbfsInputStream stream,
LinkedList<ReadBuffer> list) {
// As failed ReadBuffers (bufferIndex = -1) are already pushed to free
// list in doneReading method, we will skip adding those here again.
if (readBuffer.getBufferindex() != -1) {
- freeList.push(readBuffer.getBufferindex());
+ getFreeList().push(readBuffer.getBufferindex());
}
}
}
}
/**
- * Test method that can clean up the current state of readAhead buffers and
- * the lists. Will also trigger a fresh init.
+ * {@inheritDoc}
*/
@VisibleForTesting
- void testResetReadBufferManager() {
+ @Override
+ public int getNumBuffers() {
+ return NUM_BUFFERS;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @VisibleForTesting
+ @Override
+ public void callTryEvict() {
+ tryEvict();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @VisibleForTesting
+ @Override
+ public void testResetReadBufferManager() {
synchronized (this) {
ArrayList<ReadBuffer> completedBuffers = new ArrayList<>();
- for (ReadBuffer buf : completedReadList) {
+ for (ReadBuffer buf : getCompletedReadList()) {
if (buf != null) {
completedBuffers.add(buf);
}
@@ -588,10 +578,10 @@ void testResetReadBufferManager() {
evict(buf);
}
- readAheadQueue.clear();
- inProgressList.clear();
- completedReadList.clear();
- freeList.clear();
+ getReadAheadQueue().clear();
+ getInProgressList().clear();
+ getCompletedReadList().clear();
+ getFreeList().clear();
for (int i = 0; i < NUM_BUFFERS; i++) {
buffers[i] = null;
}
@@ -601,50 +591,22 @@ void testResetReadBufferManager() {
}
/**
- * Reset buffer manager to null.
+ * {@inheritDoc}
*/
@VisibleForTesting
- static void resetBufferManager() {
- bufferManager = null;
- }
-
- /**
- * Reset readAhead buffer to needed readAhead block size and
- * thresholdAgeMilliseconds.
- * @param readAheadBlockSize
- * @param thresholdAgeMilliseconds
- */
- @VisibleForTesting
- void testResetReadBufferManager(int readAheadBlockSize, int
thresholdAgeMilliseconds) {
- setBlockSize(readAheadBlockSize);
+ @Override
+ public void testResetReadBufferManager(int readAheadBlockSize, int
thresholdAgeMilliseconds) {
+ setReadAheadBlockSize(readAheadBlockSize);
setThresholdAgeMilliseconds(thresholdAgeMilliseconds);
testResetReadBufferManager();
}
- @VisibleForTesting
- static void setBlockSize(int readAheadBlockSize) {
- blockSize = readAheadBlockSize;
+ @Override
+ void resetBufferManager() {
+ setBufferManager(null); // reset the singleton instance
}
- @VisibleForTesting
- int getReadAheadBlockSize() {
- return blockSize;
- }
-
- /**
- * Test method that can mimic no free buffers scenario and also add a
ReadBuffer
- * into completedReadList. This readBuffer will get picked up by TryEvict()
- * next time a new queue request comes in.
- * @param buf that needs to be added to completedReadlist
- */
- @VisibleForTesting
- void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) {
- freeList.clear();
- completedReadList.add(buf);
- }
-
- @VisibleForTesting
- int getNumBuffers() {
- return NUM_BUFFERS;
+ private static void setBufferManager(ReadBufferManagerV1 manager) {
+ bufferManager = manager;
}
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
new file mode 100644
index 00000000000..9cce860127d
--- /dev/null
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+
+final class ReadBufferManagerV2 extends ReadBufferManager {
+
+ // Thread Pool Configurations
+ private static int minThreadPoolSize;
+ private static int maxThreadPoolSize;
+ private static int executorServiceKeepAliveTimeInMilliSec;
+ private ThreadPoolExecutor workerPool;
+
+ // Buffer Pool Configurations
+ private static int minBufferPoolSize;
+ private static int maxBufferPoolSize;
+ private int numberOfActiveBuffers = 0;
+ private byte[][] bufferPool;
+
+ private static ReadBufferManagerV2 bufferManager;
+
+ // hide instance constructor
+ private ReadBufferManagerV2() {
+ LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
+ }
+
+ /**
+ * Sets the read buffer manager configurations.
+ * @param readAheadBlockSize the size of the read-ahead block in bytes
+ * @param abfsConfiguration the AbfsConfiguration instance for other
configurations
+ */
+ static void setReadBufferManagerConfigs(int readAheadBlockSize,
AbfsConfiguration abfsConfiguration) {
+ if (bufferManager == null) {
+ minThreadPoolSize = abfsConfiguration.getMinReadAheadV2ThreadPoolSize();
+ maxThreadPoolSize = abfsConfiguration.getMaxReadAheadV2ThreadPoolSize();
+ executorServiceKeepAliveTimeInMilliSec =
abfsConfiguration.getReadAheadExecutorServiceTTLInMillis();
+
+ minBufferPoolSize = abfsConfiguration.getMinReadAheadV2BufferPoolSize();
+ maxBufferPoolSize = abfsConfiguration.getMaxReadAheadV2BufferPoolSize();
+
setThresholdAgeMilliseconds(abfsConfiguration.getReadAheadV2CachedBufferTTLMillis());
+ setReadAheadBlockSize(readAheadBlockSize);
+ }
+ }
+
+ /**
+ * Returns the singleton instance of ReadBufferManagerV2.
+ * @return the singleton instance of ReadBufferManagerV2
+ */
+ static ReadBufferManagerV2 getBufferManager() {
+ if (bufferManager == null) {
+ LOCK.lock();
+ try {
+ if (bufferManager == null) {
+ bufferManager = new ReadBufferManagerV2();
+ bufferManager.init();
+ }
+ } finally {
+ LOCK.unlock();
+ }
+ }
+ return bufferManager;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ void init() {
+ // Initialize Buffer Pool
+ bufferPool = new byte[maxBufferPoolSize][];
+ for (int i = 0; i < minBufferPoolSize; i++) {
+ bufferPool[i] = new byte[getReadAheadBlockSize()]; // same buffers are
reused. These byte arrays are never garbage collected
+ getFreeList().add(i);
+ numberOfActiveBuffers++;
+ }
+
+ // Initialize a Fixed Size Thread Pool with minThreadPoolSize threads
+ workerPool = new ThreadPoolExecutor(
+ minThreadPoolSize,
+ maxThreadPoolSize,
+ executorServiceKeepAliveTimeInMilliSec,
+ TimeUnit.MILLISECONDS,
+ new SynchronousQueue<>(),
+ namedThreadFactory);
+ workerPool.allowCoreThreadTimeOut(true);
+ for (int i = 0; i < minThreadPoolSize; i++) {
+ ReadBufferWorker worker = new ReadBufferWorker(i, this);
+ workerPool.submit(worker);
+ }
+ ReadBufferWorker.UNLEASH_WORKERS.countDown();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void queueReadAhead(final AbfsInputStream stream,
+ final long requestedOffset,
+ final int requestedLength,
+ final TracingContext tracingContext) {
+ // TODO: To be implemented
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public int getBlock(final AbfsInputStream stream,
+ final long position,
+ final int length,
+ final byte[] buffer) throws IOException {
+ // TODO: To be implemented
+ return 0;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public ReadBuffer getNextBlockToRead() throws InterruptedException {
+ // TODO: To be implemented
+ return null;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void doneReading(final ReadBuffer buffer,
+ final ReadBufferStatus result,
+ final int bytesActuallyRead) {
+ // TODO: To be implemented
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void purgeBuffersForStream(final AbfsInputStream stream) {
+ // TODO: To be implemented
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @VisibleForTesting
+ @Override
+ public int getNumBuffers() {
+ return numberOfActiveBuffers;
+ }
+ /**
+ * {@inheritDoc}
+ */
+ @VisibleForTesting
+ @Override
+ public void callTryEvict() {
+ // TODO: To be implemented
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @VisibleForTesting
+ @Override
+ public void testResetReadBufferManager() {
+ // TODO: To be implemented
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @VisibleForTesting
+ @Override
+ public void testResetReadBufferManager(final int readAheadBlockSize,
+ final int thresholdAgeMilliseconds) {
+ // TODO: To be implemented
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void testMimicFullUseAndAddFailedBuffer(final ReadBuffer buf) {
+ // TODO: To be implemented
+ }
+
+ private final ThreadFactory namedThreadFactory = new ThreadFactory() {
+ private int count = 0;
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "ReadAheadV2-Thread-" + count++);
+ }
+ };
+
+ @Override
+ void resetBufferManager() {
+ setBufferManager(null); // reset the singleton instance
+ }
+
+ private static void setBufferManager(ReadBufferManagerV2 manager) {
+ bufferManager = manager;
+ }
+}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
index a30f06261ef..79d5eef955a 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
@@ -28,9 +28,11 @@ class ReadBufferWorker implements Runnable {
protected static final CountDownLatch UNLEASH_WORKERS = new
CountDownLatch(1);
private int id;
+ private ReadBufferManager bufferManager;
- ReadBufferWorker(final int id) {
+ ReadBufferWorker(final int id, final ReadBufferManager bufferManager) {
this.id = id;
+ this.bufferManager = bufferManager;
}
/**
@@ -51,7 +53,6 @@ public void run() {
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
- ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
ReadBuffer buffer;
while (true) {
try {
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java
index dd32643063a..78cd6bd9d6a 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestGetNameSpaceEnabled.java
@@ -26,7 +26,6 @@
import org.assertj.core.api.Assertions;
import org.mockito.Mockito;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
import
org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
@@ -43,8 +42,11 @@
import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;
+import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_MAX_IO_RETRIES;
import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.accountProperty;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_DFS_DOMAIN_NAME;
import static
org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys.FS_AZURE_ACCOUNT_KEY;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
@@ -139,7 +141,7 @@ private AzureBlobFileSystem getNewFSWithHnsConf(
this.getAccountName()), isNamespaceEnabledAccount);
rawConfig
.setBoolean(AZURE_CREATE_REMOTE_FILESYSTEM_DURING_INITIALIZATION,
true);
- rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+ rawConfig.set(FS_DEFAULT_NAME_KEY,
getNonExistingUrl());
return (AzureBlobFileSystem) FileSystem.get(rawConfig);
}
@@ -315,29 +317,29 @@ public void testAccountSpecificConfig() throws Exception {
rawConfig.set(accountProperty(FS_AZURE_ACCOUNT_KEY, testAccountName),
dummyAcountKey);
rawConfig.set(accountProperty(FS_AZURE_ACCOUNT_KEY, otherAccountName),
dummyAcountKey);
// Assert that account specific config takes precedence
- rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
defaultUri);
+ rawConfig.set(FS_DEFAULT_NAME_KEY, defaultUri);
assertFileSystemInitWithExpectedHNSSettings(rawConfig, false);
// Assert that other account still uses account agnostic config
- rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, otherUri);
+ rawConfig.set(FS_DEFAULT_NAME_KEY, otherUri);
assertFileSystemInitWithExpectedHNSSettings(rawConfig, true);
// Set only the account specific config for test account
rawConfig.set(accountProperty(FS_AZURE_ACCOUNT_IS_HNS_ENABLED,
testAccountName), FALSE_STR);
rawConfig.unset(FS_AZURE_ACCOUNT_IS_HNS_ENABLED);
// Assert that only account specific config is enough for test account
- rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
defaultUri);
+ rawConfig.set(FS_DEFAULT_NAME_KEY, defaultUri);
assertFileSystemInitWithExpectedHNSSettings(rawConfig, false);
// Set only account agnostic config
rawConfig.set(FS_AZURE_ACCOUNT_IS_HNS_ENABLED, FALSE_STR);
rawConfig.unset(accountProperty(FS_AZURE_ACCOUNT_IS_HNS_ENABLED,
testAccountName));
- rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
defaultUri);
+ rawConfig.set(FS_DEFAULT_NAME_KEY, defaultUri);
assertFileSystemInitWithExpectedHNSSettings(rawConfig, false);
// Unset both account specific and account agnostic config
rawConfig.unset(FS_AZURE_ACCOUNT_IS_HNS_ENABLED);
rawConfig.unset(accountProperty(FS_AZURE_ACCOUNT_IS_HNS_ENABLED,
testAccountName));
- rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
defaultUri);
+ rawConfig.set(FS_DEFAULT_NAME_KEY, defaultUri);
rawConfig.set(AZURE_MAX_IO_RETRIES, "0");
// Assert that file system init fails with UnknownHost exception as
getAcl() is needed.
try {
@@ -471,10 +473,10 @@ private Configuration getConfigurationWithoutHnsConfig() {
rawConfig.unset(FS_AZURE_ACCOUNT_IS_HNS_ENABLED);
rawConfig.unset(accountProperty(FS_AZURE_ACCOUNT_IS_HNS_ENABLED,
this.getAccountName()));
- String testAccountName = "testAccount.dfs.core.windows.net";
- String defaultUri = this.getTestUrl().replace(this.getAccountName(),
testAccountName);
+ String defaultUri = getRawConfiguration().get(FS_DEFAULT_NAME_KEY).
+ replace(ABFS_BLOB_DOMAIN_NAME, ABFS_DFS_DOMAIN_NAME);
// Assert that account specific config takes precedence
- rawConfig.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
defaultUri);
+ rawConfig.set(FS_DEFAULT_NAME_KEY, defaultUri);
return rawConfig;
}
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
index a57430fa808..b70f36de318 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
@@ -67,7 +67,7 @@ public ITestReadBufferManager() throws Exception {
@Test
public void testPurgeBufferManagerForParallelStreams() throws Exception {
- describe("Testing purging of buffers from ReadBufferManager for "
+ describe("Testing purging of buffers from ReadBufferManagerV1 for "
+ "parallel input streams");
final int numBuffers = 16;
final LinkedList<Integer> freeList = new LinkedList<>();
@@ -99,7 +99,7 @@ public void testPurgeBufferManagerForParallelStreams() throws
Exception {
executorService.awaitTermination(1, TimeUnit.MINUTES);
}
- ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
+ ReadBufferManagerV1 bufferManager =
ReadBufferManagerV1.getBufferManager();
// readahead queue is empty
assertListEmpty("ReadAheadQueue",
bufferManager.getReadAheadQueueCopy());
// verify the in progress list eventually empties out.
@@ -115,7 +115,7 @@ private void assertListEmpty(String listName,
List<ReadBuffer> list) {
@Test
public void testPurgeBufferManagerForSequentialStream() throws Exception {
- describe("Testing purging of buffers in ReadBufferManager for "
+ describe("Testing purging of buffers in ReadBufferManagerV1 for "
+ "sequential input streams");
AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
final String fileName = methodName.getMethodName();
@@ -131,7 +131,7 @@ public void testPurgeBufferManagerForSequentialStream()
throws Exception {
} finally {
IOUtils.closeStream(iStream1);
}
- ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
+ ReadBufferManagerV1 bufferManager =
ReadBufferManagerV1.getBufferManager();
AbfsInputStream iStream2 = null;
try {
iStream2 = (AbfsInputStream)
fs.open(testFilePath).getWrappedStream();
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
index e4ed9881ffa..de49da5dc51 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
@@ -89,7 +89,7 @@ public class TestAbfsInputStream extends
@Override
public void teardown() throws Exception {
super.teardown();
- ReadBufferManager.getBufferManager().testResetReadBufferManager();
+ getBufferManager().testResetReadBufferManager();
}
private AbfsRestOperation getMockRestOp() {
@@ -164,12 +164,12 @@ public AbfsInputStream getAbfsInputStream(AbfsClient
abfsClient,
private void queueReadAheads(AbfsInputStream inputStream) {
// Mimic AbfsInputStream readAhead queue requests
- ReadBufferManager.getBufferManager()
+ getBufferManager()
.queueReadAhead(inputStream, 0, ONE_KB,
inputStream.getTracingContext());
- ReadBufferManager.getBufferManager()
+ getBufferManager()
.queueReadAhead(inputStream, ONE_KB, ONE_KB,
inputStream.getTracingContext());
- ReadBufferManager.getBufferManager()
+ getBufferManager()
.queueReadAhead(inputStream, TWO_KB, TWO_KB,
inputStream.getTracingContext());
}
@@ -187,15 +187,15 @@ private void verifyReadCallCount(AbfsClient client, int
count)
private void checkEvictedStatus(AbfsInputStream inputStream, int position,
boolean expectedToThrowException)
throws Exception {
// Sleep for the eviction threshold time
-
Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds()
+ 1000);
+ Thread.sleep(getBufferManager().getThresholdAgeMilliseconds() + 1000);
// Eviction is done only when AbfsInputStream tries to queue new items.
// 1 tryEvict will remove 1 eligible item. To ensure that the current test
buffer
// will get evicted (considering there could be other tests running in
parallel),
// call tryEvict for the number of items that are there in
completedReadList.
- int numOfCompletedReadListItems =
ReadBufferManager.getBufferManager().getCompletedReadListSize();
+ int numOfCompletedReadListItems =
getBufferManager().getCompletedReadListSize();
while (numOfCompletedReadListItems > 0) {
- ReadBufferManager.getBufferManager().callTryEvict();
+ getBufferManager().callTryEvict();
numOfCompletedReadListItems--;
}
@@ -210,7 +210,7 @@ private void checkEvictedStatus(AbfsInputStream
inputStream, int position, boole
public TestAbfsInputStream() throws Exception {
super();
// Reduce thresholdAgeMilliseconds to 3 sec for the tests
-
ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD);
+
getBufferManager().setThresholdAgeMilliseconds(REDUCED_READ_BUFFER_AGE_THRESHOLD);
}
private void writeBufferToNewFile(Path testFile, byte[] buffer) throws
IOException {
@@ -364,7 +364,7 @@ public void testFailedReadAhead() throws Exception {
public void testFailedReadAheadEviction() throws Exception {
AbfsClient client = getMockAbfsClient();
AbfsRestOperation successOp = getMockRestOp();
-
ReadBufferManager.setThresholdAgeMilliseconds(INCREASED_READ_BUFFER_AGE_THRESHOLD);
+
getBufferManager().setThresholdAgeMilliseconds(INCREASED_READ_BUFFER_AGE_THRESHOLD);
// Stub :
// Read request leads to 3 readahead calls: Fail all 3
readahead-client.read()
// Actual read request fails with the failure in readahead thread
@@ -379,7 +379,8 @@ public void testFailedReadAheadEviction() throws Exception {
// Add a failed buffer to completed queue and set to no free buffers to
read ahead.
ReadBuffer buff = new ReadBuffer();
buff.setStatus(ReadBufferStatus.READ_FAILED);
-
ReadBufferManager.getBufferManager().testMimicFullUseAndAddFailedBuffer(buff);
+ buff.setStream(inputStream);
+ getBufferManager().testMimicFullUseAndAddFailedBuffer(buff);
// if read failed buffer eviction is tagged as a valid eviction, it will
lead to
// wrong assumption of queue logic that a buffer is freed up and can lead
to :
@@ -387,7 +388,7 @@ public void testFailedReadAheadEviction() throws Exception {
// at java.util.Stack.peek(Stack.java:102)
// at java.util.Stack.pop(Stack.java:84)
// at
org.apache.hadoop.fs.azurebfs.services.ReadBufferManager.queueReadAhead
- ReadBufferManager.getBufferManager().queueReadAhead(inputStream, 0, ONE_KB,
+ getBufferManager().queueReadAhead(inputStream, 0, ONE_KB,
getTestTracingContext(getFileSystem(), true));
}
@@ -429,7 +430,7 @@ public void testOlderReadAheadFailure() throws Exception {
verifyReadCallCount(client, 3);
// Sleep for thresholdAgeMs so that the read ahead buffer qualifies for
being old.
-
Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds());
+ Thread.sleep(getBufferManager().getThresholdAgeMilliseconds());
// Second read request should retry the read (and not issue any new
readaheads)
inputStream.read(ONE_KB, new byte[ONE_KB], 0, ONE_KB);
@@ -474,7 +475,7 @@ public void testSuccessfulReadAhead() throws Exception {
any(String.class), any(), any(TracingContext.class));
AbfsInputStream inputStream = getAbfsInputStream(client,
"testSuccessfulReadAhead.txt");
- int beforeReadCompletedListSize =
ReadBufferManager.getBufferManager().getCompletedReadListSize();
+ int beforeReadCompletedListSize =
getBufferManager().getCompletedReadListSize();
// First read request that triggers readAheads.
inputStream.read(new byte[ONE_KB]);
@@ -482,7 +483,7 @@ public void testSuccessfulReadAhead() throws Exception {
// Only the 3 readAhead threads should have triggered client.read
verifyReadCallCount(client, 3);
int newAdditionsToCompletedRead =
- ReadBufferManager.getBufferManager().getCompletedReadListSize()
+ getBufferManager().getCompletedReadListSize()
- beforeReadCompletedListSize;
// read buffer might be dumped if the ReadBufferManager getblock preceded
// the action of buffer being picked for reading from readaheadqueue, so
that
@@ -530,7 +531,7 @@ public void testStreamPurgeDuringReadAheadCallExecuting()
throws Exception {
any(TracingContext.class));
final ReadBufferManager readBufferManager
- = ReadBufferManager.getBufferManager();
+ = getBufferManager();
final int readBufferTotal = readBufferManager.getNumBuffers();
final int expectedFreeListBufferCount = readBufferTotal
@@ -607,7 +608,7 @@ public void testReadAheadManagerForFailedReadAhead() throws
Exception {
// if readAhead failed for specific offset, getBlock should
// throw exception from the ReadBuffer that failed within last
thresholdAgeMilliseconds sec
intercept(IOException.class,
- () -> ReadBufferManager.getBufferManager().getBlock(
+ () -> getBufferManager().getBlock(
inputStream,
0,
ONE_KB,
@@ -655,14 +656,14 @@ public void
testReadAheadManagerForOlderReadAheadFailure() throws Exception {
// AbfsInputStream Read would have waited for the read-ahead for the
requested offset
// as we are testing from ReadAheadManager directly, sleep for
thresholdAgeMilliseconds so that
// read buffer qualifies for to be an old buffer
-
Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds());
+ Thread.sleep(getBufferManager().getThresholdAgeMilliseconds());
// Only the 3 readAhead threads should have triggered client.read
verifyReadCallCount(client, 3);
// getBlock from a new read request should return 0 if there is a failure
// 30 sec before in read ahead buffer for respective offset.
- int bytesRead = ReadBufferManager.getBufferManager().getBlock(
+ int bytesRead = getBufferManager().getBlock(
inputStream,
ONE_KB,
ONE_KB,
@@ -715,7 +716,7 @@ public void testReadAheadManagerForSuccessfulReadAhead()
throws Exception {
verifyReadCallCount(client, 3);
// getBlock for a new read should return the buffer read-ahead
- int bytesRead = ReadBufferManager.getBufferManager().getBlock(
+ int bytesRead = getBufferManager().getBlock(
inputStream,
ONE_KB,
ONE_KB,
@@ -853,7 +854,7 @@ public AbfsInputStream testReadAheadConfigs(int
readRequestSize,
.describedAs("Unexpected AlwaysReadBufferSize settings")
.isEqualTo(alwaysReadBufferSizeEnabled);
-
Assertions.assertThat(ReadBufferManager.getBufferManager().getReadAheadBlockSize())
+ Assertions.assertThat(getBufferManager().getReadAheadBlockSize())
.describedAs("Unexpected readAhead block size")
.isEqualTo(readAheadBlockSize);
@@ -921,10 +922,14 @@ private AzureBlobFileSystem createTestFile(Path
testFilePath, long testFileSize,
}
private void resetReadBufferManager(int bufferSize, int threshold) {
- ReadBufferManager.getBufferManager()
+ getBufferManager()
.testResetReadBufferManager(bufferSize, threshold);
// Trigger GC as aggressive recreation of ReadBufferManager buffers
// by successive tests can lead to OOM based on the dev VM/machine
capacity.
System.gc();
}
-}
\ No newline at end of file
+
+ private ReadBufferManager getBufferManager() {
+ return ReadBufferManagerV1.getBufferManager();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]