This is an automated email from the ASF dual-hosted git repository.
anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2e9ef96d8f4 HADOOP-19622: [ABFS][ReadAheadV2] Implement Read Buffer
Manager V2 with improved aggressiveness (#7832)
2e9ef96d8f4 is described below
commit 2e9ef96d8f4e91e5861f42b073c2a92bc98bb49f
Author: Anuj Modi <[email protected]>
AuthorDate: Fri Oct 31 12:20:24 2025 +0530
HADOOP-19622: [ABFS][ReadAheadV2] Implement Read Buffer Manager V2 with
improved aggressiveness (#7832)
Contributed by Anuj Modi
---
.../hadoop/fs/azurebfs/AbfsConfiguration.java | 81 +-
.../fs/azurebfs/AzureBlobFileSystemStore.java | 34 +-
.../fs/azurebfs/constants/ConfigurationKeys.java | 38 +
.../constants/FileSystemConfigurations.java | 13 +-
.../fs/azurebfs/services/AbfsInputStream.java | 36 +-
.../azurebfs/services/AbfsInputStreamContext.java | 16 +-
.../hadoop/fs/azurebfs/services/ReadBuffer.java | 37 +
.../fs/azurebfs/services/ReadBufferManager.java | 19 +-
.../fs/azurebfs/services/ReadBufferManagerV1.java | 8 +-
.../fs/azurebfs/services/ReadBufferManagerV2.java | 1027 ++++++++++++++++++--
.../fs/azurebfs/services/ReadBufferWorker.java | 16 +-
.../fs/azurebfs/AbstractAbfsIntegrationTest.java | 27 +
.../fs/azurebfs/ITestAzureBlobFileSystemFlush.java | 20 +-
.../services/AbfsInputStreamTestUtils.java | 61 --
.../fs/azurebfs/services/ITestAbfsInputStream.java | 8 +-
.../services/ITestAbfsInputStreamReadFooter.java | 16 +-
.../ITestAbfsInputStreamSmallFileReads.java | 16 +-
.../azurebfs/services/ITestReadBufferManager.java | 222 ++---
.../services/ITestReadBufferManagerV2.java | 147 +++
.../fs/azurebfs/services/TestAbfsInputStream.java | 22 +-
.../azurebfs/services/TestReadBufferManagerV2.java | 298 ++++++
21 files changed, 1805 insertions(+), 357 deletions(-)
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index c57a0ea2a7f..88e35b890a2 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -396,6 +396,11 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_ENABLE_READAHEAD_V2)
private boolean isReadAheadV2Enabled;
+ @BooleanConfigurationValidatorAnnotation(
+ ConfigurationKey = FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING,
+ DefaultValue = DEFAULT_ENABLE_READAHEAD_V2_DYNAMIC_SCALING)
+ private boolean isReadAheadV2DynamicScalingEnabled;
+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE,
DefaultValue = DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE)
@@ -416,6 +421,26 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_READAHEAD_V2_MAX_BUFFER_POOL_SIZE)
private int maxReadAheadV2BufferPoolSize;
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS,
+ DefaultValue = DEFAULT_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS)
+ private int readAheadV2CpuMonitoringIntervalMillis;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_READAHEAD_V2_THREAD_POOL_UPSCALE_PERCENTAGE,
+ DefaultValue = DEFAULT_READAHEAD_V2_THREAD_POOL_UPSCALE_PERCENTAGE)
+ private int readAheadV2ThreadPoolUpscalePercentage;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_READAHEAD_V2_THREAD_POOL_DOWNSCALE_PERCENTAGE,
+ DefaultValue = DEFAULT_READAHEAD_V2_THREAD_POOL_DOWNSCALE_PERCENTAGE)
+ private int readAheadV2ThreadPoolDownscalePercentage;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS,
+ DefaultValue = DEFAULT_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS)
+ private int readAheadV2MemoryMonitoringIntervalMillis;
+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS,
DefaultValue = DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS)
@@ -426,6 +451,16 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS)
private int readAheadV2CachedBufferTTLMillis;
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT,
+ DefaultValue = DEFAULT_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENTAGE)
+ private int readAheadV2CpuUsageThresholdPercent;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
+ FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT,
+ DefaultValue = DEFAULT_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENTAGE)
+ private int readAheadV2MemoryUsageThresholdPercent;
+
@LongConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS,
MinValue = 0,
DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
@@ -1548,13 +1583,25 @@ public EncryptionContextProvider
createEncryptionContextProvider() {
}
public boolean isReadAheadEnabled() {
- return this.enabledReadAhead;
+ return enabledReadAhead;
+ }
+
+ /**
+ * Checks if the read-ahead v2 feature is enabled by user.
+ * @return true if read-ahead v2 is enabled, false otherwise.
+ */
+ public boolean isReadAheadV2Enabled() {
+ return isReadAheadV2Enabled;
+ }
+
+ public boolean isReadAheadV2DynamicScalingEnabled() {
+ return isReadAheadV2DynamicScalingEnabled;
}
public int getMinReadAheadV2ThreadPoolSize() {
if (minReadAheadV2ThreadPoolSize <= 0) {
// If the minReadAheadV2ThreadPoolSize is not set, use the default value
- return 2 * Runtime.getRuntime().availableProcessors();
+ return DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE;
}
return minReadAheadV2ThreadPoolSize;
}
@@ -1570,7 +1617,7 @@ public int getMaxReadAheadV2ThreadPoolSize() {
public int getMinReadAheadV2BufferPoolSize() {
if (minReadAheadV2BufferPoolSize <= 0) {
// If the minReadAheadV2BufferPoolSize is not set, use the default value
- return 2 * Runtime.getRuntime().availableProcessors();
+ return DEFAULT_READAHEAD_V2_MIN_BUFFER_POOL_SIZE;
}
return minReadAheadV2BufferPoolSize;
}
@@ -1583,6 +1630,22 @@ public int getMaxReadAheadV2BufferPoolSize() {
return maxReadAheadV2BufferPoolSize;
}
+ public int getReadAheadV2CpuMonitoringIntervalMillis() {
+ return readAheadV2CpuMonitoringIntervalMillis;
+ }
+
+ public int getReadAheadV2ThreadPoolUpscalePercentage() {
+ return readAheadV2ThreadPoolUpscalePercentage;
+ }
+
+ public int getReadAheadV2ThreadPoolDownscalePercentage() {
+ return readAheadV2ThreadPoolDownscalePercentage;
+ }
+
+ public int getReadAheadV2MemoryMonitoringIntervalMillis() {
+ return readAheadV2MemoryMonitoringIntervalMillis;
+ }
+
public int getReadAheadExecutorServiceTTLInMillis() {
return readAheadExecutorServiceTTLMillis;
}
@@ -1591,12 +1654,12 @@ public int getReadAheadV2CachedBufferTTLMillis() {
return readAheadV2CachedBufferTTLMillis;
}
- /**
- * Checks if the read-ahead v2 feature is enabled by user.
- * @return true if read-ahead v2 is enabled, false otherwise.
- */
- public boolean isReadAheadV2Enabled() {
- return this.isReadAheadV2Enabled;
+ public int getReadAheadV2CpuUsageThresholdPercent() {
+ return readAheadV2CpuUsageThresholdPercent;
+ }
+
+ public int getReadAheadV2MemoryUsageThresholdPercent() {
+ return readAheadV2MemoryUsageThresholdPercent;
}
@VisibleForTesting
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index 486e4b3cc9b..6d06580726e 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -963,24 +963,24 @@ private AbfsInputStreamContext
populateAbfsInputStreamContext(
int footerReadBufferSize = options.map(c -> c.getInt(
AZURE_FOOTER_READ_BUFFER_SIZE,
getAbfsConfiguration().getFooterReadBufferSize()))
.orElse(getAbfsConfiguration().getFooterReadBufferSize());
+
return new
AbfsInputStreamContext(getAbfsConfiguration().getSasTokenRenewPeriodForStreamsInSeconds())
- .withReadBufferSize(getAbfsConfiguration().getReadBufferSize())
-
.withReadAheadQueueDepth(getAbfsConfiguration().getReadAheadQueueDepth())
-
.withTolerateOobAppends(getAbfsConfiguration().getTolerateOobAppends())
- .isReadAheadEnabled(getAbfsConfiguration().isReadAheadEnabled())
-
.isReadAheadV2Enabled(getAbfsConfiguration().isReadAheadV2Enabled())
-
.withReadSmallFilesCompletely(getAbfsConfiguration().readSmallFilesCompletely())
-
.withOptimizeFooterRead(getAbfsConfiguration().optimizeFooterRead())
- .withFooterReadBufferSize(footerReadBufferSize)
- .withReadAheadRange(getAbfsConfiguration().getReadAheadRange())
- .withStreamStatistics(new AbfsInputStreamStatisticsImpl())
- .withShouldReadBufferSizeAlways(
- getAbfsConfiguration().shouldReadBufferSizeAlways())
-
.withReadAheadBlockSize(getAbfsConfiguration().getReadAheadBlockSize())
- .withBufferedPreadDisabled(bufferedPreadDisabled)
- .withEncryptionAdapter(contextEncryptionAdapter)
- .withAbfsBackRef(fsBackRef)
- .build();
+ .withReadBufferSize(getAbfsConfiguration().getReadBufferSize())
+
.withReadAheadQueueDepth(getAbfsConfiguration().getReadAheadQueueDepth())
+ .withTolerateOobAppends(getAbfsConfiguration().getTolerateOobAppends())
+ .isReadAheadEnabled(getAbfsConfiguration().isReadAheadEnabled())
+ .isReadAheadV2Enabled(getAbfsConfiguration().isReadAheadV2Enabled())
+
.withReadSmallFilesCompletely(getAbfsConfiguration().readSmallFilesCompletely())
+ .withOptimizeFooterRead(getAbfsConfiguration().optimizeFooterRead())
+ .withFooterReadBufferSize(footerReadBufferSize)
+ .withReadAheadRange(getAbfsConfiguration().getReadAheadRange())
+ .withStreamStatistics(new AbfsInputStreamStatisticsImpl())
+
.withShouldReadBufferSizeAlways(getAbfsConfiguration().shouldReadBufferSizeAlways())
+ .withReadAheadBlockSize(getAbfsConfiguration().getReadAheadBlockSize())
+ .withBufferedPreadDisabled(bufferedPreadDisabled)
+ .withEncryptionAdapter(contextEncryptionAdapter)
+ .withAbfsBackRef(fsBackRef)
+ .build();
}
public OutputStream openFileForWrite(final Path path,
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 899e96dadc1..b365217323f 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -276,6 +276,12 @@ public final class ConfigurationKeys {
*/
public static final String FS_AZURE_ENABLE_READAHEAD_V2 =
"fs.azure.enable.readahead.v2";
+ /**
+ * Enable or disable dynamic scaling of thread pool and buffer pool of
readahead V2.
+ * Value: {@value}.
+ */
+ public static final String FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING =
"fs.azure.enable.readahead.v2.dynamic.scaling";
+
/**
* Minimum number of prefetch threads in the thread pool for readahead V2.
* {@value }
@@ -297,6 +303,28 @@ public final class ConfigurationKeys {
*/
public static final String FS_AZURE_READAHEAD_V2_MAX_BUFFER_POOL_SIZE =
"fs.azure.readahead.v2.max.buffer.pool.size";
+ /**
+ * Interval in milliseconds for periodic monitoring of CPU usage and up/down
scaling thread pool size accordingly.
+ * {@value }
+ */
+ public static final String
FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS =
"fs.azure.readahead.v2.cpu.monitoring.interval.millis";
+
+ /**
+ * Percentage by which the thread pool size should be upscaled when CPU
usage is low.
+ */
+ public static final String
FS_AZURE_READAHEAD_V2_THREAD_POOL_UPSCALE_PERCENTAGE =
"fs.azure.readahead.v2.thread.pool.upscale.percentage";
+
+ /**
+ * Percentage by which the thread pool size should be downscaled when CPU
usage is high.
+ */
+ public static final String
FS_AZURE_READAHEAD_V2_THREAD_POOL_DOWNSCALE_PERCENTAGE =
"fs.azure.readahead.v2.thread.pool.downscale.percentage";
+
+ /**
+ * Interval in milliseconds for periodic monitoring of memory usage and
up/down scaling buffer pool size accordingly.
+ * {@value }
+ */
+ public static final String
FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS =
"fs.azure.readahead.v2.memory.monitoring.interval.millis";
+
/**
* TTL in milliseconds for the idle threads in executor service used by read
ahead v2.
*/
@@ -307,6 +335,16 @@ public final class ConfigurationKeys {
*/
public static final String FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS =
"fs.azure.readahead.v2.cached.buffer.ttl.millis";
+ /**
+ * Threshold percentage for CPU usage to scale up/down the thread pool size
in read ahead v2.
+ */
+ public static final String FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT
= "fs.azure.readahead.v2.cpu.usage.threshold.percent";
+
+ /**
+ * Threshold percentage for memory usage to scale up/down the buffer pool
size in read ahead v2.
+ */
+ public static final String
FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT =
"fs.azure.readahead.v2.memory.usage.threshold.percent";
+
/** Setting this true will make the driver use it's own RemoteIterator
implementation */
public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR =
"fs.azure.enable.abfslistiterator";
/** Server side encryption key encoded in Base6format {@value}.*/
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index ea77f9d874a..e95ea8b1dde 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -153,12 +153,19 @@ public final class FileSystemConfigurations {
public static final boolean DEFAULT_ENABLE_READAHEAD = true;
public static final boolean DEFAULT_ENABLE_READAHEAD_V2 = false;
- public static final int DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE = -1;
+ public static final boolean DEFAULT_ENABLE_READAHEAD_V2_DYNAMIC_SCALING =
false;
+ public static final int DEFAULT_READAHEAD_V2_MIN_THREAD_POOL_SIZE = 8;
public static final int DEFAULT_READAHEAD_V2_MAX_THREAD_POOL_SIZE = -1;
- public static final int DEFAULT_READAHEAD_V2_MIN_BUFFER_POOL_SIZE = -1;
+ public static final int DEFAULT_READAHEAD_V2_MIN_BUFFER_POOL_SIZE = 16;
public static final int DEFAULT_READAHEAD_V2_MAX_BUFFER_POOL_SIZE = -1;
- public static final int DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS =
3_000;
+ public static final int DEFAULT_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS
= 6_000;
+ public static final int DEFAULT_READAHEAD_V2_THREAD_POOL_UPSCALE_PERCENTAGE
= 20;
+ public static final int
DEFAULT_READAHEAD_V2_THREAD_POOL_DOWNSCALE_PERCENTAGE = 30;
+ public static final int
DEFAULT_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS = 6_000;
+ public static final int DEFAULT_READAHEAD_V2_EXECUTOR_SERVICE_TTL_MILLIS =
6_000;
public static final int DEFAULT_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS =
6_000;
+ public static final int DEFAULT_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENTAGE
= 50;
+ public static final int
DEFAULT_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENTAGE = 50;
public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
index ba2bb61802a..9d29614a044 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java
@@ -69,7 +69,7 @@ public class AbfsInputStream extends FSInputStream implements
CanUnbuffer,
public static final int FOOTER_SIZE = 16 * ONE_KB;
public static final int MAX_OPTIMIZED_READ_ATTEMPTS = 2;
- private int readAheadBlockSize;
+ private final int readAheadBlockSize;
private final AbfsClient client;
private final Statistics statistics;
private final String path;
@@ -132,7 +132,7 @@ public class AbfsInputStream extends FSInputStream
implements CanUnbuffer,
/** ABFS instance to be held by the input stream to avoid GC close. */
private final BackReference fsBackRef;
- private ReadBufferManager readBufferManager;
+ private final ReadBufferManager readBufferManager;
public AbfsInputStream(
final AbfsClient client,
@@ -184,10 +184,13 @@ public AbfsInputStream(
* If none of the V1 and V2 are enabled, then no read ahead will be done.
*/
if (readAheadV2Enabled) {
- LOG.debug("ReadBufferManagerV2 not yet implemented, defaulting to
ReadBufferManagerV1");
+ ReadBufferManagerV2.setReadBufferManagerConfigs(
+ readAheadBlockSize, client.getAbfsConfiguration());
+ readBufferManager = ReadBufferManagerV2.getBufferManager();
+ } else {
+ ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize);
+ readBufferManager = ReadBufferManagerV1.getBufferManager();
}
- ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize);
- readBufferManager = ReadBufferManagerV1.getBufferManager();
if (streamStatistics != null) {
ioStatistics = streamStatistics.getIOStatistics();
@@ -530,7 +533,7 @@ private int readInternal(final long position, final byte[]
b, final int offset,
while (numReadAheads > 0 && nextOffset < contentLength) {
LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
nextOffset, nextSize);
- readBufferManager.queueReadAhead(this, nextOffset, (int) nextSize,
+ getReadBufferManager().queueReadAhead(this, nextOffset, (int) nextSize,
new TracingContext(readAheadTracingContext));
nextOffset = nextOffset + nextSize;
numReadAheads--;
@@ -539,7 +542,7 @@ private int readInternal(final long position, final byte[]
b, final int offset,
}
// try reading from buffers first
- receivedBytes = readBufferManager.getBlock(this, position, length, b);
+ receivedBytes = getReadBufferManager().getBlock(this, position, length,
b);
bytesFromReadAhead += receivedBytes;
if (receivedBytes > 0) {
incrementReadOps();
@@ -743,8 +746,8 @@ public boolean seekToNewSource(long l) throws IOException {
public synchronized void close() throws IOException {
LOG.debug("Closing {}", this);
closed = true;
- if (readBufferManager != null) {
- readBufferManager.purgeBuffersForStream(this);
+ if (getReadBufferManager() != null) {
+ getReadBufferManager().purgeBuffersForStream(this);
}
buffer = null; // de-reference the buffer so it can be GC'ed sooner
if (contextEncryptionAdapter != null) {
@@ -805,7 +808,7 @@ byte[] getBuffer() {
*/
@VisibleForTesting
public boolean isReadAheadEnabled() {
- return (readAheadEnabled || readAheadV2Enabled) && readBufferManager !=
null;
+ return (readAheadEnabled || readAheadV2Enabled) && getReadBufferManager()
!= null;
}
@VisibleForTesting
@@ -823,6 +826,10 @@ public String getStreamID() {
return inputStreamId;
}
+ public String getETag() {
+ return eTag;
+ }
+
/**
* Getter for AbfsInputStreamStatistics.
*
@@ -920,11 +927,20 @@ long getLimit() {
return this.limit;
}
+ boolean isFirstRead() {
+ return this.firstRead;
+ }
+
@VisibleForTesting
BackReference getFsBackRef() {
return fsBackRef;
}
+ @VisibleForTesting
+ ReadBufferManager getReadBufferManager() {
+ return readBufferManager;
+ }
+
@Override
public int minSeekForVectorReads() {
return S_128K;
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
index 15ee4809911..cb51fa22900 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
@@ -248,11 +248,11 @@ public AbfsInputStreamContext withAbfsBackRef(
* @param contextEncryptionAdapter encryption adapter.
* @return this instance.
*/
- public AbfsInputStreamContext withEncryptionAdapter(
- ContextEncryptionAdapter contextEncryptionAdapter){
- this.contextEncryptionAdapter = contextEncryptionAdapter;
- return this;
- }
+ public AbfsInputStreamContext withEncryptionAdapter(
+ ContextEncryptionAdapter contextEncryptionAdapter){
+ this.contextEncryptionAdapter = contextEncryptionAdapter;
+ return this;
+ }
/**
* Finalizes and validates the context configuration.
@@ -348,7 +348,7 @@ public BackReference getFsBackRef() {
}
/** @return context encryption adapter. */
- public ContextEncryptionAdapter getEncryptionAdapter() {
- return contextEncryptionAdapter;
- }
+ public ContextEncryptionAdapter getEncryptionAdapter() {
+ return contextEncryptionAdapter;
+ }
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
index 9ce926d841c..a6aa75f59d2 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java
@@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
@@ -29,6 +30,8 @@
class ReadBuffer {
private AbfsInputStream stream;
+ private String eTag;
+ private String path; // path of the file this buffer is for
private long offset; // offset within the file for the
buffer
private int length; // actual length, set after the
buffer is filles
private int requestedLength; // requested length of the read
@@ -44,6 +47,7 @@ class ReadBuffer {
private boolean isFirstByteConsumed = false;
private boolean isLastByteConsumed = false;
private boolean isAnyByteConsumed = false;
+ private AtomicInteger refCount = new AtomicInteger(0);
private IOException errException = null;
@@ -51,10 +55,26 @@ public AbfsInputStream getStream() {
return stream;
}
+ public String getETag() {
+ return eTag;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
public void setStream(AbfsInputStream stream) {
this.stream = stream;
}
+ public void setETag(String eTag) {
+ this.eTag = eTag;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
public void setTracingContext(TracingContext tracingContext) {
this.tracingContext = tracingContext;
}
@@ -122,6 +142,20 @@ public void setStatus(ReadBufferStatus status) {
}
}
+ public void startReading() {
+ refCount.getAndIncrement();
+ }
+
+ public void endReading() {
+ if (refCount.decrementAndGet() < 0) {
+ throw new IllegalStateException("ReadBuffer refCount cannot be
negative");
+ }
+ }
+
+ public int getRefCount() {
+ return refCount.get();
+ }
+
public CountDownLatch getLatch() {
return latch;
}
@@ -162,4 +196,7 @@ public void setAnyByteConsumed(boolean isAnyByteConsumed) {
this.isAnyByteConsumed = isAnyByteConsumed;
}
+ public boolean isFullyConsumed() {
+ return isFirstByteConsumed() && isLastByteConsumed();
+ }
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
index 9ee128fbc32..712b04fb499 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java
@@ -119,7 +119,6 @@ abstract void doneReading(ReadBuffer buffer,
*/
abstract void purgeBuffersForStream(AbfsInputStream stream);
-
// Following Methods are for testing purposes only and should not be used in
production code.
/**
@@ -206,7 +205,7 @@ protected static void setReadAheadBlockSize(int
readAheadBlockSize) {
*
* @return the stack of free buffer indices
*/
- public Stack<Integer> getFreeList() {
+ Stack<Integer> getFreeList() {
return freeList;
}
@@ -215,7 +214,7 @@ public Stack<Integer> getFreeList() {
*
* @return the queue of {@link ReadBuffer} objects in the read-ahead queue
*/
- public Queue<ReadBuffer> getReadAheadQueue() {
+ Queue<ReadBuffer> getReadAheadQueue() {
return readAheadQueue;
}
@@ -224,7 +223,7 @@ public Queue<ReadBuffer> getReadAheadQueue() {
*
* @return the list of {@link ReadBuffer} objects that are currently being
processed
*/
- public LinkedList<ReadBuffer> getInProgressList() {
+ LinkedList<ReadBuffer> getInProgressList() {
return inProgressList;
}
@@ -233,7 +232,7 @@ public LinkedList<ReadBuffer> getInProgressList() {
*
* @return the list of {@link ReadBuffer} objects that have been read and
are available for use
*/
- public LinkedList<ReadBuffer> getCompletedReadList() {
+ LinkedList<ReadBuffer> getCompletedReadList() {
return completedReadList;
}
@@ -244,7 +243,7 @@ public LinkedList<ReadBuffer> getCompletedReadList() {
* @return a list of free buffer indices
*/
@VisibleForTesting
- protected synchronized List<Integer> getFreeListCopy() {
+ List<Integer> getFreeListCopy() {
return new ArrayList<>(freeList);
}
@@ -254,7 +253,7 @@ protected synchronized List<Integer> getFreeListCopy() {
* @return a list of {@link ReadBuffer} objects in the read-ahead queue
*/
@VisibleForTesting
- protected synchronized List<ReadBuffer> getReadAheadQueueCopy() {
+ synchronized List<ReadBuffer> getReadAheadQueueCopy() {
return new ArrayList<>(readAheadQueue);
}
@@ -264,7 +263,7 @@ protected synchronized List<ReadBuffer>
getReadAheadQueueCopy() {
* @return a list of in-progress {@link ReadBuffer} objects
*/
@VisibleForTesting
- protected synchronized List<ReadBuffer> getInProgressCopiedList() {
+ synchronized List<ReadBuffer> getInProgressListCopy() {
return new ArrayList<>(inProgressList);
}
@@ -274,7 +273,7 @@ protected synchronized List<ReadBuffer>
getInProgressCopiedList() {
* @return a list of completed {@link ReadBuffer} objects
*/
@VisibleForTesting
- protected synchronized List<ReadBuffer> getCompletedReadListCopy() {
+ synchronized List<ReadBuffer> getCompletedReadListCopy() {
return new ArrayList<>(completedReadList);
}
@@ -284,7 +283,7 @@ protected synchronized List<ReadBuffer>
getCompletedReadListCopy() {
* @return the number of completed read buffers
*/
@VisibleForTesting
- protected int getCompletedReadListSize() {
+ int getCompletedReadListSize() {
return completedReadList.size();
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
index fe1ac3fa1f2..e476f6d7446 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java
@@ -33,7 +33,7 @@
* The Read Buffer Manager for Rest AbfsClient.
* V1 implementation of ReadBufferManager.
*/
-final class ReadBufferManagerV1 extends ReadBufferManager {
+public final class ReadBufferManagerV1 extends ReadBufferManager {
private static final int NUM_BUFFERS = 16;
private static final int NUM_THREADS = 8;
@@ -41,7 +41,7 @@ final class ReadBufferManagerV1 extends ReadBufferManager {
private Thread[] threads = new Thread[NUM_THREADS];
private byte[][] buffers;
- private static ReadBufferManagerV1 bufferManager;
+ private static ReadBufferManagerV1 bufferManager;
// hide instance constructor
private ReadBufferManagerV1() {
@@ -52,7 +52,7 @@ private ReadBufferManagerV1() {
* Sets the read buffer manager configurations.
* @param readAheadBlockSize the size of the read-ahead block in bytes
*/
- static void setReadBufferManagerConfigs(int readAheadBlockSize) {
+ public static void setReadBufferManagerConfigs(int readAheadBlockSize) {
if (bufferManager == null) {
LOGGER.debug(
"ReadBufferManagerV1 not initialized yet. Overriding
readAheadBlockSize as {}",
@@ -88,7 +88,7 @@ static ReadBufferManagerV1 getBufferManager() {
void init() {
buffers = new byte[NUM_BUFFERS][];
for (int i = 0; i < NUM_BUFFERS; i++) {
- buffers[i] = new byte[getReadAheadBlockSize()]; // same buffers are
reused. These byte arrays are never garbage collected
+ buffers[i] = new byte[getReadAheadBlockSize()]; // same buffers are
reused. The byte array never goes back to GC
getFreeList().add(i);
}
for (int i = 0; i < NUM_THREADS; i++) {
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
index 9cce860127d..c7e6e4c3d28 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
@@ -17,67 +17,112 @@
*/
package org.apache.hadoop.fs.azurebfs.services;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+
+import com.sun.management.OperatingSystemMXBean;
+
import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Stack;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
-import org.apache.hadoop.classification.VisibleForTesting;
-import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
-import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
+import org.apache.hadoop.classification.VisibleForTesting;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
-final class ReadBufferManagerV2 extends ReadBufferManager {
+/**
+ * The Improved Read Buffer Manager for Rest AbfsClient.
+ */
+public final class ReadBufferManagerV2 extends ReadBufferManager {
+
+ // Internal constants
+ private static final ReentrantLock LOCK = new ReentrantLock();
// Thread Pool Configurations
private static int minThreadPoolSize;
+
private static int maxThreadPoolSize;
+
+ private static int cpuMonitoringIntervalInMilliSec;
+
+ private static double cpuThreshold;
+
+ private static int threadPoolUpscalePercentage;
+
+ private static int threadPoolDownscalePercentage;
+
private static int executorServiceKeepAliveTimeInMilliSec;
+
+ private static final double THREAD_POOL_REQUIREMENT_BUFFER = 1.2;
+ // 20% more threads than the queue size
+
+ private static boolean isDynamicScalingEnabled;
+
+ private ScheduledExecutorService cpuMonitorThread;
+
private ThreadPoolExecutor workerPool;
+ private final List<ReadBufferWorker> workerRefs = new ArrayList<>();
+
// Buffer Pool Configurations
private static int minBufferPoolSize;
+
private static int maxBufferPoolSize;
- private int numberOfActiveBuffers = 0;
+
+ private static int memoryMonitoringIntervalInMilliSec;
+
+ private static double memoryThreshold;
+
+ private final AtomicInteger numberOfActiveBuffers = new AtomicInteger(0);
+
private byte[][] bufferPool;
+ private final Stack<Integer> removedBufferList = new Stack<>();
+
+ private ScheduledExecutorService memoryMonitorThread;
+
+ // Buffer Manager Structures
private static ReadBufferManagerV2 bufferManager;
- // hide instance constructor
- private ReadBufferManagerV2() {
- LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch");
- }
+ private static AtomicBoolean isConfigured = new AtomicBoolean(false);
/**
- * Sets the read buffer manager configurations.
- * @param readAheadBlockSize the size of the read-ahead block in bytes
- * @param abfsConfiguration the AbfsConfiguration instance for other
configurations
+ * Private constructor to prevent instantiation as this needs to be
singleton.
*/
- static void setReadBufferManagerConfigs(int readAheadBlockSize,
AbfsConfiguration abfsConfiguration) {
- if (bufferManager == null) {
- minThreadPoolSize = abfsConfiguration.getMinReadAheadV2ThreadPoolSize();
- maxThreadPoolSize = abfsConfiguration.getMaxReadAheadV2ThreadPoolSize();
- executorServiceKeepAliveTimeInMilliSec =
abfsConfiguration.getReadAheadExecutorServiceTTLInMillis();
-
- minBufferPoolSize = abfsConfiguration.getMinReadAheadV2BufferPoolSize();
- maxBufferPoolSize = abfsConfiguration.getMaxReadAheadV2BufferPoolSize();
-
setThresholdAgeMilliseconds(abfsConfiguration.getReadAheadV2CachedBufferTTLMillis());
- setReadAheadBlockSize(readAheadBlockSize);
- }
+ private ReadBufferManagerV2() {
+ printTraceLog("Creating Read Buffer Manager V2 with HADOOP-18546 patch");
}
- /**
- * Returns the singleton instance of ReadBufferManagerV2.
- * @return the singleton instance of ReadBufferManagerV2
- */
static ReadBufferManagerV2 getBufferManager() {
+ if (!isConfigured.get()) {
+ throw new IllegalStateException("ReadBufferManagerV2 is not configured. "
+ + "Please call setReadBufferManagerConfigs() before calling
getBufferManager().");
+ }
if (bufferManager == null) {
LOCK.lock();
try {
if (bufferManager == null) {
bufferManager = new ReadBufferManagerV2();
bufferManager.init();
+ LOGGER.trace("ReadBufferManagerV2 singleton initialized");
}
} finally {
LOCK.unlock();
@@ -87,17 +132,73 @@ static ReadBufferManagerV2 getBufferManager() {
}
/**
- * {@inheritDoc}
+ * Set the ReadBufferManagerV2 configurations based on the provided before
singleton initialization.
+ * @param readAheadBlockSize the read-ahead block size to set for the
ReadBufferManagerV2.
+ * @param abfsConfiguration the configuration to set for the
ReadBufferManagerV2.
+ */
+ public static void setReadBufferManagerConfigs(final int readAheadBlockSize,
+ final AbfsConfiguration abfsConfiguration) {
+ // Set Configs only before initializations.
+ if (bufferManager == null && !isConfigured.get()) {
+ LOCK.lock();
+ try {
+ if (bufferManager == null && !isConfigured.get()) {
+ minThreadPoolSize =
abfsConfiguration.getMinReadAheadV2ThreadPoolSize();
+ maxThreadPoolSize =
abfsConfiguration.getMaxReadAheadV2ThreadPoolSize();
+ cpuMonitoringIntervalInMilliSec
+ = abfsConfiguration.getReadAheadV2CpuMonitoringIntervalMillis();
+ cpuThreshold =
abfsConfiguration.getReadAheadV2CpuUsageThresholdPercent()
+ / HUNDRED_D;
+ threadPoolUpscalePercentage
+ = abfsConfiguration.getReadAheadV2ThreadPoolUpscalePercentage();
+ threadPoolDownscalePercentage
+ =
abfsConfiguration.getReadAheadV2ThreadPoolDownscalePercentage();
+ executorServiceKeepAliveTimeInMilliSec
+ = abfsConfiguration.getReadAheadExecutorServiceTTLInMillis();
+
+ minBufferPoolSize =
abfsConfiguration.getMinReadAheadV2BufferPoolSize();
+ maxBufferPoolSize =
abfsConfiguration.getMaxReadAheadV2BufferPoolSize();
+ memoryMonitoringIntervalInMilliSec
+ =
abfsConfiguration.getReadAheadV2MemoryMonitoringIntervalMillis();
+ memoryThreshold =
+ abfsConfiguration.getReadAheadV2MemoryUsageThresholdPercent()
+ / HUNDRED_D;
+ setThresholdAgeMilliseconds(
+ abfsConfiguration.getReadAheadV2CachedBufferTTLMillis());
+ isDynamicScalingEnabled
+ = abfsConfiguration.isReadAheadV2DynamicScalingEnabled();
+ setReadAheadBlockSize(readAheadBlockSize);
+ setIsConfigured(true);
+ }
+ } finally {
+ LOCK.unlock();
+ }
+ }
+ }
+
+ /**
+ * Initialize the singleton ReadBufferManagerV2.
*/
@Override
void init() {
- // Initialize Buffer Pool
+ // Initialize Buffer Pool. Size can never be more than max pool size
bufferPool = new byte[maxBufferPoolSize][];
for (int i = 0; i < minBufferPoolSize; i++) {
- bufferPool[i] = new byte[getReadAheadBlockSize()]; // same buffers are
reused. These byte arrays are never garbage collected
+ // Start with just minimum number of buffers.
+ bufferPool[i]
+ = new byte[getReadAheadBlockSize()]; // same buffers are reused.
The byte array never goes back to GC
getFreeList().add(i);
- numberOfActiveBuffers++;
+ numberOfActiveBuffers.getAndIncrement();
}
+ memoryMonitorThread = Executors.newSingleThreadScheduledExecutor(
+ runnable -> {
+ Thread t = new Thread(runnable, "ReadAheadV2-Memory-Monitor");
+ t.setDaemon(true);
+ return t;
+ });
+ memoryMonitorThread.scheduleAtFixedRate(this::scheduledEviction,
+ getMemoryMonitoringIntervalInMilliSec(),
+ getMemoryMonitoringIntervalInMilliSec(), TimeUnit.MILLISECONDS);
// Initialize a Fixed Size Thread Pool with minThreadPoolSize threads
workerPool = new ThreadPoolExecutor(
@@ -106,123 +207,893 @@ void init() {
executorServiceKeepAliveTimeInMilliSec,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
- namedThreadFactory);
+ workerThreadFactory);
workerPool.allowCoreThreadTimeOut(true);
for (int i = 0; i < minThreadPoolSize; i++) {
- ReadBufferWorker worker = new ReadBufferWorker(i, this);
+ ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
+ workerRefs.add(worker);
workerPool.submit(worker);
}
ReadBufferWorker.UNLEASH_WORKERS.countDown();
+
+ if (isDynamicScalingEnabled) {
+ cpuMonitorThread = Executors.newSingleThreadScheduledExecutor(
+ runnable -> {
+ Thread t = new Thread(runnable, "ReadAheadV2-CPU-Monitor");
+ t.setDaemon(true);
+ return t;
+ });
+ cpuMonitorThread.scheduleAtFixedRate(this::adjustThreadPool,
+ getCpuMonitoringIntervalInMilliSec(),
+ getCpuMonitoringIntervalInMilliSec(),
+ TimeUnit.MILLISECONDS);
+ }
+
+ printTraceLog(
+ "ReadBufferManagerV2 initialized with {} buffers and {} worker
threads",
+ numberOfActiveBuffers.get(), workerRefs.size());
}
/**
- * {@inheritDoc}
+ * {@link AbfsInputStream} calls this method to queueing read-ahead.
+ * @param stream which read-ahead is requested from.
+ * @param requestedOffset The offset in the file which should be read.
+ * @param requestedLength The length to read.
*/
@Override
public void queueReadAhead(final AbfsInputStream stream,
final long requestedOffset,
final int requestedLength,
- final TracingContext tracingContext) {
- // TODO: To be implemented
+ TracingContext tracingContext) {
+ printTraceLog(
+ "Start Queueing readAhead for file: {}, with eTag: {}, "
+ + "offset: {}, length: {}, triggered by stream: {}",
+ stream.getPath(), stream.getETag(), requestedOffset, requestedLength,
+ stream.hashCode());
+ ReadBuffer buffer;
+ synchronized (this) {
+ if (isAlreadyQueued(stream.getETag(), requestedOffset)) {
+ // Already queued for this offset, so skip queuing.
+ printTraceLog(
+ "Skipping queuing readAhead for file: {}, with eTag: {}, "
+ + "offset: {}, triggered by stream: {} as it is already
queued",
+ stream.getPath(), stream.getETag(), requestedOffset,
+ stream.hashCode());
+ return;
+ }
+ if (isFreeListEmpty() && !tryMemoryUpscale() && !tryEvict()) {
+ // No buffers are available and more buffers cannot be created. Skip
queuing.
+ printTraceLog(
+ "Skipping queuing readAhead for file: {}, with eTag: {}, offset:
{}, triggered by stream: {} as no buffers are available",
+ stream.getPath(), stream.getETag(), requestedOffset,
+ stream.hashCode());
+ return;
+ }
+
+ // Create a new ReadBuffer to keep the prefetched data and queue.
+ buffer = new ReadBuffer();
+ buffer.setStream(stream); // To map buffer with stream that requested it
+ buffer.setETag(stream.getETag()); // To map buffer with file it belongs
to
+ buffer.setPath(stream.getPath());
+ buffer.setOffset(requestedOffset);
+ buffer.setLength(0);
+ buffer.setRequestedLength(requestedLength);
+ buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE);
+ buffer.setLatch(new CountDownLatch(1));
+ buffer.setTracingContext(tracingContext);
+
+ if (isFreeListEmpty()) {
+ /*
+ * By now there should be at least one buffer available.
+ * This is to double sure that after upscaling or eviction,
+ * we still have free buffer available. If not, we skip queueing.
+ */
+ return;
+ }
+ Integer bufferIndex = popFromFreeList();
+ if (bufferIndex > bufferPool.length) {
+ // This should never happen.
+ printTraceLog(
+ "Skipping queuing readAhead for file: {}, with eTag: {}, offset:
{}, triggered by stream: {} as invalid buffer index popped from free list",
+ stream.getPath(), stream.getETag(), requestedOffset,
+ stream.hashCode());
+ return;
+ }
+ buffer.setBuffer(bufferPool[bufferIndex]);
+ buffer.setBufferindex(bufferIndex);
+ getReadAheadQueue().add(buffer);
+ notifyAll();
+ printTraceLog(
+ "Done q-ing readAhead for file: {}, with eTag:{}, offset: {}, "
+ + "buffer idx: {}, triggered by stream: {}",
+ stream.getPath(), stream.getETag(), requestedOffset,
+ buffer.getBufferindex(), stream.hashCode());
+ }
}
/**
- * {@inheritDoc}
+ * {@link AbfsInputStream} calls this method read any bytes already
available in a buffer (thereby saving a
+ * remote read). This returns the bytes if the data already exists in
buffer. If there is a buffer that is reading
+ * the requested offset, then this method blocks until that read completes.
If the data is queued in a read-ahead
+ * but not picked up by a worker thread yet, then it cancels that read-ahead
and reports cache miss. This is because
+ * depending on worker thread availability, the read-ahead may take a while
- the calling thread can do its own
+ * read to get the data faster (compared to the read waiting in queue for an
indeterminate amount of time).
+ *
+ * @param stream of the file to read bytes for
+ * @param position the offset in the file to do a read for
+ * @param length the length to read
+ * @param buffer the buffer to read data into. Note that the buffer will
be written into from offset 0.
+ * @return the number of bytes read
*/
@Override
public int getBlock(final AbfsInputStream stream,
final long position,
final int length,
- final byte[] buffer) throws IOException {
- // TODO: To be implemented
+ final byte[] buffer)
+ throws IOException {
+ // not synchronized, so have to be careful with locking
+ printTraceLog(
+ "getBlock request for file: {}, with eTag: {}, for position: {} "
+ + "for length: {} received from stream: {}",
+ stream.getPath(), stream.getETag(), position, length,
+ stream.hashCode());
+
+ String requestedETag = stream.getETag();
+ boolean isFirstRead = stream.isFirstRead();
+
+ // Wait for any in-progress read to complete.
+ waitForProcess(requestedETag, position, isFirstRead);
+
+ int bytesRead = 0;
+ synchronized (this) {
+ bytesRead = getBlockFromCompletedQueue(requestedETag, position, length,
+ buffer);
+ }
+ if (bytesRead > 0) {
+ printTraceLog(
+ "Done read from Cache for the file with eTag: {}, position: {},
length: {}, requested by stream: {}",
+ requestedETag, position, bytesRead, stream.hashCode());
+ return bytesRead;
+ }
+
+ // otherwise, just say we got nothing - calling thread can do its own read
return 0;
}
/**
- * {@inheritDoc}
+ * {@link ReadBufferWorker} thread calls this to get the next buffer that it
should work on.
+ * @return {@link ReadBuffer}
+ * @throws InterruptedException if thread is interrupted
*/
@Override
public ReadBuffer getNextBlockToRead() throws InterruptedException {
- // TODO: To be implemented
- return null;
+ ReadBuffer buffer = null;
+ synchronized (this) {
+ // Blocking Call to wait for prefetch to be queued.
+ while (getReadAheadQueue().size() == 0) {
+ wait();
+ }
+
+ buffer = getReadAheadQueue().remove();
+ notifyAll();
+ if (buffer == null) {
+ return null;
+ }
+ buffer.setStatus(ReadBufferStatus.READING_IN_PROGRESS);
+ getInProgressList().add(buffer);
+ }
+ printTraceLog(
+ "ReadBufferWorker picked file: {}, with eTag: {}, for offset: {}, "
+ + "queued by stream: {}",
+ buffer.getPath(), buffer.getETag(), buffer.getOffset(),
+ buffer.getStream().hashCode());
+ return buffer;
}
/**
- * {@inheritDoc}
+ * {@link ReadBufferWorker} thread calls this method to post completion. *
+ * @param buffer the buffer whose read was completed
+ * @param result the {@link ReadBufferStatus} after the read
operation in the worker thread
+ * @param bytesActuallyRead the number of bytes that the worker thread was
actually able to read
*/
@Override
public void doneReading(final ReadBuffer buffer,
final ReadBufferStatus result,
final int bytesActuallyRead) {
- // TODO: To be implemented
+ printTraceLog(
+ "ReadBufferWorker completed prefetch for file: {} with eTag: {}, for
offset: {}, queued by stream: {}, with status: {} and bytes read: {}",
+ buffer.getPath(), buffer.getETag(), buffer.getOffset(),
+ buffer.getStream().hashCode(), result, bytesActuallyRead);
+ synchronized (this) {
+ // If this buffer has already been purged during
+ // close of InputStream then we don't update the lists.
+ if (getInProgressList().contains(buffer)) {
+ getInProgressList().remove(buffer);
+ if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) {
+ // Successful read, so update the buffer status and length
+ buffer.setStatus(ReadBufferStatus.AVAILABLE);
+ buffer.setLength(bytesActuallyRead);
+ } else {
+ // Failed read, reuse buffer for next read, this buffer will be
+ // evicted later based on eviction policy.
+ pushToFreeList(buffer.getBufferindex());
+ }
+ // completed list also contains FAILED read buffers
+ // for sending exception message to clients.
+ buffer.setStatus(result);
+ buffer.setTimeStamp(currentTimeMillis());
+ getCompletedReadList().add(buffer);
+ }
+ }
+
+ //outside the synchronized, since anyone receiving a wake-up from the
latch must see safe-published results
+ buffer.getLatch().countDown(); // wake up waiting threads (if any)
}
/**
- * {@inheritDoc}
+ * Purging the buffers associated with an {@link AbfsInputStream}
+ * from {@link ReadBufferManagerV2} when stream is closed.
+ * @param stream input stream.
*/
- @Override
- public void purgeBuffersForStream(final AbfsInputStream stream) {
- // TODO: To be implemented
+ public synchronized void purgeBuffersForStream(AbfsInputStream stream) {
+ printDebugLog("Purging stale buffers for AbfsInputStream {} ", stream);
+ getReadAheadQueue().removeIf(
+ readBuffer -> readBuffer.getStream() == stream);
+ purgeList(stream, getCompletedReadList());
}
/**
- * {@inheritDoc}
+ * Check if any buffer is already queued for the requested offset.
+ * @param eTag the eTag of the file
+ * @param requestedOffset the requested offset
+ * @return whether any buffer is already queued
*/
- @VisibleForTesting
- @Override
- public int getNumBuffers() {
- return numberOfActiveBuffers;
+ private boolean isAlreadyQueued(final String eTag,
+ final long requestedOffset) {
+ // returns true if any part of the buffer is already queued
+ return (isInList(getReadAheadQueue(), eTag, requestedOffset)
+ || isInList(getInProgressList(), eTag, requestedOffset)
+ || isInList(getCompletedReadList(), eTag, requestedOffset));
+ }
+
+ /**
+ * Check if any buffer in the list contains the requested offset.
+ * @param list the list to check
+ * @param eTag the eTag of the file
+ * @param requestedOffset the requested offset
+ * @return whether any buffer in the list contains the requested offset
+ */
+ private boolean isInList(final Collection<ReadBuffer> list, final String
eTag,
+ final long requestedOffset) {
+ return (getFromList(list, eTag, requestedOffset) != null);
}
+
/**
- * {@inheritDoc}
+ * Get the buffer from the list that contains the requested offset.
+ * @param list the list to check
+ * @param eTag the eTag of the file
+ * @param requestedOffset the requested offset
+ * @return the buffer if found, null otherwise
*/
- @VisibleForTesting
- @Override
- public void callTryEvict() {
- // TODO: To be implemented
+ private ReadBuffer getFromList(final Collection<ReadBuffer> list,
+ final String eTag,
+ final long requestedOffset) {
+ for (ReadBuffer buffer : list) {
+ if (eTag.equals(buffer.getETag())) {
+ if (buffer.getStatus() == ReadBufferStatus.AVAILABLE
+ && requestedOffset >= buffer.getOffset()
+ && requestedOffset < buffer.getOffset() + buffer.getLength()) {
+ return buffer;
+ } else if (requestedOffset >= buffer.getOffset()
+ && requestedOffset
+ < buffer.getOffset() + buffer.getRequestedLength()) {
+ return buffer;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * If any buffer in the completed list can be reclaimed then reclaim it and
return the buffer to free list.
+ * The objective is to find just one buffer - there is no advantage to
evicting more than one.
+ * @return whether the eviction succeeded - i.e., were we able to free up
one buffer
+ */
+ private synchronized boolean tryEvict() {
+ ReadBuffer nodeToEvict = null;
+ if (getCompletedReadList().size() <= 0) {
+ return false; // there are no evict-able buffers
+ }
+
+ long currentTimeInMs = currentTimeMillis();
+
+ // first, try buffers where all bytes have been consumed (approximated as
first and last bytes consumed)
+ for (ReadBuffer buf : getCompletedReadList()) {
+ if (buf.isFullyConsumed()) {
+ nodeToEvict = buf;
+ break;
+ }
+ }
+ if (nodeToEvict != null) {
+ return manualEviction(nodeToEvict);
+ }
+
+ // next, try buffers where any bytes have been consumed (maybe a bad idea?
have to experiment and see)
+ for (ReadBuffer buf : getCompletedReadList()) {
+ if (buf.isAnyByteConsumed()) {
+ nodeToEvict = buf;
+ break;
+ }
+ }
+
+ if (nodeToEvict != null) {
+ return manualEviction(nodeToEvict);
+ }
+
+ // next, try any old nodes that have not been consumed
+ // Failed read buffers (with buffer index=-1) that are older than
+ // thresholdAge should be cleaned up, but at the same time should not
+ // report successful eviction.
+ // Queue logic expects that a buffer is freed up for read ahead when
+ // eviction is successful, whereas a failed ReadBuffer would have released
+ // its buffer when its status was set to READ_FAILED.
+ long earliestBirthday = Long.MAX_VALUE;
+ ArrayList<ReadBuffer> oldFailedBuffers = new ArrayList<>();
+ for (ReadBuffer buf : getCompletedReadList()) {
+ if ((buf.getBufferindex() != -1)
+ && (buf.getTimeStamp() < earliestBirthday)) {
+ nodeToEvict = buf;
+ earliestBirthday = buf.getTimeStamp();
+ } else if ((buf.getBufferindex() == -1)
+ && (currentTimeInMs - buf.getTimeStamp())
+ > getThresholdAgeMilliseconds()) {
+ oldFailedBuffers.add(buf);
+ }
+ }
+
+ for (ReadBuffer buf : oldFailedBuffers) {
+ manualEviction(buf);
+ }
+
+ if ((currentTimeInMs - earliestBirthday > getThresholdAgeMilliseconds())
+ && (nodeToEvict != null)) {
+ return manualEviction(nodeToEvict);
+ }
+
+ printTraceLog("No buffer eligible for eviction");
+ // nothing can be evicted
+ return false;
+ }
+
+ /**
+ * Evict the given buffer.
+ * @param buf the buffer to evict
+ * @return whether the eviction succeeded
+ */
+ private boolean evict(final ReadBuffer buf) {
+ if (buf.getRefCount() > 0) {
+ // If the buffer is still being read, then we cannot evict it.
+ printTraceLog(
+ "Cannot evict buffer with index: {}, file: {}, with eTag: {},
offset: {} as it is still being read by some input stream",
+ buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset());
+ return false;
+ }
+ // As failed ReadBuffers (bufferIndx = -1) are saved in
getCompletedReadList(),
+ // avoid adding it to availableBufferList.
+ if (buf.getBufferindex() != -1) {
+ pushToFreeList(buf.getBufferindex());
+ }
+ getCompletedReadList().remove(buf);
+ buf.setTracingContext(null);
+ printTraceLog(
+ "Eviction of Buffer Completed for BufferIndex: {}, file: {}, with
eTag: {}, offset: {}, is fully consumed: {}, is partially consumed: {}",
+ buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset(),
+ buf.isFullyConsumed(), buf.isAnyByteConsumed());
+ return true;
+ }
+
+ /**
+ * Wait for any in-progress read for the requested offset to complete.
+ * @param eTag the eTag of the file
+ * @param position the requested offset
+ * @param isFirstRead whether this is the first read of the stream
+ */
+ private void waitForProcess(final String eTag,
+ final long position,
+ boolean isFirstRead) {
+ ReadBuffer readBuf;
+ synchronized (this) {
+ readBuf = clearFromReadAheadQueue(eTag, position, isFirstRead);
+ if (readBuf == null) {
+ readBuf = getFromList(getInProgressList(), eTag, position);
+ }
+ }
+ if (readBuf != null) { // if in in-progress queue, then block for
it
+ try {
+ printTraceLog(
+ "A relevant read buffer for file: {}, with eTag: {}, offset: {}, "
+ + "queued by stream: {}, having buffer idx: {} is being
prefetched, waiting for latch",
+ readBuf.getPath(), readBuf.getETag(), readBuf.getOffset(),
+ readBuf.getStream().hashCode(), readBuf.getBufferindex());
+ readBuf.getLatch()
+ .await(); // blocking wait on the caller stream's thread
+ // Note on correctness: readBuf gets out of getInProgressList() only
in 1 place: after worker thread
+ // is done processing it (in doneReading). There, the latch is set
after removing the buffer from
+ // getInProgressList(). So this latch is safe to be outside the
synchronized block.
+ // Putting it in synchronized would result in a deadlock, since this
thread would be holding the lock
+ // while waiting, so no one will be able to change any state. If this
becomes more complex in the future,
+ // then the latch can be removed and replaced with wait/notify
whenever getInProgressList() is touched.
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ printTraceLog("Latch done for file: {}, with eTag: {}, for offset: {}, "
+ + "buffer index: {} queued by stream: {}", readBuf.getPath(),
+ readBuf.getETag(),
+ readBuf.getOffset(), readBuf.getBufferindex(),
+ readBuf.getStream().hashCode());
+ }
+ }
+
+ /**
+ * Clear the buffer from read-ahead queue if it exists.
+ * @param eTag the eTag of the file
+ * @param requestedOffset the requested offset
+ * @param isFirstRead whether this is the first read of the stream
+ * @return the buffer if found, null otherwise
+ */
+ private ReadBuffer clearFromReadAheadQueue(final String eTag,
+ final long requestedOffset,
+ boolean isFirstRead) {
+ ReadBuffer buffer = getFromList(getReadAheadQueue(), eTag,
requestedOffset);
+ /*
+ * If this prefetch was triggered by first read of this input stream,
+ * we should not remove it from queue and let it complete by backend
threads.
+ */
+ if (buffer != null && isFirstRead) {
+ return buffer;
+ }
+ if (buffer != null) {
+ getReadAheadQueue().remove(buffer);
+ notifyAll(); // lock is held in calling method
+ pushToFreeList(buffer.getBufferindex());
+ }
+ return null;
+ }
+
+ /**
+ * Get the block from completed queue if it exists.
+ * @param eTag the eTag of the file
+ * @param position the requested offset
+ * @param length the length to read
+ * @param buffer the buffer to read data into
+ * @return the number of bytes read
+ * @throws IOException if an I/O error occurs
+ */
+ private int getBlockFromCompletedQueue(final String eTag, final long
position,
+ final int length, final byte[] buffer) throws IOException {
+ ReadBuffer buf = getBufferFromCompletedQueue(eTag, position);
+
+ if (buf == null) {
+ return 0;
+ }
+
+ buf.startReading(); // atomic increment of refCount.
+
+ if (buf.getStatus() == ReadBufferStatus.READ_FAILED) {
+ // To prevent new read requests to fail due to old read-ahead attempts,
+ // return exception only from buffers that failed within last
getThresholdAgeMilliseconds()
+ if ((currentTimeMillis() - (buf.getTimeStamp())
+ < getThresholdAgeMilliseconds())) {
+ throw buf.getErrException();
+ } else {
+ return 0;
+ }
+ }
+
+ if ((buf.getStatus() != ReadBufferStatus.AVAILABLE)
+ || (position >= buf.getOffset() + buf.getLength())) {
+ return 0;
+ }
+
+ int cursor = (int) (position - buf.getOffset());
+ int availableLengthInBuffer = buf.getLength() - cursor;
+ int lengthToCopy = Math.min(length, availableLengthInBuffer);
+ System.arraycopy(buf.getBuffer(), cursor, buffer, 0, lengthToCopy);
+ if (cursor == 0) {
+ buf.setFirstByteConsumed(true);
+ }
+ if (cursor + lengthToCopy == buf.getLength()) {
+ buf.setLastByteConsumed(true);
+ }
+ buf.setAnyByteConsumed(true);
+
+ buf.endReading(); // atomic decrement of refCount
+ return lengthToCopy;
+ }
+
+ /**
+ * Get the buffer from completed queue that contains the requested offset.
+ * @param eTag the eTag of the file
+ * @param requestedOffset the requested offset
+ * @return the buffer if found, null otherwise
+ */
+ private ReadBuffer getBufferFromCompletedQueue(final String eTag,
+ final long requestedOffset) {
+ for (ReadBuffer buffer : getCompletedReadList()) {
+ // Buffer is returned if the requestedOffset is at or above buffer's
+ // offset but less than buffer's length or the actual requestedLength
+ if (eTag.equals(buffer.getETag())
+ && (requestedOffset >= buffer.getOffset())
+ && ((requestedOffset < buffer.getOffset() + buffer.getLength())
+ || (requestedOffset
+ < buffer.getOffset() + buffer.getRequestedLength()))) {
+ return buffer;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Try to upscale memory by adding more buffers to the pool if memory usage
is below threshold.
+ * @return whether the upscale succeeded
+ */
+ private synchronized boolean tryMemoryUpscale() {
+ if (!isDynamicScalingEnabled) {
+ printTraceLog("Dynamic scaling is disabled, skipping memory upscale");
+ return false; // Dynamic scaling is disabled, so no upscaling.
+ }
+ double memoryLoad = getMemoryLoad();
+ if (memoryLoad < memoryThreshold && getNumBuffers() < maxBufferPoolSize) {
+ // Create and Add more buffers in getFreeList().
+ int nextIndx = getNumBuffers();
+ if (removedBufferList.isEmpty() && nextIndx < bufferPool.length) {
+ bufferPool[nextIndx] = new byte[getReadAheadBlockSize()];
+ pushToFreeList(nextIndx);
+ } else {
+ // Reuse a removed buffer index.
+ int freeIndex = removedBufferList.pop();
+ if (freeIndex >= bufferPool.length || bufferPool[freeIndex] != null) {
+ printTraceLog("Invalid free index: {}. Current buffer pool size: {}",
+ freeIndex, bufferPool.length);
+ return false;
+ }
+ bufferPool[freeIndex] = new byte[getReadAheadBlockSize()];
+ pushToFreeList(freeIndex);
+ }
+ incrementActiveBufferCount();
+ printTraceLog(
+ "Current Memory Load: {}. Incrementing buffer pool size to {}",
+ memoryLoad, getNumBuffers());
+ return true;
+ }
+ printTraceLog("Could not Upscale memory. Total buffers: {} Memory Load:
{}",
+ getNumBuffers(), memoryLoad);
+ return false;
+ }
+
+ /**
+ * Scheduled Eviction task that runs periodically to evict old buffers.
+ */
+ private void scheduledEviction() {
+ for (ReadBuffer buf : getCompletedReadList()) {
+ if (currentTimeMillis() - buf.getTimeStamp()
+ > getThresholdAgeMilliseconds()) {
+ // If the buffer is older than thresholdAge, evict it.
+ printTraceLog(
+ "Scheduled Eviction of Buffer Triggered for BufferIndex: {}, "
+ + "file: {}, with eTag: {}, offset: {}, length: {}, queued by
stream: {}",
+ buf.getBufferindex(), buf.getPath(), buf.getETag(),
buf.getOffset(),
+ buf.getLength(), buf.getStream().hashCode());
+ evict(buf);
+ }
+ }
+
+ double memoryLoad = getMemoryLoad();
+ if (isDynamicScalingEnabled && memoryLoad > memoryThreshold) {
+ synchronized (this) {
+ if (isFreeListEmpty()) {
+ printTraceLog(
+ "No free buffers available. Skipping downscale of buffer pool");
+ return; // No free buffers available, so cannot downscale.
+ }
+ int freeIndex = popFromFreeList();
+ if (freeIndex > bufferPool.length || bufferPool[freeIndex] == null) {
+ printTraceLog("Invalid free index: {}. Current buffer pool size: {}",
+ freeIndex, bufferPool.length);
+ return;
+ }
+ bufferPool[freeIndex] = null;
+ removedBufferList.add(freeIndex);
+ decrementActiveBufferCount();
+ printTraceLog(
+ "Current Memory Load: {}. Decrementing buffer pool size to {}",
+ memoryLoad, getNumBuffers());
+ }
+ }
+ }
+
+ /**
+ * Manual Eviction of a buffer.
+ * @param buf the buffer to evict
+ * @return whether the eviction succeeded
+ */
+ private boolean manualEviction(final ReadBuffer buf) {
+ printTraceLog(
+ "Manual Eviction of Buffer Triggered for BufferIndex: {}, file: {},
with eTag: {}, offset: {}, queued by stream: {}",
+ buf.getBufferindex(), buf.getPath(), buf.getETag(), buf.getOffset(),
+ buf.getStream().hashCode());
+ return evict(buf);
+ }
+
+ /**
+ * Adjust the thread pool size based on CPU load and queue size.
+ */
+ private void adjustThreadPool() {
+ int currentPoolSize = workerRefs.size();
+ double cpuLoad = getCpuLoad();
+ int requiredPoolSize = getRequiredThreadPoolSize();
+ int newThreadPoolSize;
+ printTraceLog(
+ "Current CPU load: {}, Current worker pool size: {}, Current queue
size: {}",
+ cpuLoad, currentPoolSize, requiredPoolSize);
+ if (currentPoolSize < requiredPoolSize && cpuLoad < cpuThreshold) {
+ // Submit more background tasks.
+ newThreadPoolSize = Math.min(maxThreadPoolSize,
+ (int) Math.ceil(
+ (currentPoolSize * (HUNDRED_D + threadPoolUpscalePercentage))
+ / HUNDRED_D));
+ // Create new Worker Threads
+ for (int i = currentPoolSize; i < newThreadPoolSize; i++) {
+ ReadBufferWorker worker = new ReadBufferWorker(i, getBufferManager());
+ workerRefs.add(worker);
+ workerPool.submit(worker);
+ }
+ printTraceLog("Increased worker pool size from {} to {}",
currentPoolSize,
+ newThreadPoolSize);
+ } else if (cpuLoad > cpuThreshold || currentPoolSize > requiredPoolSize) {
+ newThreadPoolSize = Math.max(minThreadPoolSize,
+ (int) Math.ceil(
+ (currentPoolSize * (HUNDRED_D - threadPoolDownscalePercentage))
+ / HUNDRED_D));
+ // Signal the extra workers to stop
+ while (workerRefs.size() > newThreadPoolSize) {
+ ReadBufferWorker worker = workerRefs.remove(workerRefs.size() - 1);
+ worker.stop();
+ }
+ printTraceLog("Decreased worker pool size from {} to {}",
currentPoolSize,
+ newThreadPoolSize);
+ } else {
+ printTraceLog("No change in worker pool size. CPU load: {} Pool size:
{}",
+ cpuLoad, currentPoolSize);
+ }
+ }
+
+ /**
+ * Similar to System.currentTimeMillis, except implemented with
System.nanoTime().
+ * System.currentTimeMillis can go backwards when system clock is changed
(e.g., with NTP time synchronization),
+ * making it unsuitable for measuring time intervals. nanotime is strictly
monotonically increasing per CPU core.
+ * Note: it is not monotonic across Sockets, and even within a CPU, its only
the
+ * more recent parts which share a clock across all cores.
+ *
+ * @return current time in milliseconds
+ */
+ private long currentTimeMillis() {
+ return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
+ }
+
+ /**
+ * Purge all buffers associated with the given stream from the given list.
+ * @param stream the stream whose buffers are to be purged
+ * @param list the list to purge from
+ */
+ private void purgeList(AbfsInputStream stream, LinkedList<ReadBuffer> list) {
+ for (Iterator<ReadBuffer> it = list.iterator(); it.hasNext();) {
+ ReadBuffer readBuffer = it.next();
+ if (readBuffer.getStream() == stream) {
+ it.remove();
+ // As failed ReadBuffers (bufferIndex = -1) are already pushed to free
+ // list in doneReading method, we will skip adding those here again.
+ if (readBuffer.getBufferindex() != -1) {
+ pushToFreeList(readBuffer.getBufferindex());
+ }
+ }
+ }
}
/**
- * {@inheritDoc}
+ * Test method that can clean up the current state of readAhead buffers and
+ * the lists. Will also trigger a fresh init.
*/
@VisibleForTesting
@Override
public void testResetReadBufferManager() {
- // TODO: To be implemented
+ synchronized (this) {
+ ArrayList<ReadBuffer> completedBuffers = new ArrayList<>();
+ for (ReadBuffer buf : getCompletedReadList()) {
+ if (buf != null) {
+ completedBuffers.add(buf);
+ }
+ }
+
+ for (ReadBuffer buf : completedBuffers) {
+ manualEviction(buf);
+ }
+
+ getReadAheadQueue().clear();
+ getInProgressList().clear();
+ getCompletedReadList().clear();
+ getFreeList().clear();
+ for (int i = 0; i < maxBufferPoolSize; i++) {
+ bufferPool[i] = null;
+ }
+ bufferPool = null;
+ if (cpuMonitorThread != null) {
+ cpuMonitorThread.shutdownNow();
+ }
+ if (memoryMonitorThread != null) {
+ memoryMonitorThread.shutdownNow();
+ }
+ if (workerPool != null) {
+ workerPool.shutdownNow();
+ }
+ resetBufferManager();
+ }
}
- /**
- * {@inheritDoc}
- */
@VisibleForTesting
@Override
- public void testResetReadBufferManager(final int readAheadBlockSize,
- final int thresholdAgeMilliseconds) {
- // TODO: To be implemented
+ public void testResetReadBufferManager(int readAheadBlockSize,
+ int thresholdAgeMilliseconds) {
+ setReadAheadBlockSize(readAheadBlockSize);
+ setThresholdAgeMilliseconds(thresholdAgeMilliseconds);
+ testResetReadBufferManager();
+ }
+
+ @VisibleForTesting
+ public void callTryEvict() {
+ tryEvict();
+ }
+
+ @VisibleForTesting
+ public int getNumBuffers() {
+ return numberOfActiveBuffers.get();
}
- /**
- * {@inheritDoc}
- */
@Override
- public void testMimicFullUseAndAddFailedBuffer(final ReadBuffer buf) {
- // TODO: To be implemented
+ void resetBufferManager() {
+ setBufferManager(null); // reset the singleton instance
+ setIsConfigured(false);
}
- private final ThreadFactory namedThreadFactory = new ThreadFactory() {
+ private static void setBufferManager(ReadBufferManagerV2 manager) {
+ bufferManager = manager;
+ }
+
+ private static void setIsConfigured(boolean configured) {
+ isConfigured.set(configured);
+ }
+
+ private final ThreadFactory workerThreadFactory = new ThreadFactory() {
private int count = 0;
+
@Override
public Thread newThread(Runnable r) {
- return new Thread(r, "ReadAheadV2-Thread-" + count++);
+ Thread t = new Thread(r, "ReadAheadV2-WorkerThread-" + count++);
+ t.setDaemon(true);
+ return t;
}
};
- @Override
- void resetBufferManager() {
- setBufferManager(null); // reset the singleton instance
+ private void printTraceLog(String message, Object... args) {
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace(message, args);
+ }
}
- private static void setBufferManager(ReadBufferManagerV2 manager) {
- bufferManager = manager;
+ private void printDebugLog(String message, Object... args) {
+ LOGGER.debug(message, args);
+ }
+
+ /**
+ * Get the current memory load of the JVM.
+ * @return the memory load as a double value between 0.0 and 1.0
+ */
+ @VisibleForTesting
+ double getMemoryLoad() {
+ MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
+ MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
+ return (double) memoryUsage.getUsed() / memoryUsage.getMax();
+ }
+
+ /**
+ * Get the current CPU load of the system.
+ * @return the CPU load as a double value between 0.0 and 1.0
+ */
+ @VisibleForTesting
+ public double getCpuLoad() {
+ OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(
+ OperatingSystemMXBean.class);
+ double cpuLoad = osBean.getSystemCpuLoad();
+ if (cpuLoad < 0) {
+ // If the CPU load is not available, return 0.0
+ return 0.0;
+ }
+ return cpuLoad;
+ }
+
+ @VisibleForTesting
+ synchronized static ReadBufferManagerV2 getInstance() {
+ return bufferManager;
+ }
+
+ @VisibleForTesting
+ public int getMinBufferPoolSize() {
+ return minBufferPoolSize;
+ }
+
+ @VisibleForTesting
+ public int getMaxBufferPoolSize() {
+ return maxBufferPoolSize;
+ }
+
+ @VisibleForTesting
+ public int getCurrentThreadPoolSize() {
+ return workerRefs.size();
+ }
+
+ @VisibleForTesting
+ public int getCpuMonitoringIntervalInMilliSec() {
+ return cpuMonitoringIntervalInMilliSec;
+ }
+
+ @VisibleForTesting
+ public int getMemoryMonitoringIntervalInMilliSec() {
+ return memoryMonitoringIntervalInMilliSec;
+ }
+
+ @VisibleForTesting
+ public ScheduledExecutorService getCpuMonitoringThread() {
+ return cpuMonitorThread;
+ }
+
+ public int getRequiredThreadPoolSize() {
+ return (int) Math.ceil(THREAD_POOL_REQUIREMENT_BUFFER
+ * (getReadAheadQueue().size()
+ + getInProgressList().size())); // 20% more for buffer
+ }
+
+ private boolean isFreeListEmpty() {
+ LOCK.lock();
+ try {
+ return getFreeList().isEmpty();
+ } finally {
+ LOCK.unlock();
+ }
+ }
+
+ private Integer popFromFreeList() {
+ LOCK.lock();
+ try {
+ return getFreeList().pop();
+ } finally {
+ LOCK.unlock();
+ }
+ }
+
+ private void pushToFreeList(int idx) {
+ LOCK.lock();
+ try {
+ getFreeList().push(idx);
+ } finally {
+ LOCK.unlock();
+ }
+ }
+
+ private void incrementActiveBufferCount() {
+ numberOfActiveBuffers.getAndIncrement();
+ }
+
+ private void decrementActiveBufferCount() {
+ numberOfActiveBuffers.getAndDecrement();
}
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
index 79d5eef955a..2c6efdc735a 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java
@@ -20,7 +20,9 @@
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
@@ -29,6 +31,7 @@ class ReadBufferWorker implements Runnable {
protected static final CountDownLatch UNLEASH_WORKERS = new
CountDownLatch(1);
private int id;
private ReadBufferManager bufferManager;
+ private AtomicBoolean isRunning = new AtomicBoolean(true);
ReadBufferWorker(final int id, final ReadBufferManager bufferManager) {
this.id = id;
@@ -54,7 +57,7 @@ public void run() {
Thread.currentThread().interrupt();
}
ReadBuffer buffer;
- while (true) {
+ while (isRunning()) {
try {
buffer = bufferManager.getNextBlockToRead(); // blocks, until a
buffer is available for this thread
} catch (InterruptedException ex) {
@@ -72,7 +75,7 @@ public void run() {
// read-ahead buffer size, make sure a valid length is passed
// for remote read
Math.min(buffer.getRequestedLength(), buffer.getBuffer().length),
- buffer.getTracingContext());
+ buffer.getTracingContext());
bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE,
bytesRead); // post result back to ReadBufferManager
} catch (IOException ex) {
@@ -85,4 +88,13 @@ public void run() {
}
}
}
+
+ public void stop() {
+ isRunning.set(false);
+ }
+
+ @VisibleForTesting
+ public boolean isRunning() {
+ return isRunning.get();
+ }
}
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
index c35b76e1a73..798b1943e07 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
@@ -38,6 +38,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
@@ -711,12 +712,38 @@ protected void assertPathDns(Path path) {
.contains(expectedDns);
}
+ /**
+ * Return array of random bytes of the given length.
+ *
+ * @param length length of the byte array
+ * @return byte array
+ */
protected byte[] getRandomBytesArray(int length) {
final byte[] b = new byte[length];
new Random().nextBytes(b);
return b;
}
+ /**
+ * Create a file on the file system with the given file name and content.
+ *
+ * @param fs fileSystem that stores the file
+ * @param fileName name of the file
+ * @param fileContent content of the file
+ *
+ * @return path of the file created
+ * @throws IOException exception in writing file on fileSystem
+ */
+ protected Path createFileWithContent(FileSystem fs, String fileName,
+ byte[] fileContent) throws IOException {
+ Path testFilePath = path(fileName);
+ try (FSDataOutputStream oStream = fs.create(testFilePath)) {
+ oStream.write(fileContent);
+ oStream.flush();
+ }
+ return testFilePath;
+ }
+
/**
* Checks a list of futures for exceptions.
*
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
index 1e5ba3689f5..33b7acf4658 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFlush.java
@@ -233,7 +233,7 @@ private void testFlush(boolean disableOutputStreamFlush)
throws Exception {
.setDisableOutputStreamFlush(disableOutputStreamFlush);
final Path testFilePath = path(methodName.getMethodName());
- byte[] buffer = getRandomBytesArray();
+ byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH);
// The test case must write "fs.azure.write.request.size" bytes
// to the stream in order for the data to be uploaded to storage.
assertTrue(fs.getAbfsStore().getAbfsConfiguration().getWriteBufferSize()
@@ -265,7 +265,7 @@ private void testFlush(boolean disableOutputStreamFlush)
throws Exception {
@Test
public void testHflushWithFlushEnabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
- byte[] buffer = getRandomBytesArray();
+ byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH);
String fileName = UUID.randomUUID().toString();
final Path testFilePath = path(fileName);
@@ -278,7 +278,7 @@ public void testHflushWithFlushEnabled() throws Exception {
@Test
public void testHflushWithFlushDisabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
- byte[] buffer = getRandomBytesArray();
+ byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH);
final Path testFilePath = path(methodName.getMethodName());
boolean isAppendBlob = false;
if
(fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) {
@@ -295,7 +295,7 @@ public void testHflushWithFlushDisabled() throws Exception {
@Test
public void testHsyncWithFlushEnabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
- byte[] buffer = getRandomBytesArray();
+ byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH);
final Path testFilePath = path(methodName.getMethodName());
@@ -332,7 +332,7 @@ public void testTracingHeaderForAppendBlob() throws
Exception {
@Test
public void testStreamCapabilitiesWithFlushDisabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
- byte[] buffer = getRandomBytesArray();
+ byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH);
final Path testFilePath = path(methodName.getMethodName());
@@ -349,7 +349,7 @@ public void testStreamCapabilitiesWithFlushDisabled()
throws Exception {
@Test
public void testStreamCapabilitiesWithFlushEnabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
- byte[] buffer = getRandomBytesArray();
+ byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH);
final Path testFilePath = path(methodName.getMethodName());
try (FSDataOutputStream stream = getStreamAfterWrite(fs, testFilePath,
buffer, true)) {
assertHasStreamCapabilities(stream,
@@ -365,7 +365,7 @@ public void testStreamCapabilitiesWithFlushEnabled() throws
Exception {
@Test
public void testHsyncWithFlushDisabled() throws Exception {
final AzureBlobFileSystem fs = this.getFileSystem();
- byte[] buffer = getRandomBytesArray();
+ byte[] buffer = getRandomBytesArray(TEST_FILE_LENGTH);
final Path testFilePath = path(methodName.getMethodName());
boolean isAppendBlob = false;
if
(fs.getAbfsStore().isAppendBlobKey(fs.makeQualified(testFilePath).toString())) {
@@ -378,12 +378,6 @@ public void testHsyncWithFlushDisabled() throws Exception {
}
}
- private byte[] getRandomBytesArray() {
- final byte[] b = new byte[TEST_FILE_LENGTH];
- new Random().nextBytes(b);
- return b;
- }
-
private FSDataOutputStream getStreamAfterWrite(AzureBlobFileSystem fs, Path
path, byte[] buffer, boolean enableFlush) throws IOException {
fs.getAbfsStore().getAbfsConfiguration().setEnableFlush(enableFlush);
FSDataOutputStream stream = fs.create(path);
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java
index 388e662115e..ec249e2b040 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamTestUtils.java
@@ -19,23 +19,17 @@
package org.apache.hadoop.fs.azurebfs.services;
import java.io.IOException;
-import java.util.Random;
-import java.util.UUID;
import org.assertj.core.api.Assertions;
-import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
-import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
-import static
org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest.SHORTENED_GUID_LEN;
import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_OPTIMIZE_FOOTER_READ;
import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_READ_SMALL_FILES_COMPLETELY;
@@ -49,29 +43,6 @@ public AbfsInputStreamTestUtils(AbstractAbfsIntegrationTest
abstractAbfsIntegrat
this.abstractAbfsIntegrationTest = abstractAbfsIntegrationTest;
}
- private Path path(String filepath) throws IOException {
- return abstractAbfsIntegrationTest.getFileSystem().makeQualified(
- new Path(getTestPath(), getUniquePath(filepath)));
- }
-
- private Path getTestPath() {
- Path path = new Path(UriUtils.generateUniqueTestPath());
- return path;
- }
-
- /**
- * Generate a unique path using the given filepath.
- * @param filepath path string
- * @return unique path created from filepath and a GUID
- */
- private Path getUniquePath(String filepath) {
- if (filepath.equals("/")) {
- return new Path(filepath);
- }
- return new Path(filepath + StringUtils
- .right(UUID.randomUUID().toString(), SHORTENED_GUID_LEN));
- }
-
/**
* Returns AzureBlobFileSystem instance with the required
* readFullFileOptimization configuration.
@@ -90,38 +61,6 @@ public AzureBlobFileSystem getFileSystem(boolean
readSmallFilesCompletely)
return (AzureBlobFileSystem) FileSystem.newInstance(configuration);
}
- /**
- * Return array of random bytes of the given length.
- *
- * @param length length of the byte array
- * @return byte array
- */
- public byte[] getRandomBytesArray(int length) {
- final byte[] b = new byte[length];
- new Random().nextBytes(b);
- return b;
- }
-
- /**
- * Create a file on the file system with the given file name and content.
- *
- * @param fs fileSystem that stores the file
- * @param fileName name of the file
- * @param fileContent content of the file
- *
- * @return path of the file created
- * @throws IOException exception in writing file on fileSystem
- */
- public Path createFileWithContent(FileSystem fs, String fileName,
- byte[] fileContent) throws IOException {
- Path testFilePath = path(fileName);
- try (FSDataOutputStream oStream = fs.create(testFilePath)) {
- oStream.write(fileContent);
- oStream.flush();
- }
- return testFilePath;
- }
-
/**
* Assert that the content read from the subsection of a file is correct.
*
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
index 6bcf31f9e69..938f5f4300c 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java
@@ -52,8 +52,8 @@ public void testWithNoOptimization() throws Exception {
int fileSize = i * ONE_MB;
final AzureBlobFileSystem fs = getFileSystem(false, false, fileSize);
String fileName = methodName.getMethodName() + i;
- byte[] fileContent =
abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
- Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs,
fileName, fileContent);
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
testWithNoOptimization(fs, testFilePath, HUNDRED, fileContent);
}
}
@@ -97,8 +97,8 @@ public void testExceptionInOptimization() throws Exception {
int fileSize = i * ONE_MB;
final AzureBlobFileSystem fs = getFileSystem(true, true, fileSize);
String fileName = methodName.getMethodName() + i;
- byte[] fileContent =
abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
- Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs,
fileName, fileContent);
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
testExceptionInOptimization(fs, testFilePath, fileSize - HUNDRED,
fileSize / 4, fileContent);
}
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java
index fbafc12490c..9cd3433a88e 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamReadFooter.java
@@ -258,8 +258,8 @@ private void validateSeekAndReadWithConf(boolean
optimizeFooterRead,
try (AzureBlobFileSystem spiedFs = createSpiedFs(
getRawConfiguration())) {
String fileName = methodName.getMethodName() + fileId;
- byte[] fileContent =
abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
- Path testFilePath =
abfsInputStreamTestUtils.createFileWithContent(spiedFs, fileName,
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(spiedFs, fileName,
fileContent);
for (int readBufferSize : READ_BUFFER_SIZE) {
validateSeekAndReadWithConf(spiedFs, optimizeFooterRead, seekTo,
@@ -389,8 +389,8 @@ public void testPartialReadWithNoData() throws Exception {
futureList.add(executorService.submit(() -> {
try (AzureBlobFileSystem spiedFs = createSpiedFs(
getRawConfiguration())) {
- byte[] fileContent =
abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
- Path testFilePath =
abfsInputStreamTestUtils.createFileWithContent(spiedFs, fileName,
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(spiedFs, fileName,
fileContent);
validatePartialReadWithNoData(spiedFs, fileSize, fileContent,
testFilePath);
@@ -461,8 +461,8 @@ public void testPartialReadWithSomeData() throws Exception {
try (AzureBlobFileSystem spiedFs = createSpiedFs(
getRawConfiguration())) {
String fileName = methodName.getMethodName() + fileId;
- byte[] fileContent =
abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
- Path testFilePath =
abfsInputStreamTestUtils.createFileWithContent(spiedFs, fileName,
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(spiedFs, fileName,
fileContent);
validatePartialReadWithSomeData(spiedFs, fileSize, testFilePath,
fileContent);
@@ -583,8 +583,8 @@ private void verifyConfigValueInStream(final
FSDataInputStream inputStream,
private Path createPathAndFileWithContent(final AzureBlobFileSystem fs,
final int fileIdx, final int fileSize) throws Exception {
String fileName = methodName.getMethodName() + fileIdx;
- byte[] fileContent =
abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
- return abfsInputStreamTestUtils.createFileWithContent(fs, fileName,
fileContent);
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ return createFileWithContent(fs, fileName, fileContent);
}
private FutureDataInputStreamBuilder getParameterizedBuilder(final Path path,
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java
index 5e3879a525c..f92d64359a2 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStreamSmallFileReads.java
@@ -74,8 +74,8 @@ private void validateNumBackendCalls(final boolean
readSmallFilesCompletely,
for (int i = 1; i <= 4; i++) {
String fileName = methodName.getMethodName() + i;
int fileSize = i * ONE_MB;
- byte[] fileContent =
abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
- Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs,
fileName, fileContent);
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
int length = ONE_KB;
try (FSDataInputStream iStream = fs.open(testFilePath)) {
byte[] buffer = new byte[length];
@@ -185,8 +185,8 @@ private void validateSeekAndReadWithConf(final SeekTo
seekTo,
for (int i = startFileSizeInMB; i <= endFileSizeInMB; i++) {
String fileName = methodName.getMethodName() + i;
int fileSize = i * ONE_MB;
- byte[] fileContent =
abfsInputStreamTestUtils.getRandomBytesArray(fileSize);
- Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs,
fileName, fileContent);
+ byte[] fileContent = getRandomBytesArray(fileSize);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
int length = ONE_KB;
int seekPos = seekPos(seekTo, fileSize, length);
seekReadAndTest(fs, testFilePath, seekPos, length, fileContent);
@@ -255,9 +255,9 @@ public void testPartialReadWithNoData() throws Exception {
try (AzureBlobFileSystem fs = abfsInputStreamTestUtils.getFileSystem(
true)) {
String fileName = methodName.getMethodName() + i;
- byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(
+ byte[] fileContent = getRandomBytesArray(
fileSize);
- Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs,
+ Path testFilePath = createFileWithContent(fs,
fileName, fileContent);
partialReadWithNoData(fs, testFilePath, fileSize / 2, fileSize / 4,
fileContent);
@@ -304,9 +304,9 @@ public void testPartialReadWithSomeData() throws Exception {
try (AzureBlobFileSystem fs = abfsInputStreamTestUtils.getFileSystem(
true)) {
String fileName = methodName.getMethodName() + i;
- byte[] fileContent = abfsInputStreamTestUtils.getRandomBytesArray(
+ byte[] fileContent = getRandomBytesArray(
fileSize);
- Path testFilePath = abfsInputStreamTestUtils.createFileWithContent(fs,
+ Path testFilePath = createFileWithContent(fs,
fileName, fileContent);
partialReadWithSomeData(fs, testFilePath, fileSize / 2,
fileSize / 4, fileContent);
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
index 1ead30e9fa2..84b0fbd5196 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java
@@ -18,10 +18,8 @@
package org.apache.hadoop.fs.azurebfs.services;
-import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
-import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -29,7 +27,6 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
@@ -62,127 +59,122 @@ public class ITestReadBufferManager extends
AbstractAbfsIntegrationTest {
*/
public static final int PROBE_INTERVAL_MILLIS = 1_000;
- public ITestReadBufferManager() throws Exception {
+ public ITestReadBufferManager() throws Exception {
+ }
+
+ @Test
+ public void testPurgeBufferManagerForParallelStreams() throws Exception {
+ describe("Testing purging of buffers from ReadBufferManagerV1 for "
+ + "parallel input streams");
+ final int numBuffers = 16;
+ final LinkedList<Integer> freeList = new LinkedList<>();
+ for (int i=0; i < numBuffers; i++) {
+ freeList.add(i);
}
-
- @Test
- public void testPurgeBufferManagerForParallelStreams() throws Exception {
- describe("Testing purging of buffers from ReadBufferManagerV1 for "
- + "parallel input streams");
- final int numBuffers = 16;
- final LinkedList<Integer> freeList = new LinkedList<>();
- for (int i=0; i < numBuffers; i++) {
- freeList.add(i);
- }
- ExecutorService executorService = Executors.newFixedThreadPool(4);
- AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
- // verify that the fs has the capability to validate the fix
- Assertions.assertThat(fs.hasPathCapability(new Path("/"),
CAPABILITY_SAFE_READAHEAD))
- .describedAs("path capability %s in %s",
CAPABILITY_SAFE_READAHEAD, fs)
- .isTrue();
-
- try {
- for (int i = 0; i < 4; i++) {
- final String fileName = methodName.getMethodName() + i;
- executorService.submit((Callable<Void>) () -> {
- byte[] fileContent = getRandomBytesArray(ONE_MB);
- Path testFilePath = createFileWithContent(fs, fileName,
fileContent);
- try (FSDataInputStream iStream = fs.open(testFilePath)) {
- iStream.read();
- }
- return null;
- });
- }
- } finally {
- executorService.shutdown();
- // wait for all tasks to finish
- executorService.awaitTermination(1, TimeUnit.MINUTES);
- }
-
- ReadBufferManagerV1 bufferManager =
ReadBufferManagerV1.getBufferManager();
- // readahead queue is empty
- assertListEmpty("ReadAheadQueue",
bufferManager.getReadAheadQueueCopy());
- // verify the in progress list eventually empties out.
- eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET,
PROBE_INTERVAL_MILLIS, () ->
- assertListEmpty("InProgressList",
bufferManager.getInProgressCopiedList()));
+ ExecutorService executorService = Executors.newFixedThreadPool(4);
+ AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
+ // verify that the fs has the capability to validate the fix
+ Assertions.assertThat(fs.hasPathCapability(new Path("/"),
CAPABILITY_SAFE_READAHEAD))
+ .describedAs("path capability %s in %s", CAPABILITY_SAFE_READAHEAD, fs)
+ .isTrue();
+
+ try {
+ for (int i = 0; i < 4; i++) {
+ final String fileName = methodName.getMethodName() + i;
+ executorService.submit((Callable<Void>) () -> {
+ byte[] fileContent = getRandomBytesArray(ONE_MB);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+ try (FSDataInputStream iStream = fs.open(testFilePath)) {
+ iStream.read();
+ }
+ return null;
+ });
+ }
+ } finally {
+ executorService.shutdown();
+ // wait for all tasks to finish
+ executorService.awaitTermination(1, TimeUnit.MINUTES);
}
- private void assertListEmpty(String listName, List<ReadBuffer> list) {
- Assertions.assertThat(list)
- .describedAs("After closing all streams %s should be empty",
listName)
- .hasSize(0);
+ ReadBufferManager bufferManager = getBufferManager(fs);
+ // readahead queue is empty
+ assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
+ // verify the in progress list eventually empties out.
+ eventually(getTestTimeoutMillis() - TIMEOUT_OFFSET, PROBE_INTERVAL_MILLIS,
() ->
+ assertListEmpty("InProgressList",
bufferManager.getInProgressListCopy()));
+ }
+
+ private void assertListEmpty(String listName, List<ReadBuffer> list) {
+ Assertions.assertThat(list)
+ .describedAs("After closing all streams %s should be empty", listName)
+ .hasSize(0);
+ }
+
+ @Test
+ public void testPurgeBufferManagerForSequentialStream() throws Exception {
+ describe("Testing purging of buffers in ReadBufferManagerV1 for "
+ + "sequential input streams");
+ AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
+ final String fileName = methodName.getMethodName();
+ byte[] fileContent = getRandomBytesArray(ONE_MB);
+ Path testFilePath = createFileWithContent(fs, fileName, fileContent);
+
+ AbfsInputStream iStream1 = null;
+ // stream1 will be closed right away.
+ try {
+ iStream1 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
+ // Just reading one byte will trigger all read ahead calls.
+ iStream1.read();
+ } finally {
+ IOUtils.closeStream(iStream1);
}
-
- @Test
- public void testPurgeBufferManagerForSequentialStream() throws Exception {
- describe("Testing purging of buffers in ReadBufferManagerV1 for "
- + "sequential input streams");
- AzureBlobFileSystem fs = getABFSWithReadAheadConfig();
- final String fileName = methodName.getMethodName();
- byte[] fileContent = getRandomBytesArray(ONE_MB);
- Path testFilePath = createFileWithContent(fs, fileName, fileContent);
-
- AbfsInputStream iStream1 = null;
- // stream1 will be closed right away.
- try {
- iStream1 = (AbfsInputStream)
fs.open(testFilePath).getWrappedStream();
- // Just reading one byte will trigger all read ahead calls.
- iStream1.read();
- } finally {
- IOUtils.closeStream(iStream1);
- }
- ReadBufferManagerV1 bufferManager =
ReadBufferManagerV1.getBufferManager();
- AbfsInputStream iStream2 = null;
- try {
- iStream2 = (AbfsInputStream)
fs.open(testFilePath).getWrappedStream();
- iStream2.read();
- // After closing stream1, no queued buffers of stream1 should be
present
- // assertions can't be made about the state of the other lists as
it is
- // too prone to race conditions.
-
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(),
iStream1);
- } finally {
- // closing the stream later.
- IOUtils.closeStream(iStream2);
- }
- // After closing stream2, no queued buffers of stream2 should be
present.
-
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(),
iStream2);
-
- // After closing both the streams, read queue should be empty.
- assertListEmpty("ReadAheadQueue",
bufferManager.getReadAheadQueueCopy());
-
+ ReadBufferManager bufferManager = getBufferManager(fs);
+ AbfsInputStream iStream2 = null;
+ try {
+ iStream2 = (AbfsInputStream) fs.open(testFilePath).getWrappedStream();
+ iStream2.read();
+ // After closing stream1, no queued buffers of stream1 should be present
+ // assertions can't be made about the state of the other lists as it is
+ // too prone to race conditions.
+
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(),
iStream1);
+ } finally {
+ // closing the stream later.
+ IOUtils.closeStream(iStream2);
}
+ // After closing stream2, no queued buffers of stream2 should be present.
+
assertListDoesnotContainBuffersForIstream(bufferManager.getReadAheadQueueCopy(),
iStream2);
+ // After closing both the streams, read queue should be empty.
+ assertListEmpty("ReadAheadQueue", bufferManager.getReadAheadQueueCopy());
- private void assertListDoesnotContainBuffersForIstream(List<ReadBuffer>
list,
- AbfsInputStream
inputStream) {
- for (ReadBuffer buffer : list) {
- Assertions.assertThat(buffer.getStream())
- .describedAs("Buffers associated with closed input streams
shouldn't be present")
- .isNotEqualTo(inputStream);
- }
- }
+ }
- private AzureBlobFileSystem getABFSWithReadAheadConfig() throws Exception {
- Configuration conf = getRawConfiguration();
- conf.setLong(FS_AZURE_READ_AHEAD_QUEUE_DEPTH, 8);
- conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
- conf.setInt(FS_AZURE_READ_AHEAD_BLOCK_SIZE, MIN_BUFFER_SIZE);
- return (AzureBlobFileSystem) FileSystem.newInstance(conf);
- }
- protected byte[] getRandomBytesArray(int length) {
- final byte[] b = new byte[length];
- new Random().nextBytes(b);
- return b;
+ private void assertListDoesnotContainBuffersForIstream(List<ReadBuffer> list,
+ AbfsInputStream inputStream) {
+ for (ReadBuffer buffer : list) {
+ Assertions.assertThat(buffer.getStream())
+ .describedAs("Buffers associated with closed input streams shouldn't
be present")
+ .isNotEqualTo(inputStream);
}
-
- protected Path createFileWithContent(FileSystem fs, String fileName,
- byte[] fileContent) throws
IOException {
- Path testFilePath = path(fileName);
- try (FSDataOutputStream oStream = fs.create(testFilePath)) {
- oStream.write(fileContent);
- oStream.flush();
- }
- return testFilePath;
+ }
+
+ private AzureBlobFileSystem getABFSWithReadAheadConfig() throws Exception {
+ Configuration conf = getRawConfiguration();
+ conf.setLong(FS_AZURE_READ_AHEAD_QUEUE_DEPTH, 8);
+ conf.setInt(AZURE_READ_BUFFER_SIZE, MIN_BUFFER_SIZE);
+ conf.setInt(FS_AZURE_READ_AHEAD_BLOCK_SIZE, MIN_BUFFER_SIZE);
+ return (AzureBlobFileSystem) FileSystem.newInstance(conf);
+ }
+
+ private ReadBufferManager getBufferManager(AzureBlobFileSystem fs) {
+ int blockSize =
fs.getAbfsStore().getAbfsConfiguration().getReadAheadBlockSize();
+ if (getConfiguration().isReadAheadV2Enabled()) {
+ ReadBufferManagerV2.setReadBufferManagerConfigs(blockSize,
+ getConfiguration());
+ return ReadBufferManagerV2.getBufferManager();
}
+ ReadBufferManagerV1.setReadBufferManagerConfigs(blockSize);
+ return ReadBufferManagerV1.getBufferManager();
+ }
}
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManagerV2.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManagerV2.java
new file mode 100644
index 00000000000..c409ee9991d
--- /dev/null
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManagerV2.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CONNECTIONS_MADE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ITestReadBufferManagerV2 extends AbstractAbfsIntegrationTest {
+
+ private static final int LESS_NUM_FILES = 2;
+ private static final int MORE_NUM_FILES = 5;
+ private static final int SMALL_FILE_SIZE = 6 * ONE_MB;
+ private static final int LARGE_FILE_SIZE = 50 * ONE_MB;
+ private static final int BLOCK_SIZE = 4 * ONE_MB;
+
+ public ITestReadBufferManagerV2() throws Exception {
+ }
+
+ @Test
+ public void testReadDifferentFilesInParallel() throws Exception {
+ try (AzureBlobFileSystem fs = getConfiguredFileSystem()) {
+ int fileSize = LARGE_FILE_SIZE;
+ int numFiles = MORE_NUM_FILES;
+ byte[] fileContent = getRandomBytesArray(fileSize);
+
+ Path[] testPaths = new Path[numFiles];
+ int[] idx = {0};
+ for (int i = 0; i < numFiles; i++) {
+ final String fileName = methodName.getMethodName() + i;
+ testPaths[i] = createFileWithContent(fs, fileName, fileContent);
+ }
+ ExecutorService executorService = Executors.newFixedThreadPool(numFiles);
+ Map<String, Long> metricMap = getInstrumentationMap(fs);
+ long requestsMadeBeforeTest = metricMap
+ .get(CONNECTIONS_MADE.getStatName());
+ try {
+ for (int i = 0; i < numFiles; i++) {
+ executorService.submit((Callable<Void>) () -> {
+ try (FSDataInputStream iStream = fs.open(testPaths[idx[0]++])) {
+ byte[] buffer = new byte[fileSize];
+ int bytesRead = iStream.read(buffer, 0, fileSize);
+ assertThat(bytesRead).isEqualTo(fileSize);
+ assertThat(buffer).isEqualTo(fileContent);
+ }
+ return null;
+ });
+ }
+ } finally {
+ executorService.shutdown();
+ // wait for all tasks to finish
+ executorService.awaitTermination(1, TimeUnit.MINUTES);
+ }
+ metricMap = getInstrumentationMap(fs);
+ long requestsMadeAfterTest = metricMap
+ .get(CONNECTIONS_MADE.getStatName());
+ int expectedRequests = numFiles // Get Path Status for each file
+ + ((int) Math.ceil((double) fileSize / BLOCK_SIZE))
+ * numFiles; // Read requests for each file
+ assertEquals(expectedRequests,
+ requestsMadeAfterTest - requestsMadeBeforeTest);
+ }
+ }
+
+ @Test
+ public void testReadSameFileInParallel() throws Exception {
+ try (AzureBlobFileSystem fs = getConfiguredFileSystem()) {
+ int fileSize = SMALL_FILE_SIZE;
+ int numFiles = LESS_NUM_FILES;
+ byte[] fileContent = getRandomBytesArray(fileSize);
+
+ final String fileName = methodName.getMethodName();
+ Path testPath = createFileWithContent(fs, fileName, fileContent);
+ ExecutorService executorService = Executors.newFixedThreadPool(numFiles);
+ Map<String, Long> metricMap = getInstrumentationMap(fs);
+ long requestsMadeBeforeTest = metricMap
+ .get(CONNECTIONS_MADE.getStatName());
+ try {
+ for (int i = 0; i < numFiles; i++) {
+ executorService.submit((Callable<Void>) () -> {
+ try (FSDataInputStream iStream = fs.open(testPath)) {
+ byte[] buffer = new byte[fileSize];
+ int bytesRead = iStream.read(buffer, 0, fileSize);
+ assertThat(bytesRead).isEqualTo(fileSize);
+ assertThat(buffer).isEqualTo(fileContent);
+ }
+ return null;
+ });
+ }
+ } finally {
+ executorService.shutdown();
+ // wait for all tasks to finish
+ executorService.awaitTermination(1, TimeUnit.MINUTES);
+ }
+ metricMap = getInstrumentationMap(fs);
+ long requestsMadeAfterTest = metricMap
+ .get(CONNECTIONS_MADE.getStatName());
+ int expectedRequests = numFiles // Get Path Status for each file
+ + ((int) Math.ceil(
+ (double) fileSize / BLOCK_SIZE)); // Read requests for each file
+ assertEquals(expectedRequests,
+ requestsMadeAfterTest - requestsMadeBeforeTest);
+ }
+ }
+
+ private AzureBlobFileSystem getConfiguredFileSystem() throws Exception {
+ Configuration config = new Configuration(getRawConfiguration());
+ config.set(FS_AZURE_ENABLE_READAHEAD_V2, TRUE);
+ config.set(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, TRUE);
+ AzureBlobFileSystem fs = (AzureBlobFileSystem)
FileSystem.newInstance(config);
+ return fs;
+ }
+}
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
index 9b388b57c3e..37d8046203e 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java
@@ -112,7 +112,7 @@ public void teardown() throws Exception {
getBufferManager().testResetReadBufferManager();
}
- private AbfsRestOperation getMockRestOp() {
+ AbfsRestOperation getMockRestOp() {
AbfsRestOperation op = mock(AbfsRestOperation.class);
AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class);
when(httpOp.getBytesReceived()).thenReturn(1024L);
@@ -121,7 +121,7 @@ private AbfsRestOperation getMockRestOp() {
return op;
}
- private AbfsClient getMockAbfsClient() throws URISyntaxException {
+ AbfsClient getMockAbfsClient() throws URISyntaxException {
// Mock failure for client.read()
AbfsClient client = mock(AbfsClient.class);
AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new
URI("abcd")));
@@ -135,7 +135,7 @@ private AbfsClient getMockAbfsClient() throws
URISyntaxException {
return client;
}
- private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient,
+ AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient,
String fileName) throws IOException {
AbfsInputStreamContext inputStreamContext = new AbfsInputStreamContext(-1);
// Create AbfsInputStream with the client instance
@@ -144,7 +144,10 @@ private AbfsInputStream getAbfsInputStream(AbfsClient
mockAbfsClient,
null,
FORWARD_SLASH + fileName,
THREE_KB,
-
inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10).withReadAheadBlockSize(ONE_KB),
+ inputStreamContext.withReadBufferSize(ONE_KB)
+ .withReadAheadQueueDepth(10)
+ .withReadAheadBlockSize(ONE_KB)
+ .isReadAheadV2Enabled(getConfiguration().isReadAheadV2Enabled()),
"eTag",
getTestTracingContext(null, false));
@@ -182,7 +185,7 @@ public AbfsInputStream getAbfsInputStream(AbfsClient
abfsClient,
return inputStream;
}
- private void queueReadAheads(AbfsInputStream inputStream) {
+ void queueReadAheads(AbfsInputStream inputStream) {
// Mimic AbfsInputStream readAhead queue requests
getBufferManager()
.queueReadAhead(inputStream, 0, ONE_KB,
inputStream.getTracingContext());
@@ -564,7 +567,7 @@ public void testStreamPurgeDuringReadAheadCallExecuting()
throws Exception {
//Sleeping to give ReadBufferWorker to pick the readBuffers for
processing.
Thread.sleep(readBufferTransferToInProgressProbableTime);
- assertThat(readBufferManager.getInProgressCopiedList())
+ assertThat(readBufferManager.getInProgressListCopy())
.describedAs(String.format("InProgressList should have %d elements",
readBufferQueuedCount))
.hasSize(readBufferQueuedCount);
@@ -577,7 +580,7 @@ public void testStreamPurgeDuringReadAheadCallExecuting()
throws Exception {
.hasSize(0);
}
- assertThat(readBufferManager.getInProgressCopiedList())
+ assertThat(readBufferManager.getInProgressListCopy())
.describedAs(String.format("InProgressList should have %d elements",
readBufferQueuedCount))
.hasSize(readBufferQueuedCount);
@@ -1125,6 +1128,11 @@ private void resetReadBufferManager(int bufferSize, int
threshold) {
}
private ReadBufferManager getBufferManager() {
+ if (getConfiguration().isReadAheadV2Enabled()) {
+ ReadBufferManagerV2.setReadBufferManagerConfigs(
+ getConfiguration().getReadAheadBlockSize(), getConfiguration());
+ return ReadBufferManagerV2.getBufferManager();
+ }
return ReadBufferManagerV1.getBufferManager();
}
}
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
new file mode 100644
index 00000000000..77fbb769042
--- /dev/null
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
@@ -0,0 +1,298 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT;
+import static
org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Unit Tests around different components of Read Buffer Manager V2
+ */
+public class TestReadBufferManagerV2 extends AbstractAbfsIntegrationTest {
+ private volatile boolean running = true;
+ private final List<byte[]> allocations = new ArrayList<>();
+ private static final double HIGH_MEMORY_USAGE_THRESHOLD_PERCENT = 0.8;
+
+ public TestReadBufferManagerV2() throws Exception {
+ super();
+ }
+
+ /**
+ * Test to verify init of ReadBufferManagerV2
+ * @throws Exception if test fails
+ */
+ @Test
+ public void testReadBufferManagerV2Init() throws Exception {
+
ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(),
getConfiguration());
+ ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+ assertThat(ReadBufferManagerV2.getInstance())
+ .as("ReadBufferManager should be uninitialized").isNull();
+ intercept(IllegalStateException.class, "ReadBufferManagerV2 is not
configured.", () -> {
+ ReadBufferManagerV2.getBufferManager();
+ });
+ // verify that multiple invocations of getBufferManager returns same
instance.
+
ReadBufferManagerV2.setReadBufferManagerConfigs(getConfiguration().getReadAheadBlockSize(),
getConfiguration());
+ ReadBufferManagerV2 bufferManager = ReadBufferManagerV2.getBufferManager();
+ ReadBufferManagerV2 bufferManager2 =
ReadBufferManagerV2.getBufferManager();
+ ReadBufferManagerV2 bufferManager3 = ReadBufferManagerV2.getInstance();
+ assertThat(bufferManager).isNotNull();
+ assertThat(bufferManager2).isNotNull();
+ assertThat(bufferManager).isSameAs(bufferManager2);
+ assertThat(bufferManager3).isNotNull();
+ assertThat(bufferManager3).isSameAs(bufferManager);
+
+ // Verify default values are not invalid.
+ assertThat(bufferManager.getMinBufferPoolSize()).isGreaterThan(0);
+ assertThat(bufferManager.getMaxBufferPoolSize()).isGreaterThan(0);
+ }
+
+ /**
+ * Test to verify that cpu monitor thread is not active if disabled.
+ * @throws Exception if test fails
+ */
+ @Test
+ public void testDynamicScalingSwitchingOnAndOff() throws Exception {
+ Configuration conf = new Configuration(getRawConfiguration());
+ conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2, true);
+ conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, true);
+ try(AzureBlobFileSystem fs = (AzureBlobFileSystem)
FileSystem.newInstance(getFileSystem().getUri(), conf)) {
+ AbfsConfiguration abfsConfiguration =
fs.getAbfsStore().getAbfsConfiguration();
+
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(),
abfsConfiguration);
+ ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(),
abfsConfiguration);
+ ReadBufferManagerV2 bufferManagerV2 =
ReadBufferManagerV2.getBufferManager();
+ assertThat(bufferManagerV2.getCpuMonitoringThread())
+ .as("CPU Monitor thread should be initialized").isNotNull();
+ bufferManagerV2.resetBufferManager();
+ }
+
+ conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, false);
+ try(AzureBlobFileSystem fs = (AzureBlobFileSystem)
FileSystem.newInstance(getFileSystem().getUri(), conf)) {
+ AbfsConfiguration abfsConfiguration =
fs.getAbfsStore().getAbfsConfiguration();
+
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(),
abfsConfiguration);
+ ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfiguration.getReadAheadBlockSize(),
abfsConfiguration);
+ ReadBufferManagerV2 bufferManagerV2 =
ReadBufferManagerV2.getBufferManager();
+ assertThat(bufferManagerV2.getCpuMonitoringThread())
+ .as("CPU Monitor thread should not be initialized").isNull();
+ bufferManagerV2.resetBufferManager();
+ }
+ }
+
+ @Test
+ public void testThreadPoolDynamicScaling() throws Exception {
+ running = true;
+ TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
+ AbfsClient client = testAbfsInputStream.getMockAbfsClient();
+ AbfsInputStream inputStream =
testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
+ Configuration configuration = getReadAheadV2Configuration();
+ AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+ getAccountName());
+
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
abfsConfig);
+ ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
abfsConfig);
+ ReadBufferManagerV2 bufferManagerV2 =
ReadBufferManagerV2.getBufferManager();
+ assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(2);
+ int[] reqOffset = {0};
+ int reqLength = 1;
+ Thread t = new Thread(() -> {
+ while (running) {
+ bufferManagerV2.queueReadAhead(inputStream, reqOffset[0], reqLength,
+ inputStream.getTracingContext());
+ reqOffset[0] += reqLength;
+ }
+ });
+ t.start();
+ Thread.sleep(2L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec());
+ assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isGreaterThan(2);
+ running = false;
+ t.join();
+ Thread.sleep(4L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec());
+ assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isLessThan(4);
+ }
+
+ @Test
+ public void testCpuUpscaleNotAllowedIfCpuAboveThreshold() throws Exception {
+ TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
+ AbfsClient client = testAbfsInputStream.getMockAbfsClient();
+ AbfsInputStream inputStream =
testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
+ Configuration configuration = getReadAheadV2Configuration();
+ configuration.set(FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT, "0");
// set low threshold
+ AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+ getAccountName());
+
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
abfsConfig);
+ ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
abfsConfig);
+ ReadBufferManagerV2 bufferManagerV2 =
ReadBufferManagerV2.getBufferManager();
+ assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(2);
+ int[] reqOffset = {0};
+ int reqLength = 1;
+ running = true;
+ Thread t = new Thread(() -> {
+ while (running) {
+ bufferManagerV2.queueReadAhead(inputStream, reqOffset[0], reqLength,
+ inputStream.getTracingContext());
+ reqOffset[0] += reqLength;
+ }
+ });
+ t.start();
+ Thread.sleep(2L * bufferManagerV2.getCpuMonitoringIntervalInMilliSec());
+ assertThat(bufferManagerV2.getCurrentThreadPoolSize()).isEqualTo(2);
+ running = false;
+ t.join();
+ }
+
+ @Test
+ public void testScheduledEviction() throws Exception {
+ TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
+ AbfsClient client = testAbfsInputStream.getMockAbfsClient();
+ AbfsInputStream inputStream =
testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
+ Configuration configuration = getReadAheadV2Configuration();
+ AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+ getAccountName());
+ ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
abfsConfig);
+ ReadBufferManagerV2 bufferManagerV2 =
ReadBufferManagerV2.getBufferManager();
+ // Add a failed buffer to completed queue and set to no free buffers to
read ahead.
+ ReadBuffer buff = new ReadBuffer();
+ buff.setStatus(ReadBufferStatus.READ_FAILED);
+ buff.setStream(inputStream);
+ bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff);
+ bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff);
+ assertThat(bufferManagerV2.getCompletedReadListSize()).isEqualTo(2);
+ Thread.sleep(2L * bufferManagerV2.getMemoryMonitoringIntervalInMilliSec());
+ assertThat(bufferManagerV2.getCompletedReadListSize()).isEqualTo(0);
+ }
+
+ @Test
+ public void testMemoryUpscaleNotAllowedIfMemoryAboveThreshold() throws
Exception {
+ TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
+ AbfsClient client = testAbfsInputStream.getMockAbfsClient();
+ AbfsInputStream inputStream =
testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
+ Configuration configuration = getReadAheadV2Configuration();
+ configuration.set(FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT,
"0"); // set low threshold
+ AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+ getAccountName());
+
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
abfsConfig);
+ ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
abfsConfig);
+ ReadBufferManagerV2 bufferManagerV2 =
ReadBufferManagerV2.getBufferManager();
+ // Add a failed buffer to completed queue and set to no free buffers to
read ahead.
+ ReadBuffer buff = new ReadBuffer();
+ buff.setStatus(ReadBufferStatus.READ_FAILED);
+ buff.setStream(inputStream);
+ bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff);
+
assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(bufferManagerV2.getMinBufferPoolSize());
+ bufferManagerV2.queueReadAhead(inputStream, 0, ONE_KB,
+ inputStream.getTracingContext());
+
assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(bufferManagerV2.getMinBufferPoolSize());
+ }
+
+ @Test
+ public void testMemoryUpscaleIfMemoryBelowThreshold() throws Exception {
+ TestAbfsInputStream testAbfsInputStream = new TestAbfsInputStream();
+ AbfsClient client = testAbfsInputStream.getMockAbfsClient();
+ AbfsInputStream inputStream =
testAbfsInputStream.getAbfsInputStream(client, "testFailedReadAhead.txt");
+ Configuration configuration = getReadAheadV2Configuration();
+ configuration.set(FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT,
"100");
+ AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+ getAccountName());
+
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
abfsConfig);
+ ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
abfsConfig);
+ ReadBufferManagerV2 bufferManagerV2 =
ReadBufferManagerV2.getBufferManager();
+ // Add a failed buffer to completed queue and set to no free buffers to
read ahead.
+ ReadBuffer buff = new ReadBuffer();
+ buff.setStatus(ReadBufferStatus.READ_FAILED);
+ buff.setStream(inputStream);
+ bufferManagerV2.testMimicFullUseAndAddFailedBuffer(buff);
+
assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(bufferManagerV2.getMinBufferPoolSize());
+ bufferManagerV2.queueReadAhead(inputStream, 0, ONE_KB,
+ inputStream.getTracingContext());
+
assertThat(bufferManagerV2.getNumBuffers()).isGreaterThan(bufferManagerV2.getMinBufferPoolSize());
+ }
+
+ @Test
+ public void testMemoryDownscaleIfMemoryAboveThreshold() throws Exception {
+ Configuration configuration = getReadAheadV2Configuration();
+ configuration.set(FS_AZURE_READAHEAD_V2_MEMORY_USAGE_THRESHOLD_PERCENT,
"2");
+ AbfsConfiguration abfsConfig = new AbfsConfiguration(configuration,
+ getAccountName());
+
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
abfsConfig);
+ ReadBufferManagerV2.getBufferManager().testResetReadBufferManager();
+
ReadBufferManagerV2.setReadBufferManagerConfigs(abfsConfig.getReadAheadBlockSize(),
abfsConfig);
+ ReadBufferManagerV2 bufferManagerV2 =
ReadBufferManagerV2.getBufferManager();
+ int initialBuffers = bufferManagerV2.getMinBufferPoolSize();
+ assertThat(bufferManagerV2.getNumBuffers()).isEqualTo(initialBuffers);
+ running = true;
+ Thread t = new Thread(() -> {
+ while (running) {
+ long maxMemory = Runtime.getRuntime().maxMemory();
+ long usedMemory = Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory();
+ double usage = (double) usedMemory / maxMemory;
+
+ if (usage < HIGH_MEMORY_USAGE_THRESHOLD_PERCENT) {
+ // Allocate more memory
+ allocations.add(new byte[10 * 1024 * 1024]); // 10MB
+ }
+ }
+ }, "MemoryLoadThread");
+ t.setDaemon(true);
+ t.start();
+ Thread.sleep(2L * bufferManagerV2.getMemoryMonitoringIntervalInMilliSec());
+ assertThat(bufferManagerV2.getNumBuffers()).isLessThan(initialBuffers);
+ running = false;
+ t.join();
+ }
+
+ private Configuration getReadAheadV2Configuration() {
+ Configuration conf = new Configuration(getRawConfiguration());
+ conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2, true);
+ conf.setBoolean(FS_AZURE_ENABLE_READAHEAD_V2_DYNAMIC_SCALING, true);
+ conf.setInt(FS_AZURE_READAHEAD_V2_MIN_THREAD_POOL_SIZE, 2);
+ conf.setInt(FS_AZURE_READAHEAD_V2_MAX_THREAD_POOL_SIZE, 4);
+ conf.setInt(FS_AZURE_READAHEAD_V2_CPU_USAGE_THRESHOLD_PERCENT, HUNDRED);
+ conf.setInt(FS_AZURE_READAHEAD_V2_CPU_MONITORING_INTERVAL_MILLIS, 1_000);
+ conf.setInt(FS_AZURE_READAHEAD_V2_MEMORY_MONITORING_INTERVAL_MILLIS,
1_000);
+ conf.setInt(FS_AZURE_READAHEAD_V2_CACHED_BUFFER_TTL_MILLIS, 1_000);
+ return conf;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]