This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 22efb4865e1 KAFKA-19791; Add Idle Thread Ratio Metric to 
MetadataLoader (#20724)
22efb4865e1 is described below

commit 22efb4865e1368ac8f626ee7a112093632be1f1c
Author: Mahsa Seifikar <[email protected]>
AuthorDate: Wed Oct 29 12:55:04 2025 -0400

    KAFKA-19791; Add Idle Thread Ratio Metric to MetadataLoader (#20724)
    
    This change adds the metric MetadataLoader::AvgIdleRatio, following the
    same pattern as the existing ControllerEventManager::AvgIdleRatio
    metric.
    
    This metric measures the average idle ratio of the metadata loader event
    queue thread, indicating how much time the metadata loader spends
    waiting for events versus actively processing them. The metric value
    ranges from 0.0 (always busy processing) to 1.0 (always idle waiting for
    events).
    
    Reviewers: José Armando García Sancio <[email protected]>
---
 .../src/main/scala/kafka/server/SharedServer.scala |  6 ++--
 docs/upgrade.html                                  |  4 +--
 .../metrics/QuorumControllerMetrics.java           |  4 +--
 .../apache/kafka/image/loader/MetadataLoader.java  |  8 +++--
 .../loader/metrics/MetadataLoaderMetrics.java      | 22 +++++++++++-
 .../metrics/QuorumControllerMetricsTest.java       |  6 ++--
 .../loader/metrics/MetadataLoaderMetricsTest.java  | 39 ++++++++++++++++++++--
 .../org/apache/kafka/queue/KafkaEventQueue.java    | 18 ++++++----
 .../apache/kafka/queue/KafkaEventQueueTest.java    |  2 +-
 9 files changed, 86 insertions(+), 23 deletions(-)

diff --git a/core/src/main/scala/kafka/server/SharedServer.scala 
b/core/src/main/scala/kafka/server/SharedServer.scala
index aba9035cb7e..9c245765569 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -301,12 +301,14 @@ class SharedServer(
 
         nodeMetrics = new NodeMetrics(metrics, 
controllerConfig.unstableFeatureVersionsEnabled)
         metadataLoaderMetrics = if (brokerMetrics != null) {
-          new 
MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
+          new MetadataLoaderMetrics(
+            Optional.of(KafkaYammerMetrics.defaultRegistry()),
             elapsedNs => brokerMetrics.updateBatchProcessingTime(elapsedNs),
             batchSize => brokerMetrics.updateBatchSize(batchSize),
             brokerMetrics.lastAppliedImageProvenance)
         } else {
-          new 
MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
+          new MetadataLoaderMetrics(
+            Optional.of(KafkaYammerMetrics.defaultRegistry()),
             _ => {},
             _ => {},
             new AtomicReference[MetadataProvenance](MetadataProvenance.EMPTY))
diff --git a/docs/upgrade.html b/docs/upgrade.html
index c0d22cd0fbf..36c1ca6a929 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -190,8 +190,8 @@
         For further details, please refer to <a 
href="https://cwiki.apache.org/confluence/x/6oqMEw";>KIP-1100</a>.
     </li>
     <li>
-        A new metric <code>AvgIdleRatio</code> has been added to the 
<code>ControllerEventManager</code> group. This metric measures the average 
idle ratio of the controller event queue thread,
-        providing visibility into how much time the controller spends waiting 
for events versus processing them. The metric value ranges from 0.0 (always 
busy) to 1.0 (always idle).
+        A new metric <code>AvgIdleRatio</code> has been added to the 
<code>ControllerEventManager</code> and <code>MetadataLoader</code> groups. 
These metrics measure the average idle ratio of their respective event queue 
threads,
+        providing visibility into how much time each component spends waiting 
for events versus processing them. The metric value ranges from 0.0 (always 
busy) to 1.0 (always idle).
     </li>
     <li>
         Deprecated 
<code>org.apache.kafka.streams.KafkaStreams$CloseOptions</code> and its related 
methods, such as
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
index 4a251faafc4..9593950d138 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
@@ -173,9 +173,9 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
         }));
     }
 
-    public void updateIdleTime(long idleDurationMs) {
+    public void updateIdleTime(long idleDurationMs, long currentTimeMs) {
         synchronized (avgIdleTimeRatio) {
-            avgIdleTimeRatio.record((double) idleDurationMs, 
time.milliseconds());
+            avgIdleTimeRatio.record((double) idleDurationMs, currentTimeMs);
         }
     }
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java 
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index a1513a0c4c0..f0b7b004b15 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -120,7 +120,8 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
                 throw new RuntimeException("You must set the high water mark 
accessor.");
             }
             if (metrics == null) {
-                metrics = new MetadataLoaderMetrics(Optional.empty(),
+                metrics = new MetadataLoaderMetrics(
+                    Optional.empty(),
                     __ -> { },
                     __ -> { },
                     new AtomicReference<>(MetadataProvenance.EMPTY));
@@ -217,10 +218,11 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
             faultHandler,
             this::maybePublishMetadata);
         this.eventQueue = new KafkaEventQueue(
-            Time.SYSTEM,
+            time,
             logContext,
             threadNamePrefix + "metadata-loader-",
-            new ShutdownEvent());
+            new ShutdownEvent(),
+            metrics::updateIdleTime);
     }
 
     // VisibleForTesting
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java
 
b/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java
index a819e9230a5..4e4931cf94c 100644
--- 
a/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java
+++ 
b/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java
@@ -21,6 +21,7 @@ import org.apache.kafka.image.MetadataProvenance;
 import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+import org.apache.kafka.server.metrics.TimeRatio;
 
 import com.yammer.metrics.core.Gauge;
 import com.yammer.metrics.core.MetricName;
@@ -47,6 +48,8 @@ public final class MetadataLoaderMetrics implements 
AutoCloseable {
         "MetadataLoader", "HandleLoadSnapshotCount");
     private static final MetricName CURRENT_CONTROLLER_ID = getMetricName(
         "MetadataLoader", "CurrentControllerId");
+    private static final MetricName AVERAGE_IDLE_RATIO = getMetricName(
+        "MetadataLoader", "AvgIdleRatio");
     private static final String FINALIZED_LEVEL_METRIC_NAME = "FinalizedLevel";
     private static final String FEATURE_NAME_TAG = "featureName";
 
@@ -59,6 +62,7 @@ public final class MetadataLoaderMetrics implements 
AutoCloseable {
     private final Consumer<Long> batchProcessingTimeNsUpdater;
     private final Consumer<Integer> batchSizesUpdater;
     private final AtomicReference<MetadataProvenance> lastAppliedProvenance;
+    private final TimeRatio avgIdleTimeRatio;
 
     /**
      * Create a new LoaderMetrics object.
@@ -78,6 +82,7 @@ public final class MetadataLoaderMetrics implements 
AutoCloseable {
         this.batchProcessingTimeNsUpdater = batchProcessingTimeNsUpdater;
         this.batchSizesUpdater = batchSizesUpdater;
         this.lastAppliedProvenance = lastAppliedProvenance;
+        this.avgIdleTimeRatio = new TimeRatio(1);
         registry.ifPresent(r -> r.newGauge(CURRENT_METADATA_VERSION, new 
Gauge<Integer>() {
             @Override
             public Integer value() {
@@ -96,6 +101,20 @@ public final class MetadataLoaderMetrics implements 
AutoCloseable {
                 return handleLoadSnapshotCount();
             }
         }));
+        registry.ifPresent(r -> r.newGauge(AVERAGE_IDLE_RATIO, new 
Gauge<Double>() {
+            @Override
+            public Double value() {
+                synchronized (avgIdleTimeRatio) {
+                    return avgIdleTimeRatio.measure();
+                }
+            }
+        }));
+    }
+
+    public void updateIdleTime(long idleDurationMs, long currentTimeMs) {
+        synchronized (avgIdleTimeRatio) {
+            avgIdleTimeRatio.record((double) idleDurationMs, currentTimeMs);
+        }
     }
 
     private void addFinalizedFeatureLevelMetric(String featureName) {
@@ -223,7 +242,8 @@ public final class MetadataLoaderMetrics implements 
AutoCloseable {
         registry.ifPresent(r -> List.of(
             CURRENT_METADATA_VERSION,
             CURRENT_CONTROLLER_ID,
-            HANDLE_LOAD_SNAPSHOT_COUNT
+            HANDLE_LOAD_SNAPSHOT_COUNT,
+            AVERAGE_IDLE_RATIO
         ).forEach(r::removeMetric));
         for (var featureName : finalizedFeatureLevels.keySet()) {
             removeFinalizedFeatureLevelMetric(featureName);
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
index 4aa50a561df..8842e814b71 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
@@ -203,14 +203,14 @@ public class QuorumControllerMetricsTest {
 
             // First recording is dropped to establish the interval start time
             // This is because TimeRatio needs an initial timestamp to measure 
intervals from
-            metrics.updateIdleTime(10);
+            metrics.updateIdleTime(10, time.milliseconds());
             time.sleep(40);
-            metrics.updateIdleTime(20);
+            metrics.updateIdleTime(20, time.milliseconds());
             // avgIdleRatio = (20ms idle) / (40ms interval) = 0.5
             assertEquals(0.5, avgIdleRatio.value(), delta);
 
             time.sleep(20);
-            metrics.updateIdleTime(1);
+            metrics.updateIdleTime(1, time.milliseconds());
             // avgIdleRatio = (1ms idle) / (20ms interval) = 0.05
             assertEquals(0.05, avgIdleRatio.value(), delta);
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
 
b/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
index 3acf3be23dd..fcbc2714cf6 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.image.loader.metrics;
 
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.controller.metrics.ControllerMetricsTestUtils;
 import org.apache.kafka.image.MetadataProvenance;
 import org.apache.kafka.server.common.KRaftVersion;
@@ -44,6 +45,7 @@ public class MetadataLoaderMetricsTest {
         final AtomicInteger batchSize = new AtomicInteger(0);
         final AtomicReference<MetadataProvenance> provenance =
             new AtomicReference<>(MetadataProvenance.EMPTY);
+        final MockTime time = new MockTime();
         final MetadataLoaderMetrics metrics;
 
         FakeMetadataLoaderMetrics(MetricsRegistry registry) {
@@ -73,7 +75,8 @@ public class MetadataLoaderMetricsTest {
                     Set.of(
                         
"kafka.server:type=MetadataLoader,name=CurrentControllerId",
                         
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
-                        
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount"
+                        
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount",
+                        "kafka.server:type=MetadataLoader,name=AvgIdleRatio"
                     )
                 );
 
@@ -86,6 +89,7 @@ public class MetadataLoaderMetricsTest {
                         
"kafka.server:type=MetadataLoader,name=CurrentControllerId",
                         
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
                         
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount",
+                        "kafka.server:type=MetadataLoader,name=AvgIdleRatio",
                         
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=metadataVersion",
                         
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=kraftVersion"
                     )
@@ -180,7 +184,8 @@ public class MetadataLoaderMetricsTest {
                     Set.of(
                         
"kafka.server:type=MetadataLoader,name=CurrentControllerId",
                         
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
-                        
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount"
+                        
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount",
+                        "kafka.server:type=MetadataLoader,name=AvgIdleRatio"
                     )
                 );
 
@@ -213,6 +218,7 @@ public class MetadataLoaderMetricsTest {
                         
"kafka.server:type=MetadataLoader,name=CurrentControllerId",
                         
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
                         
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount",
+                        "kafka.server:type=MetadataLoader,name=AvgIdleRatio",
                         
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=metadataVersion",
                         
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=kraftVersion",
                         
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=transactionVersion"
@@ -227,6 +233,7 @@ public class MetadataLoaderMetricsTest {
                         
"kafka.server:type=MetadataLoader,name=CurrentControllerId",
                         
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
                         
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount",
+                        "kafka.server:type=MetadataLoader,name=AvgIdleRatio",
                         
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=kraftVersion",
                         
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=metadataVersion"
                     )
@@ -244,6 +251,7 @@ public class MetadataLoaderMetricsTest {
                         
"kafka.server:type=MetadataLoader,name=CurrentControllerId",
                         
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
                         
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount",
+                        "kafka.server:type=MetadataLoader,name=AvgIdleRatio",
                         
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=metadataVersion",
                         
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=kraftVersion",
                         
"kafka.server:type=MetadataLoader,name=FinalizedLevel,featureName=transactionVersion"
@@ -256,6 +264,33 @@ public class MetadataLoaderMetricsTest {
             registry.shutdown();
         }
     }
+    @Test
+    public void testAvgIdleRatio() {
+        final double delta = 0.001;
+        MetricsRegistry registry = new MetricsRegistry();
+        try (FakeMetadataLoaderMetrics fakeMetrics = new 
FakeMetadataLoaderMetrics(registry)) {
+            @SuppressWarnings("unchecked")
+            Gauge<Double> avgIdleRatio = (Gauge<Double>) 
registry.allMetrics().get(metricName("MetadataLoader", "AvgIdleRatio"));
+
+            // No idle time recorded yet; returns default ratio of 1.0
+            assertEquals(1.0, avgIdleRatio.value(), delta);
+
+            // The first updateIdleTime call is ignored by the TimeRatio 
sensor.
+            // This establishes the baseline timestamp for subsequent 
measurements.
+            fakeMetrics.metrics.updateIdleTime(10, 
fakeMetrics.time.milliseconds());
+            fakeMetrics.time.sleep(40);
+            fakeMetrics.metrics.updateIdleTime(20, 
fakeMetrics.time.milliseconds());
+            // avgIdleRatio = (20ms idle) / (40ms interval) = 0.5
+            assertEquals(0.5, avgIdleRatio.value(), delta);
+
+            fakeMetrics.time.sleep(20);
+            fakeMetrics.metrics.updateIdleTime(1, 
fakeMetrics.time.milliseconds());
+            // avgIdleRatio = (1ms idle) / (20ms interval) = 0.05
+            assertEquals(0.05, avgIdleRatio.value(), delta);
+        } finally {
+            registry.shutdown();
+        }
+    }
 
     private static MetricName metricName(String type, String name) {
         String mBeanName = String.format("kafka.server:type=%s,name=%s", type, 
name);
diff --git 
a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java 
b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
index ad2c916e3fc..ea55d85f919 100644
--- a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
+++ b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
@@ -33,7 +33,7 @@ import java.util.TreeMap;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.function.Consumer;
+import java.util.function.BiConsumer;
 import java.util.function.Function;
 import java.util.function.UnaryOperator;
 
@@ -294,7 +294,8 @@ public final class KafkaEventQueue implements EventQueue {
                         );
                         interrupted = true;
                     } finally {
-                        idleTimeCallback.accept(Math.max(time.milliseconds() - 
startIdleMs, 0));
+                        long currentTimeMs = time.milliseconds();
+                        idleTimeCallback.accept(Math.max(currentTimeMs - 
startIdleMs, 0), currentTimeMs);
                     }
                 } finally {
                     lock.unlock();
@@ -442,9 +443,12 @@ public final class KafkaEventQueue implements EventQueue {
     private boolean interrupted;
 
     /**
-     * Optional callback for queue idle time tracking.
+     * Optional callback for tracking queue idle time. The BiConsumer accepts 
two parameters:
+     * the first Long is the idle duration in milliseconds, and the second 
Long is the current
+     * time in milliseconds when the idle period ended. Both values are 
captured at the same
+     * moment to ensure timing consistency for metric calculations.
      */
-    private final Consumer<Long> idleTimeCallback;
+    private final BiConsumer<Long, Long> idleTimeCallback;
 
 
     public KafkaEventQueue(
@@ -452,7 +456,7 @@ public final class KafkaEventQueue implements EventQueue {
         LogContext logContext,
         String threadNamePrefix
     ) {
-        this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE, __ -> { 
});
+        this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE, (__, ___) 
-> { });
     }
 
     public KafkaEventQueue(
@@ -461,7 +465,7 @@ public final class KafkaEventQueue implements EventQueue {
         String threadNamePrefix,
         Event cleanupEvent
     ) {
-        this(time, logContext, threadNamePrefix, cleanupEvent, __ -> { });
+        this(time, logContext, threadNamePrefix, cleanupEvent, (__, ___) -> { 
});
     }
 
     public KafkaEventQueue(
@@ -469,7 +473,7 @@ public final class KafkaEventQueue implements EventQueue {
         LogContext logContext,
         String threadNamePrefix,
         Event cleanupEvent,
-        Consumer<Long> idleTimeCallback
+        BiConsumer<Long, Long> idleTimeCallback
     ) {
         this.time = time;
         this.cleanupEvent = Objects.requireNonNull(cleanupEvent);
diff --git 
a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java 
b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
index d2d4526eef7..41972f9fb44 100644
--- 
a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
@@ -435,7 +435,7 @@ public class KafkaEventQueueTest {
                 logContext,
                 "testIdleTimeCallback",
                 EventQueue.VoidEvent.INSTANCE,
-                lastIdleTimeMs::set)) {
+                (idleDuration, currentTime) -> 
lastIdleTimeMs.set(idleDuration))) {
             time.sleep(2);
             assertEquals(0, lastIdleTimeMs.get(), "Last idle time should be 
0ms");
 

Reply via email to