This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch KAFKA-15183
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 2816888c74bc4f275f2abe94feaba7ad78b22599
Author: Colin P. McCabe <[email protected]>
AuthorDate: Wed Jul 12 11:26:03 2023 -0700

    KAFKA-15183: Add more controller, loader, snapshot emitter metrics
    
    Implement some of the metrics from KIP-938: Add more metrics for
    measuring KRaft performance.
    
    Add these metrics to QuorumControllerMetrics:
        kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount
        
kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount
        
kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount
        kafka.controller:type=KafkaController,name=NewActiveControllersCount
    
    Create LoaderMetrics with these new metrics:
        kafka.server:type=MetadataLoader,name=CurrentMetadataVersion
        kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount
    
    Create SnapshotEmitterMetrics with these new metrics:
        kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes
        kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs
---
 checkstyle/import-control-metadata.xml             |  16 +-
 .../src/main/scala/kafka/server/SharedServer.scala |  30 ++-
 .../server/metadata/BrokerServerMetrics.scala      |  11 +-
 .../apache/kafka/controller/QuorumController.java  |  54 ++++--
 .../metrics/QuorumControllerMetrics.java           |  83 +++++++-
 .../apache/kafka/image/loader/MetadataLoader.java  |  66 ++++---
 .../kafka/image/loader/MetadataLoaderMetrics.java  |  46 -----
 .../loader/metrics/MetadataLoaderMetrics.java      | 137 +++++++++++++
 .../publisher/metrics/SnapshotEmitterMetrics.java  | 108 +++++++++++
 .../QuorumControllerIntegrationTestUtils.java      | 213 +++++++++++++++++++++
 .../QuorumControllerMetricsIntegrationTest.java    | 202 +++++++++++++++++++
 .../kafka/controller/QuorumControllerTest.java     | 122 +-----------
 .../metrics/QuorumControllerMetricsTest.java       |   9 +-
 .../kafka/image/loader/MetadataLoaderTest.java     |  24 +++
 .../loader/metrics/MetadataLoaderMetricsTest.java  | 148 ++++++++++++++
 15 files changed, 1042 insertions(+), 227 deletions(-)

diff --git a/checkstyle/import-control-metadata.xml 
b/checkstyle/import-control-metadata.xml
index 2cbb5504293..464006f0e38 100644
--- a/checkstyle/import-control-metadata.xml
+++ b/checkstyle/import-control-metadata.xml
@@ -63,7 +63,6 @@
     </subpackage>
 
     <subpackage name="controller">
-        <allow pkg="com.yammer.metrics"/>
         <allow pkg="org.apache.kafka.clients" />
         <allow pkg="org.apache.kafka.clients.admin" />
         <allow pkg="org.apache.kafka.common.acl" />
@@ -73,7 +72,6 @@
         <allow pkg="org.apache.kafka.common.internals" />
         <allow pkg="org.apache.kafka.common.message" />
         <allow pkg="org.apache.kafka.common.metadata" />
-        <allow pkg="org.apache.kafka.common.metrics" />
         <allow pkg="org.apache.kafka.common.network" />
         <allow pkg="org.apache.kafka.common.protocol" />
         <allow pkg="org.apache.kafka.common.quota" />
@@ -93,13 +91,17 @@
         <allow pkg="org.apache.kafka.server.common" />
         <allow pkg="org.apache.kafka.server.config" />
         <allow pkg="org.apache.kafka.server.fault" />
-        <allow pkg="org.apache.kafka.server.metrics" />
         <allow pkg="org.apache.kafka.server.mutable" />
         <allow pkg="org.apache.kafka.server.policy"/>
         <allow pkg="org.apache.kafka.server.util"/>
         <allow pkg="org.apache.kafka.snapshot" />
         <allow pkg="org.apache.kafka.test" />
         <allow pkg="org.apache.kafka.timeline" />
+        <subpackage name="metrics">
+            <allow pkg="com.yammer.metrics"/>
+            <allow pkg="org.apache.kafka.common.metrics" />
+            <allow pkg="org.apache.kafka.server.metrics" />
+        </subpackage>
     </subpackage>
 
     <subpackage name="image">
@@ -122,6 +124,14 @@
         <allow pkg="org.apache.kafka.server.util" />
         <allow pkg="org.apache.kafka.snapshot" />
         <allow pkg="org.apache.kafka.test" />
+        <subpackage name="loader">
+            <subpackage name="metrics">
+                <allow pkg="com.yammer.metrics"/>
+                <allow pkg="org.apache.kafka.common.metrics" />
+                <allow pkg="org.apache.kafka.controller.metrics" />
+                <allow pkg="org.apache.kafka.server.metrics" />
+            </subpackage>
+        </subpackage>
     </subpackage>
 
     <subpackage name="metadata">
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala 
b/core/src/main/scala/kafka/server/SharedServer.scala
index e58b33a8d57..892c528885a 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -25,7 +25,9 @@ import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
 import org.apache.kafka.controller.metrics.ControllerMetadataMetrics
+import org.apache.kafka.image.MetadataProvenance
 import org.apache.kafka.image.loader.MetadataLoader
+import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics
 import org.apache.kafka.image.publisher.{SnapshotEmitter, SnapshotGenerator}
 import org.apache.kafka.metadata.MetadataRecordSerde
 import org.apache.kafka.raft.RaftConfig.AddressSpec
@@ -34,7 +36,7 @@ import org.apache.kafka.server.fault.{FaultHandler, 
LoggingFaultHandler, Process
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 
 import java.util
-import java.util.{Collections, Optional}
+import java.util.Optional
 import java.util.concurrent.{CompletableFuture, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
 
@@ -106,6 +108,7 @@ class SharedServer(
   val snapshotsDisabledReason = new AtomicReference[String](null)
   @volatile var snapshotEmitter: SnapshotEmitter = _
   @volatile var snapshotGenerator: SnapshotGenerator = _
+  @volatile var metadataLoaderMetrics: MetadataLoaderMetrics = _
 
   def isUsed(): Boolean = synchronized {
     usedByController || usedByBroker
@@ -259,15 +262,24 @@ class SharedServer(
         raftManager = _raftManager
         _raftManager.startup()
 
+        metadataLoaderMetrics = if (brokerMetrics != null) {
+          new 
MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
+            elapsedNs => brokerMetrics.updateBatchProcessingTime(elapsedNs),
+            batchSize => brokerMetrics.updateBatchSize(batchSize),
+            brokerMetrics.lastAppliedImageProvenance)
+        } else {
+          new 
MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
+            _ => {},
+            _ => {},
+            new AtomicReference[MetadataProvenance](MetadataProvenance.EMPTY))
+        }
         val loaderBuilder = new MetadataLoader.Builder().
           setNodeId(metaProps.nodeId).
           setTime(time).
           setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
           setFaultHandler(metadataLoaderFaultHandler).
-          setHighWaterMarkAccessor(() => _raftManager.client.highWatermark())
-        if (brokerMetrics != null) {
-          loaderBuilder.setMetadataLoaderMetrics(brokerMetrics)
-        }
+          setHighWaterMarkAccessor(() => _raftManager.client.highWatermark()).
+          setMetrics(metadataLoaderMetrics)
         loader = loaderBuilder.build()
         snapshotEmitter = new SnapshotEmitter.Builder().
           setNodeId(metaProps.nodeId).
@@ -282,15 +294,15 @@ class SharedServer(
           setDisabledReason(snapshotsDisabledReason).
           setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
           build()
-        _raftManager.register(loader)
         try {
-          
loader.installPublishers(Collections.singletonList(snapshotGenerator))
+          loader.installPublishers(util.Arrays.asList(snapshotGenerator)).get()
         } catch {
           case t: Throwable => {
             error("Unable to install metadata publishers", t)
             throw new RuntimeException("Unable to install metadata 
publishers.", t)
           }
         }
+        _raftManager.register(loader)
         debug("Completed SharedServer startup.")
         started = true
       } catch {
@@ -326,6 +338,10 @@ class SharedServer(
         CoreUtils.swallow(loader.close(), this)
         loader = null
       }
+      if (metadataLoaderMetrics != null) {
+        CoreUtils.swallow(metadataLoaderMetrics.close(), this)
+        metadataLoaderMetrics = null
+      }
       if (snapshotGenerator != null) {
         CoreUtils.swallow(snapshotGenerator.close(), this)
         snapshotGenerator = null
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
index 212909101f2..ff183324166 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
@@ -23,7 +23,6 @@ import org.apache.kafka.common.metrics.Gauge
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.metrics.MetricConfig
 import org.apache.kafka.image.MetadataProvenance
-import org.apache.kafka.image.loader.MetadataLoaderMetrics
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
 
 import java.util.Collections
@@ -31,7 +30,7 @@ import java.util.concurrent.TimeUnit.NANOSECONDS
 
 final class BrokerServerMetrics private (
   metrics: Metrics
-) extends MetadataLoaderMetrics {
+) extends AutoCloseable {
   import BrokerServerMetrics._
 
   private val batchProcessingTimeHistName = 
KafkaMetricsGroup.explicitMetricName("kafka.server",
@@ -123,15 +122,15 @@ final class BrokerServerMetrics private (
     ).foreach(metrics.removeMetric)
   }
 
-  override def updateBatchProcessingTime(elapsedNs: Long): Unit =
+  def updateBatchProcessingTime(elapsedNs: Long): Unit =
     batchProcessingTimeHist.update(NANOSECONDS.toMicros(elapsedNs))
 
-  override def updateBatchSize(size: Int): Unit = batchSizeHist.update(size)
+  def updateBatchSize(size: Int): Unit = batchSizeHist.update(size)
 
-  override def updateLastAppliedImageProvenance(provenance: 
MetadataProvenance): Unit =
+  def updateLastAppliedImageProvenance(provenance: MetadataProvenance): Unit =
     lastAppliedImageProvenance.set(provenance)
 
-  override def lastAppliedOffset(): Long = 
lastAppliedImageProvenance.get().lastContainedOffset()
+  def lastAppliedOffset(): Long = 
lastAppliedImageProvenance.get().lastContainedOffset()
 
   def lastAppliedTimestamp(): Long = 
lastAppliedImageProvenance.get().lastContainedLogTimeMs()
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 582d774e6e9..2f32c3638df 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -461,6 +461,12 @@ public final class QuorumController implements Controller {
     private Throwable handleEventException(String name,
                                            OptionalLong startProcessingTimeNs,
                                            Throwable exception) {
+        if (!startProcessingTimeNs.isPresent() &&
+                ControllerExceptions.isTimeoutException(exception)) {
+            // If the event never started, and the exception is a timeout, 
increment the timed
+            // out metric.
+            controllerMetrics.incrementOperationsTimedOut();
+        }
         Throwable externalException =
                 ControllerExceptions.toExternalException(exception, () -> 
latestController());
         if (!startProcessingTimeNs.isPresent()) {
@@ -492,6 +498,15 @@ public final class QuorumController implements Controller {
         return externalException;
     }
 
+    private long updateEventStartMetricsAndGetTime(OptionalLong 
eventCreatedTimeNs) {
+        long now = time.nanoseconds();
+        controllerMetrics.incrementOperationsStarted();
+        if (eventCreatedTimeNs.isPresent()) {
+            controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - 
eventCreatedTimeNs.getAsLong()));
+        }
+        return now;
+    }
+
     /**
      * A controller event for handling internal state changes, such as Raft 
inputs.
      */
@@ -508,9 +523,8 @@ public final class QuorumController implements Controller {
 
         @Override
         public void run() throws Exception {
-            long now = time.nanoseconds();
-            controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - 
eventCreatedTimeNs));
-            startProcessingTimeNs = OptionalLong.of(now);
+            startProcessingTimeNs = OptionalLong.of(
+                
updateEventStartMetricsAndGetTime(OptionalLong.of(eventCreatedTimeNs)));
             log.debug("Executing {}.", this);
             handler.run();
             handleEventEnd(this.toString(), startProcessingTimeNs.getAsLong());
@@ -527,11 +541,16 @@ public final class QuorumController implements Controller 
{
         }
     }
 
-    private void appendControlEvent(String name, Runnable handler) {
+    void appendControlEvent(String name, Runnable handler) {
         ControllerEvent event = new ControllerEvent(name, handler);
         queue.append(event);
     }
 
+    void appendControlEventWithDeadline(String name, Runnable handler, long 
deadlineNs) {
+        ControllerEvent event = new ControllerEvent(name, handler);
+        queue.appendWithDeadline(deadlineNs, event);
+    }
+
     /**
      * A controller event that reads the committed internal state in order to 
expose it
      * to an API.
@@ -555,9 +574,8 @@ public final class QuorumController implements Controller {
 
         @Override
         public void run() throws Exception {
-            long now = time.nanoseconds();
-            controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - 
eventCreatedTimeNs));
-            startProcessingTimeNs = OptionalLong.of(now);
+            startProcessingTimeNs = OptionalLong.of(
+                
updateEventStartMetricsAndGetTime(OptionalLong.of(eventCreatedTimeNs)));
             T value = handler.get();
             handleEventEnd(this.toString(), startProcessingTimeNs.getAsLong());
             future.complete(value);
@@ -692,12 +710,11 @@ public final class QuorumController implements Controller 
{
 
         @Override
         public void run() throws Exception {
-            long now = time.nanoseconds();
-            if (!flags.contains(DOES_NOT_UPDATE_QUEUE_TIME)) {
-                // We exclude deferred events from the event queue time metric 
to prevent
-                // incorrectly including the deferral time in the queue time.
-                
controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - 
eventCreatedTimeNs));
-            }
+            // Deferred events set the DOES_NOT_UPDATE_QUEUE_TIME flag to 
prevent incorrectly
+            // including their deferral time in the event queue time.
+            startProcessingTimeNs = OptionalLong.of(
+                
updateEventStartMetricsAndGetTime(flags.contains(DOES_NOT_UPDATE_QUEUE_TIME) ?
+                    OptionalLong.empty() : 
OptionalLong.of(eventCreatedTimeNs)));
             int controllerEpoch = curClaimEpoch;
             if (!isActiveController(controllerEpoch)) {
                 throw 
ControllerExceptions.newWrongControllerException(latestController());
@@ -706,7 +723,6 @@ public final class QuorumController implements Controller {
                 log.info("Cannot run write operation {} in pre-migration mode. 
Returning NOT_CONTROLLER.", name);
                 throw 
ControllerExceptions.newPreMigrationException(latestController());
             }
-            startProcessingTimeNs = OptionalLong.of(now);
             ControllerResult<T> result = op.generateRecordsAndResult();
             if (result.records().isEmpty()) {
                 op.processBatchEndOffset(writeOffset);
@@ -1063,6 +1079,9 @@ public final class QuorumController implements Controller 
{
             appendRaftEvent("handleLeaderChange[" + newLeader.epoch() + "]", 
() -> {
                 final String newLeaderName = newLeader.leaderId().isPresent() ?
                         String.valueOf(newLeader.leaderId().getAsInt()) : 
"(none)";
+                if (newLeader.leaderId().isPresent()) {
+                    controllerMetrics.incrementNewActiveControllers();
+                }
                 if (isActiveController()) {
                     if (newLeader.isLeader(nodeId)) {
                         log.warn("We were the leader in epoch {}, and are 
still the leader " +
@@ -1308,7 +1327,7 @@ public final class QuorumController implements Controller 
{
         }
     }
 
-    private void renounce() {
+    void renounce() {
         try {
             if (curClaimEpoch == -1) {
                 throw new RuntimeException("Cannot renounce leadership because 
we are not the " +
@@ -2307,4 +2326,9 @@ public final class QuorumController implements Controller 
{
     Time time() {
         return time;
     }
+
+    // VisibleForTesting
+    QuorumControllerMetrics controllerMetrics() {
+        return controllerMetrics;
+    }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
index 566245ed096..a95fa11d1c1 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
@@ -52,6 +52,14 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
         "KafkaController", "LastAppliedRecordTimestamp");
     private final static MetricName LAST_APPLIED_RECORD_LAG_MS = getMetricName(
         "KafkaController", "LastAppliedRecordLagMs");
+    private final static MetricName TIMED_OUT_BROKER_HEARTBEAT_COUNT = 
getMetricName(
+        "KafkaController", "TimedOutBrokerHeartbeatCount");
+    private final static MetricName EVENT_QUEUE_OPERATIONS_STARTED_COUNT = 
getMetricName(
+        "KafkaController", "EventQueueOperationsStartedCount");
+    private final static MetricName EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT = 
getMetricName(
+            "KafkaController", "EventQueueOperationsTimedOutCount");
+    private final static MetricName NEW_ACTIVE_CONTROLLERS_COUNT = 
getMetricName(
+        "KafkaController", "NewActiveControllersCount");
 
     private final Optional<MetricsRegistry> registry;
     private volatile boolean active;
@@ -61,6 +69,9 @@ public class QuorumControllerMetrics implements AutoCloseable 
{
     private final Consumer<Long> eventQueueTimeUpdater;
     private final Consumer<Long> eventQueueProcessingTimeUpdater;
     private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
+    private final AtomicLong operationsStarted = new AtomicLong(0);
+    private final AtomicLong operationsTimedOut = new AtomicLong(0);
+    private final AtomicLong newActiveControllers = new AtomicLong(0);
 
     private Consumer<Long> newHistogram(MetricName name, boolean biased) {
         if (registry.isPresent()) {
@@ -109,6 +120,30 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
                 return time.milliseconds() - lastAppliedRecordTimestamp();
             }
         }));
+        registry.ifPresent(r -> r.newGauge(TIMED_OUT_BROKER_HEARTBEAT_COUNT, 
new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return timedOutHeartbeats();
+            }
+        }));
+        registry.ifPresent(r -> 
r.newGauge(EVENT_QUEUE_OPERATIONS_STARTED_COUNT, new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return timedOutHeartbeats();
+            }
+        }));
+        registry.ifPresent(r -> 
r.newGauge(EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT, new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return timedOutHeartbeats();
+            }
+        }));
+        registry.ifPresent(r -> r.newGauge(NEW_ACTIVE_CONTROLLERS_COUNT, new 
Gauge<Long>() {
+            @Override
+            public Long value() {
+                return newActiveControllers();
+            }
+        }));
     }
 
     public void setActive(boolean active) {
@@ -152,17 +187,53 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
     }
 
     public void incrementTimedOutHeartbeats() {
-        timedOutHeartbeats.addAndGet(1);
+        timedOutHeartbeats.incrementAndGet();
     }
 
-    public void setTimedOutHeartbeats(long heartbeats) {
-        timedOutHeartbeats.set(heartbeats);
+    public void setTimedOutHeartbeats(long value) {
+        timedOutHeartbeats.set(value);
     }
 
     public long timedOutHeartbeats() {
         return timedOutHeartbeats.get();
     }
 
+    public void incrementOperationsStarted() {
+        operationsStarted.incrementAndGet();
+    }
+
+    public void setOperationsStarted(long value) {
+        operationsStarted.set(value);
+    }
+
+    public long operationsStarted() {
+        return operationsStarted.get();
+    }
+
+    public void incrementOperationsTimedOut() {
+        operationsTimedOut.incrementAndGet();
+    }
+
+    public void setOperationsTimedOut(long value) {
+        operationsTimedOut.set(value);
+    }
+
+    public long operationsTimedOut() {
+        return operationsTimedOut.get();
+    }
+
+    public void incrementNewActiveControllers() {
+        newActiveControllers.incrementAndGet();
+    }
+
+    public void setNewActiveControllers(long value) {
+        newActiveControllers.set(value);
+    }
+
+    public long newActiveControllers() {
+        return newActiveControllers.get();
+    }
+
     @Override
     public void close() {
         registry.ifPresent(r -> Arrays.asList(
@@ -172,7 +243,11 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
             LAST_APPLIED_RECORD_OFFSET,
             LAST_COMMITTED_RECORD_OFFSET,
             LAST_APPLIED_RECORD_TIMESTAMP,
-            LAST_APPLIED_RECORD_LAG_MS
+            LAST_APPLIED_RECORD_LAG_MS,
+            TIMED_OUT_BROKER_HEARTBEAT_COUNT,
+            EVENT_QUEUE_OPERATIONS_STARTED_COUNT,
+            EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT,
+            NEW_ACTIVE_CONTROLLERS_COUNT
         ).forEach(r::removeMetric));
     }
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java 
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index 768fcb2574b..c2c066418b9 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics;
 import org.apache.kafka.image.publisher.MetadataPublisher;
 import org.apache.kafka.image.writer.ImageReWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
@@ -35,14 +36,17 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.fault.FaultHandler;
 import org.apache.kafka.server.fault.FaultHandlerException;
 import org.apache.kafka.snapshot.SnapshotReader;
+import org.apache.kafka.snapshot.Snapshots;
 import org.slf4j.Logger;
 
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -69,28 +73,7 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
         private Time time = Time.SYSTEM;
         private LogContext logContext = null;
         private FaultHandler faultHandler = (m, e) -> new 
FaultHandlerException(m, e);
-        private MetadataLoaderMetrics metrics = new MetadataLoaderMetrics() {
-            private volatile long lastAppliedOffset = -1L;
-
-            @Override
-            public void updateBatchProcessingTime(long elapsedNs) { }
-
-            @Override
-            public void updateBatchSize(int size) { }
-
-            @Override
-            public void updateLastAppliedImageProvenance(MetadataProvenance 
provenance) {
-                this.lastAppliedOffset = provenance.lastContainedOffset();
-            }
-
-            @Override
-            public long lastAppliedOffset() {
-                return lastAppliedOffset;
-            }
-
-            @Override
-            public void close() throws Exception { }
-        };
+        private MetadataLoaderMetrics metrics = null;
         private Supplier<OptionalLong> highWaterMarkAccessor = null;
 
         public Builder setNodeId(int nodeId) {
@@ -113,13 +96,13 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
             return this;
         }
 
-        public Builder setMetadataLoaderMetrics(MetadataLoaderMetrics metrics) 
{
-            this.metrics = metrics;
+        public Builder setHighWaterMarkAccessor(Supplier<OptionalLong> 
highWaterMarkAccessor) {
+            this.highWaterMarkAccessor = highWaterMarkAccessor;
             return this;
         }
 
-        public Builder setHighWaterMarkAccessor(Supplier<OptionalLong> 
highWaterMarkAccessor) {
-            this.highWaterMarkAccessor = highWaterMarkAccessor;
+        public Builder setMetrics(MetadataLoaderMetrics metrics) {
+            this.metrics = metrics;
             return this;
         }
 
@@ -130,6 +113,12 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
             if (highWaterMarkAccessor == null) {
                 throw new RuntimeException("You must set the high water mark 
accessor.");
             }
+            if (metrics == null) {
+                metrics = new MetadataLoaderMetrics(Optional.empty(),
+                    __ -> { },
+                    __ -> { },
+                    new AtomicReference<>(MetadataProvenance.EMPTY));
+            }
             return new MetadataLoader(
                 time,
                 logContext,
@@ -221,6 +210,11 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
                 new ShutdownEvent());
     }
 
+    // VisibleForTesting
+    MetadataLoaderMetrics metrics() {
+        return metrics;
+    }
+
     private boolean stillNeedToCatchUp(String where, long offset) {
         if (!catchingUp) {
             log.trace("{}: we are not in the initial catching up state.", 
where);
@@ -349,6 +343,9 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
                     }
                 }
                 metrics.updateLastAppliedImageProvenance(image.provenance());
+                if (delta.featuresDelta() != null) {
+                    
metrics.setCurrentMetadataVersion(image.features().metadataVersion());
+                }
                 if (uninitializedPublishers.isEmpty()) {
                     scheduleInitializeNewPublishers(0);
                 }
@@ -406,7 +403,7 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
         MetadataProvenance provenance =
                 new MetadataProvenance(lastOffset, lastEpoch, 
lastContainedLogTimeMs);
         long elapsedNs = time.nanoseconds() - startNs;
-        metrics.updateBatchProcessingTime(elapsedNs);
+        metrics.updateBatchProcessingTimeNs(elapsedNs);
         return new LogDeltaManifest(provenance,
                 currentLeaderAndEpoch,
                 numBatches,
@@ -418,24 +415,30 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
     public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> 
reader) {
         eventQueue.append(() -> {
             try {
+                long numLoaded = metrics.incrementHandleLoadSnapshotCount();
+                String snapshotName = 
Snapshots.filenameFromSnapshotId(reader.snapshotId());
+                log.info("handleLoadSnapshot({}): incrementing 
HandleLoadSnapshotCount to {}.",
+                    snapshotName, numLoaded);
                 MetadataDelta delta = new MetadataDelta.Builder().
                         setImage(image).
                         build();
                 SnapshotManifest manifest = loadSnapshot(delta, reader);
-                log.info("handleLoadSnapshot: generated a metadata delta from 
a snapshot at offset {} " +
-                        "in {} us.", 
manifest.provenance().lastContainedOffset(),
+                log.info("handleLoadSnapshot({}): generated a metadata delta 
between offset {} " +
+                        "and this snapshot in {} us.", snapshotName,
+                        image.provenance().lastContainedOffset(),
                         NANOSECONDS.toMicros(manifest.elapsedNs()));
                 try {
                     image = delta.apply(manifest.provenance());
                 } catch (Throwable e) {
                     faultHandler.handleFault("Error generating new metadata 
image from " +
-                            "snapshot at offset " + 
reader.lastContainedLogOffset(), e);
+                            "snapshot " + snapshotName, e);
                     return;
                 }
                 if (stillNeedToCatchUp("handleLoadSnapshot", 
manifest.provenance().lastContainedOffset())) {
                     return;
                 }
-                log.info("handleLoadSnapshot: publishing new snapshot image 
with provenance {}.", image.provenance());
+                log.info("handleLoadSnapshot({}): publishing new snapshot 
image to {} publisher(s).",
+                        snapshotName, publishers.size());
                 for (MetadataPublisher publisher : publishers.values()) {
                     try {
                         publisher.onMetadataUpdate(delta, image, manifest);
@@ -446,6 +449,7 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
                     }
                 }
                 metrics.updateLastAppliedImageProvenance(image.provenance());
+                
metrics.setCurrentMetadataVersion(image.features().metadataVersion());
                 if (uninitializedPublishers.isEmpty()) {
                     scheduleInitializeNewPublishers(0);
                 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoaderMetrics.java
 
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoaderMetrics.java
deleted file mode 100644
index 654bc9dd505..00000000000
--- 
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoaderMetrics.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.image.loader;
-
-import org.apache.kafka.image.MetadataProvenance;
-
-
-/**
- * An interface for the metadata loader metrics.
- */
-public interface MetadataLoaderMetrics extends AutoCloseable {
-    /**
-     * Update the batch processing time histogram.
-     */
-    void updateBatchProcessingTime(long elapsedNs);
-
-    /**
-     * Update the batch size histogram.
-     */
-    void updateBatchSize(int size);
-
-    /**
-     * Set the provenance of the last image which has been processed by all 
publishers.
-     */
-    void updateLastAppliedImageProvenance(MetadataProvenance provenance);
-
-    /**
-     * Retrieve the last offset which has been processed by all publishers.
-     */
-    long lastAppliedOffset();
-}
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java
 
b/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java
new file mode 100644
index 00000000000..351ac4fc5c3
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+/**
+ * These are the metrics which are managed by the MetadataLoader class.
+ */
+public final class MetadataLoaderMetrics implements AutoCloseable {
+    private final static MetricName CURRENT_METADATA_VERSION = getMetricName(
+        "MetadataLoader", "CurrentMetadataVersion");
+    private final static MetricName HANDLE_LOAD_SNAPSHOT_COUNT = getMetricName(
+        "MetadataLoader", "HandleLoadSnapshotCount");
+
+    private final Optional<MetricsRegistry> registry;
+    private final AtomicReference<MetadataVersion> currentMetadataVersion =
+            new AtomicReference<>(MetadataVersion.MINIMUM_KRAFT_VERSION);
+    private final AtomicLong handleLoadSnapshotCount = new AtomicLong(0);
+    private final Consumer<Long> batchProcessingTimeNsUpdater;
+    private final Consumer<Integer> batchSizesUpdater;
+    private final AtomicReference<MetadataProvenance> lastAppliedProvenance;
+
+    /**
+     * Create a new LoaderMetrics object.
+     *
+     * @param registry                      The metrics registry, or 
Optional.empty if this is a
+     *                                      test and we don't have one.
+     * @param batchProcessingTimeNsUpdater  Updates the batch processing time 
histogram.
+     * @param batchSizesUpdater             Updates the batch sizes histogram.
+     */
+    public MetadataLoaderMetrics(
+        Optional<MetricsRegistry> registry,
+        Consumer<Long> batchProcessingTimeNsUpdater,
+        Consumer<Integer> batchSizesUpdater,
+        AtomicReference<MetadataProvenance> lastAppliedProvenance
+    ) {
+        this.registry = registry;
+        this.batchProcessingTimeNsUpdater = batchProcessingTimeNsUpdater;
+        this.batchSizesUpdater = batchSizesUpdater;
+        this.lastAppliedProvenance = lastAppliedProvenance;
+        registry.ifPresent(r -> r.newGauge(CURRENT_METADATA_VERSION, new 
Gauge<Integer>() {
+            @Override
+            public Integer value() {
+                return 
Integer.valueOf(currentMetadataVersion().featureLevel());
+            }
+        }));
+        registry.ifPresent(r -> r.newGauge(HANDLE_LOAD_SNAPSHOT_COUNT, new 
Gauge<Long>() {
+            @Override
+            public Long value() {
+                return handleLoadSnapshotCount();
+            }
+        }));
+    }
+
+    /**
+     * Update the batch processing time histogram.
+     */
+    public void updateBatchProcessingTimeNs(long elapsedNs) {
+        batchProcessingTimeNsUpdater.accept(elapsedNs);
+    }
+
+    /**
+     * Update the batch size histogram.
+     */
+    public void updateBatchSize(int size) {
+        batchSizesUpdater.accept(size);
+    }
+
+    /**
+     * Set the provenance of the last image which has been processed by all 
publishers.
+     */
+    public void updateLastAppliedImageProvenance(MetadataProvenance 
lastAppliedProvenance) {
+        this.lastAppliedProvenance.set(lastAppliedProvenance);
+    }
+
+    /**
+     * Retrieve the last offset which has been processed by all publishers.
+     */
+    public long lastAppliedOffset() {
+        return this.lastAppliedProvenance.get().lastContainedOffset();
+    }
+
+    public void setCurrentMetadataVersion(MetadataVersion metadataVersion) {
+        this.currentMetadataVersion.set(metadataVersion);
+    }
+
+    public MetadataVersion currentMetadataVersion() {
+        return this.currentMetadataVersion.get();
+    }
+
+    public long incrementHandleLoadSnapshotCount() {
+        return this.handleLoadSnapshotCount.incrementAndGet();
+    }
+
+    public long handleLoadSnapshotCount() {
+        return this.handleLoadSnapshotCount.get();
+    }
+
+    @Override
+    public void close() {
+        registry.ifPresent(r -> Arrays.asList(
+            CURRENT_METADATA_VERSION,
+            HANDLE_LOAD_SNAPSHOT_COUNT
+        ).forEach(r::removeMetric));
+    }
+
+    private static MetricName getMetricName(String type, String name) {
+        return KafkaYammerMetrics.getMetricName("kafka.server", type, name);
+    }
+}
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetrics.java
 
b/metadata/src/main/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetrics.java
new file mode 100644
index 00000000000..0a2cd308463
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetrics.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * These are the metrics which are managed by the SnapshotEmitter class.
+ */
+public final class SnapshotEmitterMetrics implements AutoCloseable {
+    private final static MetricName LATEST_SNAPSHOT_GENERATED_BYTES = 
getMetricName(
+        "SnapshotEmitter", "LatestSnapshotGeneratedBytes");
+    private final static MetricName LATEST_SNAPSHOT_GENERATED_AGE_MS = 
getMetricName(
+        "SnapshotEmitter", "LatestSnapshotGeneratedAgeMs");
+
+    private final Optional<MetricsRegistry> registry;
+    private final Time time;
+    private final AtomicLong latestSnapshotGeneratedBytes;
+    private final AtomicLong latestSnapshotGeneratedTimeMs;
+
+    /**
+     * Create a new LoaderMetrics object.
+     *
+     * @param registry  The metrics registry, or Optional.empty if this is a 
test and we don't have one.
+     */
+    public SnapshotEmitterMetrics(
+        Optional<MetricsRegistry> registry,
+        Time time,
+        long initialLatestSnapshotGeneratedBytes
+    ) {
+        this.registry = registry;
+        this.time = time;
+        this.latestSnapshotGeneratedBytes = new 
AtomicLong(initialLatestSnapshotGeneratedBytes);
+        this.latestSnapshotGeneratedTimeMs = new AtomicLong(monoTimeInMs());
+        registry.ifPresent(r -> r.newGauge(LATEST_SNAPSHOT_GENERATED_BYTES, 
new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return latestSnapshotGeneratedBytes();
+            }
+        }));
+        registry.ifPresent(r -> r.newGauge(LATEST_SNAPSHOT_GENERATED_AGE_MS, 
new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return latestSnapshotGeneratedAgeMs();
+            }
+        }));
+    }
+
+    long monoTimeInMs() {
+        return TimeUnit.NANOSECONDS.toMillis(time.nanoseconds());
+    }
+
+    public void setLatestSnapshotGeneratedBytes(long value) {
+        this.latestSnapshotGeneratedBytes.set(value);
+    }
+
+    public long latestSnapshotGeneratedBytes() {
+        return this.latestSnapshotGeneratedBytes.get();
+    }
+
+    public void setLatestSnapshotGeneratedTimeMs(long timeMs) {
+        this.latestSnapshotGeneratedTimeMs.set(timeMs);
+    }
+
+    public long latestSnapshotGeneratedTimeMs() {
+        return this.latestSnapshotGeneratedTimeMs.get();
+    }
+
+    public long latestSnapshotGeneratedAgeMs() {
+        return time.milliseconds() - this.latestSnapshotGeneratedTimeMs.get();
+    }
+
+    @Override
+    public void close() {
+        registry.ifPresent(r -> Arrays.asList(
+            LATEST_SNAPSHOT_GENERATED_BYTES,
+            LATEST_SNAPSHOT_GENERATED_AGE_MS
+        ).forEach(r::removeMetric));
+    }
+
+    private static MetricName getMetricName(String type, String name) {
+        return KafkaYammerMetrics.getMetricName("kafka.server", type, name);
+    }
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
new file mode 100644
index 00000000000..83758c0fff1
--- /dev/null
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData.Listener;
+import 
org.apache.kafka.common.message.BrokerRegistrationRequestData.ListenerCollection;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+/**
+ * Utility functions for use in QuorumController integration tests.
+ */
+class QuorumControllerIntegrationTestUtils {
+    private final static Logger log = 
LoggerFactory.getLogger(QuorumControllerIntegrationTestUtils.class);
+
+    BrokerRegistrationRequestData.FeatureCollection brokerFeatures() {
+        return brokerFeatures(MetadataVersion.MINIMUM_KRAFT_VERSION, 
MetadataVersion.latest());
+    }
+
+    /**
+     * Create a broker features collection for use in a registration request. 
We only set MV. here.
+     *
+     * @param minVersion    The minimum supported MV.
+     * @param maxVersion    The maximum supported MV.
+     */
+    static BrokerRegistrationRequestData.FeatureCollection brokerFeatures(
+        MetadataVersion minVersion,
+        MetadataVersion maxVersion
+    ) {
+        BrokerRegistrationRequestData.FeatureCollection features = new 
BrokerRegistrationRequestData.FeatureCollection();
+        features.add(new BrokerRegistrationRequestData.Feature()
+                         .setName(MetadataVersion.FEATURE_NAME)
+                         .setMinSupportedVersion(minVersion.featureLevel())
+                         .setMaxSupportedVersion(maxVersion.featureLevel()));
+        return features;
+    }
+
+    /**
+     * Register the given number of brokers.
+     *
+     * @param controller    The active controller.
+     * @param numBrokers    The number of brokers to register. We will start 
at 0 and increment.
+     *
+     * @return              A map from broker IDs to broker epochs.
+     */
+    static Map<Integer, Long> registerBrokersAndUnfence(
+        QuorumController controller,
+        int numBrokers
+    ) throws Exception {
+        Map<Integer, Long> brokerEpochs = new HashMap<>();
+        for (int brokerId = 0; brokerId < numBrokers; brokerId++) {
+            BrokerRegistrationReply reply = 
controller.registerBroker(ANONYMOUS_CONTEXT,
+                new BrokerRegistrationRequestData()
+                    .setBrokerId(brokerId)
+                    .setRack(null)
+                    .setClusterId(controller.clusterId())
+                    .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_6_IV0))
+                    .setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" 
+ brokerId))
+                    .setListeners(new ListenerCollection(
+                        Arrays.asList(
+                            new Listener()
+                                .setName("PLAINTEXT")
+                                .setHost("localhost")
+                                .setPort(9092 + brokerId)
+                            ).iterator()
+                        )
+                    )
+            ).get();
+            brokerEpochs.put(brokerId, reply.epoch());
+
+            // Send heartbeat to unfence
+            controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT,
+                new BrokerHeartbeatRequestData()
+                    .setWantFence(false)
+                    .setBrokerEpoch(brokerEpochs.get(brokerId))
+                    .setBrokerId(brokerId)
+                    .setCurrentMetadataOffset(100000L)
+            ).get();
+        }
+
+        return brokerEpochs;
+    }
+
+    /**
+     * Send broker heartbeats for the provided brokers.
+     *
+     * @param controller    The active controller.
+     * @param brokers       The broker IDs to send heartbeats for.
+     * @param brokerEpochs  A map from broker ID to broker epoch.
+     */
+    static void sendBrokerHeartbeat(
+        QuorumController controller,
+        List<Integer> brokers,
+        Map<Integer, Long> brokerEpochs
+    ) throws Exception {
+        if (brokers.isEmpty()) {
+            return;
+        }
+        for (Integer brokerId : brokers) {
+            BrokerHeartbeatReply reply = 
controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT,
+                new BrokerHeartbeatRequestData()
+                    .setWantFence(false)
+                    .setBrokerEpoch(brokerEpochs.get(brokerId))
+                    .setBrokerId(brokerId)
+                    .setCurrentMetadataOffset(100000)
+            ).get();
+            assertEquals(new BrokerHeartbeatReply(true, false, false, false), 
reply);
+        }
+    }
+
+    /**
+     * Create some topics directly on the controller.
+     *
+     * @param controller            The active controller.
+     * @param prefix                The prefix to use for topic names.
+     * @param numTopics             The number of topics to create.
+     * @param replicationFactor     The replication factor to use.
+     */
+    static void createTopics(
+        QuorumController controller,
+        String prefix,
+        int numTopics,
+        int replicationFactor
+    ) throws Exception {
+        HashSet<String> describable = new HashSet<>();
+        for (int i = 0; i < numTopics; i++) {
+            describable.add(prefix + i);
+        }
+        CreateTopicsRequestData request = new CreateTopicsRequestData();
+        for (int i = 0; i < numTopics; i++) {
+            request.topics().add(
+                new CreatableTopic().
+                    setName(prefix + i).
+                    setNumPartitions(-1).
+                    setReplicationFactor((short) replicationFactor));
+        }
+        CreateTopicsResponseData response =
+            controller.createTopics(ANONYMOUS_CONTEXT, request, 
describable).get();
+        for (int i = 0; i < numTopics; i++) {
+            CreatableTopicResult result = response.topics().find(prefix + i);
+            assertEquals((short) 0, result.errorCode());
+        }
+    }
+
+    /**
+     * Add an event to the controller event queue that will pause it 
temporarily.
+     *
+     * @param controller    The controller.
+     * @return              The latch that can be used to unpause the 
controller.
+     */
+    static CountDownLatch pause(QuorumController controller) {
+        final CountDownLatch latch = new CountDownLatch(1);
+        controller.appendControlEvent("pause", () -> {
+            try {
+                latch.await();
+            } catch (InterruptedException e) {
+                log.info("Interrupted while waiting for unpause.", e);
+            }
+        });
+        return latch;
+    }
+
+    /**
+     * Force the current controller to renounce.
+     *
+     * @param controller    The controller.
+     */
+    static void forceRenounce(QuorumController controller) throws Exception {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        controller.appendControlEvent("forceRenounce", () -> {
+            controller.renounce();
+            future.complete(null);
+        });
+        future.get();
+    }
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
new file mode 100644
index 00000000000..5cd6fe4b1b9
--- /dev/null
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.RequestHeaderData;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metalog.LocalLogManagerTestEnv;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.forceRenounce;
+import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.pause;
+import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(value = 40)
+public class QuorumControllerMetricsIntegrationTest {
+    private final static Logger log = 
LoggerFactory.getLogger(QuorumControllerMetricsIntegrationTest.class);
+
+    static class MockControllerMetrics extends QuorumControllerMetrics {
+        final AtomicBoolean closed = new AtomicBoolean(false);
+
+        MockControllerMetrics() {
+            super(Optional.empty(), Time.SYSTEM);
+        }
+
+        @Override
+        public void close() {
+            super.close();
+            closed.set(true);
+        }
+    }
+
+    /**
+     * Test that closing the QuorumController closes the metrics object.
+     */
+    @Test
+    public void testClosingQuorumControllerClosesMetrics() throws Throwable {
+        MockControllerMetrics metrics = new MockControllerMetrics();
+        try (
+            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).
+                build();
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setMetrics(metrics);
+                }).
+                build()
+        ) {
+            assertEquals(1, 
controlEnv.activeController().controllerMetrics().newActiveControllers());
+        }
+        assertTrue(metrics.closed.get(), "metrics were not closed");
+    }
+
+    /**
+     * Test that failing over to a new controller increments 
NewActiveControllersCount on both the
+     * active and inactive controllers.
+     */
+    @Test
+    public void testFailingOverIncrementsNewActiveControllerCount() throws 
Throwable {
+        try (
+            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(3).
+                build();
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+                build()
+        ) {
+            controlEnv.activeController(); // wait for a controller to become 
active.
+            TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                for (QuorumController controller : controlEnv.controllers()) {
+                    assertEquals(1, 
controller.controllerMetrics().newActiveControllers());
+                }
+            });
+            forceRenounce(controlEnv.activeController());
+            TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                for (QuorumController controller : controlEnv.controllers()) {
+                    assertEquals(2, 
controller.controllerMetrics().newActiveControllers());
+                }
+            });
+        }
+    }
+
+    /**
+     * Test the heartbeat and general operation timeout metrics.
+     * These are incremented on the active controller only.
+     */
+    @Test
+    public void testTimeoutMetrics() throws Throwable {
+        try (
+            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(3).
+                build();
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+                build()
+        ) {
+            QuorumController active = controlEnv.activeController();
+            Map<Integer, Long> brokerEpochs = 
registerBrokersAndUnfence(active, 3);
+            assertEquals(0L, active.controllerMetrics().timedOutHeartbeats());
+            assertEquals(0L, active.controllerMetrics().operationsTimedOut());
+
+            // We pause the controller so that the heartbeat event will 
definitely be expired
+            // rather than processed.
+            CountDownLatch latch = pause(active);
+            ControllerRequestContext expiredTimeoutContext = new 
ControllerRequestContext(
+                new RequestHeaderData(),
+                KafkaPrincipal.ANONYMOUS,
+                OptionalLong.of(active.time().nanoseconds()));
+            CompletableFuture<BrokerHeartbeatReply> replyFuture =
+                active.processBrokerHeartbeat(expiredTimeoutContext,
+                    new BrokerHeartbeatRequestData()
+                        .setWantFence(false)
+                        .setBrokerEpoch(brokerEpochs.get(0))
+                        .setBrokerId(0)
+                        .setCurrentMetadataOffset(100000));
+            latch.countDown(); // Unpause the controller.
+            assertEquals(TimeoutException.class,
+                assertThrows(ExecutionException.class, () -> 
replyFuture.get()).
+                    getCause().getClass());
+            assertEquals(1L, active.controllerMetrics().timedOutHeartbeats());
+            assertEquals(1L, active.controllerMetrics().operationsTimedOut());
+
+            // Inject a new timed out operation.
+            CountDownLatch latch2 = pause(active);
+            active.appendControlEventWithDeadline("fakeTimeoutOperation",
+                () -> { },
+                active.time().nanoseconds());
+            latch2.countDown();
+            TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                // The fake timeout increments operationsTimedOut but not 
timedOutHeartbeats.
+                assertEquals(1L, 
active.controllerMetrics().timedOutHeartbeats());
+                assertEquals(2L, 
active.controllerMetrics().operationsTimedOut());
+            });
+            for (QuorumController controller : controlEnv.controllers()) {
+                // Inactive controllers don't set these metrics.
+                if (!controller.isActive()) {
+                    assertEquals(false, 
controller.controllerMetrics().active());
+                    assertEquals(0L, 
controller.controllerMetrics().timedOutHeartbeats());
+                    assertEquals(0L, 
controller.controllerMetrics().operationsTimedOut());
+                }
+            }
+        }
+    }
+
+    /**
+     * Test the event queue operations started metric.
+     */
+    @Test
+    public void testEventQueueOperationsStartedMetric() throws Throwable {
+        try (
+            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(3).
+                                                build();
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+                                                     build()
+        ) {
+            QuorumController active = controlEnv.activeController();
+            Map<Integer, Long> brokerEpochs = 
registerBrokersAndUnfence(active, 3);
+
+            // Test that a new operation increments operationsStarted. We 
retry this if needed
+            // to handle the case where another operation is performed in 
between loading
+            // expectedOperationsStarted and running the new control event.
+            TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                long expectedOperationsStarted = 
active.controllerMetrics().operationsStarted() + 1;
+                CompletableFuture<Long> actualOperationsStarted = new 
CompletableFuture<>();
+                active.appendControlEvent("checkOperationsStarted", () -> {
+                    
actualOperationsStarted.complete(active.controllerMetrics().operationsStarted());
+                });
+                assertEquals(expectedOperationsStarted, 
actualOperationsStarted.get());
+            });
+        }
+    }
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index d55ff5f67c2..261d3c91b28 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -36,7 +36,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -88,7 +87,6 @@ import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import 
org.apache.kafka.controller.QuorumController.ConfigResourceExistenceChecker;
-import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
@@ -124,6 +122,10 @@ import static 
org.apache.kafka.controller.ConfigurationControlManagerTest.BROKER
 import static 
org.apache.kafka.controller.ConfigurationControlManagerTest.SCHEMA;
 import static 
org.apache.kafka.controller.ConfigurationControlManagerTest.entry;
 import static 
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT;
+import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.brokerFeatures;
+import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.pause;
+import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence;
+import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.sendBrokerHeartbeat;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -140,39 +142,6 @@ public class QuorumControllerTest {
     static final BootstrapMetadata SIMPLE_BOOTSTRAP = BootstrapMetadata.
             fromVersion(MetadataVersion.IBP_3_3_IV3, "test-provided 
bootstrap");
 
-    static class MockControllerMetrics extends QuorumControllerMetrics {
-        final AtomicBoolean closed = new AtomicBoolean(false);
-
-        MockControllerMetrics() {
-            super(Optional.empty(), Time.SYSTEM);
-        }
-
-        @Override
-        public void close() {
-            super.close();
-            closed.set(true);
-        }
-    }
-
-    /**
-     * Test creating a new QuorumController and closing it.
-     */
-    @Test
-    public void testCreateAndClose() throws Throwable {
-        MockControllerMetrics metrics = new MockControllerMetrics();
-        try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).
-                build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
-                setControllerBuilderInitializer(controllerBuilder -> {
-                    controllerBuilder.setMetrics(metrics);
-                }).
-                build()
-        ) {
-        }
-        assertTrue(metrics.closed.get(), "metrics were not closed");
-    }
-
     /**
      * Test setting some configuration values and reading them back.
      */
@@ -610,22 +579,6 @@ public class QuorumControllerTest {
         }
     }
 
-    private BrokerRegistrationRequestData.FeatureCollection brokerFeatures() {
-        return brokerFeatures(MetadataVersion.MINIMUM_KRAFT_VERSION, 
MetadataVersion.latest());
-    }
-
-    private BrokerRegistrationRequestData.FeatureCollection brokerFeatures(
-        MetadataVersion minVersion,
-        MetadataVersion maxVersion
-    ) {
-        BrokerRegistrationRequestData.FeatureCollection features = new 
BrokerRegistrationRequestData.FeatureCollection();
-        features.add(new BrokerRegistrationRequestData.Feature()
-            .setName(MetadataVersion.FEATURE_NAME)
-            .setMinSupportedVersion(minVersion.featureLevel())
-            .setMaxSupportedVersion(maxVersion.featureLevel()));
-        return features;
-    }
-
     private RegisterBrokerRecord.BrokerFeatureCollection registrationFeatures(
         MetadataVersion minVersion,
         MetadataVersion maxVersion
@@ -782,7 +735,7 @@ public class QuorumControllerTest {
                 build()
         ) {
             QuorumController controller = controlEnv.activeController();
-            CountDownLatch countDownLatch = controller.pause();
+            CountDownLatch countDownLatch = pause(controller);
             long now = controller.time().nanoseconds();
             ControllerRequestContext context0 = new ControllerRequestContext(
                 new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, 
OptionalLong.of(now));
@@ -846,7 +799,7 @@ public class QuorumControllerTest {
                 build()
         ) {
             QuorumController controller = controlEnv.activeController();
-            CountDownLatch countDownLatch = controller.pause();
+            CountDownLatch countDownLatch = pause(controller);
             CompletableFuture<CreateTopicsResponseData> createFuture =
                 controller.createTopics(ANONYMOUS_CONTEXT, new 
CreateTopicsRequestData().
                     setTimeoutMs(120000), Collections.emptySet());
@@ -891,7 +844,7 @@ public class QuorumControllerTest {
         ) {
             QuorumController controller = controlEnv.activeController();
 
-            Map<Integer, Long> brokerEpochs = registerBrokers(controller, 
numBrokers);
+            Map<Integer, Long> brokerEpochs = 
registerBrokersAndUnfence(controller, numBrokers);
 
             // Create a lot of partitions
             List<CreatableReplicaAssignment> partitions = IntStream
@@ -980,62 +933,6 @@ public class QuorumControllerTest {
         }
     }
 
-    private Map<Integer, Long> registerBrokers(QuorumController controller, 
int numBrokers) throws Exception {
-        Map<Integer, Long> brokerEpochs = new HashMap<>();
-        for (int brokerId = 0; brokerId < numBrokers; brokerId++) {
-            BrokerRegistrationReply reply = 
controller.registerBroker(ANONYMOUS_CONTEXT,
-                new BrokerRegistrationRequestData()
-                    .setBrokerId(brokerId)
-                    .setRack(null)
-                    .setClusterId(controller.clusterId())
-                    .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_6_IV0))
-                    .setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" 
+ brokerId))
-                    .setListeners(
-                        new ListenerCollection(
-                            Arrays.asList(
-                                new Listener()
-                                .setName("PLAINTEXT")
-                                .setHost("localhost")
-                                .setPort(9092 + brokerId)
-                                ).iterator()
-                            )
-                        )
-                    ).get();
-            brokerEpochs.put(brokerId, reply.epoch());
-
-            // Send heartbeat to unfence
-            controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT,
-                new BrokerHeartbeatRequestData()
-                    .setWantFence(false)
-                    .setBrokerEpoch(brokerEpochs.get(brokerId))
-                    .setBrokerId(brokerId)
-                    .setCurrentMetadataOffset(100000L)
-            ).get();
-        }
-
-        return brokerEpochs;
-    }
-
-    private void sendBrokerHeartbeat(
-        QuorumController controller,
-        List<Integer> brokers,
-        Map<Integer, Long> brokerEpochs
-    ) throws Exception {
-        if (brokers.isEmpty()) {
-            return;
-        }
-        for (Integer brokerId : brokers) {
-            BrokerHeartbeatReply reply = 
controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT,
-                new BrokerHeartbeatRequestData()
-                    .setWantFence(false)
-                    .setBrokerEpoch(brokerEpochs.get(brokerId))
-                    .setBrokerId(brokerId)
-                    .setCurrentMetadataOffset(100000)
-            ).get();
-            assertEquals(new BrokerHeartbeatReply(true, false, false, false), 
reply);
-        }
-    }
-
     @Test
     public void testConfigResourceExistenceChecker() throws Throwable {
         try (
@@ -1048,7 +945,7 @@ public class QuorumControllerTest {
                 build()
         ) {
             QuorumController active = controlEnv.activeController();
-            registerBrokers(active, 5);
+            registerBrokersAndUnfence(active, 5);
             active.createTopics(ANONYMOUS_CONTEXT, new 
CreateTopicsRequestData().
                 setTopics(new CreatableTopicCollection(Collections.singleton(
                     new CreatableTopic().setName("foo").
@@ -1508,7 +1405,8 @@ public class QuorumControllerTest {
 
         featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, 
Optional.empty(), true);
         assertEquals(MetadataVersion.IBP_3_4_IV0, 
featureControl.metadataVersion());
-        assertEquals(ZkMigrationState.PRE_MIGRATION, 
featureControl.zkMigrationState());    }
+        assertEquals(ZkMigrationState.PRE_MIGRATION, 
featureControl.zkMigrationState());
+    }
 
     @Test
     public void testMigrationsEnabledForOldBootstrapMetadataVersion() throws 
Exception {
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
index 619100f1ed8..5bba7d0658b 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
@@ -40,14 +40,17 @@ public class QuorumControllerMetricsTest {
             try (QuorumControllerMetrics metrics = new 
QuorumControllerMetrics(Optional.of(registry), time)) {
                 ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, 
"kafka.controller",
                     new HashSet<>(Arrays.asList(
-                        
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
                         
"kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
+                        
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
                         
"kafka.controller:type=KafkaController,name=ActiveControllerCount",
-                        
"kafka.controller:type=KafkaController,name=LastCommittedRecordOffset",
+                        
"kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount",
+                        
"kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount",
                         
"kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs",
                         
"kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
                         
"kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp",
-                        
"kafka.controller:type=KafkaController,name=LastAppliedRecordOffset"
+                        
"kafka.controller:type=KafkaController,name=LastCommittedRecordOffset",
+                        
"kafka.controller:type=KafkaController,name=NewActiveControllersCount",
+                        
"kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount"
                     )));
             }
             ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, 
"kafka.controller",
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java 
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index 1eefa15a8c3..372700b1fb6 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.fault.MockFaultHandler;
 import org.apache.kafka.snapshot.SnapshotReader;
+import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -55,6 +56,7 @@ import java.util.stream.Collectors;
 import static java.util.Arrays.asList;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV1;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_5_IV0;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -251,6 +253,13 @@ public class MetadataLoaderTest {
                     )
                 );
                 loader.handleLoadSnapshot(snapshotReader);
+                TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                    assertEquals(1L, 
loader.metrics().handleLoadSnapshotCount());
+                });
+            } else {
+                TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                    assertEquals(0L, 
loader.metrics().handleLoadSnapshotCount());
+                });
             }
             loader.waitForAllEventsToBeHandled();
             if (sameObject) {
@@ -328,6 +337,8 @@ public class MetadataLoaderTest {
             assertEquals(300L, loader.lastAppliedOffset());
             assertEquals(new SnapshotManifest(new MetadataProvenance(300, 100, 
4000), 3000000L),
                 publishers.get(0).latestSnapshotManifest);
+            assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION,
+                loader.metrics().currentMetadataVersion());
         }
         assertTrue(publishers.get(0).closed);
         assertEquals(MetadataVersion.IBP_3_0_IV1,
@@ -587,14 +598,27 @@ public class MetadataLoaderTest {
 
             loadTestSnapshot(loader, 200);
             assertEquals(200L, loader.lastAppliedOffset());
+            assertEquals(IBP_3_3_IV1.featureLevel(),
+                loader.metrics().currentMetadataVersion().featureLevel());
             assertFalse(publishers.get(0).latestDelta.image().isEmpty());
 
             loadTestSnapshot2(loader, 400);
             assertEquals(400L, loader.lastAppliedOffset());
+            assertEquals(IBP_3_3_IV2.featureLevel(),
+                loader.metrics().currentMetadataVersion().featureLevel());
 
             // Make sure the topic in the initial snapshot was overwritten by 
loading the new snapshot.
             
assertFalse(publishers.get(0).latestImage.topics().topicsByName().containsKey("foo"));
             
assertTrue(publishers.get(0).latestImage.topics().topicsByName().containsKey("bar"));
+
+            loader.handleCommit(new MockBatchReader(500, asList(
+                MockBatchReader.newBatch(500, 100, asList(
+                    new ApiMessageAndVersion(new FeatureLevelRecord().
+                        setName(MetadataVersion.FEATURE_NAME).
+                        setFeatureLevel(IBP_3_5_IV0.featureLevel()), (short) 
0))))));
+            loader.waitForAllEventsToBeHandled();
+            assertEquals(IBP_3_5_IV0.featureLevel(),
+                loader.metrics().currentMetadataVersion().featureLevel());
         }
         faultHandler.maybeRethrowFirstException();
     }
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
 
b/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
new file mode 100644
index 00000000000..c8d33754106
--- /dev/null
+++ 
b/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.kafka.controller.metrics.ControllerMetricsTestUtils;
+import org.apache.kafka.image.MetadataProvenance;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+public class MetadataLoaderMetricsTest {
+    private static class FakeMetadataLoaderMetrics implements AutoCloseable {
+        final AtomicLong batchProcessingTimeNs = new AtomicLong(0L);
+        final AtomicInteger batchSize = new AtomicInteger(0);
+        final AtomicReference<MetadataProvenance> provenance =
+            new AtomicReference<>(MetadataProvenance.EMPTY);
+        final MetadataLoaderMetrics metrics;
+
+        FakeMetadataLoaderMetrics() {
+            this(Optional.empty());
+        }
+
+        FakeMetadataLoaderMetrics(MetricsRegistry registry) {
+            this(Optional.of(registry));
+        }
+
+        FakeMetadataLoaderMetrics(Optional<MetricsRegistry> registry) {
+            metrics = new MetadataLoaderMetrics(
+                registry,
+                n -> batchProcessingTimeNs.set(n),
+                n -> batchSize.set(n),
+                provenance);
+        }
+
+        @Override
+        public void close() {
+            metrics.close();
+        }
+    }
+
+    @Test
+    public void testMetricNames() {
+        MetricsRegistry registry = new MetricsRegistry();
+        try {
+            try (FakeMetadataLoaderMetrics fakeMetrics = new 
FakeMetadataLoaderMetrics(registry)) {
+                ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, 
"kafka.server",
+                    new HashSet<>(Arrays.asList(
+                        
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
+                        
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount"
+                    )));
+            }
+            ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, 
"kafka.server",
+                    Collections.emptySet());
+        } finally {
+            registry.shutdown();
+        }
+    }
+
+    @Test
+    public void testUpdateBatchProcessingTimeNs() {
+        MetricsRegistry registry = new MetricsRegistry();
+        try (FakeMetadataLoaderMetrics fakeMetrics = new 
FakeMetadataLoaderMetrics(registry)) {
+            fakeMetrics.metrics.updateBatchProcessingTimeNs(123L);
+            assertEquals(123L, fakeMetrics.batchProcessingTimeNs.get());
+        }
+    }
+
+    @Test
+    public void testUpdateBatchSize() {
+        MetricsRegistry registry = new MetricsRegistry();
+        try (FakeMetadataLoaderMetrics fakeMetrics = new 
FakeMetadataLoaderMetrics(registry)) {
+            fakeMetrics.metrics.updateBatchSize(50);
+            assertEquals(50, fakeMetrics.batchSize.get());
+        }
+    }
+
+    @Test
+    public void testUpdateLastAppliedImageProvenance() {
+        MetricsRegistry registry = new MetricsRegistry();
+        try (FakeMetadataLoaderMetrics fakeMetrics = new 
FakeMetadataLoaderMetrics(registry)) {
+            fakeMetrics.metrics.updateLastAppliedImageProvenance(new 
MetadataProvenance(1L, 2, 3L));
+            assertEquals(new MetadataProvenance(1L, 2, 3L), 
fakeMetrics.provenance.get());
+        }
+    }
+
+    @Test
+    public void testManagedMetrics() {
+        MetricsRegistry registry = new MetricsRegistry();
+        try {
+            try (FakeMetadataLoaderMetrics fakeMetrics = new 
FakeMetadataLoaderMetrics(registry)) {
+                fakeMetrics.metrics.setCurrentMetadataVersion(IBP_3_3_IV2);
+                fakeMetrics.metrics.incrementHandleLoadSnapshotCount();
+                fakeMetrics.metrics.incrementHandleLoadSnapshotCount();
+
+                @SuppressWarnings("unchecked")
+                Gauge<Integer> currentMetadataVersion = (Gauge<Integer>) 
registry
+                    .allMetrics()
+                    .get(metricName("MetadataLoader", 
"CurrentMetadataVersion"));
+                assertEquals(IBP_3_3_IV2.featureLevel(),
+                    currentMetadataVersion.value().shortValue());
+
+                @SuppressWarnings("unchecked")
+                Gauge<Long> loadSnapshotCount = (Gauge<Long>) registry
+                    .allMetrics()
+                    .get(metricName("MetadataLoader", 
"HandleLoadSnapshotCount"));
+                assertEquals(2L, loadSnapshotCount.value().longValue());
+            }
+            ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, 
"kafka.server",
+                Collections.emptySet());
+        } finally {
+            registry.shutdown();
+        }
+    }
+
+    private static MetricName metricName(String type, String name) {
+        String mBeanName = String.format("kafka.server:type=%s,name=%s", type, 
name);
+        return new MetricName("kafka.server", type, name, null, mBeanName);
+    }
+}

Reply via email to