This is an automated email from the ASF dual-hosted git repository.
anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7dcf76ded59 HADOOP-19472: [ABFS] Improve write workload performance
for ABFS by efficient concurrency utilization (#7669)
7dcf76ded59 is described below
commit 7dcf76ded594cf3323662523dcc4df4816802499
Author: Anmol Asrani <[email protected]>
AuthorDate: Thu Oct 30 16:10:26 2025 +0530
HADOOP-19472: [ABFS] Improve write workload performance for ABFS by
efficient concurrency utilization (#7669)
Contributed by Anmol Asrani.
---
.../hadoop/fs/azurebfs/AbfsConfiguration.java | 87 ++-
.../fs/azurebfs/AzureBlobFileSystemStore.java | 34 +-
.../fs/azurebfs/WriteThreadPoolSizeManager.java | 397 +++++++++++
.../fs/azurebfs/constants/AbfsHttpConstants.java | 2 +
.../fs/azurebfs/constants/ConfigurationKeys.java | 28 +
.../constants/FileSystemConfigurations.java | 123 +++-
.../contracts/exceptions/AbfsDriverException.java | 20 +
.../security/AbfsDelegationTokenIdentifier.java | 19 +-
.../security/AbfsDelegationTokenManager.java | 15 +
.../hadoop/fs/azurebfs/services/AbfsDfsClient.java | 22 +
.../azurebfs/services/AbfsInputStreamContext.java | 119 ++++
.../azurebfs/TestWriteThreadPoolSizeManager.java | 770 +++++++++++++++++++++
.../azurebfs/services/ITestAbfsOutputStream.java | 2 +-
.../fs/azurebfs/services/TestAbfsOutputStream.java | 4 +-
14 files changed, 1621 insertions(+), 21 deletions(-)
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
index 7c355671cf8..c57a0ea2a7f 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
@@ -483,6 +483,53 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_APACHE_HTTP_CLIENT_MAX_IO_EXCEPTION_RETRIES)
private int maxApacheHttpClientIoExceptionsRetries;
+ @BooleanConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT,
+ DefaultValue = DEFAULT_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT)
+ private boolean dynamicWriteThreadPoolEnablement;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_WRITE_THREADPOOL_KEEP_ALIVE_TIME_MILLIS,
+ DefaultValue = DEFAULT_WRITE_THREADPOOL_KEEP_ALIVE_TIME_MILLIS)
+ private int writeThreadPoolKeepAliveTime;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_WRITE_CPU_MONITORING_INTERVAL_MILLIS,
+ MinValue = MIN_WRITE_CPU_MONITORING_INTERVAL_MILLIS,
+ MaxValue = MAX_WRITE_CPU_MONITORING_INTERVAL_MILLIS,
+ DefaultValue = DEFAULT_WRITE_CPU_MONITORING_INTERVAL_MILLIS)
+ private int writeCpuMonitoringInterval;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_WRITE_HIGH_CPU_THRESHOLD_PERCENT,
+ MinValue = MIN_WRITE_HIGH_CPU_THRESHOLD_PERCENT,
+ MaxValue = MAX_WRITE_HIGH_CPU_THRESHOLD_PERCENT,
+ DefaultValue = DEFAULT_WRITE_HIGH_CPU_THRESHOLD_PERCENT)
+ private int writeHighCpuThreshold;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT,
+ MinValue = MIN_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT,
+ MaxValue = MAX_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT,
+ DefaultValue = DEFAULT_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT)
+ private int writeMediumCpuThreshold;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_WRITE_LOW_CPU_THRESHOLD_PERCENT,
+ MinValue = MIN_WRITE_LOW_CPU_THRESHOLD_PERCENT,
+ MaxValue = MAX_WRITE_LOW_CPU_THRESHOLD_PERCENT,
+ DefaultValue = DEFAULT_WRITE_LOW_CPU_THRESHOLD_PERCENT)
+ private int writeLowCpuThreshold;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_WRITE_LOW_TIER_MEMORY_MULTIPLIER,
+ MinValue = MIN_WRITE_LOW_TIER_MEMORY_MULTIPLIER,
+ DefaultValue = DEFAULT_WRITE_LOW_TIER_MEMORY_MULTIPLIER)
+ private int lowTierMemoryMultiplier;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER,
+ MinValue = MIN_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER,
+ DefaultValue = DEFAULT_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER)
+ private int mediumTierMemoryMultiplier;
+
+ @IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_WRITE_HIGH_TIER_MEMORY_MULTIPLIER,
+ MinValue = MIN_WRITE_HIGH_TIER_MEMORY_MULTIPLIER,
+ DefaultValue = DEFAULT_WRITE_HIGH_TIER_MEMORY_MULTIPLIER)
+ private int highTierMemoryMultiplier;
+
@IntegerConfigurationValidatorAnnotation(ConfigurationKey =
FS_AZURE_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE, DefaultValue =
DEFAULT_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE,
MinValue = MIN_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE, MaxValue =
MAX_APACHE_HTTP_CLIENT_MAX_CACHE_SIZE)
@@ -1641,16 +1688,52 @@ public ExponentialRetryPolicy
getOauthTokenFetchRetryPolicy() {
oauthTokenFetchRetryDeltaBackoff);
}
- public int getWriteMaxConcurrentRequestCount() {
+ public int getWriteConcurrentRequestCount() {
if (this.writeMaxConcurrentRequestCount < 1) {
return 4 * Runtime.getRuntime().availableProcessors();
}
return this.writeMaxConcurrentRequestCount;
}
+ public int getWriteThreadPoolKeepAliveTime() {
+ return writeThreadPoolKeepAliveTime;
+ }
+
+ public int getWriteCpuMonitoringInterval() {
+ return writeCpuMonitoringInterval;
+ }
+
+ public boolean isDynamicWriteThreadPoolEnablement() {
+ return dynamicWriteThreadPoolEnablement;
+ }
+
+ public int getWriteLowCpuThreshold() {
+ return writeLowCpuThreshold;
+ }
+
+ public int getWriteMediumCpuThreshold() {
+ return writeMediumCpuThreshold;
+ }
+
+ public int getWriteHighCpuThreshold() {
+ return writeHighCpuThreshold;
+ }
+
+ public int getLowTierMemoryMultiplier() {
+ return lowTierMemoryMultiplier;
+ }
+
+ public int getMediumTierMemoryMultiplier() {
+ return mediumTierMemoryMultiplier;
+ }
+
+ public int getHighTierMemoryMultiplier() {
+ return highTierMemoryMultiplier;
+ }
+
public int getMaxWriteRequestsToQueue() {
if (this.maxWriteRequestsToQueue < 1) {
- return 2 * getWriteMaxConcurrentRequestCount();
+ return 2 * getWriteConcurrentRequestCount();
}
return this.maxWriteRequestsToQueue;
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
index d51559de18e..486e4b3cc9b 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
@@ -45,6 +45,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import java.util.WeakHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -203,6 +204,7 @@ public class AzureBlobFileSystemStore implements Closeable,
ListingSupport {
private int blockOutputActiveBlocks;
/** Bounded ThreadPool for this instance. */
private ExecutorService boundedThreadPool;
+ private WriteThreadPoolSizeManager poolSizeManager;
/** ABFS instance reference to be held by the store to avoid GC close. */
private BackReference fsBackRef;
@@ -277,11 +279,19 @@ public AzureBlobFileSystemStore(
}
this.blockFactory = abfsStoreBuilder.blockFactory;
this.blockOutputActiveBlocks = abfsStoreBuilder.blockOutputActiveBlocks;
- this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
- abfsConfiguration.getWriteMaxConcurrentRequestCount(),
- abfsConfiguration.getMaxWriteRequestsToQueue(),
- 10L, TimeUnit.SECONDS,
- "abfs-bounded");
+ if (abfsConfiguration.isDynamicWriteThreadPoolEnablement()) {
+ this.poolSizeManager = WriteThreadPoolSizeManager.getInstance(
+ getClient().getFileSystem() + "-" + UUID.randomUUID(),
+ abfsConfiguration);
+ poolSizeManager.startCPUMonitoring();
+ this.boundedThreadPool = poolSizeManager.getExecutorService();
+ } else {
+ this.boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
+ abfsConfiguration.getWriteConcurrentRequestCount(),
+ abfsConfiguration.getMaxWriteRequestsToQueue(),
+ 10L, TimeUnit.SECONDS,
+ "abfs-bounded");
+ }
}
/**
@@ -320,17 +330,19 @@ public void close() throws IOException {
}
try {
Futures.allAsList(futures).get();
- // shutdown the threadPool and set it to null.
- HadoopExecutors.shutdown(boundedThreadPool, LOG,
- 30, TimeUnit.SECONDS);
- boundedThreadPool = null;
+ if (!abfsConfiguration.isDynamicWriteThreadPoolEnablement()) {
+ // shutdown the threadPool and set it to null.
+ HadoopExecutors.shutdown(boundedThreadPool, LOG,
+ 30, TimeUnit.SECONDS);
+ boundedThreadPool = null;
+ }
} catch (InterruptedException e) {
LOG.error("Interrupted freeing leases", e);
Thread.currentThread().interrupt();
} catch (ExecutionException e) {
LOG.error("Error freeing leases", e);
} finally {
- IOUtils.cleanupWithLogger(LOG, getClientHandler());
+ IOUtils.cleanupWithLogger(LOG, poolSizeManager, getClientHandler());
}
}
@@ -797,7 +809,7 @@ private AbfsOutputStreamContext
populateAbfsOutputStreamContext(
.disableOutputStreamFlush(abfsConfiguration.isOutputStreamFlushDisabled())
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
.withAppendBlob(isAppendBlob)
-
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteMaxConcurrentRequestCount())
+
.withWriteMaxConcurrentRequestCount(abfsConfiguration.getWriteConcurrentRequestCount())
.withMaxWriteRequestsToQueue(abfsConfiguration.getMaxWriteRequestsToQueue())
.withLease(lease)
.withEncryptionAdapter(contextEncryptionAdapter)
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
new file mode 100644
index 00000000000..d7887b67c64
--- /dev/null
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.lang.management.MemoryUsage;
+
+import com.sun.management.OperatingSystemMXBean;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.LOW_HEAP_SPACE_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.MEDIUM_HEAP_SPACE_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BYTES_PER_GIGABYTE;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_REDUCTION_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_MEDIUM_HEAP_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HEAP_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_POOL_SIZE_INCREASE_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_REDUCTION_FACTOR;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THIRTY_SECONDS;
+
+/**
+ * Manages a thread pool for writing operations, adjusting the pool size based
on CPU utilization.
+ */
+public final class WriteThreadPoolSizeManager implements Closeable {
+
+ /* Maximum allowed size for the thread pool. */
+ private final int maxThreadPoolSize;
+ /* Executor for periodically monitoring CPU usage. */
+ private final ScheduledExecutorService cpuMonitorExecutor;
+ /* Thread pool whose size is dynamically managed. */
+ private volatile ExecutorService boundedThreadPool;
+ /* Lock to ensure thread-safe updates to the thread pool. */
+ private final Lock lock = new ReentrantLock();
+ /* New computed max size for the thread pool after adjustment. */
+ private volatile int newMaxPoolSize;
+ /* Logger instance for logging events from WriteThreadPoolSizeManager. */
+ private static final Logger LOG = LoggerFactory.getLogger(
+ WriteThreadPoolSizeManager.class);
+ /* Map to maintain a WriteThreadPoolSizeManager instance per filesystem. */
+ private static final ConcurrentHashMap<String, WriteThreadPoolSizeManager>
+ POOL_SIZE_MANAGER_MAP = new ConcurrentHashMap<>();
+ /* Name of the filesystem associated with this manager. */
+ private final String filesystemName;
+ /* Initial size for the thread pool when created. */
+ private final int initialPoolSize;
+ /* Initially available heap memory. */
+ private final long initialAvailableHeapMemory;
+ /* The configuration instance. */
+ private final AbfsConfiguration abfsConfiguration;
+
+ /**
+ * Private constructor to initialize the write thread pool and CPU monitor
executor
+ * based on system resources and ABFS configuration.
+ *
+ * @param filesystemName Name of the ABFS filesystem.
+ * @param abfsConfiguration Configuration containing pool size parameters.
+ */
+ private WriteThreadPoolSizeManager(String filesystemName,
+ AbfsConfiguration abfsConfiguration) {
+ this.filesystemName = filesystemName;
+ this.abfsConfiguration = abfsConfiguration;
+ int availableProcessors = Runtime.getRuntime().availableProcessors();
+ /* Get the heap space available when the instance is created */
+ this.initialAvailableHeapMemory = getAvailableHeapMemory();
+ /* Compute the max pool size */
+ int computedMaxPoolSize = getComputedMaxPoolSize(availableProcessors,
initialAvailableHeapMemory);
+
+ /* Get the initial pool size from config, fallback to at least 1 */
+ this.initialPoolSize = Math.max(1,
+ abfsConfiguration.getWriteConcurrentRequestCount());
+
+ /* Set the upper bound for the thread pool size */
+ this.maxThreadPoolSize = Math.max(computedMaxPoolSize, initialPoolSize);
+ AtomicInteger threadCount = new AtomicInteger(1);
+ this.boundedThreadPool = Executors.newFixedThreadPool(
+ initialPoolSize,
+ r -> {
+ Thread t = new Thread(r);
+ t.setName("abfs-boundedwrite-" + threadCount.getAndIncrement());
+ return t;
+ }
+ );
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) this.boundedThreadPool;
+ executor.setKeepAliveTime(
+ abfsConfiguration.getWriteThreadPoolKeepAliveTime(), TimeUnit.SECONDS);
+ executor.allowCoreThreadTimeOut(true);
+ /* Create a scheduled executor for CPU monitoring and pool adjustment */
+ this.cpuMonitorExecutor = Executors.newScheduledThreadPool(1);
+ }
+
+ /** Returns the internal {@link AbfsConfiguration}. */
+ private AbfsConfiguration getAbfsConfiguration() {
+ return abfsConfiguration;
+ }
+
+ /**
+ * Computes the maximum thread pool size based on the available processors
+ * and the initial available heap memory. The calculation uses a tiered
+ * multiplier derived from the memory-to-core ratio — systems with higher
+ * memory per core allow for a larger thread pool.
+ *
+ * @param availableProcessors the number of available CPU cores.
+ * @param initialAvailableHeapMemory the initial available heap memory, in
bytes or GB (depending on implementation).
+ * @return the computed maximum thread pool size.
+ */
+ private int getComputedMaxPoolSize(final int availableProcessors, long
initialAvailableHeapMemory) {
+ int maxpoolSize = getMemoryTierMaxThreads(initialAvailableHeapMemory,
availableProcessors);
+ LOG.debug("Computed max thread pool size: {} | Available processors: {} |
Heap memory (GB): {}",
+ maxpoolSize, availableProcessors, initialAvailableHeapMemory);
+ return maxpoolSize;
+ }
+
+ /**
+ * Calculates the available heap memory in gigabytes.
+ * This method uses {@link Runtime#getRuntime()} to obtain the maximum heap
memory
+ * allowed for the JVM and subtracts the currently used memory (total - free)
+ * to determine how much heap memory is still available.
+ * The result is rounded up to the nearest gigabyte.
+ *
+ * @return the available heap memory in gigabytes
+ */
+ private long getAvailableHeapMemory() {
+ MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
+ MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
+ long availableHeapBytes = memoryUsage.getMax() - memoryUsage.getUsed();
+ return (availableHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
+ }
+
+ /**
+ * Returns aggressive thread count = CPU cores × multiplier based on heap
tier.
+ */
+ private int getMemoryTierMaxThreads(long availableHeapGB, int
availableProcessors) {
+ int multiplier;
+ if (availableHeapGB <= LOW_HEAP_SPACE_FACTOR) {
+ multiplier = abfsConfiguration.getLowTierMemoryMultiplier();
+ } else if (availableHeapGB <= MEDIUM_HEAP_SPACE_FACTOR) {
+ multiplier = abfsConfiguration.getMediumTierMemoryMultiplier();
+ } else {
+ multiplier = abfsConfiguration.getHighTierMemoryMultiplier();
+ }
+ return availableProcessors * multiplier;
+ }
+
+ /**
+ * Returns the singleton instance of WriteThreadPoolSizeManager for the
given filesystem.
+ *
+ * @param filesystemName the name of the filesystem.
+ * @param abfsConfiguration the configuration for the ABFS.
+ *
+ * @return the singleton instance.
+ */
+ public static synchronized WriteThreadPoolSizeManager getInstance(
+ String filesystemName, AbfsConfiguration abfsConfiguration) {
+ /* Check if an instance already exists in the map for the given filesystem
*/
+ WriteThreadPoolSizeManager existingInstance = POOL_SIZE_MANAGER_MAP.get(
+ filesystemName);
+
+ /* If an existing instance is found, return it */
+ if (existingInstance != null && existingInstance.boundedThreadPool != null
+ && !existingInstance.boundedThreadPool.isShutdown()) {
+ return existingInstance;
+ }
+
+ /* Otherwise, create a new instance, put it in the map, and return it */
+ LOG.debug(
+ "Creating new WriteThreadPoolSizeManager instance for filesystem: {}",
+ filesystemName);
+ WriteThreadPoolSizeManager newInstance = new WriteThreadPoolSizeManager(
+ filesystemName, abfsConfiguration);
+ POOL_SIZE_MANAGER_MAP.put(filesystemName, newInstance);
+ return newInstance;
+ }
+
+ /**
+ * Adjusts the thread pool size to the specified maximum pool size.
+ *
+ * @param newMaxPoolSize the new maximum pool size.
+ */
+ private void adjustThreadPoolSize(int newMaxPoolSize) {
+ synchronized (this) {
+ ThreadPoolExecutor threadPoolExecutor
+ = ((ThreadPoolExecutor) boundedThreadPool);
+ int currentCorePoolSize = threadPoolExecutor.getCorePoolSize();
+
+ if (newMaxPoolSize >= currentCorePoolSize) {
+ threadPoolExecutor.setMaximumPoolSize(newMaxPoolSize);
+ threadPoolExecutor.setCorePoolSize(newMaxPoolSize);
+ } else {
+ threadPoolExecutor.setCorePoolSize(newMaxPoolSize);
+ threadPoolExecutor.setMaximumPoolSize(newMaxPoolSize);
+ }
+ LOG.debug("ThreadPool Info - New max pool size: {}, Current pool size:
{}, Active threads: {}",
+ newMaxPoolSize, threadPoolExecutor.getPoolSize(),
threadPoolExecutor.getActiveCount());
+ }
+ }
+
+ /**
+ * Starts monitoring the CPU utilization and adjusts the thread pool size
accordingly.
+ */
+ synchronized void startCPUMonitoring() {
+ cpuMonitorExecutor.scheduleAtFixedRate(() -> {
+ double cpuUtilization = getCpuUtilization();
+ LOG.debug("Current CPU Utilization is this: {}", cpuUtilization);
+ try {
+ adjustThreadPoolSizeBasedOnCPU(cpuUtilization);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(String.format(
+ "Thread pool size adjustment interrupted for filesystem %s",
+ filesystemName), e);
+ }
+ }, 0, getAbfsConfiguration().getWriteCpuMonitoringInterval(),
TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Gets the current system CPU utilization.
+ *
+ * @return the CPU utilization as a fraction (0.0 to 1.0), or 0.0 if
unavailable.
+ */
+ private double getCpuUtilization() {
+ OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(
+ OperatingSystemMXBean.class);
+ double cpuLoad = osBean.getSystemCpuLoad();
+ if (cpuLoad < 0) {
+ LOG.warn("System CPU load value unavailable (returned -1.0). Defaulting
to 0.0.");
+ return 0.0;
+ }
+ return cpuLoad;
+ }
+
+ /**
+ * Dynamically adjusts the thread pool size based on current CPU utilization
+ * and available heap memory relative to the initially available heap.
+ *
+ * @param cpuUtilization Current system CPU utilization (0.0 to 1.0)
+ * @throws InterruptedException if thread locking is interrupted
+ */
+ public void adjustThreadPoolSizeBasedOnCPU(double cpuUtilization) throws
InterruptedException {
+ lock.lock();
+ try {
+ ThreadPoolExecutor executor = (ThreadPoolExecutor)
this.boundedThreadPool;
+ int currentPoolSize = executor.getMaximumPoolSize();
+ long currentHeap = getAvailableHeapMemory();
+ long initialHeap = initialAvailableHeapMemory;
+ LOG.debug("Available heap memory: {} GB, Initial heap memory: {} GB",
currentHeap, initialHeap);
+ LOG.debug("Current CPU Utilization: {}", cpuUtilization);
+
+ if (cpuUtilization >
(abfsConfiguration.getWriteHighCpuThreshold()/HUNDRED_D)) {
+ newMaxPoolSize = calculateReducedPoolSizeHighCPU(currentPoolSize,
currentHeap, initialHeap);
+ } else if (cpuUtilization >
(abfsConfiguration.getWriteMediumCpuThreshold()/HUNDRED_D)) {
+ newMaxPoolSize = calculateReducedPoolSizeMediumCPU(currentPoolSize,
currentHeap, initialHeap);
+ } else if (cpuUtilization <
(abfsConfiguration.getWriteLowCpuThreshold()/HUNDRED_D)) {
+ newMaxPoolSize = calculateIncreasedPoolSizeLowCPU(currentPoolSize,
currentHeap, initialHeap);
+ } else {
+ newMaxPoolSize = currentPoolSize;
+ LOG.debug("CPU load normal ({}). No change: current={}",
cpuUtilization, currentPoolSize);
+ }
+ if (newMaxPoolSize != currentPoolSize) {
+ LOG.debug("Resizing thread pool from {} to {}", currentPoolSize,
newMaxPoolSize);
+ adjustThreadPoolSize(newMaxPoolSize);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Calculates reduced pool size under high CPU utilization.
+ */
+ private int calculateReducedPoolSizeHighCPU(int currentPoolSize, long
currentHeap, long initialHeap) {
+ if (currentHeap <= initialHeap / HIGH_MEDIUM_HEAP_FACTOR) {
+ LOG.debug("High CPU & low heap. Aggressively reducing: current={},
new={}",
+ currentPoolSize, currentPoolSize /
HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR);
+ return Math.max(initialPoolSize, currentPoolSize /
HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR);
+ }
+ int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize
/ HIGH_CPU_REDUCTION_FACTOR);
+ LOG.debug("High CPU ({}). Reducing pool size moderately: current={},
new={}",
+ abfsConfiguration.getWriteHighCpuThreshold(), currentPoolSize,
reduced);
+ return reduced;
+ }
+
+ /**
+ * Calculates reduced pool size under medium CPU utilization.
+ */
+ private int calculateReducedPoolSizeMediumCPU(int currentPoolSize, long
currentHeap, long initialHeap) {
+ if (currentHeap <= initialHeap / HIGH_MEDIUM_HEAP_FACTOR) {
+ int reduced = Math.max(initialPoolSize, currentPoolSize -
currentPoolSize / MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR);
+ LOG.debug("Medium CPU & low heap. Reducing: current={}, new={}",
currentPoolSize, reduced);
+ return reduced;
+ }
+ int reduced = Math.max(initialPoolSize, currentPoolSize - currentPoolSize
/ MEDIUM_CPU_REDUCTION_FACTOR);
+ LOG.debug("Medium CPU ({}). Moderate reduction: current={}, new={}",
+ abfsConfiguration.getWriteMediumCpuThreshold(), currentPoolSize,
reduced);
+ return reduced;
+ }
+
+ /**
+ * Calculates increased pool size under low CPU utilization.
+ */
+ private int calculateIncreasedPoolSizeLowCPU(int currentPoolSize, long
currentHeap, long initialHeap) {
+ if (currentHeap >= initialHeap * LOW_CPU_HEAP_FACTOR) {
+ int increased = Math.min(maxThreadPoolSize, (int) (currentPoolSize *
LOW_CPU_POOL_SIZE_INCREASE_FACTOR));
+ LOG.debug("Low CPU & healthy heap. Increasing: current={}, new={}",
currentPoolSize, increased);
+ return increased;
+ } else {
+ // Decrease by 10%
+ int decreased = Math.max(1, (int) (currentPoolSize *
LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR));
+ LOG.debug("Low CPU but insufficient heap ({} GB). Decreasing:
current={}, new={}", currentHeap, currentPoolSize, decreased);
+ return decreased;
+ }
+ }
+
+
+ /**
+ * Returns the executor service for the thread pool.
+ *
+ * @return the executor service.
+ */
+ public ExecutorService getExecutorService() {
+ return boundedThreadPool;
+ }
+
+ /**
+ * Returns the scheduled executor responsible for CPU monitoring and dynamic
pool adjustment.
+ *
+ * @return the {@link ScheduledExecutorService} used for CPU monitoring.
+ */
+ public ScheduledExecutorService getCpuMonitorExecutor() {
+ return cpuMonitorExecutor;
+ }
+
+ /**
+ * Closes this manager by shutting down executors and cleaning up resources.
+ * Removes the instance from the active manager map.
+ *
+ * @throws IOException if an error occurs during shutdown.
+ */
+ @Override
+ public void close() throws IOException {
+ synchronized (this) {
+ try {
+ // Shutdown CPU monitor
+ if (cpuMonitorExecutor != null && !cpuMonitorExecutor.isShutdown()) {
+ cpuMonitorExecutor.shutdown();
+ }
+ // Gracefully shutdown the bounded thread pool
+ if (boundedThreadPool != null && !boundedThreadPool.isShutdown()) {
+ boundedThreadPool.shutdown();
+ if (!boundedThreadPool.awaitTermination(THIRTY_SECONDS,
TimeUnit.SECONDS)) {
+ LOG.warn("Bounded thread pool did not terminate in time, forcing
shutdownNow for filesystem: {}", filesystemName);
+ boundedThreadPool.shutdownNow();
+ }
+ boundedThreadPool = null;
+ }
+ // Remove from the map
+ POOL_SIZE_MANAGER_MAP.remove(filesystemName);
+ LOG.debug("Closed and removed instance for filesystem: {}",
filesystemName);
+ } catch (Exception e) {
+ LOG.warn("Failed to properly close instance for filesystem: {}",
filesystemName, e);
+ }
+ }
+ }
+}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
index fe4991c9582..a751101cf57 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
@@ -173,6 +173,8 @@ public final class AbfsHttpConstants {
public static final char CHAR_EQUALS = '=';
public static final char CHAR_STAR = '*';
public static final char CHAR_PLUS = '+';
+ public static final int LOW_HEAP_SPACE_FACTOR = 4;
+ public static final double MEDIUM_HEAP_SPACE_FACTOR = 8;
public static final int SPLIT_NO_LIMIT = -1;
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
index 9afb37e35c7..899e96dadc1 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
@@ -459,6 +459,34 @@ public static String containerProperty(String property,
String fsName, String ac
public static final String FS_AZURE_BLOB_DIR_RENAME_MAX_THREAD =
"fs.azure.blob.dir.rename.max.thread";
/**Maximum number of thread per blob-delete orchestration: {@value}*/
public static final String FS_AZURE_BLOB_DIR_DELETE_MAX_THREAD =
"fs.azure.blob.dir.delete.max.thread";
+
+ /** Configuration key for the keep-alive time (ms) for the write thread
pool. Value: {@value}. */
+ public static final String FS_AZURE_WRITE_THREADPOOL_KEEP_ALIVE_TIME_MILLIS
= "fs.azure.write.threadpool.keep.alive.time.millis";
+
+ /** Configuration key for the CPU monitoring interval (ms) during write
operations. Value: {@value}. */
+ public static final String FS_AZURE_WRITE_CPU_MONITORING_INTERVAL_MILLIS =
"fs.azure.write.cpu.monitoring.interval.millis";
+
+ /** Configuration key to enable or disable dynamic write thread pool
adjustment. Value: {@value}. */
+ public static final String FS_AZURE_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT =
"fs.azure.write.dynamic.threadpool.enablement";
+
+ /** Configuration key for the high CPU utilization threshold (%) for write
scaling. Value: {@value}. */
+ public static final String FS_AZURE_WRITE_HIGH_CPU_THRESHOLD_PERCENT =
"fs.azure.write.high.cpu.threshold.percent";
+
+ /** Configuration key for the medium CPU utilization threshold (%) for write
scaling. Value: {@value}. */
+ public static final String FS_AZURE_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT =
"fs.azure.write.medium.cpu.threshold.percent";
+
+ /** Configuration key for the low CPU utilization threshold (%) for write
scaling. Value: {@value}. */
+ public static final String FS_AZURE_WRITE_LOW_CPU_THRESHOLD_PERCENT =
"fs.azure.write.low.cpu.threshold.percent";
+
+ /** Configuration key for the low-tier memory multiplier for write
workloads. Value: {@value}. */
+ public static final String FS_AZURE_WRITE_LOW_TIER_MEMORY_MULTIPLIER =
"fs.azure.write.low.tier.memory.multiplier";
+
+ /** Configuration key for the medium-tier memory multiplier for write
workloads. Value: {@value}. */
+ public static final String FS_AZURE_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER =
"fs.azure.write.medium.tier.memory.multiplier";
+
+ /** Configuration key for the high-tier memory multiplier for write
workloads. Value: {@value}. */
+ public static final String FS_AZURE_WRITE_HIGH_TIER_MEMORY_MULTIPLIER =
"fs.azure.write.high.tier.memory.multiplier";
+
/**Flag to enable/disable sending client transactional ID during
create/rename operations: {@value}*/
public static final String FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID =
"fs.azure.enable.client.transaction.id";
/**Flag to enable/disable create idempotency during create operation:
{@value}*/
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
index 83f636bf1d1..ea77f9d874a 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
@@ -36,7 +36,25 @@ public final class FileSystemConfigurations {
public static final boolean
DEFAULT_FS_AZURE_ACCOUNT_IS_EXPECT_HEADER_ENABLED = true;
public static final String USER_HOME_DIRECTORY_PREFIX = "/user";
- private static final int SIXTY_SECONDS = 60_000;
+ public static final int SIXTY_SECONDS = 60;
+ public static final int THIRTY_SECONDS = 30;
+ /**
+ * Number of bytes in a gigabyte.
+ */
+ public static final long BYTES_PER_GIGABYTE = 1024L * 1024 * 1024;
+ /**
+ * Factor by which the pool size is increased when CPU utilization is low.
+ */
+ public static final double LOW_CPU_POOL_SIZE_INCREASE_FACTOR = 1.5;
+ public static final double LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR = 0.9;
+ public static final int HIGH_CPU_REDUCTION_FACTOR = 3;
+ public static final int HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR = 2;
+ public static final int MEDIUM_CPU_REDUCTION_FACTOR = 5;
+ public static final int MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR = 3;
+ public static final int HIGH_MEDIUM_HEAP_FACTOR = 2;
+ public static final double LOW_CPU_HEAP_FACTOR = 0.8;
+
+
// Retry parameter defaults.
public static final int DEFAULT_MIN_BACKOFF_INTERVAL = 500; // 500ms
@@ -73,7 +91,7 @@ public final class FileSystemConfigurations {
public static final int ONE_KB = 1024;
public static final int ONE_MB = ONE_KB * ONE_KB;
- // Default upload and download buffer size
+ /** Default buffer sizes and optimization flags. */
public static final int DEFAULT_WRITE_BUFFER_SIZE = 8 * ONE_MB; // 8 MB
public static final int APPENDBLOB_MAX_WRITE_BUFFER_SIZE = 4 * ONE_MB; // 4
MB
public static final boolean DEFAULT_AZURE_ENABLE_SMALL_WRITE_OPTIMIZATION =
false;
@@ -217,6 +235,7 @@ public final class FileSystemConfigurations {
public static final int ZERO = 0;
public static final int HUNDRED = 100;
+ public static final double HUNDRED_D = 100.0;
public static final long THOUSAND = 1000L;
public static final HttpOperationType DEFAULT_NETWORKING_LIBRARY
@@ -262,6 +281,106 @@ public final class FileSystemConfigurations {
public static final int DEFAULT_FS_AZURE_BLOB_DELETE_THREAD =
DEFAULT_FS_AZURE_LISTING_ACTION_THREADS;
+ /**
+ * Whether dynamic write thread pool adjustment is enabled by default.
+ */
+ public static final boolean DEFAULT_WRITE_DYNAMIC_THREADPOOL_ENABLEMENT =
false;
+
+ /**
+ * Default keep-alive time (in milliseconds) for write thread pool threads.
+ */
+ public static final int DEFAULT_WRITE_THREADPOOL_KEEP_ALIVE_TIME_MILLIS =
30_000;
+
+ /**
+ * Minimum interval (in milliseconds) for CPU monitoring during write
operations.
+ */
+ public static final int MIN_WRITE_CPU_MONITORING_INTERVAL_MILLIS = 10_000;
+
+ /**
+ * Maximum interval (in milliseconds) for CPU monitoring during write
operations.
+ */
+ public static final int MAX_WRITE_CPU_MONITORING_INTERVAL_MILLIS = 60_000;
+
+ /**
+ * Default interval (in milliseconds) for CPU monitoring during write
operations.
+ */
+ public static final int DEFAULT_WRITE_CPU_MONITORING_INTERVAL_MILLIS =
15_000;
+
+ /**
+ * Minimum CPU utilization percentage considered as high threshold for write
scaling.
+ */
+ public static final int MIN_WRITE_HIGH_CPU_THRESHOLD_PERCENT = 65;
+
+ /**
+ * Maximum CPU utilization percentage considered as high threshold for write
scaling.
+ */
+ public static final int MAX_WRITE_HIGH_CPU_THRESHOLD_PERCENT = 90;
+
+ /**
+ * Default CPU utilization percentage considered as high threshold for write
scaling.
+ */
+ public static final int DEFAULT_WRITE_HIGH_CPU_THRESHOLD_PERCENT = 80;
+
+ /**
+ * Minimum CPU utilization percentage considered as medium threshold for
write scaling.
+ */
+ public static final int MIN_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT = 45;
+
+ /**
+ * Maximum CPU utilization percentage considered as medium threshold for
write scaling.
+ */
+ public static final int MAX_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT = 65;
+
+ /**
+ * Default CPU utilization percentage considered as medium threshold for
write scaling.
+ */
+ public static final int DEFAULT_WRITE_MEDIUM_CPU_THRESHOLD_PERCENT = 60;
+
+ /**
+ * Minimum CPU utilization percentage considered as low threshold for write
scaling.
+ */
+ public static final int MIN_WRITE_LOW_CPU_THRESHOLD_PERCENT = 10;
+
+ /**
+ * Maximum CPU utilization percentage considered as low threshold for write
scaling.
+ */
+ public static final int MAX_WRITE_LOW_CPU_THRESHOLD_PERCENT = 40;
+
+ /**
+ * Default CPU utilization percentage considered as low threshold for write
scaling.
+ */
+ public static final int DEFAULT_WRITE_LOW_CPU_THRESHOLD_PERCENT = 35;
+
+ /**
+ * Minimum multiplier applied to available memory for low-tier write
workloads.
+ */
+ public static final int MIN_WRITE_LOW_TIER_MEMORY_MULTIPLIER = 3;
+
+ /**
+ * Default multiplier applied to available memory for low-tier write
workloads.
+ */
+ public static final int DEFAULT_WRITE_LOW_TIER_MEMORY_MULTIPLIER = 4;
+
+ /**
+ * Minimum multiplier applied to available memory for medium-tier write
workloads.
+ */
+ public static final int MIN_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER = 6;
+
+ /**
+ * Default multiplier applied to available memory for medium-tier write
workloads.
+ */
+ public static final int DEFAULT_WRITE_MEDIUM_TIER_MEMORY_MULTIPLIER = 8;
+
+ /**
+ * Minimum multiplier applied to available memory for high-tier write
workloads.
+ */
+ public static final int MIN_WRITE_HIGH_TIER_MEMORY_MULTIPLIER = 12;
+
+ /**
+ * Default multiplier applied to available memory for high-tier write
workloads.
+ */
+ public static final int DEFAULT_WRITE_HIGH_TIER_MEMORY_MULTIPLIER = 16;
+
public static final boolean DEFAULT_FS_AZURE_ENABLE_CLIENT_TRANSACTION_ID =
true;
public static final boolean DEFAULT_FS_AZURE_ENABLE_CREATE_BLOB_IDEMPOTENCY
= true;
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsDriverException.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsDriverException.java
index 7b2d03d6923..1b201f3349d 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsDriverException.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsDriverException.java
@@ -30,8 +30,14 @@
@InterfaceStability.Evolving
public class AbfsDriverException extends AbfsRestOperationException {
+ /** Default error message used when no inner exception is provided. */
private static final String ERROR_MESSAGE = "Runtime Exception Occurred In
ABFS Driver";
+ /**
+ * Constructs an {@code AbfsDriverException} with the specified inner
exception.
+ *
+ * @param innerException the underlying exception that caused the failure
+ */
public AbfsDriverException(final Exception innerException) {
super(
AzureServiceErrorCode.UNKNOWN.getStatusCode(),
@@ -42,6 +48,13 @@ public AbfsDriverException(final Exception innerException) {
innerException);
}
+ /**
+ * Constructs an {@code AbfsDriverException} with the specified inner
exception
+ * and activity ID for correlation.
+ *
+ * @param innerException the underlying exception that caused the failure
+ * @param activityId the request or operation ID for traceability
+ */
public AbfsDriverException(final Exception innerException, final String
activityId) {
super(
AzureServiceErrorCode.UNKNOWN.getStatusCode(),
@@ -52,6 +65,13 @@ public AbfsDriverException(final Exception innerException,
final String activity
null);
}
+ /**
+ * Constructs an {@code AbfsDriverException} with a custom error message and
+ * inner exception.
+ *
+ * @param errorMessage a custom error message describing the failure
+ * @param innerException the underlying exception that caused the failure
+ */
public AbfsDriverException(final String errorMessage, final Exception
innerException) {
super(
AzureServiceErrorCode.UNKNOWN.getStatusCode(),
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenIdentifier.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenIdentifier.java
index 7272c13297e..91eca509211 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenIdentifier.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenIdentifier.java
@@ -37,16 +37,29 @@ public class AbfsDelegationTokenIdentifier extends
DelegationTokenIdentifier {
*/
public static final Text TOKEN_KIND = new Text("ABFS delegation");
- public AbfsDelegationTokenIdentifier(){
+ /** Creates an {@code AbfsDelegationTokenIdentifier} with the default ABFS
token kind. */
+ public AbfsDelegationTokenIdentifier() {
super(TOKEN_KIND);
}
+ /**
+ * Creates an {@code AbfsDelegationTokenIdentifier} with the specified token
kind.
+ *
+ * @param kind the token kind to use
+ */
public AbfsDelegationTokenIdentifier(Text kind) {
super(kind);
}
- public AbfsDelegationTokenIdentifier(Text kind, Text owner, Text renewer,
- Text realUser) {
+ /**
+ * Creates an {@code AbfsDelegationTokenIdentifier} with the specified
details.
+ *
+ * @param kind the token kind
+ * @param owner the token owner
+ * @param renewer the token renewer
+ * @param realUser the real user on whose behalf the token was issued
+ */
+ public AbfsDelegationTokenIdentifier(Text kind, Text owner, Text renewer,
Text realUser) {
super(kind, owner, renewer, realUser);
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenManager.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenManager.java
index 85d0434a687..c976fbd0e0c 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenManager.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/security/AbfsDelegationTokenManager.java
@@ -143,23 +143,38 @@ public Token<DelegationTokenIdentifier>
getDelegationToken(
return token;
}
+ /** Renews the given delegation token through the configured token manager.
+ * @param token the delegation token to renew
+ * @return the new expiration time of the token
+ * @throws IOException if renewal fails
+ */
public long renewDelegationToken(Token<?> token)
throws IOException {
return tokenManager.renewDelegationToken(token);
}
+ /** Cancels the given delegation token through the configured token manager.
+ * @param token the delegation token to cancel
+ * @throws IOException if cancellation fails
+ */
public void cancelDelegationToken(Token<?> token)
throws IOException {
tokenManager.cancelDelegationToken(token);
}
+ /** Returns the current {@link CustomDelegationTokenManager} instance (for
testing).
+ * @return the token manager instance
+ */
@VisibleForTesting
public CustomDelegationTokenManager getTokenManager() {
return tokenManager;
}
+ /** Returns a string representation of this token manager for debugging
purposes.
+ * @return a string describing this instance
+ */
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
index f574f4704ab..cd94ee1c8fd 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
@@ -154,6 +154,17 @@
*/
public class AbfsDfsClient extends AbfsClient {
+ /**
+ * Creates an {@code AbfsDfsClient} instance.
+ *
+ * @param baseUrl the base URL of the DFS endpoint
+ * @param sharedKeyCredentials the shared key credentials
+ * @param abfsConfiguration the ABFS configuration
+ * @param tokenProvider the access token provider for authentication
+ * @param encryptionContextProvider the encryption context provider
+ * @param abfsClientContext the ABFS client context
+ * @throws IOException if client initialization fails
+ */
public AbfsDfsClient(final URL baseUrl,
final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
@@ -164,6 +175,17 @@ public AbfsDfsClient(final URL baseUrl,
encryptionContextProvider, abfsClientContext, AbfsServiceType.DFS);
}
+ /**
+ * Creates an {@code AbfsDfsClient} instance.
+ *
+ * @param baseUrl the base URL of the DFS endpoint
+ * @param sharedKeyCredentials the shared key credentials
+ * @param abfsConfiguration the ABFS configuration
+ * @param sasTokenProvider the SAS token provider
+ * @param encryptionContextProvider the encryption context provider
+ * @param abfsClientContext the ABFS client context
+ * @throws IOException if client initialization fails
+ */
public AbfsDfsClient(final URL baseUrl,
final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
index f6272492d60..15ee4809911 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java
@@ -64,15 +64,33 @@ public class AbfsInputStreamContext extends
AbfsStreamContext {
private ContextEncryptionAdapter contextEncryptionAdapter = null;
+ /**
+ * Constructs a new {@link AbfsInputStreamContext}.
+ *
+ * @param sasTokenRenewPeriodForStreamsInSeconds SAS token renewal interval
in seconds.
+ */
public AbfsInputStreamContext(final long
sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds);
}
+ /**
+ * Sets the read buffer size.
+ *
+ * @param readBufferSize buffer size in bytes.
+ * @return this instance.
+ */
public AbfsInputStreamContext withReadBufferSize(final int readBufferSize) {
this.readBufferSize = readBufferSize;
return this;
}
+ /**
+ * Sets the read-ahead queue depth.
+ * Defaults to the number of available processors if negative.
+ *
+ * @param readAheadQueueDepth queue depth.
+ * @return this instance.
+ */
public AbfsInputStreamContext withReadAheadQueueDepth(
final int readAheadQueueDepth) {
this.readAheadQueueDepth = (readAheadQueueDepth >= 0)
@@ -81,83 +99,169 @@ public AbfsInputStreamContext withReadAheadQueueDepth(
return this;
}
+ /**
+ * Enables or disables tolerance for out-of-band appends.
+ *
+ * @param tolerateOobAppends whether OOB appends should be tolerated.
+ * @return this instance.
+ */
public AbfsInputStreamContext withTolerateOobAppends(
final boolean tolerateOobAppends) {
this.tolerateOobAppends = tolerateOobAppends;
return this;
}
+ /**
+ * Enables or disables read-ahead feature.
+ *
+ * @param isReadAheadEnabled whether read-ahead is enabled.
+ * @return this instance.
+ */
public AbfsInputStreamContext isReadAheadEnabled(
final boolean isReadAheadEnabled) {
this.isReadAheadEnabled = isReadAheadEnabled;
return this;
}
+ /**
+ * Enables or disables read-ahead version 2.
+ *
+ * @param isReadAheadV2Enabled whether read-ahead V2 is enabled.
+ * @return this instance.
+ */
public AbfsInputStreamContext isReadAheadV2Enabled(
final boolean isReadAheadV2Enabled) {
this.isReadAheadV2Enabled = isReadAheadV2Enabled;
return this;
}
+ /**
+ * Sets the read-ahead range.
+ *
+ * @param readAheadRange range in bytes.
+ * @return this instance.
+ */
public AbfsInputStreamContext withReadAheadRange(
final int readAheadRange) {
this.readAheadRange = readAheadRange;
return this;
}
+ /**
+ * Sets stream statistics collector.
+ *
+ * @param streamStatistics statistics instance.
+ * @return this instance.
+ */
public AbfsInputStreamContext withStreamStatistics(
final AbfsInputStreamStatistics streamStatistics) {
this.streamStatistics = streamStatistics;
return this;
}
+ /**
+ * Enables or disables complete read of small files in a single operation.
+ *
+ * @param readSmallFilesCompletely whether small files should be fully read.
+ * @return this instance.
+ */
public AbfsInputStreamContext withReadSmallFilesCompletely(
final boolean readSmallFilesCompletely) {
this.readSmallFilesCompletely = readSmallFilesCompletely;
return this;
}
+ /**
+ * Enables or disables footer read optimization.
+ *
+ * @param optimizeFooterRead whether footer read optimization is enabled.
+ * @return this instance.
+ */
public AbfsInputStreamContext withOptimizeFooterRead(
final boolean optimizeFooterRead) {
this.optimizeFooterRead = optimizeFooterRead;
return this;
}
+ /**
+ * Sets the footer read buffer size.
+ *
+ * @param footerReadBufferSize size in bytes.
+ * @return this instance.
+ */
public AbfsInputStreamContext withFooterReadBufferSize(final int
footerReadBufferSize) {
this.footerReadBufferSize = footerReadBufferSize;
return this;
}
+ /**
+ * Forces use of the configured read buffer size always.
+ *
+ * @param alwaysReadBufferSize whether to always use configured buffer size.
+ * @return this instance.
+ */
public AbfsInputStreamContext withShouldReadBufferSizeAlways(
final boolean alwaysReadBufferSize) {
this.alwaysReadBufferSize = alwaysReadBufferSize;
return this;
}
+ /**
+ * Sets the read-ahead block size.
+ *
+ * @param readAheadBlockSize block size in bytes.
+ * @return this instance.
+ */
public AbfsInputStreamContext withReadAheadBlockSize(
final int readAheadBlockSize) {
this.readAheadBlockSize = readAheadBlockSize;
return this;
}
+ /**
+ * Enables or disables buffered positional reads.
+ *
+ * @param bufferedPreadDisabled whether buffered pread is disabled.
+ * @return this instance.
+ */
public AbfsInputStreamContext withBufferedPreadDisabled(
final boolean bufferedPreadDisabled) {
this.bufferedPreadDisabled = bufferedPreadDisabled;
return this;
}
+ /**
+ * Sets a back reference to the filesystem that created this stream.
+ *
+ * @param fsBackRef filesystem back reference.
+ * @return this instance.
+ */
public AbfsInputStreamContext withAbfsBackRef(
final BackReference fsBackRef) {
this.fsBackRef = fsBackRef;
return this;
}
+ /**
+ * Sets the context encryption adapter.
+ *
+ * @param contextEncryptionAdapter encryption adapter.
+ * @return this instance.
+ */
public AbfsInputStreamContext withEncryptionAdapter(
ContextEncryptionAdapter contextEncryptionAdapter){
this.contextEncryptionAdapter = contextEncryptionAdapter;
return this;
}
+ /**
+ * Finalizes and validates the context configuration.
+ * <p>
+ * Ensures read-ahead range is valid and aligns read-ahead block size with
+ * read request size if necessary.
+ *
+ * @return this instance.
+ */
public AbfsInputStreamContext build() {
if (readBufferSize > readAheadBlockSize) {
LOG.debug(
@@ -173,62 +277,77 @@ public AbfsInputStreamContext build() {
return this;
}
+ /** @return configured read buffer size. */
public int getReadBufferSize() {
return readBufferSize;
}
+ /** @return read-ahead queue depth. */
public int getReadAheadQueueDepth() {
return readAheadQueueDepth;
}
+ /** @return whether out-of-band appends are tolerated. */
public boolean isTolerateOobAppends() {
return tolerateOobAppends;
}
+ /** @return whether read-ahead is enabled. */
public boolean isReadAheadEnabled() {
return isReadAheadEnabled;
}
+ /** @return whether read-ahead V2 is enabled. */
public boolean isReadAheadV2Enabled() {
return isReadAheadV2Enabled;
}
+ /** @return read-ahead range. */
public int getReadAheadRange() {
return readAheadRange;
}
+ /** @return stream statistics collector. */
public AbfsInputStreamStatistics getStreamStatistics() {
return streamStatistics;
}
+ /** @return whether small files should be read completely. */
public boolean readSmallFilesCompletely() {
return this.readSmallFilesCompletely;
}
+ /** @return whether footer read optimization is enabled. */
public boolean optimizeFooterRead() {
return this.optimizeFooterRead;
}
+ /** @return footer read buffer size. */
public int getFooterReadBufferSize() {
return footerReadBufferSize;
}
+ /** @return whether the configured buffer size is always used. */
public boolean shouldReadBufferSizeAlways() {
return alwaysReadBufferSize;
}
+ /** @return read-ahead block size. */
public int getReadAheadBlockSize() {
return readAheadBlockSize;
}
+ /** @return whether buffered pread is disabled. */
public boolean isBufferedPreadDisabled() {
return bufferedPreadDisabled;
}
+ /** @return filesystem back reference. */
public BackReference getFsBackRef() {
return fsBackRef;
}
+ /** @return context encryption adapter. */
public ContextEncryptionAdapter getEncryptionAdapter() {
return contextEncryptionAdapter;
}
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
new file mode 100644
index 00000000000..3d4c9aa48e8
--- /dev/null
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
@@ -0,0 +1,770 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerArray;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestWriteThreadPoolSizeManager extends AbstractAbfsIntegrationTest {
+
+ private AbfsConfiguration mockConfig;
+ private static final double HIGH_CPU_UTILIZATION_THRESHOLD = 0.95;
+ private static final double LOW_CPU_UTILIZATION_THRESHOLD = 0.05;
+ private static final int THREAD_SLEEP_DURATION_MS = 200;
+ private static final String TEST_FILE_PATH = "testFilePath";
+ private static final String TEST_DIR_PATH = "testDirPath";
+ private static final int TEST_FILE_LENGTH = 1024 * 1024 * 8;
+ private static final int CONCURRENT_REQUEST_COUNT = 15;
+ private static final int THREAD_POOL_KEEP_ALIVE_TIME = 10;
+ private static final int LOW_TIER_MEMORY_MULTIPLIER = 4;
+ private static final int MEDIUM_TIER_MEMORY_MULTIPLIER = 6;
+ private static final int HIGH_TIER_MEMORY_MULTIPLIER = 8;
+ private static final int HIGH_CPU_THRESHOLD = 15;
+ private static final int MEDIUM_CPU_THRESHOLD = 10;
+ private static final int LOW_CPU_THRESHOLD = 5;
+ private static final int CPU_MONITORING_INTERVAL = 15;
+ private static final int WAIT_DURATION_MS = 3000;
+ private static final int LATCH_TIMEOUT_SECONDS = 60;
+ private static final int RESIZE_WAIT_TIME_MS = 6_000;
+ private static final double HIGH_CPU_USAGE_RATIO = 0.95;
+ private static final double LOW_CPU_USAGE_RATIO = 0.05;
+ private static final int SLEEP_DURATION_MS = 150;
+ private static final int AWAIT_TIMEOUT_SECONDS = 45;
+ private static final int RESIZER_JOIN_TIMEOUT_MS = 2_000;
+ private static final int WAIT_TIMEOUT_MS = 5000;
+ private static final int SLEEP_DURATION_30S_MS = 30000;
+ private static final int SMALL_PAUSE_MS = 50;
+ private static final int BURST_LOAD = 50;
+ private static final long LOAD_SLEEP_DURATION_MS = 2000;
+
+ TestWriteThreadPoolSizeManager() throws Exception {
+ super.setup();
+ }
+
+ /**
+ * Common setup to prepare a mock configuration for each test.
+ */
+ @BeforeEach
+ public void setUp() {
+ mockConfig = mock(AbfsConfiguration.class);
+
when(mockConfig.getWriteConcurrentRequestCount()).thenReturn(CONCURRENT_REQUEST_COUNT);
+
when(mockConfig.getWriteThreadPoolKeepAliveTime()).thenReturn(THREAD_POOL_KEEP_ALIVE_TIME);
+
when(mockConfig.getLowTierMemoryMultiplier()).thenReturn(LOW_TIER_MEMORY_MULTIPLIER);
+
when(mockConfig.getMediumTierMemoryMultiplier()).thenReturn(MEDIUM_TIER_MEMORY_MULTIPLIER);
+
when(mockConfig.getHighTierMemoryMultiplier()).thenReturn(HIGH_TIER_MEMORY_MULTIPLIER);
+ when(mockConfig.getWriteHighCpuThreshold()).thenReturn(HIGH_CPU_THRESHOLD);
+
when(mockConfig.getWriteMediumCpuThreshold()).thenReturn(MEDIUM_CPU_THRESHOLD);
+ when(mockConfig.getWriteLowCpuThreshold()).thenReturn(LOW_CPU_THRESHOLD);
+
when(mockConfig.getWriteCpuMonitoringInterval()).thenReturn(CPU_MONITORING_INTERVAL);
+ }
+
+ /**
+ * Ensures that {@link WriteThreadPoolSizeManager#getInstance(String,
AbfsConfiguration)} returns a singleton per key.
+ */
+ @Test
+ void testGetInstanceReturnsSingleton() {
+ WriteThreadPoolSizeManager instance1
+ = WriteThreadPoolSizeManager.getInstance("testfs", mockConfig);
+ WriteThreadPoolSizeManager instance2
+ = WriteThreadPoolSizeManager.getInstance("testfs", mockConfig);
+ WriteThreadPoolSizeManager instance3 =
+ WriteThreadPoolSizeManager.getInstance("newFs", mockConfig);
+ Assertions.assertThat(instance1)
+ .as("Expected the same singleton instance for the same key")
+ .isSameAs(instance2);
+ Assertions.assertThat(instance1)
+ .as("Expected the same singleton instance for the same key")
+ .isNotSameAs(instance3);
+ }
+
+ /**
+ /**
+ * Tests that high CPU usage results in thread pool downscaling.
+ */
+ @Test
+ void testAdjustThreadPoolSizeBasedOnHighCPU() throws InterruptedException,
IOException {
+ // Get the executor service (ThreadPoolExecutor)
+ WriteThreadPoolSizeManager instance
+ = WriteThreadPoolSizeManager.getInstance("testfsHigh",
+ getAbfsStore(getFileSystem()).getAbfsConfiguration());
+ ExecutorService executor = instance.getExecutorService();
+ ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
+
+ // Simulate high CPU usage (e.g., 95% CPU utilization)
+ int initialMaxSize = threadPoolExecutor.getMaximumPoolSize();
+ instance.adjustThreadPoolSizeBasedOnCPU(HIGH_CPU_UTILIZATION_THRESHOLD);
// High CPU
+
+ // Get the new maximum pool size after adjustment
+ int newMaxSize = threadPoolExecutor.getMaximumPoolSize();
+
+ // Assert that the pool size has decreased or is equal to initial PoolSize
based on high CPU usage
+ Assertions.assertThat(newMaxSize)
+ .as("Expected pool size to decrease under high CPU usage")
+ .isLessThanOrEqualTo(initialMaxSize);
+ instance.close();
+ }
+
+ /**
+ * Tests that low CPU usage results in thread pool upscaling or remains the
same.
+ */
+ @Test
+ void testAdjustThreadPoolSizeBasedOnLowCPU()
+ throws InterruptedException, IOException {
+ WriteThreadPoolSizeManager instance
+ = WriteThreadPoolSizeManager.getInstance("testfsLow",
+ getAbfsStore(getFileSystem()).getAbfsConfiguration());
+ ExecutorService executor = instance.getExecutorService();
+ int initialSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
+ instance.adjustThreadPoolSizeBasedOnCPU(LOW_CPU_UTILIZATION_THRESHOLD); //
Low CPU
+ int newSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
+ Assertions.assertThat(newSize)
+ .as("Expected pool size to increase or stay the same under low CPU
usage")
+ .isGreaterThanOrEqualTo(initialSize);
+ instance.close();
+ }
+
+
+ /**
+ * Confirms that the thread pool executor is initialized and not shut down.
+ */
+ @Test
+ void testExecutorServiceIsNotNull() throws IOException {
+ WriteThreadPoolSizeManager instance
+ = WriteThreadPoolSizeManager.getInstance("testfsExec", mockConfig);
+ ExecutorService executor = instance.getExecutorService();
+ Assertions.assertThat(executor).as("Executor service should be
initialized")
+ .isNotNull();
+ Assertions.assertThat(executor.isShutdown())
+ .as("Executor service should not be shut down")
+ .isFalse();
+ instance.close();
+ }
+
+
+ /**
+ * Ensures that calling {@link WriteThreadPoolSizeManager#close()} cleans up
resources.
+ */
+ @Test
+ void testCloseCleansUp() throws Exception {
+ WriteThreadPoolSizeManager instance
+ = WriteThreadPoolSizeManager.getInstance("testfsClose", mockConfig);
+ ExecutorService executor = instance.getExecutorService();
+ instance.close();
+ Assertions.assertThat(executor.isShutdown() || executor.isTerminated())
+ .as("Executor service should be shut down or terminated after close()")
+ .isTrue();
+ }
+
+ /**
+ * Test that the CPU monitoring task is scheduled properly when
startCPUMonitoring() is called.
+ * This test checks the following:
+ * 1. That the CPU monitoring task gets scheduled by verifying that the CPU
monitor executor is not null.
+ * 2. Ensures that the thread pool executor has at least one thread running,
confirming that the task is being executed.
+ * @throws InterruptedException if the test is interrupted during the sleep
time
+ */
+ @Test
+ void testStartCPUMonitoringSchedulesTask()
+ throws InterruptedException, IOException {
+ // Create a new instance of WriteThreadPoolSizeManager using a mock
configuration
+ WriteThreadPoolSizeManager instance
+ = WriteThreadPoolSizeManager.getInstance("testScheduler", mockConfig);
+
+ // Call startCPUMonitoring to schedule the monitoring task
+ instance.startCPUMonitoring();
+
+ // Wait for a short period to allow the task to run and be scheduled
+ Thread.sleep(THREAD_SLEEP_DURATION_MS);
+
+ // Retrieve the CPU monitor executor (ScheduledThreadPoolExecutor) from
the instance
+ ScheduledThreadPoolExecutor monitor
+ = (ScheduledThreadPoolExecutor) instance.getCpuMonitorExecutor();
+
+ // Assert that the monitor executor is not null, ensuring that it was
properly initialized
+ Assertions.assertThat(monitor)
+ .as("CPU Monitor Executor should not be null")
+ .isNotNull();
+
+ // Assert that the thread pool size is greater than 0, confirming that the
task has been scheduled and threads are active
+ Assertions.assertThat(monitor.getPoolSize())
+ .as("Thread pool size should be greater than 0, indicating that the
task is running")
+ .isGreaterThan(ZERO);
+ instance.close();
+ }
+
+ /**
+ * Verifies that ABFS write tasks can complete successfully even when the
system
+ * is under artificial CPU stress. The test also ensures that the write
thread
+ * pool resizes dynamically under load without leading to starvation,
overload,
+ * or leftover work in the queue.
+ */
+ @Test
+ void testABFSWritesUnderCPUStress() throws Exception {
+ // Initialize the filesystem and thread pool manager
+ AzureBlobFileSystem fs = getFileSystem();
+ WriteThreadPoolSizeManager instance =
+ WriteThreadPoolSizeManager.getInstance(getFileSystemName(),
getConfiguration());
+ ThreadPoolExecutor executor =
+ (ThreadPoolExecutor) instance.getExecutorService();
+
+ // Start CPU monitoring so pool size adjustments happen in response to load
+ instance.startCPUMonitoring();
+
+ // Launch a background thread that generates CPU stress for ~3 seconds.
+ // This simulates contention on the system and should cause the pool to
adjust.
+ Thread stressThread = new Thread(() -> {
+ long end = System.currentTimeMillis() + WAIT_DURATION_MS;
+ while (System.currentTimeMillis() < end) {
+ // Busy-work loop: repeatedly compute random powers to waste CPU cycles
+ double waste = Math.pow(Math.random(), Math.random());
+ }
+ });
+ stressThread.start();
+
+ // Prepare the ABFS write workload with multiple concurrent tasks
+ int taskCount = 10;
+ CountDownLatch latch = new CountDownLatch(taskCount);
+ Path testFile = new Path(TEST_FILE_PATH);
+ final byte[] b = new byte[TEST_FILE_LENGTH];
+ new Random().nextBytes(b);
+
+ // Submit 10 tasks, each writing to its own file to simulate parallel load
+ for (int i = 0; i < taskCount; i++) {
+ int finalI = i;
+ executor.submit(() -> {
+ try (FSDataOutputStream out = fs.create(
+ new Path(testFile + "_" + finalI), true)) {
+ for (int j = 0; j < 5; j++) {
+ out.write(b); // perform multiple writes to add sustained pressure
+ }
+ out.hflush(); // flush to force actual I/O
+ } catch (IOException e) {
+ // Any failure here indicates pool misbehavior or I/O issues
+ Assertions.fail("Write task failed with exception", e);
+ } finally {
+ // Mark this task as complete
+ latch.countDown();
+ }
+ });
+ }
+
+ // Wait for all tasks to finish (up to 60s timeout to guard against
deadlock/starvation)
+ boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ // Record the pool size after CPU stress to confirm resizing took place
+ int resizedPoolSize = executor.getMaximumPoolSize();
+
+ // 1. All tasks must finish within timeout → proves no starvation or
deadlock
+ Assertions.assertThat(finished)
+ .as("All ABFS write tasks should complete without starvation")
+ .isTrue();
+
+ // 2. Pool size must fall within valid bounds → proves resizing occurred
+ Assertions.assertThat(resizedPoolSize)
+ .as("Thread pool size should dynamically adjust under CPU stress")
+ .isBetween(1,
getAbfsStore(fs).getAbfsConfiguration().getWriteConcurrentRequestCount());
+
+ // 3. Task queue must be empty → proves no backlog remains after workload
+ Assertions.assertThat(executor.getQueue().size())
+ .as("No backlog should remain in task queue after completion")
+ .isEqualTo(0);
+
+ // Cleanup resources
+ instance.close();
+ }
+
+
+ /**
+ * Ensures that dynamic thread pool resizing during an active ABFS write
workload
+ * does not cause deadlocks, task loss, or task duplication. The test also
verifies
+ * that the pool resizes while work is in progress and that the executor
queue
+ * eventually drains cleanly.
+ */
+ @Test
+ void testDynamicResizeNoDeadlocksNoTaskLoss() throws Exception {
+ // Initialize filesystem and thread pool manager
+ AzureBlobFileSystem fs = getFileSystem();
+ WriteThreadPoolSizeManager mgr =
+ WriteThreadPoolSizeManager.getInstance(getFileSystemName(),
mockConfig);
+ ThreadPoolExecutor executor = (ThreadPoolExecutor)
mgr.getExecutorService();
+
+ // Enable monitoring (may not be required if adjust() is triggered
internally)
+ mgr.startCPUMonitoring();
+
+ // Test configuration: enough tasks and writes to stress the pool
+ final int taskCount = 10;
+ final int writesPerTask = 5;
+ final byte[] b = new byte[TEST_FILE_LENGTH];
+ new Random().nextBytes(b);
+ final Path base = new Path(TEST_DIR_PATH);
+ fs.mkdirs(base);
+
+ // Barrier ensures all tasks start together, so resizing happens mid-flight
+ final CyclicBarrier startBarrier = new CyclicBarrier(taskCount + 1);
+ final CountDownLatch done = new CountDownLatch(taskCount);
+
+ // Track execution results
+ final AtomicIntegerArray completed = new AtomicIntegerArray(taskCount); //
mark tasks once
+ final AtomicInteger duplicates = new AtomicInteger(0); //
guard against double-completion
+ final AtomicInteger rejected = new AtomicInteger(0); //
count unexpected rejections
+
+ // Submit ABFS write tasks
+ for (int i = 0; i < taskCount; i++) {
+ final int id = i;
+ try {
+ executor.submit(() -> {
+ try {
+ // Hold until all tasks are enqueued, then start together
+ startBarrier.await(10, TimeUnit.SECONDS);
+
+ // Each task writes to its own file, flushing intermittently
+ Path subPath = new Path(base, "part-" + id);
+ try (FSDataOutputStream out = fs.create(subPath)) {
+ for (int w = 0; w < writesPerTask; w++) {
+ out.write(b);
+ if ((w & 1) == 1) {
+ out.hflush(); // force some syncs to increase contention
+ }
+ }
+ out.hflush();
+ }
+
+ // Mark task as completed once; duplicates flag if it happens again
+ if (!completed.compareAndSet(id, 0, 1)) {
+ duplicates.incrementAndGet();
+ }
+ } catch (Exception e) {
+ Assertions.fail("ABFS write task " + id + " failed", e);
+ } finally {
+ done.countDown();
+ }
+ });
+ } catch (RejectedExecutionException rex) {
+ rejected.incrementAndGet();
+ }
+ }
+
+ // Thread that simulates fluctuating CPU load while tasks are running
+ final AtomicInteger observedMinMax = new
AtomicInteger(executor.getMaximumPoolSize());
+ final AtomicInteger observedMaxMax = new
AtomicInteger(executor.getMaximumPoolSize());
+
+ Thread resizer = new Thread(() -> {
+ try {
+ // Release worker tasks
+ startBarrier.await(10, TimeUnit.SECONDS);
+
+ long end = System.currentTimeMillis() + RESIZE_WAIT_TIME_MS; // keep
resizing for ~6s
+ boolean high = true;
+ while (System.currentTimeMillis() < end) {
+ // Alternate between high load (shrink) and low load (expand)
+ if (high) {
+ mgr.adjustThreadPoolSizeBasedOnCPU(HIGH_CPU_USAGE_RATIO);
+ } else {
+ mgr.adjustThreadPoolSizeBasedOnCPU(LOW_CPU_USAGE_RATIO);
+ }
+ high = !high;
+
+ // Track observed pool size bounds to prove resizing occurred
+ int cur = executor.getMaximumPoolSize();
+ observedMinMax.updateAndGet(prev -> Math.min(prev, cur));
+ observedMaxMax.updateAndGet(prev -> Math.max(prev, cur));
+
+ Thread.sleep(SLEEP_DURATION_MS);
+ }
+ } catch (Exception ignore) {
+ // No-op: this is best-effort simulation
+ }
+ }, "resizer-thread");
+
+ resizer.start();
+
+ // Wait for all tasks to finish (ensures no deadlock)
+ boolean finished = done.await(AWAIT_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ // Join resizer thread
+ resizer.join(RESIZER_JOIN_TIMEOUT_MS);
+
+ // 1. All tasks must complete in time → proves there are no deadlocks
+ Assertions.assertThat(finished)
+ .as("All tasks must complete within timeout (no deadlock)")
+ .isTrue();
+
+ // 2. Every task should complete exactly once → proves no task loss
+ int completedCount = 0;
+ for (int i = 0; i < taskCount; i++) {
+ completedCount += completed.get(i);
+ }
+ Assertions.assertThat(completedCount)
+ .as("Every task should complete exactly once (no task loss)")
+ .isEqualTo(taskCount);
+
+ // 3. No task should mark itself as done more than once → proves no
duplication
+ Assertions.assertThat(duplicates.get())
+ .as("No task should report completion more than once (no duplication)")
+ .isZero();
+
+ // 4. The executor should not reject tasks while resizing is happening
+ Assertions.assertThat(rejected.get())
+ .as("Tasks should not be rejected during active resizing")
+ .isZero();
+
+ // 5. Executor queue should eventually empty once all tasks finish
+ Assertions.assertThat(executor.getQueue().size())
+ .as("Executor queue should drain after workload")
+ .isEqualTo(0);
+
+ // 6. Executor should still be running after workload until explicitly
closed
+ Assertions.assertThat(executor.isShutdown())
+ .as("Executor should remain running until manager.close()")
+ .isFalse();
+
+ // 7. Verify that resizing actually occurred (pool max both grew and
shrank)
+ int minObserved = observedMinMax.get();
+ int maxObserved = observedMaxMax.get();
+
+ Assertions.assertThat(maxObserved)
+ .as("Pool maximum size should have increased or fluctuated above
baseline")
+ .isGreaterThan(0);
+
+ Assertions.assertThat(minObserved)
+ .as("Pool maximum size should have dropped during resizing")
+ .isLessThanOrEqualTo(maxObserved);
+
+ // Cleanup
+ for (int i = 0; i < taskCount; i++) {
+ Path p = new Path(base, "part-" + i);
+ try {
+ fs.delete(p, false);
+ } catch (IOException ignore) {
+ // Ignored: delete failures are non-fatal for test cleanup
+ }
+ }
+ try {
+ fs.delete(base, true);
+ } catch (IOException ignore) {
+ // Ignored: cleanup failures are non-fatal in tests
+ }
+ mgr.close();
+ }
+
+
+
+ /**
+ * Verifies that when the system experiences high CPU usage,
+ * the WriteThreadPoolSizeManager detects the load and reduces
+ * the maximum thread pool size accordingly.
+ */
+ @Test
+ void testThreadPoolScalesDownOnHighCpuLoad() throws Exception {
+ // Initialize filesystem and thread pool manager
+ try (FileSystem fileSystem =
FileSystem.newInstance(getRawConfiguration())) {
+ AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem;
+ WriteThreadPoolSizeManager instance =
+ WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
getConfiguration());
+ ThreadPoolExecutor executor =
+ (ThreadPoolExecutor) instance.getExecutorService();
+
+ // Start monitoring CPU load
+ instance.startCPUMonitoring();
+
+ // Capture baseline pool size for comparison later
+ int initialMax = executor.getMaximumPoolSize();
+
+ // Define a CPU-bound task: tight loop of math ops for ~5s
+ Runnable cpuBurn = () -> {
+ long end = System.currentTimeMillis() + WAIT_TIMEOUT_MS;
+ while (System.currentTimeMillis() < end) {
+ double waste = Math.sin(Math.random()) * Math.cos(Math.random());
+ }
+ };
+
+ // Launch two CPU hogs in parallel
+ Thread cpuHog1 = new Thread(cpuBurn, "cpu-hog-thread-1");
+ Thread cpuHog2 = new Thread(cpuBurn, "cpu-hog-thread-2");
+ cpuHog1.start();
+ cpuHog2.start();
+
+ // Submit multiple write tasks while CPU is under stress
+ int taskCount = 10;
+ CountDownLatch latch = new CountDownLatch(taskCount);
+ Path base = new Path(TEST_DIR_PATH);
+ abfs.mkdirs(base);
+ final byte[] buffer = new byte[TEST_FILE_LENGTH];
+ new Random().nextBytes(buffer);
+
+ for (int i = 0; i < taskCount; i++) {
+ final Path part = new Path(base, "part-" + i);
+ executor.submit(() -> {
+ try (FSDataOutputStream out = abfs.create(part, true)) {
+ for (int j = 0; j < 5; j++) {
+ out.write(buffer);
+ out.hflush();
+ }
+ } catch (IOException e) {
+ Assertions.fail("Write task failed under CPU stress", e);
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ // Ensure all tasks complete (avoid deadlock/starvation)
+ boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ // Wait for CPU hogs to end and give monitor time to react
+ cpuHog1.join();
+ cpuHog2.join();
+ Thread.sleep(SLEEP_DURATION_30S_MS);
+
+ int resizedMax = executor.getMaximumPoolSize();
+
+ // Verify outcomes:
+ // 1. All write tasks succeeded despite CPU pressure
+ Assertions.assertThat(finished)
+ .as("All ABFS write tasks must complete despite CPU stress")
+ .isTrue();
+
+ // 2. Thread pool scaled down as expected
+ Assertions.assertThat(resizedMax)
+ .as("Thread pool should scale down under high CPU load")
+ .isLessThanOrEqualTo(initialMax);
+
+ // 3. No leftover tasks in the queue
+ Assertions.assertThat(executor.getQueue().size())
+ .as("No backlog should remain in the queue after workload")
+ .isEqualTo(0);
+
+ // Cleanup test data
+ for (int i = 0; i < taskCount; i++) {
+ try {
+ abfs.delete(new Path(base, "part-" + i), false);
+ } catch (IOException ignore) {
+ // Ignored: cleanup failures are non-fatal in tests
+ }
+ }
+ try {
+ abfs.delete(base, true);
+ } catch (IOException ignore) {
+ // Ignored: cleanup failures are non-fatal in tests
+ }
+ instance.close();
+ }
+ }
+
+
+ /**
+ * Verifies that when two parallel high memory–consuming workloads run,
+ * the WriteThreadPoolSizeManager detects the memory pressure and
+ * scales down the maximum thread pool size.
+ */
+ @Test
+ void testScalesDownOnParallelHighMemoryLoad() throws Exception {
+ // Initialize filesystem and thread pool manager
+ try (FileSystem fileSystem =
FileSystem.newInstance(getRawConfiguration())) {
+ AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem;
+ WriteThreadPoolSizeManager instance =
+ WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
+ getConfiguration());
+ ThreadPoolExecutor executor =
+ (ThreadPoolExecutor) instance.getExecutorService();
+
+ // Begin monitoring resource usage (CPU + memory)
+ instance.startCPUMonitoring();
+
+ // Capture the initial thread pool size for later comparison
+ int initialMax = executor.getMaximumPoolSize();
+
+ // Define a workload that continuously allocates memory (~5 MB chunks)
+ // for ~5 seconds to simulate memory pressure in the JVM.
+ Runnable memoryBurn = () -> {
+ List<byte[]> allocations = new ArrayList<>();
+ long end = System.currentTimeMillis() + WAIT_TIMEOUT_MS;
+ while (System.currentTimeMillis() < end) {
+ try {
+ allocations.add(new byte[5 * 1024 * 1024]); // allocate 5 MB
+ Thread.sleep(SMALL_PAUSE_MS); // small pause to avoid instant OOM
+ } catch (OutOfMemoryError oom) {
+ // Clear allocations if JVM runs out of memory and continue
+ allocations.clear();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ };
+
+ // Start two threads running the memory hog workload in parallel
+ Thread memHog1 = new Thread(memoryBurn, "mem-hog-thread-1");
+ Thread memHog2 = new Thread(memoryBurn, "mem-hog-thread-2");
+ memHog1.start();
+ memHog2.start();
+
+ // Submit several write tasks to ABFS while memory is under stress
+ int taskCount = 10;
+ CountDownLatch latch = new CountDownLatch(taskCount);
+ Path base = new Path(TEST_DIR_PATH);
+ abfs.mkdirs(base);
+ final byte[] buffer = new byte[TEST_FILE_LENGTH];
+ new Random().nextBytes(buffer);
+
+ for (int i = 0; i < taskCount; i++) {
+ final Path part = new Path(base, "part-" + i);
+ executor.submit(() -> {
+ try (FSDataOutputStream out = abfs.create(part, true)) {
+ for (int j = 0; j < 5; j++) {
+ out.write(buffer);
+ out.hflush();
+ }
+ } catch (IOException e) {
+ Assertions.fail("Write task failed under memory stress", e);
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ // Ensure all tasks finish within a timeout
+ boolean finished = latch.await(LATCH_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+
+ // Wait for memory hog threads to finish
+ memHog1.join();
+ memHog2.join();
+
+ // Give monitoring thread time to detect memory pressure and react
+ Thread.sleep(SLEEP_DURATION_30S_MS);
+
+ int resizedMax = executor.getMaximumPoolSize();
+
+ // Validate that:
+ // 1. All ABFS writes succeeded despite memory stress
+ Assertions.assertThat(finished)
+ .as("All ABFS write tasks must complete despite parallel memory
stress")
+ .isTrue();
+
+ // 2. The thread pool scaled down under memory pressure
+ Assertions.assertThat(resizedMax)
+ .as("Thread pool should scale down under parallel high memory load")
+ .isLessThanOrEqualTo(initialMax);
+
+ // 3. No tasks remain queued after workload completion
+ Assertions.assertThat(executor.getQueue().size())
+ .as("No backlog should remain in the queue after workload")
+ .isEqualTo(0);
+
+ // Clean up temporary test files
+ for (int i = 0; i < taskCount; i++) {
+ try {
+ abfs.delete(new Path(base, "part-" + i), false);
+ } catch (IOException ignore) {
+ // Ignored: cleanup failures are non-fatal in tests
+ }
+ }
+ try {
+ abfs.delete(base, true);
+ } catch (IOException ignore) {
+ // Ignored: cleanup failures are non-fatal in tests
+ }
+ instance.close();
+ }
+ }
+
+ /**
+ * Test that after a long idle period, the thread pool
+ * can quickly scale up in response to a sudden burst of load
+ * without performance degradation.
+ */
+ @Test
+ void testThreadPoolScalesUpAfterIdleBurstLoad() throws Exception {
+ // Initialize filesystem and thread pool manager
+ try (FileSystem fileSystem = FileSystem.newInstance(
+ getRawConfiguration())) {
+ AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem;
+ WriteThreadPoolSizeManager instance =
WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
+ abfs.getAbfsStore().getAbfsConfiguration());
+ ThreadPoolExecutor executor =
+ (ThreadPoolExecutor) instance.getExecutorService();
+
+ // --- Step 1: Simulate idle period ---
+ // Let the executor sit idle with no work for a few seconds
+ Thread.sleep(WAIT_TIMEOUT_MS);
+ int poolSizeAfterIdle = executor.getPoolSize();
+
+ // Verify that after idling, the pool is at or close to its minimum size
+ Assertions.assertThat(poolSizeAfterIdle)
+ .as("Pool size should remain minimal after idle")
+ .isLessThanOrEqualTo(executor.getCorePoolSize());
+
+ // --- Step 2: Submit a sudden burst of tasks ---
+ // Launch many short, CPU-heavy tasks at once to simulate burst load
+ int burstLoad = BURST_LOAD;
+ CountDownLatch latch = new CountDownLatch(burstLoad);
+ for (int i = 0; i < burstLoad; i++) {
+ executor.submit(() -> {
+ // Busy loop for ~200ms to simulate CPU work
+ long end = System.currentTimeMillis() + THREAD_SLEEP_DURATION_MS;
+ while (System.currentTimeMillis() < end) {
+ Math.sqrt(Math.random()); // burn CPU cycles
+ }
+ latch.countDown();
+ });
+ }
+
+ // --- Step 3: Give pool time to react ---
+ // Wait briefly so the pool’s scaling logic has a chance to expand
+ Thread.sleep(LOAD_SLEEP_DURATION_MS);
+ int poolSizeDuringBurst = executor.getPoolSize();
+
+ // Verify that the pool scaled up compared to idle
+ Assertions.assertThat(poolSizeDuringBurst)
+ .as("Pool size should increase after burst load")
+ .isGreaterThanOrEqualTo(poolSizeAfterIdle);
+
+// --- Step 4: Verify completion ---
+// Ensure all tasks complete successfully in a reasonable time,
+// proving there was no degradation or deadlock under burst load
+ Assertions.assertThat(
+ latch.await(LATCH_TIMEOUT_SECONDS / 2, TimeUnit.SECONDS))
+ .as("All burst tasks should finish in reasonable time")
+ .isTrue();
+ instance.close();
+ }
+ }
+}
+
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
index 372dbfc8033..54521c9c971 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsOutputStream.java
@@ -105,7 +105,7 @@ public void testMaxRequestsAndQueueCapacityDefaults()
throws Exception {
AbfsOutputStream stream = (AbfsOutputStream) out.getWrappedStream();
int maxConcurrentRequests
- = getConfiguration().getWriteMaxConcurrentRequestCount();
+ = getConfiguration().getWriteConcurrentRequestCount();
if (stream.isAppendBlobStream()) {
maxConcurrentRequests = 1;
}
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
index a4eefce0cb8..146eed84e80 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsOutputStream.java
@@ -96,7 +96,7 @@ private AbfsOutputStreamContext
populateAbfsOutputStreamContext(
.disableOutputStreamFlush(disableOutputStreamFlush)
.withStreamStatistics(new AbfsOutputStreamStatisticsImpl())
.withAppendBlob(isAppendBlob)
-
.withWriteMaxConcurrentRequestCount(abfsConf.getWriteMaxConcurrentRequestCount())
+
.withWriteMaxConcurrentRequestCount(abfsConf.getWriteConcurrentRequestCount())
.withMaxWriteRequestsToQueue(abfsConf.getMaxWriteRequestsToQueue())
.withClientHandler(clientHandler)
.withPath(path)
@@ -613,7 +613,7 @@ private ExecutorService createExecutorService(
AbfsConfiguration abfsConf) {
ExecutorService executorService =
new
SemaphoredDelegatingExecutor(BlockingThreadPoolExecutorService.newInstance(
- abfsConf.getWriteMaxConcurrentRequestCount(),
+ abfsConf.getWriteConcurrentRequestCount(),
abfsConf.getMaxWriteRequestsToQueue(),
10L, TimeUnit.SECONDS,
"abfs-test-bounded"),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]