This is an automated email from the ASF dual-hosted git repository.
jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8468317dac4 KAFKA-19467; Add a metric for controller thread idleness
(#20422)
8468317dac4 is described below
commit 8468317dac490e8c3ba9c7b35ac8ff64f5f1fbfe
Author: Mahsa Seifikar <[email protected]>
AuthorDate: Thu Oct 2 14:02:47 2025 -0400
KAFKA-19467; Add a metric for controller thread idleness (#20422)
This change adds the metric ControllerEventManager::AvgIdleRatio which
measures the amount of time the controller spends blocked waiting for
events vs the amount of time spent processing events. A value of 1.0
means that the controller spent the entire interval blocked waiting for
events.
Reviewers: José Armando García Sancio <[email protected]>, Kevin Wu
<[email protected]>, Alyssa Huang <[email protected]>, TengYao
Chi <[email protected]>, Reviewers: Chia-Ping Tsai
<[email protected]>
---
checkstyle/import-control.xml | 1 +
docs/upgrade.html | 4 ++
.../apache/kafka/controller/QuorumController.java | 9 ++++-
.../metrics/QuorumControllerMetrics.java | 23 ++++++++++-
.../metrics/QuorumControllerMetricsTest.java | 30 ++++++++++++++
.../kafka/raft/internals/KafkaRaftMetrics.java | 1 +
.../org/apache/kafka/queue/KafkaEventQueue.java | 46 +++++++++++++++-------
.../apache/kafka/server/metrics}/TimeRatio.java | 31 ++++++++++++---
.../apache/kafka/queue/KafkaEventQueueTest.java | 44 +++++++++++++++++++++
.../kafka/server/metrics}/TimeRatioTest.java | 2 +-
10 files changed, 168 insertions(+), 23 deletions(-)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index b1ef62ca3a2..c7f9eaad7ea 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -497,6 +497,7 @@
<allow pkg="org.apache.kafka.server.common.serialization" />
<allow pkg="org.apache.kafka.server.config" />
<allow pkg="org.apache.kafka.server.fault"/>
+ <allow pkg="org.apache.kafka.server.metrics" />
<allow pkg="org.apache.kafka.server.util" />
<allow pkg="org.apache.kafka.test"/>
<allow pkg="com.fasterxml.jackson" />
diff --git a/docs/upgrade.html b/docs/upgrade.html
index d28898590f8..b5501bd2a74 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -182,6 +182,10 @@
<code>kafka.log.remote:type=RemoteStorageThreadPool.RemoteLogReaderAvgIdlePercent</code>.
For further details, please refer to <a
href="https://cwiki.apache.org/confluence/x/6oqMEw">KIP-1100</a>.
</li>
+ <li>
+ A new metric <code>AvgIdleRatio</code> has been added to the
<code>ControllerEventManager</code> group. This metric measures the average
idle ratio of the controller event queue thread,
+ providing visibility into how much time the controller spends waiting
for events versus processing them. The metric value ranges from 0.0 (always
busy) to 1.0 (always idle).
+ </li>
</ul>
<h4><a id="upgrade_4_1_0" href="#upgrade_4_1_0">Upgrading to 4.1.0</a></h4>
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 3e1dd69723b..dfde76ecba5 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -406,7 +406,14 @@ public final class QuorumController implements Controller {
KafkaEventQueue queue = null;
try {
- queue = new KafkaEventQueue(time, logContext,
threadNamePrefix);
+ queue = new KafkaEventQueue(
+ time,
+ logContext,
+ threadNamePrefix,
+ EventQueue.VoidEvent.INSTANCE,
+ controllerMetrics::updateIdleTime
+ );
+
return new QuorumController(
nonFatalFaultHandler,
fatalFaultHandler,
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
index 310a2c1dd61..4a251faafc4 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
@@ -19,6 +19,7 @@ package org.apache.kafka.controller.metrics;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+import org.apache.kafka.server.metrics.TimeRatio;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
@@ -48,6 +49,8 @@ public class QuorumControllerMetrics implements AutoCloseable
{
"ControllerEventManager", "EventQueueTimeMs");
private static final MetricName EVENT_QUEUE_PROCESSING_TIME_MS =
getMetricName(
"ControllerEventManager", "EventQueueProcessingTimeMs");
+ private static final MetricName AVERAGE_IDLE_RATIO = getMetricName(
+ "ControllerEventManager", "AvgIdleRatio");
private static final MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
"KafkaController", "LastAppliedRecordOffset");
private static final MetricName LAST_COMMITTED_RECORD_OFFSET =
getMetricName(
@@ -64,6 +67,7 @@ public class QuorumControllerMetrics implements AutoCloseable
{
"KafkaController", "EventQueueOperationsTimedOutCount");
private static final MetricName NEW_ACTIVE_CONTROLLERS_COUNT =
getMetricName(
"KafkaController", "NewActiveControllersCount");
+
private static final String TIME_SINCE_LAST_HEARTBEAT_RECEIVED_METRIC_NAME
= "TimeSinceLastHeartbeatReceivedMs";
private static final String BROKER_ID_TAG = "broker";
@@ -75,6 +79,7 @@ public class QuorumControllerMetrics implements AutoCloseable
{
private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
private final Consumer<Long> eventQueueTimeUpdater;
private final Consumer<Long> eventQueueProcessingTimeUpdater;
+ private final TimeRatio avgIdleTimeRatio;
private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
private final AtomicLong operationsStarted = new AtomicLong(0);
@@ -109,6 +114,7 @@ public class QuorumControllerMetrics implements
AutoCloseable {
this.eventQueueTimeUpdater = newHistogram(EVENT_QUEUE_TIME_MS, true);
this.eventQueueProcessingTimeUpdater =
newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true);
this.sessionTimeoutMs = sessionTimeoutMs;
+ this.avgIdleTimeRatio = new TimeRatio(1);
registry.ifPresent(r -> r.newGauge(LAST_APPLIED_RECORD_OFFSET, new
Gauge<Long>() {
@Override
public Long value() {
@@ -157,6 +163,20 @@ public class QuorumControllerMetrics implements
AutoCloseable {
return newActiveControllers();
}
}));
+ registry.ifPresent(r -> r.newGauge(AVERAGE_IDLE_RATIO, new
Gauge<Double>() {
+ @Override
+ public Double value() {
+ synchronized (avgIdleTimeRatio) {
+ return avgIdleTimeRatio.measure();
+ }
+ }
+ }));
+ }
+
+ public void updateIdleTime(long idleDurationMs) {
+ synchronized (avgIdleTimeRatio) {
+ avgIdleTimeRatio.record((double) idleDurationMs,
time.milliseconds());
+ }
}
public void addTimeSinceLastHeartbeatMetric(int brokerId) {
@@ -291,7 +311,8 @@ public class QuorumControllerMetrics implements
AutoCloseable {
TIMED_OUT_BROKER_HEARTBEAT_COUNT,
EVENT_QUEUE_OPERATIONS_STARTED_COUNT,
EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT,
- NEW_ACTIVE_CONTROLLERS_COUNT
+ NEW_ACTIVE_CONTROLLERS_COUNT,
+ AVERAGE_IDLE_RATIO
).forEach(r::removeMetric));
removeTimeSinceLastHeartbeatMetrics();
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
index 491d22f1cd8..4aa50a561df 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
@@ -45,6 +45,7 @@ public class QuorumControllerMetricsTest {
Set<String> expected = Set.of(
"kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
+
"kafka.controller:type=ControllerEventManager,name=AvgIdleRatio",
"kafka.controller:type=KafkaController,name=ActiveControllerCount",
"kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount",
"kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount",
@@ -189,6 +190,35 @@ public class QuorumControllerMetricsTest {
}
}
+ @Test
+ public void testAvgIdleRatio() {
+ final double delta = 0.001;
+ MetricsRegistry registry = new MetricsRegistry();
+ MockTime time = new MockTime();
+ try (QuorumControllerMetrics metrics = new
QuorumControllerMetrics(Optional.of(registry), time, 9000)) {
+ Gauge<Double> avgIdleRatio = (Gauge<Double>)
registry.allMetrics().get(metricName("ControllerEventManager", "AvgIdleRatio"));
+
+ // No idle time recorded yet; returns default ratio of 1.0
+ assertEquals(1.0, avgIdleRatio.value(), delta);
+
+ // First recording is dropped to establish the interval start time
+ // This is because TimeRatio needs an initial timestamp to measure
intervals from
+ metrics.updateIdleTime(10);
+ time.sleep(40);
+ metrics.updateIdleTime(20);
+ // avgIdleRatio = (20ms idle) / (40ms interval) = 0.5
+ assertEquals(0.5, avgIdleRatio.value(), delta);
+
+ time.sleep(20);
+ metrics.updateIdleTime(1);
+ // avgIdleRatio = (1ms idle) / (20ms interval) = 0.05
+ assertEquals(0.05, avgIdleRatio.value(), delta);
+
+ } finally {
+ registry.shutdown();
+ }
+ }
+
private static void assertMetricHistogram(MetricsRegistry registry,
MetricName metricName, long count, double sum) {
Histogram histogram = (Histogram)
registry.allMetrics().get(metricName);
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
index 87c5b217d8e..a90928d35f3 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
@@ -29,6 +29,7 @@ import org.apache.kafka.raft.LogOffsetMetadata;
import org.apache.kafka.raft.QuorumState;
import org.apache.kafka.raft.ReplicaKey;
import org.apache.kafka.server.common.OffsetAndEpoch;
+import org.apache.kafka.server.metrics.TimeRatio;
import java.util.List;
import java.util.OptionalLong;
diff --git
a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
index 42ebdd37d20..ad2c916e3fc 100644
--- a/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
+++ b/server-common/src/main/java/org/apache/kafka/queue/KafkaEventQueue.java
@@ -33,6 +33,7 @@ import java.util.TreeMap;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.UnaryOperator;
@@ -278,22 +279,22 @@ public final class KafkaEventQueue implements EventQueue {
remove(toRun);
continue;
}
- if (awaitNs == Long.MAX_VALUE) {
- try {
+
+ long startIdleMs = time.milliseconds();
+ try {
+ if (awaitNs == Long.MAX_VALUE) {
cond.await();
- } catch (InterruptedException e) {
- log.warn("Interrupted while waiting for a new
event. " +
- "Shutting down event queue");
- interrupted = true;
- }
- } else {
- try {
+ } else {
cond.awaitNanos(awaitNs);
- } catch (InterruptedException e) {
- log.warn("Interrupted while waiting for a deferred
event. " +
- "Shutting down event queue");
- interrupted = true;
}
+ } catch (InterruptedException e) {
+ log.warn(
+ "Interrupted while waiting for a {} event.
Shutting down event queue",
+ (awaitNs == Long.MAX_VALUE) ? "new" :
"deferred"
+ );
+ interrupted = true;
+ } finally {
+ idleTimeCallback.accept(Math.max(time.milliseconds() -
startIdleMs, 0));
}
} finally {
lock.unlock();
@@ -440,12 +441,18 @@ public final class KafkaEventQueue implements EventQueue {
*/
private boolean interrupted;
+ /**
+ * Optional callback for queue idle time tracking.
+ */
+ private final Consumer<Long> idleTimeCallback;
+
+
public KafkaEventQueue(
Time time,
LogContext logContext,
String threadNamePrefix
) {
- this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE);
+ this(time, logContext, threadNamePrefix, VoidEvent.INSTANCE, __ -> {
});
}
public KafkaEventQueue(
@@ -453,6 +460,16 @@ public final class KafkaEventQueue implements EventQueue {
LogContext logContext,
String threadNamePrefix,
Event cleanupEvent
+ ) {
+ this(time, logContext, threadNamePrefix, cleanupEvent, __ -> { });
+ }
+
+ public KafkaEventQueue(
+ Time time,
+ LogContext logContext,
+ String threadNamePrefix,
+ Event cleanupEvent,
+ Consumer<Long> idleTimeCallback
) {
this.time = time;
this.cleanupEvent = Objects.requireNonNull(cleanupEvent);
@@ -463,6 +480,7 @@ public final class KafkaEventQueue implements EventQueue {
this.eventHandler, false);
this.shuttingDown = false;
this.interrupted = false;
+ this.idleTimeCallback = Objects.requireNonNull(idleTimeCallback);
this.eventHandlerThread.start();
}
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/TimeRatio.java
b/server-common/src/main/java/org/apache/kafka/server/metrics/TimeRatio.java
similarity index 80%
rename from raft/src/main/java/org/apache/kafka/raft/internals/TimeRatio.java
rename to
server-common/src/main/java/org/apache/kafka/server/metrics/TimeRatio.java
index 357682b6fe2..8a1572c0273 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/TimeRatio.java
+++ b/server-common/src/main/java/org/apache/kafka/server/metrics/TimeRatio.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.raft.internals;
+package org.apache.kafka.server.metrics;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.MetricConfig;
@@ -46,11 +46,26 @@ public class TimeRatio implements MeasurableStat {
@Override
public double measure(MetricConfig config, long currentTimestampMs) {
+ return measure();
+ }
+
+ @Override
+ public void record(MetricConfig config, double value, long
currentTimestampMs) {
+ record(value, currentTimestampMs);
+ }
+
+ /**
+ * Measures the ratio of recorded duration to the interval duration
+ * since the last measurement.
+ *
+ * @return The measured ratio value between 0.0 and 1.0
+ */
+ public double measure() {
if (lastRecordedTimestampMs < 0) {
// Return the default value if no recordings have been captured.
return defaultRatio;
} else {
- // We measure the ratio over the
+ // We measure the ratio over the interval
double intervalDurationMs = Math.max(lastRecordedTimestampMs -
intervalStartTimestampMs, 0);
final double ratio;
if (intervalDurationMs == 0) {
@@ -61,15 +76,20 @@ public class TimeRatio implements MeasurableStat {
ratio = totalRecordedDurationMs / intervalDurationMs;
}
- // The next interval begins at the
+ // The next interval begins at the last recorded timestamp
intervalStartTimestampMs = lastRecordedTimestampMs;
totalRecordedDurationMs = 0;
return ratio;
}
}
- @Override
- public void record(MetricConfig config, double value, long
currentTimestampMs) {
+ /**
+ * Records a duration value at the specified timestamp.
+ *
+ * @param value The duration value to record
+ * @param currentTimestampMs The current timestamp in milliseconds
+ */
+ public void record(double value, long currentTimestampMs) {
if (intervalStartTimestampMs < 0) {
// Discard the initial value since the value occurred prior to the
interval start
intervalStartTimestampMs = currentTimestampMs;
@@ -78,5 +98,4 @@ public class TimeRatio implements MeasurableStat {
lastRecordedTimestampMs = currentTimestampMs;
}
}
-
}
diff --git
a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
index 54fc65a604a..d2d4526eef7 100644
---
a/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
+++
b/server-common/src/test/java/org/apache/kafka/queue/KafkaEventQueueTest.java
@@ -424,4 +424,48 @@ public class KafkaEventQueueTest {
assertEquals(InterruptedException.class,
ieTrapper2.exception.get().getClass());
}
}
+
+ @Test
+ public void testIdleTimeCallback() throws Exception {
+ MockTime time = new MockTime();
+ AtomicLong lastIdleTimeMs = new AtomicLong(0);
+
+ try (KafkaEventQueue queue = new KafkaEventQueue(
+ time,
+ logContext,
+ "testIdleTimeCallback",
+ EventQueue.VoidEvent.INSTANCE,
+ lastIdleTimeMs::set)) {
+ time.sleep(2);
+ assertEquals(0, lastIdleTimeMs.get(), "Last idle time should be
0ms");
+
+ // Test 1: Two events with a wait in between using FutureEvent
+ CompletableFuture<String> event1 = new CompletableFuture<>();
+ queue.append(new FutureEvent<>(event1, () -> {
+ time.sleep(1);
+ return "event1-processed";
+ }));
+ assertEquals("event1-processed", event1.get());
+
+ long waitTime5Ms = 5;
+ time.sleep(waitTime5Ms);
+ CompletableFuture<String> event2 = new CompletableFuture<>();
+ queue.append(new FutureEvent<>(event2, () -> {
+ time.sleep(1);
+ return "event2-processed";
+ }));
+ assertEquals("event2-processed", event2.get());
+ assertEquals(waitTime5Ms, lastIdleTimeMs.get(), "Idle time should
be " + waitTime5Ms + "ms, was: " + lastIdleTimeMs.get());
+
+ // Test 2: Deferred event
+ long waitTime2Ms = 2;
+ CompletableFuture<Void> deferredEvent2 = new CompletableFuture<>();
+ queue.scheduleDeferred("deferred2",
+ __ -> OptionalLong.of(time.nanoseconds() +
TimeUnit.MILLISECONDS.toNanos(waitTime2Ms)),
+ () -> deferredEvent2.complete(null));
+ time.sleep(waitTime2Ms);
+ deferredEvent2.get();
+ assertEquals(waitTime2Ms, lastIdleTimeMs.get(), "Idle time should
be " + waitTime2Ms + "ms, was: " + lastIdleTimeMs.get());
+ }
+ }
}
diff --git
a/raft/src/test/java/org/apache/kafka/raft/internals/TimeRatioTest.java
b/server-common/src/test/java/org/apache/kafka/server/metrics/TimeRatioTest.java
similarity index 98%
rename from
raft/src/test/java/org/apache/kafka/raft/internals/TimeRatioTest.java
rename to
server-common/src/test/java/org/apache/kafka/server/metrics/TimeRatioTest.java
index 94e8844734d..2c194a1448a 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/TimeRatioTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/metrics/TimeRatioTest.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.raft.internals;
+package org.apache.kafka.server.metrics;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.utils.MockTime;