This is an automated email from the ASF dual-hosted git repository.
anujmodi pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new e7cab8ae513 HADOOP-19737: ABFS: [BugFix] Add metrics to identify
improvements with read and write aggressiveness (#8135)
e7cab8ae513 is described below
commit e7cab8ae51385ce8025b282d8055b14d8aa9660b
Author: Anmol Asrani <[email protected]>
AuthorDate: Thu Dec 18 15:36:09 2025 +0530
HADOOP-19737: ABFS: [BugFix] Add metrics to identify improvements with read
and write aggressiveness (#8135)
Contributed by Anmol Asrani
---
.../hadoop/fs/azurebfs/JvmUniqueIdProvider.java | 70 ++++++++++
.../fs/azurebfs/WriteThreadPoolSizeManager.java | 82 ++++++------
.../fs/azurebfs/services/AbfsBlobClient.java | 6 +-
.../hadoop/fs/azurebfs/services/AbfsDfsClient.java | 6 +-
.../AbfsReadResourceUtilizationMetrics.java | 49 ++++++-
.../AbfsWriteResourceUtilizationMetrics.java | 50 ++++++-
.../AbstractAbfsResourceUtilizationMetrics.java | 104 ++++++++-------
.../azurebfs/services/AzureBlobIngressHandler.java | 6 +-
.../azurebfs/services/AzureDFSIngressHandler.java | 6 +-
.../AzureDfsToBlobIngressFallbackHandler.java | 6 +-
.../fs/azurebfs/services/ReadBufferManagerV2.java | 34 +++--
.../services/ResourceUtilizationStats.java | 54 ++++----
.../azurebfs/utils/ResourceUtilizationUtils.java | 144 +++++++++++----------
.../azurebfs/TestWriteThreadPoolSizeManager.java | 75 ++++++++++-
.../azurebfs/services/TestReadBufferManagerV2.java | 2 +-
15 files changed, 465 insertions(+), 229 deletions(-)
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/JvmUniqueIdProvider.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/JvmUniqueIdProvider.java
new file mode 100644
index 00000000000..3effbccb830
--- /dev/null
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/JvmUniqueIdProvider.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.util.Random;
+
+/**
+ * Provides a JVM-scoped identifier.
+ *
+ * <p>The identifier is generated once when the class is loaded and remains
+ * constant for the lifetime of the JVM. It is derived using a combination of
+ * the current system time and random entropy to reduce the likelihood of
+ * collisions across JVM instances.</p>
+ *
+ * <p>The identifier is intended for lightweight JVM-level identification,
+ * such as tagging metrics or log entries. It provides best-effort uniqueness
+ * and is not guaranteed to be globally unique.</p>
+ *
+ * <p>This class is utility-only and cannot be instantiated.</p>
+ */
+public final class JvmUniqueIdProvider {
+
+ /** Lower bound (inclusive) for the generated JVM identifier. */
+ private static final int MIN_JVM_ID = 100_000;
+
+ /** Size of the identifier value range. */
+ private static final int JVM_ID_RANGE = 900_000;
+
+ /** Upper bound for random entropy mixed into the identifier. */
+ private static final int RANDOM_ENTROPY_BOUND = 1_000;
+
+ /** JVM-scoped identifier generated at class initialization time. */
+ private static final int JVM_UNIQUE_ID;
+
+ static {
+ long time = System.currentTimeMillis();
+ int random = new Random().nextInt(RANDOM_ENTROPY_BOUND);
+ JVM_UNIQUE_ID = (int) ((time + random) % JVM_ID_RANGE) + MIN_JVM_ID;
+ }
+
+ /** Prevents instantiation. */
+ private JvmUniqueIdProvider() {
+ }
+
+ /**
+ * Returns the JVM-scoped identifier.
+ *
+ * @return an identifier that remains constant for the lifetime of the JVM
+ */
+ public static int getJvmId() {
+ return JVM_UNIQUE_ID;
+ }
+}
+
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
index 24aecb2d977..de11037780b 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/WriteThreadPoolSizeManager.java
@@ -44,7 +44,6 @@
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.MEDIUM_HEAP_SPACE_FACTOR;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_LOW_MEMORY_REDUCTION_FACTOR;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HIGH_CPU_REDUCTION_FACTOR;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_HIGH_MEMORY_DECREASE_FACTOR;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.LOW_CPU_POOL_SIZE_INCREASE_FACTOR;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MEDIUM_CPU_LOW_MEMORY_REDUCTION_FACTOR;
@@ -55,7 +54,6 @@
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_UP;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.THIRTY_SECONDS;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO_D;
/**
* Manages a thread pool for writing operations, adjusting the pool size based
on CPU utilization.
@@ -91,11 +89,11 @@ public final class WriteThreadPoolSizeManager implements
Closeable {
/* Tracks the last scale direction applied, or empty if none. */
private volatile String lastScaleDirection = EMPTY_STRING;
/* Maximum CPU utilization observed during the monitoring interval. */
- private volatile double maxJvmCpuUtilization = 0.0;
+ private volatile long maxJvmCpuUtilization = 0L;
/** High memory usage threshold used to trigger thread pool downscaling. */
- private final double highMemoryThreshold;
+ private final long highMemoryThreshold;
/** Low memory usage threshold used to allow thread pool upscaling. */
- private final double lowMemoryThreshold;
+ private final long lowMemoryThreshold;
/**
* Private constructor to initialize the write thread pool and CPU monitor
executor
@@ -136,8 +134,8 @@ private WriteThreadPoolSizeManager(String filesystemName,
executor.allowCoreThreadTimeOut(true);
/* Create a scheduled executor for CPU monitoring and pool adjustment */
this.cpuMonitorExecutor = Executors.newScheduledThreadPool(1);
- highMemoryThreshold =
abfsConfiguration.getWriteHighMemoryUsageThresholdPercent() / HUNDRED_D;
- lowMemoryThreshold =
abfsConfiguration.getWriteLowMemoryUsageThresholdPercent() / HUNDRED_D;
+ highMemoryThreshold =
abfsConfiguration.getWriteHighMemoryUsageThresholdPercent();
+ lowMemoryThreshold =
abfsConfiguration.getWriteLowMemoryUsageThresholdPercent();
}
/** Returns the internal {@link AbfsConfiguration}. */
@@ -245,7 +243,7 @@ public synchronized void startCPUMonitoring() {
if (!isMonitoringStarted()) {
isMonitoringStarted = true;
cpuMonitorExecutor.scheduleAtFixedRate(() -> {
- double cpuUtilization = ResourceUtilizationUtils.getJvmCpuLoad();
+ long cpuUtilization = ResourceUtilizationUtils.getJvmCpuLoad();
LOG.debug("Current CPU Utilization is this: {}", cpuUtilization);
try {
adjustThreadPoolSizeBasedOnCPU(cpuUtilization);
@@ -266,24 +264,27 @@ public synchronized void startCPUMonitoring() {
* @param cpuUtilization Current system CPU utilization (0.0 to 1.0)
* @throws InterruptedException if the resizing operation is interrupted
while acquiring the lock
*/
- public void adjustThreadPoolSizeBasedOnCPU(double cpuUtilization) throws
InterruptedException {
+ public void adjustThreadPoolSizeBasedOnCPU(long cpuUtilization) throws
InterruptedException {
lock.lock();
try {
ThreadPoolExecutor executor = (ThreadPoolExecutor)
this.boundedThreadPool;
int currentPoolSize = executor.getMaximumPoolSize();
- double memoryLoad = ResourceUtilizationUtils.getMemoryLoad();
+ long memoryLoad = ResourceUtilizationUtils.getMemoryLoad();
+ long usedHeapMemory = ResourceUtilizationUtils.getUsedHeapMemory();
+ long availableMemory = ResourceUtilizationUtils.getAvailableHeapMemory();
+ long committedMemory = ResourceUtilizationUtils.getCommittedHeapMemory();
LOG.debug("The memory load is {} and CPU utilization is {}", memoryLoad,
cpuUtilization);
- if (cpuUtilization >
(abfsConfiguration.getWriteHighCpuThreshold()/HUNDRED_D)) {
+ if (cpuUtilization > (abfsConfiguration.getWriteHighCpuThreshold())) {
newMaxPoolSize = calculateReducedPoolSizeHighCPU(currentPoolSize,
memoryLoad);
if (currentPoolSize == initialPoolSize && newMaxPoolSize ==
initialPoolSize) {
lastScaleDirection = SCALE_DIRECTION_NO_DOWN_AT_MIN;
}
- } else if (cpuUtilization >
(abfsConfiguration.getWriteMediumCpuThreshold()/HUNDRED_D)) {
+ } else if (cpuUtilization >
(abfsConfiguration.getWriteMediumCpuThreshold())) {
newMaxPoolSize = calculateReducedPoolSizeMediumCPU(currentPoolSize,
memoryLoad);
if (currentPoolSize == initialPoolSize && newMaxPoolSize ==
initialPoolSize) {
lastScaleDirection = SCALE_DIRECTION_NO_DOWN_AT_MIN;
}
- } else if (cpuUtilization <
(abfsConfiguration.getWriteLowCpuThreshold()/HUNDRED_D)) {
+ } else if (cpuUtilization <
(abfsConfiguration.getWriteLowCpuThreshold())) {
newMaxPoolSize = calculateIncreasedPoolSizeLowCPU(currentPoolSize,
memoryLoad);
if (currentPoolSize == maxThreadPoolSize && newMaxPoolSize ==
maxThreadPoolSize) {
lastScaleDirection = SCALE_DIRECTION_NO_UP_AT_MAX;
@@ -294,7 +295,8 @@ public void adjustThreadPoolSizeBasedOnCPU(double
cpuUtilization) throws Interru
}
boolean willResize = newMaxPoolSize != currentPoolSize;
if (!willResize && !lastScaleDirection.equals(EMPTY_STRING)) {
- WriteThreadPoolStats stats = getCurrentStats(cpuUtilization,
memoryLoad);
+ WriteThreadPoolStats stats = getCurrentStats(cpuUtilization,
memoryLoad,
+ usedHeapMemory, availableMemory, committedMemory);
// Update the write thread pool metrics with the latest statistics
snapshot.
writeThreadPoolMetrics.update(stats);
}
@@ -304,7 +306,8 @@ public void adjustThreadPoolSizeBasedOnCPU(double
cpuUtilization) throws Interru
if (!willResize) {
try {
// Capture the latest thread pool statistics (pool size, CPU,
memory, etc.).
- WriteThreadPoolStats stats = getCurrentStats(cpuUtilization,
memoryLoad);
+ WriteThreadPoolStats stats = getCurrentStats(cpuUtilization,
memoryLoad,
+ usedHeapMemory, availableMemory, committedMemory);
// Update the write thread pool metrics with the latest statistics
snapshot.
writeThreadPoolMetrics.update(stats);
} catch (Exception e) {
@@ -320,7 +323,8 @@ public void adjustThreadPoolSizeBasedOnCPU(double
cpuUtilization) throws Interru
adjustThreadPoolSize(newMaxPoolSize);
try {
// Capture the latest thread pool statistics (pool size, CPU,
memory, etc.).
- WriteThreadPoolStats stats = getCurrentStats(cpuUtilization,
memoryLoad);
+ WriteThreadPoolStats stats = getCurrentStats(cpuUtilization,
memoryLoad,
+ usedHeapMemory, availableMemory, committedMemory);
// Update the write thread pool metrics with the latest statistics
snapshot.
writeThreadPoolMetrics.update(stats);
} catch (Exception e) {
@@ -437,7 +441,7 @@ public synchronized boolean isMonitoringStarted() {
* @return the highest JVM CPU utilization percentage recorded
*/
@VisibleForTesting
- public double getMaxJvmCpuUtilization() {
+ public long getMaxJvmCpuUtilization() {
return maxJvmCpuUtilization;
}
@@ -500,9 +504,9 @@ public static class WriteThreadPoolStats extends
ResourceUtilizationStats {
*/
public WriteThreadPoolStats(int currentPoolSize,
int maxPoolSize, int activeThreads, int idleThreads,
- double jvmCpuLoad, double systemCpuUtilization, double availableHeapGB,
- double committedHeapGB, double usedHeapGB, double maxHeapGB, double
memoryLoad, String lastScaleDirection,
- double maxCpuUtilization, long jvmProcessId) {
+ long jvmCpuLoad, long systemCpuUtilization, long availableHeapGB,
+ long committedHeapGB, long usedHeapGB, long maxHeapGB, long
memoryLoad, String lastScaleDirection,
+ long maxCpuUtilization, long jvmProcessId) {
super(currentPoolSize, maxPoolSize, activeThreads, idleThreads,
jvmCpuLoad, systemCpuUtilization, availableHeapGB,
committedHeapGB, usedHeapGB, maxHeapGB, memoryLoad,
lastScaleDirection,
@@ -511,22 +515,28 @@ public WriteThreadPoolStats(int currentPoolSize,
}
/**
- * Returns the latest statistics for the write thread pool and system
resources.
- * The snapshot includes thread counts, JVM and system CPU utilization, and
the
- * current heap usage. These metrics are used for monitoring and making
dynamic
- * sizing decisions for the write thread pool.
+ * Returns a snapshot of the current write thread pool and JVM/system
resource
+ * statistics.
*
- * @param jvmCpuUtilization current JVM CPU usage (%)
- * @param memoryLoad current JVM memory load (used/committed)
- * @return a {@link WriteThreadPoolStats} object containing the current
metrics
+ * <p>The snapshot includes thread pool size and activity, JVM and system CPU
+ * utilization, and JVM heap memory metrics. These values are used for
monitoring
+ * and for making dynamic scaling decisions for the write thread pool.</p>
+ *
+ * @param jvmCpuUtilization current JVM CPU utilization
+ * @param memoryLoad current JVM memory load ratio (used / max)
+ * @param usedMemory current used JVM heap memory
+ * @param availableMemory current available JVM heap memory
+ * @param committedMemory current committed JVM heap memory
+ *
+ * @return a {@link WriteThreadPoolStats} instance containing the current
metrics
*/
- synchronized WriteThreadPoolStats getCurrentStats(double jvmCpuUtilization,
- double memoryLoad) {
+ synchronized WriteThreadPoolStats getCurrentStats(long jvmCpuUtilization,
+ long memoryLoad, long usedMemory, long availableMemory, long
committedMemory) {
if (boundedThreadPool == null) {
return new WriteThreadPoolStats(
- ZERO, ZERO, ZERO, ZERO, ZERO_D, ZERO_D, ZERO_D, ZERO_D, ZERO_D,
- ZERO_D, ZERO_D, EMPTY_STRING, ZERO_D, ZERO);
+ ZERO, ZERO, ZERO, ZERO, ZERO, ZERO, ZERO, ZERO, ZERO,
+ ZERO, ZERO, EMPTY_STRING, ZERO, ZERO);
}
ThreadPoolExecutor exec = (ThreadPoolExecutor) this.boundedThreadPool;
@@ -544,15 +554,15 @@ synchronized WriteThreadPoolStats getCurrentStats(double
jvmCpuUtilization,
activeThreads, // Busy threads
idleThreads, // Idle threads
jvmCpuUtilization, // JVM CPU usage (ratio)
- ResourceUtilizationUtils.getSystemCpuLoad(), // System CPU usage
(ratio)
- ResourceUtilizationUtils.getAvailableHeapMemory(), // Free heap
(GB)
- ResourceUtilizationUtils.getCommittedHeapMemory(), // Committed
heap (GB)
- ResourceUtilizationUtils.getUsedHeapMemory(), // Used heap (GB)
+ ResourceUtilizationUtils.getSystemCpuLoad(), // System CPU usage
(ratio)
+ availableMemory, // Free heap (GB)
+ committedMemory, // Committed heap (GB)
+ usedMemory, // Used heap (GB)
ResourceUtilizationUtils.getMaxHeapMemory(), // Max heap (GB)
memoryLoad, // used/max
currentScaleDirection, // "I", "D", or ""
getMaxJvmCpuUtilization(), // Peak JVM CPU usage so far
- ResourceUtilizationUtils.getJvmProcessId() // JVM PID
+ JvmUniqueIdProvider.getJvmId() // JVM PID
);
}
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
index 43171aef073..52fbd3182fd 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java
@@ -1338,7 +1338,11 @@ public AbfsRestOperation read(final String path,
AbfsReadResourceUtilizationMetrics readResourceUtilizationMetrics =
retrieveReadResourceUtilizationMetrics();
// If metrics are available, record them in the tracing context for
diagnostics or logging.
if (readResourceUtilizationMetrics != null) {
-
tracingContext.setResourceUtilizationMetricResults(readResourceUtilizationMetrics.toString());
+ String readMetrics = readResourceUtilizationMetrics.toString();
+ tracingContext.setResourceUtilizationMetricResults(readMetrics);
+ if (!readMetrics.isEmpty()) {
+ readResourceUtilizationMetrics.markPushed();
+ }
}
URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = getAbfsRestOperation(
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
index cf2449ea918..5ddb9770ac5 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java
@@ -1065,7 +1065,11 @@ public AbfsRestOperation read(final String path,
AbfsReadResourceUtilizationMetrics readResourceUtilizationMetrics =
retrieveReadResourceUtilizationMetrics();
// If metrics are available, record them in the tracing context for
diagnostics or logging.
if (readResourceUtilizationMetrics != null) {
-
tracingContext.setResourceUtilizationMetricResults(readResourceUtilizationMetrics.toString());
+ String readMetrics = readResourceUtilizationMetrics.toString();
+ tracingContext.setResourceUtilizationMetricResults(readMetrics);
+ if (!readMetrics.isEmpty()) {
+ readResourceUtilizationMetrics.markPushed();
+ }
}
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = getAbfsRestOperation(
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadResourceUtilizationMetrics.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadResourceUtilizationMetrics.java
index a38dd08c557..82fef4f4b62 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadResourceUtilizationMetrics.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsReadResourceUtilizationMetrics.java
@@ -19,7 +19,7 @@
package org.apache.hadoop.fs.azurebfs.services;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
+import java.util.concurrent.atomic.AtomicLong;
import
org.apache.hadoop.fs.azurebfs.enums.AbfsReadResourceUtilizationMetricsEnum;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
@@ -36,6 +36,36 @@ public class AbfsReadResourceUtilizationMetrics
extends
AbstractAbfsResourceUtilizationMetrics<AbfsReadResourceUtilizationMetricsEnum> {
+ /**
+ * A version counter incremented each time a metric update occurs.
+ * Used to detect whether metrics have changed since the last serialization.
+ */
+ private final AtomicLong updateVersion = new AtomicLong(0);
+
+ /**
+ * The last version number that was serialized and pushed out.
+ */
+ private final AtomicLong lastPushedVersion = new AtomicLong(0);
+
+ @Override
+ protected boolean isUpdated() {
+ return updateVersion.get() > lastPushedVersion.get();
+ }
+
+ protected synchronized void markUpdated() {
+ updateVersion.incrementAndGet();
+ }
+
+ @Override
+ protected long getUpdateVersion() {
+ return updateVersion.get();
+ }
+
+ @Override
+ protected long getLastPushedVersion() {
+ return lastPushedVersion.get();
+ }
+
/**
* Creates a metrics set for read operations, initializing all
* metric keys defined in {@link AbfsReadResourceUtilizationMetricsEnum}.
@@ -44,6 +74,15 @@ public AbfsReadResourceUtilizationMetrics() {
super(AbfsReadResourceUtilizationMetricsEnum.values(),
FSOperationType.READ.toString());
}
+ /**
+ * Marks the current metrics version as pushed.
+ * Must be called only after the metrics string is actually emitted.
+ */
+ @Override
+ public synchronized void markPushed() {
+ lastPushedVersion.set(updateVersion.get());
+ }
+
/**
* Updates all read-thread-pool metrics using the latest stats snapshot.
* <p>
@@ -71,16 +110,16 @@ public synchronized void
update(ReadBufferManagerV2.ReadThreadPoolStats stats) {
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MAX_POOL_SIZE,
stats.getMaxPoolSize());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.ACTIVE_THREADS,
stats.getActiveThreads());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.IDLE_THREADS,
stats.getIdleThreads());
- setMetricValue(AbfsReadResourceUtilizationMetricsEnum.JVM_CPU_UTILIZATION,
stats.getJvmCpuLoad() * HUNDRED_D);
-
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.SYSTEM_CPU_UTILIZATION,
stats.getSystemCpuUtilization() * HUNDRED_D);
+ setMetricValue(AbfsReadResourceUtilizationMetricsEnum.JVM_CPU_UTILIZATION,
stats.getJvmCpuLoad());
+
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.SYSTEM_CPU_UTILIZATION,
stats.getSystemCpuUtilization());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.AVAILABLE_MEMORY,
stats.getMemoryUtilization());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.COMMITTED_MEMORY,
stats.getCommittedHeapGB());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.USED_MEMORY,
stats.getUsedHeapGB());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MAX_HEAP_MEMORY,
stats.getMaxHeapGB());
- setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MEMORY_LOAD,
stats.getMemoryLoad() * HUNDRED_D);
+ setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MEMORY_LOAD,
stats.getMemoryLoad());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.LAST_SCALE_DIRECTION,
stats.getLastScaleDirectionNumeric(stats.getLastScaleDirection()));
- setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MAX_CPU_UTILIZATION,
stats.getMaxCpuUtilization() * HUNDRED_D);
+ setMetricValue(AbfsReadResourceUtilizationMetricsEnum.MAX_CPU_UTILIZATION,
stats.getMaxCpuUtilization());
setMetricValue(AbfsReadResourceUtilizationMetricsEnum.JVM_PROCESS_ID,
stats.getJvmProcessId());
markUpdated();
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsWriteResourceUtilizationMetrics.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsWriteResourceUtilizationMetrics.java
index bace4421d30..6c44c20b31b 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsWriteResourceUtilizationMetrics.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsWriteResourceUtilizationMetrics.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.fs.azurebfs.services;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
+import java.util.concurrent.atomic.AtomicLong;
import
org.apache.hadoop.fs.azurebfs.enums.AbfsWriteResourceUtilizationMetricsEnum;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
@@ -36,6 +36,17 @@ public class AbfsWriteResourceUtilizationMetrics
extends
AbstractAbfsResourceUtilizationMetrics<AbfsWriteResourceUtilizationMetricsEnum>
{
+ /**
+ * A version counter incremented each time a metric update occurs.
+ * Used to detect whether metrics have changed since the last serialization.
+ */
+ private final AtomicLong updateVersion = new AtomicLong(0);
+
+ /**
+ * The last version number that was serialized and pushed out.
+ */
+ private final AtomicLong lastPushedVersion = new AtomicLong(0);
+
/**
* Creates a metrics set for write operations, pre-initializing
* all metric keys defined in {@link
AbfsWriteResourceUtilizationMetricsEnum}.
@@ -44,6 +55,34 @@ public AbfsWriteResourceUtilizationMetrics() {
super(AbfsWriteResourceUtilizationMetricsEnum.values(),
FSOperationType.WRITE.toString());
}
+ @Override
+ protected boolean isUpdated() {
+ return updateVersion.get() > lastPushedVersion.get();
+ }
+
+ protected void markUpdated() {
+ updateVersion.incrementAndGet();
+ }
+
+ @Override
+ protected long getUpdateVersion() {
+ return updateVersion.get();
+ }
+
+ @Override
+ protected long getLastPushedVersion() {
+ return lastPushedVersion.get();
+ }
+
+ /**
+ * Marks the current metrics version as pushed.
+ * Must be called only after the metrics string is actually emitted.
+ */
+ @Override
+ public synchronized void markPushed() {
+ lastPushedVersion.set(updateVersion.get());
+ }
+
/**
* Updates all write-thread-pool metrics using the latest stats snapshot.
* Each field in {@link WriteThreadPoolSizeManager.WriteThreadPoolStats}
@@ -60,19 +99,18 @@ public synchronized void
update(WriteThreadPoolSizeManager.WriteThreadPoolStats
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MAX_POOL_SIZE,
stats.getMaxPoolSize());
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.ACTIVE_THREADS,
stats.getActiveThreads());
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.IDLE_THREADS,
stats.getIdleThreads());
-
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.JVM_CPU_UTILIZATION,
stats.getJvmCpuLoad() * HUNDRED_D);
-
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.SYSTEM_CPU_UTILIZATION,
stats.getSystemCpuUtilization() * HUNDRED_D);
+
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.JVM_CPU_UTILIZATION,
stats.getJvmCpuLoad());
+
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.SYSTEM_CPU_UTILIZATION,
stats.getSystemCpuUtilization());
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.AVAILABLE_MEMORY,
stats.getMemoryUtilization());
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.COMMITTED_MEMORY,
stats.getCommittedHeapGB());
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.USED_MEMORY,
stats.getUsedHeapGB());
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MAX_HEAP_MEMORY,
stats.getMaxHeapGB());
- setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MEMORY_LOAD,
stats.getMemoryLoad() * HUNDRED_D);
+ setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MEMORY_LOAD,
stats.getMemoryLoad());
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.LAST_SCALE_DIRECTION,
stats.getLastScaleDirectionNumeric(stats.getLastScaleDirection()));
-
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MAX_CPU_UTILIZATION,
stats.getMaxCpuUtilization() * HUNDRED_D);
+
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.MAX_CPU_UTILIZATION,
stats.getMaxCpuUtilization());
setMetricValue(AbfsWriteResourceUtilizationMetricsEnum.JVM_PROCESS_ID,
stats.getJvmProcessId());
markUpdated();
}
}
-
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbstractAbfsResourceUtilizationMetrics.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbstractAbfsResourceUtilizationMetrics.java
index 2e5a172e624..4a080d07214 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbstractAbfsResourceUtilizationMetrics.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbstractAbfsResourceUtilizationMetrics.java
@@ -19,19 +19,19 @@
package org.apache.hadoop.fs.azurebfs.services;
import java.util.Arrays;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import
org.apache.hadoop.fs.azurebfs.enums.AbfsReadResourceUtilizationMetricsEnum;
import org.apache.hadoop.fs.azurebfs.enums.StatisticTypeEnum;
import org.apache.hadoop.fs.azurebfs.enums.AbfsResourceUtilizationMetricsEnum;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_EQUALS;
import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
+import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
import static
org.apache.hadoop.fs.azurebfs.constants.MetricsConstants.CHAR_DOLLAR;
import static
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
@@ -47,21 +47,10 @@ public abstract class
AbstractAbfsResourceUtilizationMetrics<T extends Enum<T> &
private static final Logger LOG = LoggerFactory.getLogger(
AbstractAbfsResourceUtilizationMetrics.class);
- /**
- * Tracks whether any metric has been updated at least once.
- */
- private final AtomicBoolean updatedAtLeastOnce = new AtomicBoolean(false);
-
- /**
- * A version counter incremented each time a metric update occurs.
- * Used to detect whether metrics have changed since the last serialization.
- */
- private final AtomicLong updateVersion = new AtomicLong(0);
-
- /**
- * The last version number that was serialized and pushed out.
- */
- private final AtomicLong lastPushedVersion = new AtomicLong(-1);
+ protected abstract boolean isUpdated();
+ protected abstract long getUpdateVersion();
+ protected abstract long getLastPushedVersion();
+ public abstract void markPushed();
/**
* The set of metrics supported by this metrics instance.
@@ -128,56 +117,65 @@ protected void setMetricValue(T metric, double value) {
}
}
- /**
- * Marks that a metric update has occurred.
- * Increments the version so consumers know that new data is available.
- */
- protected void markUpdated() {
- updatedAtLeastOnce.set(true);
- updateVersion.incrementAndGet();
- }
-
- /**
- * Returns a flag indicating whether any metric has been updated since
initialization.
- *
- * @return the {@link AtomicBoolean} tracking whether at least one update
occurred
- */
- public boolean getUpdatedAtLeastOnce() {
- return updatedAtLeastOnce.get();
- }
-
- /**
- * Serializes the current metrics to a compact string format suitable for
logs.
- * @return a serialized metrics string or an empty string if no updates
occurred
- */
@Override
public String toString() {
- if (!updatedAtLeastOnce.get()) {
+ if (!isUpdated()) {
return EMPTY_STRING;
}
- long currentVersion = updateVersion.get();
- if (currentVersion == lastPushedVersion.get()) {
+ long currentVersion = getUpdateVersion();
+ if (currentVersion == getLastPushedVersion()) {
return EMPTY_STRING;
}
- synchronized (this) {
- if (currentVersion == lastPushedVersion.get()) {
- return EMPTY_STRING;
- }
+ StringBuilder sb = new StringBuilder(operationType).append(CHAR_EQUALS);
- StringBuilder sb = new StringBuilder(operationType).append(CHAR_EQUALS);
+ for (T metric : metrics) {
+ long value = lookupGaugeValue(metric.getName());
- for (T metric : metrics) {
+ if (isMemoryMetric(metric.getName())) {
sb.append(metric.getName())
.append(CHAR_EQUALS)
- .append(lookupGaugeValue(metric.getName()))
+ .append(convertMemoryValue(value))
+ .append(CHAR_DOLLAR);
+ } else {
+ sb.append(metric.getName())
+ .append(CHAR_EQUALS)
+ .append(value)
.append(CHAR_DOLLAR);
}
-
- lastPushedVersion.set(currentVersion);
- return sb.toString();
}
+
+ return sb.toString();
+ }
+
+ /**
+ * Determines whether the given metric represents a JVM heap memory metric.
+ *
+ * <p>Memory metrics are identified by their names as defined in
+ * {@link AbfsReadResourceUtilizationMetricsEnum}.</p>
+ *
+ * @param metricName the metric name
+ * @return {@code true} if the metric is a memory metric, {@code false}
otherwise
+ */
+ private boolean isMemoryMetric(String metricName) {
+ return metricName.equals(
+ AbfsReadResourceUtilizationMetricsEnum.AVAILABLE_MEMORY.getName())
+ || metricName.equals(
+ AbfsReadResourceUtilizationMetricsEnum.COMMITTED_MEMORY.getName())
+ || metricName.equals(
+ AbfsReadResourceUtilizationMetricsEnum.USED_MEMORY.getName())
+ || metricName.equals(
+ AbfsReadResourceUtilizationMetricsEnum.MAX_HEAP_MEMORY.getName());
}
-}
+ /**
+ * Converts a raw memory metric value to a decimal representation for
logging.
+ *
+ * @param value the raw memory value
+ * @return the converted memory value
+ */
+ private double convertMemoryValue(long value) {
+ return value / HUNDRED_D;
+ }
+}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java
index 98ef0bc1a19..6877d5a03a9 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureBlobIngressHandler.java
@@ -128,7 +128,11 @@ protected AbfsRestOperation remoteWrite(AbfsBlock
blockToUpload,
// Fetches write thread pool metrics from the ABFS client and adds them
to the tracing context.
AbfsWriteResourceUtilizationMetrics writeResourceUtilizationMetrics =
getWriteResourceUtilizationMetrics();
if (writeResourceUtilizationMetrics != null) {
-
tracingContextAppend.setResourceUtilizationMetricResults(writeResourceUtilizationMetrics.toString());
+ String writeMetrics = writeResourceUtilizationMetrics.toString();
+ tracingContextAppend.setResourceUtilizationMetricResults(writeMetrics);
+ if (!writeMetrics.isEmpty()) {
+ writeResourceUtilizationMetrics.markPushed();
+ }
}
try {
LOG.trace("Starting remote write for block with ID {} and offset {}",
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java
index ccf50808eb5..bc5e6b7c521 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDFSIngressHandler.java
@@ -120,7 +120,11 @@ protected AbfsRestOperation remoteWrite(AbfsBlock
blockToUpload,
// Fetches write thread pool metrics from the ABFS client and adds them to
the tracing context.
AbfsWriteResourceUtilizationMetrics writeResourceUtilizationMetrics =
getWriteResourceUtilizationMetrics();
if (writeResourceUtilizationMetrics != null) {
-
tracingContextAppend.setResourceUtilizationMetricResults(writeResourceUtilizationMetrics.toString());
+ String writeMetrics = writeResourceUtilizationMetrics.toString();
+ tracingContextAppend.setResourceUtilizationMetricResults(writeMetrics);
+ if (!writeMetrics.isEmpty()) {
+ writeResourceUtilizationMetrics.markPushed();
+ }
}
String threadIdStr = String.valueOf(Thread.currentThread().getId());
if (tracingContextAppend.getIngressHandler().equals(EMPTY_STRING)) {
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java
index 4664f1fb26e..8db94cc3e4c 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AzureDfsToBlobIngressFallbackHandler.java
@@ -113,7 +113,11 @@ protected AbfsRestOperation remoteWrite(AbfsBlock
blockToUpload,
// Fetches write thread pool metrics from the ABFS client and adds them to
the tracing context.
AbfsWriteResourceUtilizationMetrics writeResourceUtilizationMetrics =
getWriteResourceUtilizationMetrics();
if (writeResourceUtilizationMetrics != null) {
-
tracingContextAppend.setResourceUtilizationMetricResults(writeResourceUtilizationMetrics.toString());
+ String writeMetrics = writeResourceUtilizationMetrics.toString();
+ tracingContextAppend.setResourceUtilizationMetricResults(writeMetrics);
+ if (!writeMetrics.isEmpty()) {
+ writeResourceUtilizationMetrics.markPushed();
+ }
}
String threadIdStr = String.valueOf(Thread.currentThread().getId());
tracingContextAppend.setIngressHandler(FALLBACK_APPEND + " T " +
threadIdStr);
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
index d64c7246c5b..071c1b16849 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.fs.azurebfs.services;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
+import org.apache.hadoop.fs.azurebfs.JvmUniqueIdProvider;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;
import java.io.IOException;
@@ -51,7 +52,6 @@
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_NO_UP_AT_MAX;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SCALE_DIRECTION_UP;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO_D;
/**
* The Improved Read Buffer Manager for Rest AbfsClient.
@@ -68,7 +68,7 @@ public final class ReadBufferManagerV2 extends
ReadBufferManager {
private static int cpuMonitoringIntervalInMilliSec;
- private static double cpuThreshold;
+ private static long cpuThreshold;
private static int threadPoolUpscalePercentage;
@@ -94,7 +94,7 @@ public final class ReadBufferManagerV2 extends
ReadBufferManager {
private static int memoryMonitoringIntervalInMilliSec;
- private static double memoryThreshold;
+ private static long memoryThreshold;
private final AtomicInteger numberOfActiveBuffers = new AtomicInteger(0);
@@ -116,7 +116,7 @@ public final class ReadBufferManagerV2 extends
ReadBufferManager {
/* Tracks the last scale direction applied, or empty if none. */
private volatile String lastScaleDirection = EMPTY_STRING;
/* Maximum CPU utilization observed during the monitoring interval. */
- private volatile double maxJvmCpuUtilization = 0.0;
+ private volatile long maxJvmCpuUtilization = 0L;
/**
* Private constructor to prevent instantiation as this needs to be
singleton.
@@ -171,8 +171,7 @@ public static void setReadBufferManagerConfigs(final int
readAheadBlockSize,
maxThreadPoolSize =
abfsConfiguration.getMaxReadAheadV2ThreadPoolSize();
cpuMonitoringIntervalInMilliSec
= abfsConfiguration.getReadAheadV2CpuMonitoringIntervalMillis();
- cpuThreshold =
abfsConfiguration.getReadAheadV2CpuUsageThresholdPercent()
- / HUNDRED_D;
+ cpuThreshold =
abfsConfiguration.getReadAheadV2CpuUsageThresholdPercent();
threadPoolUpscalePercentage
= abfsConfiguration.getReadAheadV2ThreadPoolUpscalePercentage();
threadPoolDownscalePercentage
@@ -185,8 +184,7 @@ public static void setReadBufferManagerConfigs(final int
readAheadBlockSize,
memoryMonitoringIntervalInMilliSec
=
abfsConfiguration.getReadAheadV2MemoryMonitoringIntervalMillis();
memoryThreshold =
- abfsConfiguration.getReadAheadV2MemoryUsageThresholdPercent()
- / HUNDRED_D;
+ abfsConfiguration.getReadAheadV2MemoryUsageThresholdPercent();
setThresholdAgeMilliseconds(
abfsConfiguration.getReadAheadV2CachedBufferTTLMillis());
isDynamicScalingEnabled
@@ -854,7 +852,7 @@ private boolean manualEviction(final ReadBuffer buf) {
*/
private void adjustThreadPool() {
int currentPoolSize = workerRefs.size();
- double cpuLoad = ResourceUtilizationUtils.getJvmCpuLoad();
+ long cpuLoad = ResourceUtilizationUtils.getJvmCpuLoad();
if (cpuLoad > maxJvmCpuUtilization) {
maxJvmCpuUtilization = cpuLoad;
}
@@ -1096,7 +1094,7 @@ public ScheduledExecutorService getCpuMonitoringThread() {
* @return the highest JVM CPU utilization percentage recorded
*/
@VisibleForTesting
- public double getMaxJvmCpuUtilization() {
+ public long getMaxJvmCpuUtilization() {
return maxJvmCpuUtilization;
}
@@ -1168,10 +1166,10 @@ public static class ReadThreadPoolStats extends
ResourceUtilizationStats {
*/
public ReadThreadPoolStats(int currentPoolSize,
int maxPoolSize, int activeThreads, int idleThreads,
- double jvmCpuLoad,
- double systemCpuUtilization, double availableHeapGB,
- double committedHeapGB, double usedHeapGB, double maxHeapGB, double
memoryLoad,
- String lastScaleDirection, double maxCpuUtilization, long
jvmProcessId) {
+ long jvmCpuLoad,
+ long systemCpuUtilization, long availableHeapGB,
+ long committedHeapGB, long usedHeapGB, long maxHeapGB, long memoryLoad,
+ String lastScaleDirection, long maxCpuUtilization, long jvmProcessId) {
super(currentPoolSize, maxPoolSize, activeThreads, idleThreads,
jvmCpuLoad, systemCpuUtilization, availableHeapGB,
committedHeapGB, usedHeapGB, maxHeapGB, memoryLoad,
lastScaleDirection,
@@ -1189,10 +1187,10 @@ public ReadThreadPoolStats(int currentPoolSize,
* @return a {@link ReadThreadPoolStats} object containing the current
thread pool
* and system resource statistics
*/
- synchronized ReadThreadPoolStats getCurrentStats(double jvmCpuLoad) {
+ synchronized ReadThreadPoolStats getCurrentStats(long jvmCpuLoad) {
if (workerPool == null) {
- return new ReadThreadPoolStats(ZERO, ZERO, ZERO, ZERO, ZERO_D, ZERO_D,
- ZERO_D, ZERO_D, ZERO_D, ZERO_D, ZERO_D, EMPTY_STRING, ZERO_D, ZERO);
+ return new ReadThreadPoolStats(ZERO, ZERO, ZERO, ZERO, ZERO, ZERO,
+ ZERO, ZERO, ZERO, ZERO, ZERO, EMPTY_STRING, ZERO, ZERO);
}
ThreadPoolExecutor exec = this.workerPool;
@@ -1217,7 +1215,7 @@ synchronized ReadThreadPoolStats getCurrentStats(double
jvmCpuLoad) {
ResourceUtilizationUtils.getMemoryLoad(), //
used/max
currentScaleDirection, // "I", "D", or ""
getMaxJvmCpuUtilization(), // Peak JVM CPU usage so far,
- ResourceUtilizationUtils.getJvmProcessId() // JVM process
id.
+ JvmUniqueIdProvider.getJvmId() // JVM process id.
);
}
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java
index 4fa0712e765..6ddd2e9f6f2 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ResourceUtilizationStats.java
@@ -18,7 +18,7 @@
package org.apache.hadoop.fs.azurebfs.services;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.HUNDRED_D;
+import static
org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.NO_ACTION_NEEDED;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.NO_SCALE_DOWN_AT_MIN;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.NO_SCALE_UP_AT_MAX;
@@ -40,15 +40,15 @@ public abstract class ResourceUtilizationStats {
private final int maxPoolSize; // Maximum allowed pool size
private final int activeThreads; // Number of threads currently executing
tasks
private final int idleThreads; // Number of threads not executing
tasks
- private final double jvmCpuLoad; // Current JVM CPU utilization (%)
- private final double systemCpuUtilization; // Current system CPU utilization
(%)
- private final double availableHeapGB; // Available heap memory (GB)
- private final double committedHeapGB; // Total committed heap memory (GB)
- private final double usedHeapGB; // Used heap memory (GB)
- private final double maxHeapGB; // Max heap memory (GB)
- private final double memoryLoad; // Heap usage ratio (used/max)
- private final String lastScaleDirection; // Last resize direction: "I"
(increase) or "D" (decrease)
- private final double maxCpuUtilization; // Peak JVM CPU observed in the
current interval
+ private final long jvmCpuLoad; // Current JVM CPU utilization (%)
+ private final long systemCpuUtilization; // Current system CPU utilization
(%)
+ private final long availableHeapGB; // Available heap memory (GB)
+ private final long committedHeapGB; // Total committed heap memory (GB)
+ private final long usedHeapGB; // Used heap memory (GB)
+ private final long maxHeapGB; // Max heap memory (GB)
+ private final long memoryLoad; // Heap usage ratio (used/max)
+ private String lastScaleDirection = EMPTY_STRING; // Last resize direction:
"I" (increase) or "D" (decrease)
+ private long maxCpuUtilization = 0L; // Peak JVM CPU observed in the current
interval
private final long jvmProcessId; // JVM Process ID
/**
@@ -73,9 +73,9 @@ public abstract class ResourceUtilizationStats {
*/
public ResourceUtilizationStats(int currentPoolSize,
int maxPoolSize, int activeThreads, int idleThreads,
- double jvmCpuLoad, double systemCpuUtilization, double availableHeapGB,
- double committedHeapGB, double usedHeapGB, double maxHeapGB, double
memoryLoad, String lastScaleDirection,
- double maxCpuUtilization, long jvmProcessId) {
+ long jvmCpuLoad, long systemCpuUtilization, long availableHeapGB,
+ long committedHeapGB, long usedHeapGB, long maxHeapGB, long memoryLoad,
String lastScaleDirection,
+ long maxCpuUtilization, long jvmProcessId) {
this.currentPoolSize = currentPoolSize;
this.maxPoolSize = maxPoolSize;
this.activeThreads = activeThreads;
@@ -113,32 +113,32 @@ public int getIdleThreads() {
}
/** @return the overall system CPU utilization percentage. */
- public double getSystemCpuUtilization() {
+ public long getSystemCpuUtilization() {
return systemCpuUtilization;
}
/** @return the available heap memory in gigabytes. */
- public double getMemoryUtilization() {
+ public long getMemoryUtilization() {
return availableHeapGB;
}
/** @return the total committed heap memory in gigabytes */
- public double getCommittedHeapGB() {
+ public long getCommittedHeapGB() {
return committedHeapGB;
}
/** @return the used heap memory in gigabytes */
- public double getUsedHeapGB() {
+ public long getUsedHeapGB() {
return usedHeapGB;
}
/** @return the max heap memory in gigabytes */
- public double getMaxHeapGB() {
+ public long getMaxHeapGB() {
return maxHeapGB;
}
/** @return the current JVM memory load (used / committed) as a value
between 0.0 and 1.0 */
- public double getMemoryLoad() {
+ public long getMemoryLoad() {
return memoryLoad;
}
@@ -148,12 +148,12 @@ public String getLastScaleDirection() {
}
/** @return the JVM process CPU utilization percentage. */
- public double getJvmCpuLoad() {
+ public long getJvmCpuLoad() {
return jvmCpuLoad;
}
/** @return the max JVM process CPU utilization percentage. */
- public double getMaxCpuUtilization() {
+ public long getMaxCpuUtilization() {
return maxCpuUtilization;
}
@@ -190,13 +190,13 @@ public int getLastScaleDirectionNumeric(String
lastScaleDirection) {
public String toString() {
return String.format(
"currentPoolSize=%d, maxPoolSize=%d, activeThreads=%d, idleThreads=%d,
"
- + "jvmCpuLoad=%.2f%%, systemCpuUtilization=%.2f%%, "
- + "availableHeap=%.2fGB, committedHeap=%.2fGB, memoryLoad=%.2f%%, "
- + "scaleDirection=%s, maxCpuUtilization=%.2f%%, jvmProcessId=%d",
+ + "jvmCpuLoad=%d, systemCpuUtilization=%d, "
+ + "availableHeap=%d, committedHeap=%d, memoryLoad=%d, "
+ + "scaleDirection=%s, maxCpuUtilization=%d, jvmProcessId=%d",
currentPoolSize, maxPoolSize, activeThreads,
- idleThreads, jvmCpuLoad * HUNDRED_D, systemCpuUtilization * HUNDRED_D,
- availableHeapGB, committedHeapGB, memoryLoad * HUNDRED_D,
- lastScaleDirection, maxCpuUtilization * HUNDRED_D, jvmProcessId
+ idleThreads, jvmCpuLoad, systemCpuUtilization,
+ availableHeapGB, committedHeapGB, memoryLoad,
+ lastScaleDirection, maxCpuUtilization, jvmProcessId
);
}
}
diff --git
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/ResourceUtilizationUtils.java
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/ResourceUtilizationUtils.java
index c151a483b18..c6bda87c3a0 100644
---
a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/ResourceUtilizationUtils.java
+++
b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/ResourceUtilizationUtils.java
@@ -27,139 +27,140 @@
import org.apache.hadoop.classification.VisibleForTesting;
import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.BYTES_PER_GIGABYTE;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO;
-import static
org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO_D;
/**
* Utility class for retrieving JVM- and system-level resource utilization
* metrics such as CPU load, memory usage, and available heap memory.
+ * All metrics are returned as long values with 2-decimal precision stored as
integer (scaled by 100).
*/
public final class ResourceUtilizationUtils {
+ private static final long SCALE_FACTOR = 100L; // 2 decimal places
+
private ResourceUtilizationUtils() {
// Prevent instantiation
}
/**
- * Calculates the available heap memory in gigabytes.
- * This method uses {@link Runtime#getRuntime()} to obtain the maximum heap
memory
- * allowed for the JVM and subtracts the currently used memory (total - free)
- * to determine how much heap memory is still available.
- * The result is rounded up to the nearest gigabyte.
+ * Scales a double value by {@link #SCALE_FACTOR} to store 2-decimal
precision as long.
+ *
+ * @param value value to scale
+ * @return scaled long value
+ */
+ private static long scale(double value) {
+ return Math.round(value * SCALE_FACTOR);
+ }
+
+ /**
+ * Returns the available heap memory in gigabytes, calculated as the
difference between
+ * the committed heap and used heap memory.
+ * <p>
+ * The result is scaled by 100 for 2-decimal precision.
+ * </p>
*
- * @return the available heap memory in gigabytes
+ * @return available heap memory in GB (scaled by 100)
*/
public static long getAvailableHeapMemory() {
- MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
- MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
- long availableHeapBytes = memoryUsage.getCommitted() -
memoryUsage.getUsed();
- return (availableHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
+ MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
+ double gb = (mu.getCommitted() - mu.getUsed()) / (double)
BYTES_PER_GIGABYTE;
+ return scale(gb);
}
/**
- * Returns the currently committed JVM heap memory in bytes.
- * This reflects the amount of heap the JVM has reserved from the OS and may
grow as needed.
+ * Returns the JVM heap memory currently committed.
+ * <p>
+ * Committed memory is the amount of memory guaranteed to be available for
the JVM.
+ * </p>
*
- * @return committed heap memory in bytes
+ * @return committed heap memory in GB (scaled by 100)
*/
@VisibleForTesting
- public static double getCommittedHeapMemory() {
+ public static long getCommittedHeapMemory() {
MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
- return (double) memoryUsage.getCommitted() / BYTES_PER_GIGABYTE;
+ double gb = memoryUsage.getCommitted() / (double) BYTES_PER_GIGABYTE;
+ return scale(gb);
}
/**
- * Get the current CPU load of the system.
- * @return the CPU load as a double value between 0.0 and 1.0
+ * Returns the system-wide CPU load as a fraction (scaled by 100 for
2-decimal precision).
+ * <p>
+ * The value ranges between 0 (no load) and 100 (full load). Returns 0 if
CPU load cannot be obtained.
+ * </p>
+ *
+ * @return system CPU load (scaled by 100)
*/
@VisibleForTesting
- public static double getSystemCpuLoad() {
- OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(
- OperatingSystemMXBean.class);
+ public static long getSystemCpuLoad() {
+ OperatingSystemMXBean osBean =
ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
double cpuLoad = osBean.getSystemCpuLoad();
if (cpuLoad < 0) {
- // If the CPU load is not available, return 0.0
- return 0.0;
+ return 0L;
}
- return cpuLoad;
+ return scale(cpuLoad);
}
-
/**
- * Gets the current system CPU utilization.
+ * Returns the JVM process CPU load as a fraction (scaled by 100 for
2-decimal precision).
+ * <p>
+ * The value ranges between 0 (no load) and 100 (full CPU used by this
process). Returns 0 if CPU load cannot be obtained.
+ * </p>
*
- * @return the CPU utilization as a fraction (0.0 to 1.0), or 0.0 if
unavailable.
+ * @return JVM process CPU load (scaled by 100)
*/
@VisibleForTesting
- public static double getJvmCpuLoad() {
- OperatingSystemMXBean osBean = ManagementFactory.getPlatformMXBean(
- OperatingSystemMXBean.class);
+ public static long getJvmCpuLoad() {
+ OperatingSystemMXBean osBean =
ManagementFactory.getPlatformMXBean(OperatingSystemMXBean.class);
double cpuLoad = osBean.getProcessCpuLoad();
- if (cpuLoad < ZERO) {
- return ZERO_D;
+ if (cpuLoad < 0) {
+ return 0L;
}
- return cpuLoad;
+ return scale(cpuLoad);
}
/**
- * Get the current memory load of the JVM.
- * @return the memory load as a double value between 0.0 and 1.0
+ * Returns the heap memory usage as a fraction of max heap (scaled by 100).
+ *
+ * @return memory load (used/max heap) scaled by 100
*/
@VisibleForTesting
- public static double getMemoryLoad() {
+ public static long getMemoryLoad() {
MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
- return (double) memoryUsage.getUsed() / memoryUsage.getMax();
+ double memLoad = (double) memoryUsage.getUsed() / memoryUsage.getMax();
+ return scale(memLoad);
}
/**
- * Calculates the used heap memory in gigabytes.
- * This method returns the amount of heap memory currently used by the JVM.
- * The result is rounded up to the nearest gigabyte.
+ * Returns the currently used heap memory in gigabytes.
*
- * @return the used heap memory in gigabytes
+ * @return used heap memory in GB (scaled by 100)
*/
public static long getUsedHeapMemory() {
- MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
- MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
- long usedHeapBytes = memoryUsage.getUsed();
- return (usedHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
+ MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
+ double gb = mu.getUsed() / (double) BYTES_PER_GIGABYTE;
+ return scale(gb);
}
/**
- * Calculates the maximum heap memory allowed for the JVM in gigabytes.
- * This is the upper bound the JVM may expand its heap to.
+ * Returns the maximum heap memory that the JVM can use.
*
- * @return the maximum heap memory in gigabytes
+ * @return max heap memory in GB (scaled by 100)
*/
public static long getMaxHeapMemory() {
- MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
- MemoryUsage memoryUsage = osBean.getHeapMemoryUsage();
- long maxHeapBytes = memoryUsage.getMax();
- return (maxHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
- }
-
-
- /**
- * Returns the process ID (PID) of the currently running JVM.
- * This method uses {@link ProcessHandle#current()} to obtain the ID of the
- * Java process.
- *
- * @return the PID of the current JVM process
- */
- public static long getJvmProcessId() {
- return ProcessHandle.current().pid();
+ MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
+ double gb = mu.getMax() / (double) BYTES_PER_GIGABYTE;
+ return scale(gb);
}
/**
- * Calculates the available max heap memory in gigabytes.
- * This method uses {@link Runtime#getRuntime()} to obtain the maximum heap
memory
- * allowed for the JVM and subtracts the currently used memory (total - free)
- * to determine how much heap memory is still available.
- * The result is rounded up to the nearest gigabyte.
+ * Returns the available heap memory relative to the max heap.
+ * <p>
+ * This method calculates the difference between max heap and currently used
heap,
+ * then converts it to gigabytes rounded up.
+ * </p>
*
- * @return the available heap memory in gigabytes
+ * @return available heap memory in GB (rounded up)
*/
public static long getAvailableMaxHeapMemory() {
MemoryMXBean osBean = ManagementFactory.getMemoryMXBean();
@@ -168,3 +169,4 @@ public static long getAvailableMaxHeapMemory() {
return (availableHeapBytes + BYTES_PER_GIGABYTE - 1) / BYTES_PER_GIGABYTE;
}
}
+
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
index a332c30bc03..735b3f276ba 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestWriteThreadPoolSizeManager.java
@@ -26,9 +26,13 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
@@ -56,8 +60,8 @@
class TestWriteThreadPoolSizeManager extends AbstractAbfsIntegrationTest {
private AbfsConfiguration mockConfig;
- private static final double HIGH_CPU_UTILIZATION_THRESHOLD = 0.95;
- private static final double LOW_CPU_UTILIZATION_THRESHOLD = 0.05;
+ private static final long HIGH_CPU_UTILIZATION_THRESHOLD = 95;
+ private static final long LOW_CPU_UTILIZATION_THRESHOLD = 5;
private static final int LOW_MEMORY_USAGE_THRESHOLD_PERCENT = 100;
private static final int THREAD_SLEEP_DURATION_MS = 200;
private static final String TEST_FILE_PATH = "testFilePath";
@@ -75,8 +79,8 @@ class TestWriteThreadPoolSizeManager extends
AbstractAbfsIntegrationTest {
private static final int WAIT_DURATION_MS = 3000;
private static final int LATCH_TIMEOUT_SECONDS = 60;
private static final int RESIZE_WAIT_TIME_MS = 6_000;
- private static final double HIGH_CPU_USAGE_RATIO = 0.95;
- private static final double LOW_CPU_USAGE_RATIO = 0.05;
+ private static final long HIGH_CPU_USAGE_RATIO = 95;
+ private static final long LOW_CPU_USAGE_RATIO = 5;
private static final int SLEEP_DURATION_MS = 150;
private static final int AWAIT_TIMEOUT_SECONDS = 45;
private static final int RESIZER_JOIN_TIMEOUT_MS = 2_000;
@@ -178,7 +182,7 @@ void testAdjustThreadPoolSizeBasedOnLowCPU()
try (AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem) {
WriteThreadPoolSizeManager instance
= WriteThreadPoolSizeManager.getInstance(abfs.getFileSystemId(),
- getAbfsStore(abfs).getAbfsConfiguration(),
+ mockConfig,
abfs.getAbfsClient().getAbfsCounters());
ExecutorService executor = instance.getExecutorService();
int initialSize = ((ThreadPoolExecutor) executor).getMaximumPoolSize();
@@ -833,7 +837,11 @@ void testThreadPoolOnLowCpuLoadAndMetricsUpdate()
.getAbfsWriteResourceUtilizationMetrics();
WriteThreadPoolSizeManager.WriteThreadPoolStats statsBefore =
- instance.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad(),
ResourceUtilizationUtils.getMemoryLoad());
+ instance.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad(),
+ ResourceUtilizationUtils.getMemoryLoad(),
+ ResourceUtilizationUtils.getUsedHeapMemory(),
+ ResourceUtilizationUtils.getAvailableHeapMemory(),
+ ResourceUtilizationUtils.getCommittedHeapMemory());
ThreadPoolExecutor executor =
(ThreadPoolExecutor) instance.getExecutorService();
@@ -868,7 +876,11 @@ void testThreadPoolOnLowCpuLoadAndMetricsUpdate()
Thread.sleep(SLEEP_DURATION_30S_MS);
WriteThreadPoolSizeManager.WriteThreadPoolStats statsAfter =
- instance.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad(),
ResourceUtilizationUtils.getMemoryLoad());
+ instance.getCurrentStats(ResourceUtilizationUtils.getJvmCpuLoad(),
+ ResourceUtilizationUtils.getMemoryLoad(),
+ ResourceUtilizationUtils.getUsedHeapMemory(),
+ ResourceUtilizationUtils.getAvailableHeapMemory(),
+ ResourceUtilizationUtils.getCommittedHeapMemory());
//--- Validate that metrics and stats changed ---
Assertions.assertThat(statsAfter)
@@ -898,5 +910,54 @@ void testThreadPoolOnLowCpuLoadAndMetricsUpdate()
instance.close();
}
}
+
+ /**
+ * Verifies that the JVM identifier is initialized once and remains
+ * constant across multiple invocations within the same JVM process.
+ */
+ @Test
+ public void testJvmIdIsSingletonWithinJvm() {
+ int firstId = JvmUniqueIdProvider.getJvmId();
+ int secondId = JvmUniqueIdProvider.getJvmId();
+ int thirdId = JvmUniqueIdProvider.getJvmId();
+
+ assertEquals(firstId, secondId,
+ "Subsequent calls to getJvmId() should return the same value");
+ assertEquals(secondId, thirdId,
+ "JVM-scoped identifier must remain constant for the lifetime of the
JVM");
+ }
+
+ /**
+ * Verifies that the JVM identifier is safely shared across multiple threads
+ * and that concurrent access returns the same value.
+ *
+ * <p>This test ensures that static initialization of the identifier is
+ * thread-safe and occurs only once per JVM.</p>
+ */
+ @Test
+ public void testJvmIdIsSameAcrossThreads()
+ throws ExecutionException, InterruptedException {
+
+ ExecutorService executor = Executors.newFixedThreadPool(4);
+
+ try {
+ Callable<Integer> task = JvmUniqueIdProvider::getJvmId;
+ Future<Integer> f1 = executor.submit(task);
+ Future<Integer> f2 = executor.submit(task);
+ Future<Integer> f3 = executor.submit(task);
+ Future<Integer> f4 = executor.submit(task);
+
+ int expectedId = f1.get();
+ assertEquals(expectedId, f2.get(),
+ "JVM ID should be identical when accessed from different threads");
+ assertEquals(expectedId, f3.get(),
+ "JVM ID should be identical when accessed concurrently");
+ assertEquals(expectedId, f4.get(),
+ "JVM ID should be initialized once and shared across all threads");
+ } finally {
+ executor.shutdown();
+ executor.awaitTermination(5, TimeUnit.SECONDS);
+ }
+ }
}
diff --git
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
index 77ae7ff71df..e94c535bd39 100644
---
a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
+++
b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestReadBufferManagerV2.java
@@ -342,7 +342,7 @@ public void testReadMetricUpdation() throws Exception {
.as("Thread pool stats should update after CPU load")
.isNotEqualTo(statsBefore);
- boolean updatedMetrics = metrics.getUpdatedAtLeastOnce();
+ boolean updatedMetrics = metrics.isUpdated();
Assertions.assertThat(updatedMetrics)
.as("Metrics should be updated at least once after CPU load")
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]