This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8468317dac4 KAFKA-19467; Add a metric for controller thread idleness 
(#20422)
8468317dac4 is described below

commit 8468317dac490e8c3ba9c7b35ac8ff64f5f1fbfe
Author: Mahsa Seifikar <[email protected]>
AuthorDate: Thu Oct 2 14:02:47 2025 -0400

    KAFKA-19467; Add a metric for controller thread idleness (#20422)
    
    This change adds the metric ControllerEventManager::AvgIdleRatio which
    measures the amount of time the controller spends blocked waiting for
    events vs the amount of time spent processing events. A value of 1.0
    means that the controller spent the entire interval blocked waiting for
    events.
    
    Reviewers: José Armando García Sancio <[email protected]>, Kevin Wu
     <[email protected]>, Alyssa Huang <[email protected]>, TengYao
     Chi <[email protected]>, Reviewers: Chia-Ping Tsai
     <[email protected]>
---
 checkstyle/import-control.xml                      |  1 +
 docs/upgrade.html                                  |  4 ++
 .../apache/kafka/controller/QuorumController.java  |  9 ++++-
 .../metrics/QuorumControllerMetrics.java           | 23 ++++++++++-
 .../metrics/QuorumControllerMetricsTest.java       | 30 ++++++++++++++
 .../kafka/raft/internals/KafkaRaftMetrics.java     |  1 +
 .../org/apache/kafka/queue/KafkaEventQueue.java    | 46 +++++++++++++++-------
 .../apache/kafka/server/metrics}/TimeRatio.java    | 31 ++++++++++++---
 .../apache/kafka/queue/KafkaEventQueueTest.java    | 44 +++++++++++++++++++++
 .../kafka/server/metrics}/TimeRatioTest.java       |  2 +-
 10 files changed, 168 insertions(+), 23 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index b1ef62ca3a2..c7f9eaad7ea 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -497,6 +497,7 @@
     <allow pkg="org.apache.kafka.server.common.serialization" />
     <allow pkg="org.apache.kafka.server.config" />
     <allow pkg="org.apache.kafka.server.fault"/>
+    <allow pkg="org.apache.kafka.server.metrics" />
     <allow pkg="org.apache.kafka.server.util" />
     <allow pkg="org.apache.kafka.test"/>
     <allow pkg="com.fasterxml.jackson" />
diff --git a/docs/upgrade.html b/docs/upgrade.html
index d28898590f8..b5501bd2a74 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -182,6 +182,10 @@
         
<code>kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent</code>.
         For further details, please refer to <a 
href="https://cwiki.apache.org/confluence/x/6oqMEw";>KIP-1100</a>.
     </li>
+    <li>
+        A new metric <code>AvgIdleRatio</code> has been added to the 
<code>ControllerEventManager</code> group. This metric measures the average 
idle ratio of the controller event queue thread,
+        providing visibility into how much time the controller spends waiting 
for events versus processing them. The metric value ranges from 0.0 (always 
busy) to 1.0 (always idle).
+    </li>
 </ul>
 
 <h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 3e1dd69723b..dfde76ecba5 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -406,7 +406,14 @@ public final class QuorumController implements Controller {
 
             KafkaEventQueue queue = null;
             try {
-                queue = new KafkaEventQueue(time, logContext, 
threadNamePrefix);
+                queue = new KafkaEventQueue(
+                    time,
+                    logContext,
+                    threadNamePrefix,
+                    EventQueue.VoidEvent.INSTANCE,
+                    controllerMetrics::updateIdleTime
+                );
+
                 return new QuorumController(
                     nonFatalFaultHandler,
                     fatalFaultHandler,
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
index 310a2c1dd61..4a251faafc4 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
@@ -19,6 +19,7 @@ package org.apache.kafka.controller.metrics;
 
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+import org.apache.kafka.server.metrics.TimeRatio;
 
 import com.yammer.metrics.core.Gauge;
 import com.yammer.metrics.core.Histogram;
@@ -48,6 +49,8 @@ public class QuorumControllerMetrics implements AutoCloseable 
{
         "ControllerEventManager", "EventQueueTimeMs");
     private static final MetricName EVENT_QUEUE_PROCESSING_TIME_MS = 
getMetricName(
         "ControllerEventManager", "EventQueueProcessingTimeMs");
+    private static final MetricName AVERAGE_IDLE_RATIO = getMetricName(
+        "ControllerEventManager", "AvgIdleRatio");
     private static final MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
         "KafkaController", "LastAppliedRecordOffset");
     private static final MetricName LAST_COMMITTED_RECORD_OFFSET = 
getMetricName(
@@ -64,6 +67,7 @@ public class QuorumControllerMetrics implements AutoCloseable 
{
         "KafkaController", "EventQueueOperationsTimedOutCount");
     private static final MetricName NEW_ACTIVE_CONTROLLERS_COUNT = 
getMetricName(
         "KafkaController", "NewActiveControllersCount");
+
     private static final String TIME_SINCE_LAST_HEARTBEAT_RECEIVED_METRIC_NAME 
= "TimeSinceLastHeartbeatReceivedMs";
     private static final String BROKER_ID_TAG = "broker";
 
@@ -75,6 +79,7 @@ public class QuorumControllerMetrics implements AutoCloseable 
{
     private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
     private final Consumer<Long> eventQueueTimeUpdater;
     private final Consumer<Long> eventQueueProcessingTimeUpdater;
+    private final TimeRatio avgIdleTimeRatio;
 
     private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
     private final AtomicLong operationsStarted = new AtomicLong(0);
@@ -109,6 +114,7 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
         this.eventQueueTimeUpdater = newHistogram(EVENT_QUEUE_TIME_MS, true);
         this.eventQueueProcessingTimeUpdater = 
newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true);
         this.sessionTimeoutMs = sessionTimeoutMs;
+        this.avgIdleTimeRatio = new TimeRatio(1);
         registry.ifPresent(r -> r.newGauge(LAST_APPLIED_RECORD_OFFSET, new 
Gauge<Long>() {
             @Override
             public Long value() {
@@ -157,6 +163,20 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
                 return newActiveControllers();
             }
         }));
+        registry.ifPresent(r -> r.newGauge(AVERAGE_IDLE_RATIO, new 
Gauge<Double>() {
+            @Override
+            public Double value() {
+                synchronized (avgIdleTimeRatio) {
+                    return avgIdleTimeRatio.measure();
+                }
+            }
+        }));
+    }
+
+    public void updateIdleTime(long idleDurationMs) {
+        synchronized (avgIdleTimeRatio) {
+            avgIdleTimeRatio.record((double) idleDurationMs, 
time.milliseconds());
+        }
     }
 
     public void addTimeSinceLastHeartbeatMetric(int brokerId) {
@@ -291,7 +311,8 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
             TIMED_OUT_BROKER_HEARTBEAT_COUNT,
             EVENT_QUEUE_OPERATIONS_STARTED_COUNT,
             EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT,
-            NEW_ACTIVE_CONTROLLERS_COUNT
+            NEW_ACTIVE_CONTROLLERS_COUNT,
+            AVERAGE_IDLE_RATIO
         ).forEach(r::removeMetric));
         removeTimeSinceLastHeartbeatMetrics();
     }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
index 491d22f1cd8..4aa50a561df 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
@@ -45,6 +45,7 @@ public class QuorumControllerMetricsTest {
                 Set<String> expected = Set.of(
                     
"kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
                     
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
+                    
"kafka.controller:type=ControllerEventManager,name=AvgIdleRatio",
                     
"kafka.controller:type=KafkaController,name=ActiveControllerCount",
                     
"kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount",
                     
"kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount",
@@ -189,6 +190,35 @@ public class QuorumControllerMetricsTest {
         }
     }
 
+    @Test
+    public void testAvgIdleRatio() {
+        final double delta = 0.001;
+        MetricsRegistry registry = new MetricsRegistry();
+        MockTime time = new MockTime();
+        try (QuorumControllerMetrics metrics = new 
QuorumControllerMetrics(Optional.of(registry), time, 9000)) {
+            Gauge<Double> avgIdleRatio = (Gauge<Double>) 
registry.allMetrics().get(metricName("ControllerEventManager", "AvgIdleRatio"));
+
+            // No idle time recorded yet; returns default ratio of 1.0
+            assertEquals(1.0, avgIdleRatio.value(), delta);
+
+            // First recording is dropped to establish the interval start time
+            // This is because TimeRatio needs an initial timestamp to measure 
intervals from
+            metrics.updateIdleTime(10);
+            time.sleep(40);
+            metrics.updateIdleTime(20);
+            // avgIdleRatio = (20ms idle) / (40ms interval) = 0.5
+            assertEquals(0.5, avgIdleRatio.value(), delta);
+
+            time.sleep(20);
+            metrics.updateIdleTime(1);
+            // avgIdleRatio = (1ms idle) / (20ms interval) = 0.05
+            assertEquals(0.05, avgIdleRatio.value(), delta);
+
+        } finally {
+            registry.shutdown();
+        }
+    }
+
     private static void assertMetricHistogram(MetricsRegistry registry, 
MetricName metricName, long count, double sum) {
         Histogram histogram = (Histogram) 
registry.allMetrics().get(metricName);
 
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
index 87c5b217d8e..a90928d35f3 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
@@ -29,6 +29,7 @@ import org.apache.kafka.raft.LogOffsetMetadata;
 import org.apache.kafka.raft.QuorumState;
 import org.apache.kafka.raft.ReplicaKey;
 import org.apache.kafka.server.common.OffsetAndEpoch;
+import org.apache.kafka.server.metrics.TimeRatio;
 
 import java.util.List;
 import java.util.OptionalLong;
diff --git 
a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java 
b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
index 42ebdd37d20..ad2c916e3fc 100644
--- a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
+++ b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
@@ -33,6 +33,7 @@ import java.util.TreeMap;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.UnaryOperator;
 
@@ -278,22 +279,22 @@ public final class KafkaEventQueue implements EventQueue {
                         remove(toRun);
                         continue;
                     }
-                    if (awaitNs == Long.MAX_VALUE) {
-                        try {
+
+                    long startIdleMs = time.milliseconds();
+                    try {
+                        if (awaitNs == Long.MAX_VALUE) {
                             cond.await();
-                        } catch (InterruptedException e) {
-                            log.warn("Interrupted while waiting for a new 
event. " +
-                                "Shutting down event queue");
-                            interrupted = true;
-                        }
-                    } else {
-                        try {
+                        } else {
                             cond.awaitNanos(awaitNs);
-                        } catch (InterruptedException e) {
-                            log.warn("Interrupted while waiting for a deferred 
event. " +
-                                "Shutting down event queue");
-                            interrupted = true;
                         }
+                    } catch (InterruptedException e) {
+                        log.warn(
+                                "Interrupted while waiting for a {} event. 
Shutting down event queue",
+                                (awaitNs == Long.MAX_VALUE) ? "new" : 
"deferred"
+                        );
+                        interrupted = true;
+                    } finally {
+                        idleTimeCallback.accept(Math.max(time.milliseconds() - 
startIdleMs, 0));
                     }
                 } finally {
                     lock.unlock();
@@ -440,12 +441,18 @@ public final class KafkaEventQueue implements EventQueue {
      */
     private boolean interrupted;
 
+    /**
+     * Optional callback for queue idle time tracking.
+     */
+    private final Consumer<Long> idleTimeCallback;
+
+
     public KafkaEventQueue(
         Time time,
         LogContext logContext,
         String threadNamePrefix
     ) {
-        this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE);
+        this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE, __ -> { 
});
     }
 
     public KafkaEventQueue(
@@ -453,6 +460,16 @@ public final class KafkaEventQueue implements EventQueue {
         LogContext logContext,
         String threadNamePrefix,
         Event cleanupEvent
+    ) {
+        this(time, logContext, threadNamePrefix, cleanupEvent, __ -> { });
+    }
+
+    public KafkaEventQueue(
+        Time time,
+        LogContext logContext,
+        String threadNamePrefix,
+        Event cleanupEvent,
+        Consumer<Long> idleTimeCallback
     ) {
         this.time = time;
         this.cleanupEvent = Objects.requireNonNull(cleanupEvent);
@@ -463,6 +480,7 @@ public final class KafkaEventQueue implements EventQueue {
             this.eventHandler, false);
         this.shuttingDown = false;
         this.interrupted = false;
+        this.idleTimeCallback = Objects.requireNonNull(idleTimeCallback);
         this.eventHandlerThread.start();
     }
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/TimeRatio.java 
b/server-common/src/main/java/org/apache/kafka/server/metrics/TimeRatio.java
similarity index 80%
rename from raft/src/main/java/org/apache/kafka/raft/internals/TimeRatio.java
rename to 
server-common/src/main/java/org/apache/kafka/server/metrics/TimeRatio.java
index 357682b6fe2..8a1572c0273 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/TimeRatio.java
+++ b/server-common/src/main/java/org/apache/kafka/server/metrics/TimeRatio.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.raft.internals;
+package org.apache.kafka.server.metrics;
 
 import org.apache.kafka.common.metrics.MeasurableStat;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -46,11 +46,26 @@ public class TimeRatio implements MeasurableStat {
 
     @Override
     public double measure(MetricConfig config, long currentTimestampMs) {
+        return measure();
+    }
+
+    @Override
+    public void record(MetricConfig config, double value, long 
currentTimestampMs) {
+        record(value, currentTimestampMs);
+    }
+
+    /**
+     * Measures the ratio of recorded duration to the interval duration
+     * since the last measurement.
+     *
+     * @return The measured ratio value between 0.0 and 1.0
+     */
+    public double measure() {
         if (lastRecordedTimestampMs < 0) {
             // Return the default value if no recordings have been captured.
             return defaultRatio;
         } else {
-            // We measure the ratio over the
+            // We measure the ratio over the interval
             double intervalDurationMs = Math.max(lastRecordedTimestampMs - 
intervalStartTimestampMs, 0);
             final double ratio;
             if (intervalDurationMs == 0) {
@@ -61,15 +76,20 @@ public class TimeRatio implements MeasurableStat {
                 ratio = totalRecordedDurationMs / intervalDurationMs;
             }
 
-            // The next interval begins at the
+            // The next interval begins at the last recorded timestamp
             intervalStartTimestampMs = lastRecordedTimestampMs;
             totalRecordedDurationMs = 0;
             return ratio;
         }
     }
 
-    @Override
-    public void record(MetricConfig config, double value, long 
currentTimestampMs) {
+    /**
+     * Records a duration value at the specified timestamp.
+     *
+     * @param value The duration value to record
+     * @param currentTimestampMs The current timestamp in milliseconds
+     */
+    public void record(double value, long currentTimestampMs) {
         if (intervalStartTimestampMs < 0) {
             // Discard the initial value since the value occurred prior to the 
interval start
             intervalStartTimestampMs = currentTimestampMs;
@@ -78,5 +98,4 @@ public class TimeRatio implements MeasurableStat {
             lastRecordedTimestampMs = currentTimestampMs;
         }
     }
-
 }
diff --git 
a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java 
b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
index 54fc65a604a..d2d4526eef7 100644
--- 
a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
@@ -424,4 +424,48 @@ public class KafkaEventQueueTest {
             assertEquals(InterruptedException.class, 
ieTrapper2.exception.get().getClass());
         }
     }
+
+    @Test
+    public void testIdleTimeCallback() throws Exception {
+        MockTime time = new MockTime();
+        AtomicLong lastIdleTimeMs = new AtomicLong(0);
+
+        try (KafkaEventQueue queue = new KafkaEventQueue(
+                time,
+                logContext,
+                "testIdleTimeCallback",
+                EventQueue.VoidEvent.INSTANCE,
+                lastIdleTimeMs::set)) {
+            time.sleep(2);
+            assertEquals(0, lastIdleTimeMs.get(), "Last idle time should be 
0ms");
+
+            // Test 1: Two events with a wait in between using FutureEvent
+            CompletableFuture<String> event1 = new CompletableFuture<>();
+            queue.append(new FutureEvent<>(event1, () -> {
+                time.sleep(1);
+                return "event1-processed";
+            }));
+            assertEquals("event1-processed", event1.get());
+
+            long waitTime5Ms = 5;
+            time.sleep(waitTime5Ms);
+            CompletableFuture<String> event2 = new CompletableFuture<>();
+            queue.append(new FutureEvent<>(event2, () -> {
+                time.sleep(1);
+                return "event2-processed";
+            }));
+            assertEquals("event2-processed", event2.get());
+            assertEquals(waitTime5Ms, lastIdleTimeMs.get(), "Idle time should 
be " + waitTime5Ms + "ms, was: " + lastIdleTimeMs.get());
+
+            // Test 2: Deferred event
+            long waitTime2Ms = 2;
+            CompletableFuture<Void> deferredEvent2 = new CompletableFuture<>();
+            queue.scheduleDeferred("deferred2",
+                    __ -> OptionalLong.of(time.nanoseconds() + 
TimeUnit.MILLISECONDS.toNanos(waitTime2Ms)),
+                    () -> deferredEvent2.complete(null));
+            time.sleep(waitTime2Ms);
+            deferredEvent2.get();
+            assertEquals(waitTime2Ms, lastIdleTimeMs.get(), "Idle time should 
be " + waitTime2Ms + "ms, was: " + lastIdleTimeMs.get());
+        }
+    }
 }
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/TimeRatioTest.java 
b/server-common/src/test/java/org/apache/kafka/server/metrics/TimeRatioTest.java
similarity index 98%
rename from 
raft/src/test/java/org/apache/kafka/raft/internals/TimeRatioTest.java
rename to 
server-common/src/test/java/org/apache/kafka/server/metrics/TimeRatioTest.java
index 94e8844734d..2c194a1448a 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/TimeRatioTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/metrics/TimeRatioTest.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.raft.internals;
+package org.apache.kafka.server.metrics;
 
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.utils.MockTime;

Reply via email to