This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c7de30f38bf KAFKA-15183: Add more controller, loader, snapshot emitter 
metrics (#14010)
c7de30f38bf is described below

commit c7de30f38bfd6e2d62a0b5c09b5dc9707e58096b
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Mon Jul 24 21:13:58 2023 -0700

    KAFKA-15183: Add more controller, loader, snapshot emitter metrics (#14010)
    
    Implement some of the metrics from KIP-938: Add more metrics for
    measuring KRaft performance.
    
    Add these metrics to QuorumControllerMetrics:
        kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount
        
kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount
        
kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount
        kafka.controller:type=KafkaController,name=NewActiveControllersCount
    
    Create LoaderMetrics with these new metrics:
        kafka.server:type=MetadataLoader,name=CurrentMetadataVersion
        kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount
    
    Create SnapshotEmitterMetrics with these new metrics:
        kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes
        kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs
    
    Reviewers: Ron Dagostino <[email protected]>
---
 README.md                                          |   2 +-
 checkstyle/import-control-metadata.xml             |  24 ++-
 .../src/main/scala/kafka/server/SharedServer.scala |  30 ++-
 .../server/metadata/BrokerServerMetrics.scala      |  11 +-
 .../kafka/server/KRaftClusterTest.scala            |   4 +-
 .../apache/kafka/controller/QuorumController.java  |  66 ++++---
 .../metrics/QuorumControllerMetrics.java           |  72 ++++++-
 .../apache/kafka/image/loader/MetadataLoader.java  |  66 ++++---
 .../kafka/image/loader/MetadataLoaderMetrics.java  |  46 -----
 .../loader/metrics/MetadataLoaderMetrics.java      | 137 +++++++++++++
 .../kafka/image/publisher/SnapshotEmitter.java     |  51 ++++-
 .../publisher/metrics/SnapshotEmitterMetrics.java  | 102 ++++++++++
 .../kafka/image/writer/RaftSnapshotWriter.java     |  11 +-
 .../QuorumControllerIntegrationTestUtils.java      | 213 +++++++++++++++++++++
 .../QuorumControllerMetricsIntegrationTest.java    | 202 +++++++++++++++++++
 .../kafka/controller/QuorumControllerTest.java     | 122 +-----------
 .../metrics/QuorumControllerMetricsTest.java       | 112 +++++++----
 .../org/apache/kafka/image/FakeSnapshotWriter.java |  93 +++++++++
 .../kafka/image/loader/MetadataLoaderTest.java     |  24 +++
 .../loader/metrics/MetadataLoaderMetricsTest.java  | 149 ++++++++++++++
 .../kafka/image/publisher/SnapshotEmitterTest.java |  75 ++------
 .../metrics/SnapshotEmitterMetricsTest.java        | 111 +++++++++++
 .../kafka/image/writer/RaftSnapshotWriterTest.java |  63 +-----
 .../kafka/snapshot/FileRawSnapshotWriter.java      |  14 +-
 .../kafka/snapshot/RecordsSnapshotWriter.java      |   3 +-
 .../org/apache/kafka/snapshot/SnapshotWriter.java  |   4 +-
 .../kafka/raft/KafkaRaftClientSnapshotTest.java    |  22 +--
 .../kafka/snapshot/MockRawSnapshotWriter.java      |   2 +-
 28 files changed, 1404 insertions(+), 427 deletions(-)

diff --git a/README.md b/README.md
index 9b85f083508..49dd49ef659 100644
--- a/README.md
+++ b/README.md
@@ -288,4 +288,4 @@ See [vagrant/README.md](vagrant/README.md).
 Apache Kafka is interested in building the community; we would welcome any 
thoughts or [patches](https://issues.apache.org/jira/browse/KAFKA). You can 
reach us [on the Apache mailing lists](http://kafka.apache.org/contact.html).
 
 To contribute follow the instructions here:
- * https://kafka.apache.org/contributing.html
+ * https://kafka.apache.org/contributing.html 
diff --git a/checkstyle/import-control-metadata.xml 
b/checkstyle/import-control-metadata.xml
index 2cbb5504293..73b74933a94 100644
--- a/checkstyle/import-control-metadata.xml
+++ b/checkstyle/import-control-metadata.xml
@@ -63,7 +63,6 @@
     </subpackage>
 
     <subpackage name="controller">
-        <allow pkg="com.yammer.metrics"/>
         <allow pkg="org.apache.kafka.clients" />
         <allow pkg="org.apache.kafka.clients.admin" />
         <allow pkg="org.apache.kafka.common.acl" />
@@ -73,7 +72,6 @@
         <allow pkg="org.apache.kafka.common.internals" />
         <allow pkg="org.apache.kafka.common.message" />
         <allow pkg="org.apache.kafka.common.metadata" />
-        <allow pkg="org.apache.kafka.common.metrics" />
         <allow pkg="org.apache.kafka.common.network" />
         <allow pkg="org.apache.kafka.common.protocol" />
         <allow pkg="org.apache.kafka.common.quota" />
@@ -93,13 +91,17 @@
         <allow pkg="org.apache.kafka.server.common" />
         <allow pkg="org.apache.kafka.server.config" />
         <allow pkg="org.apache.kafka.server.fault" />
-        <allow pkg="org.apache.kafka.server.metrics" />
         <allow pkg="org.apache.kafka.server.mutable" />
         <allow pkg="org.apache.kafka.server.policy"/>
         <allow pkg="org.apache.kafka.server.util"/>
         <allow pkg="org.apache.kafka.snapshot" />
         <allow pkg="org.apache.kafka.test" />
         <allow pkg="org.apache.kafka.timeline" />
+        <subpackage name="metrics">
+            <allow pkg="com.yammer.metrics"/>
+            <allow pkg="org.apache.kafka.common.metrics" />
+            <allow pkg="org.apache.kafka.server.metrics" />
+        </subpackage>
     </subpackage>
 
     <subpackage name="image">
@@ -122,6 +124,22 @@
         <allow pkg="org.apache.kafka.server.util" />
         <allow pkg="org.apache.kafka.snapshot" />
         <allow pkg="org.apache.kafka.test" />
+        <subpackage name="loader">
+            <subpackage name="metrics">
+                <allow pkg="com.yammer.metrics"/>
+                <allow pkg="org.apache.kafka.common.metrics" />
+                <allow pkg="org.apache.kafka.controller.metrics" />
+                <allow pkg="org.apache.kafka.server.metrics" />
+            </subpackage>
+        </subpackage>
+        <subpackage name="publisher">
+            <subpackage name="metrics">
+                <allow pkg="com.yammer.metrics"/>
+                <allow pkg="org.apache.kafka.common.metrics" />
+                <allow pkg="org.apache.kafka.controller.metrics" />
+                <allow pkg="org.apache.kafka.server.metrics" />
+            </subpackage>
+        </subpackage>
     </subpackage>
 
     <subpackage name="metadata">
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala 
b/core/src/main/scala/kafka/server/SharedServer.scala
index e58b33a8d57..892c528885a 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -25,7 +25,9 @@ import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
 import org.apache.kafka.controller.metrics.ControllerMetadataMetrics
+import org.apache.kafka.image.MetadataProvenance
 import org.apache.kafka.image.loader.MetadataLoader
+import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics
 import org.apache.kafka.image.publisher.{SnapshotEmitter, SnapshotGenerator}
 import org.apache.kafka.metadata.MetadataRecordSerde
 import org.apache.kafka.raft.RaftConfig.AddressSpec
@@ -34,7 +36,7 @@ import org.apache.kafka.server.fault.{FaultHandler, 
LoggingFaultHandler, Process
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 
 import java.util
-import java.util.{Collections, Optional}
+import java.util.Optional
 import java.util.concurrent.{CompletableFuture, TimeUnit}
 import java.util.concurrent.atomic.AtomicReference
 
@@ -106,6 +108,7 @@ class SharedServer(
   val snapshotsDisabledReason = new AtomicReference[String](null)
   @volatile var snapshotEmitter: SnapshotEmitter = _
   @volatile var snapshotGenerator: SnapshotGenerator = _
+  @volatile var metadataLoaderMetrics: MetadataLoaderMetrics = _
 
   def isUsed(): Boolean = synchronized {
     usedByController || usedByBroker
@@ -259,15 +262,24 @@ class SharedServer(
         raftManager = _raftManager
         _raftManager.startup()
 
+        metadataLoaderMetrics = if (brokerMetrics != null) {
+          new 
MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
+            elapsedNs => brokerMetrics.updateBatchProcessingTime(elapsedNs),
+            batchSize => brokerMetrics.updateBatchSize(batchSize),
+            brokerMetrics.lastAppliedImageProvenance)
+        } else {
+          new 
MetadataLoaderMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry()),
+            _ => {},
+            _ => {},
+            new AtomicReference[MetadataProvenance](MetadataProvenance.EMPTY))
+        }
         val loaderBuilder = new MetadataLoader.Builder().
           setNodeId(metaProps.nodeId).
           setTime(time).
           setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
           setFaultHandler(metadataLoaderFaultHandler).
-          setHighWaterMarkAccessor(() => _raftManager.client.highWatermark())
-        if (brokerMetrics != null) {
-          loaderBuilder.setMetadataLoaderMetrics(brokerMetrics)
-        }
+          setHighWaterMarkAccessor(() => _raftManager.client.highWatermark()).
+          setMetrics(metadataLoaderMetrics)
         loader = loaderBuilder.build()
         snapshotEmitter = new SnapshotEmitter.Builder().
           setNodeId(metaProps.nodeId).
@@ -282,15 +294,15 @@ class SharedServer(
           setDisabledReason(snapshotsDisabledReason).
           setThreadNamePrefix(s"kafka-${sharedServerConfig.nodeId}-").
           build()
-        _raftManager.register(loader)
         try {
-          
loader.installPublishers(Collections.singletonList(snapshotGenerator))
+          loader.installPublishers(util.Arrays.asList(snapshotGenerator)).get()
         } catch {
           case t: Throwable => {
             error("Unable to install metadata publishers", t)
             throw new RuntimeException("Unable to install metadata 
publishers.", t)
           }
         }
+        _raftManager.register(loader)
         debug("Completed SharedServer startup.")
         started = true
       } catch {
@@ -326,6 +338,10 @@ class SharedServer(
         CoreUtils.swallow(loader.close(), this)
         loader = null
       }
+      if (metadataLoaderMetrics != null) {
+        CoreUtils.swallow(metadataLoaderMetrics.close(), this)
+        metadataLoaderMetrics = null
+      }
       if (snapshotGenerator != null) {
         CoreUtils.swallow(snapshotGenerator.close(), this)
         snapshotGenerator = null
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
index 212909101f2..ff183324166 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
@@ -23,7 +23,6 @@ import org.apache.kafka.common.metrics.Gauge
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.metrics.MetricConfig
 import org.apache.kafka.image.MetadataProvenance
-import org.apache.kafka.image.loader.MetadataLoaderMetrics
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
 
 import java.util.Collections
@@ -31,7 +30,7 @@ import java.util.concurrent.TimeUnit.NANOSECONDS
 
 final class BrokerServerMetrics private (
   metrics: Metrics
-) extends MetadataLoaderMetrics {
+) extends AutoCloseable {
   import BrokerServerMetrics._
 
   private val batchProcessingTimeHistName = 
KafkaMetricsGroup.explicitMetricName("kafka.server",
@@ -123,15 +122,15 @@ final class BrokerServerMetrics private (
     ).foreach(metrics.removeMetric)
   }
 
-  override def updateBatchProcessingTime(elapsedNs: Long): Unit =
+  def updateBatchProcessingTime(elapsedNs: Long): Unit =
     batchProcessingTimeHist.update(NANOSECONDS.toMicros(elapsedNs))
 
-  override def updateBatchSize(size: Int): Unit = batchSizeHist.update(size)
+  def updateBatchSize(size: Int): Unit = batchSizeHist.update(size)
 
-  override def updateLastAppliedImageProvenance(provenance: 
MetadataProvenance): Unit =
+  def updateLastAppliedImageProvenance(provenance: MetadataProvenance): Unit =
     lastAppliedImageProvenance.set(provenance)
 
-  override def lastAppliedOffset(): Long = 
lastAppliedImageProvenance.get().lastContainedOffset()
+  def lastAppliedOffset(): Long = 
lastAppliedImageProvenance.get().lastContainedOffset()
 
   def lastAppliedTimestamp(): Long = 
lastAppliedImageProvenance.get().lastContainedLogTimeMs()
 }
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 37bd66cd2ae..94e6118d7d4 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -35,7 +35,7 @@ import org.apache.kafka.common.quota.{ClientQuotaAlteration, 
ClientQuotaEntity,
 import org.apache.kafka.common.requests.{ApiError, DescribeClusterRequest, 
DescribeClusterResponse}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.{Cluster, Endpoint, Reconfigurable, 
TopicPartition, TopicPartitionInfo}
-import org.apache.kafka.controller.QuorumController
+import org.apache.kafka.controller.{QuorumController, 
QuorumControllerIntegrationTestUtils}
 import org.apache.kafka.image.ClusterImage
 import org.apache.kafka.metadata.BrokerState
 import org.apache.kafka.server.authorizer._
@@ -1153,7 +1153,7 @@ class KRaftClusterTest {
       val controller = cluster.controllers().values().iterator().next()
       controller.controller.waitForReadyBrokers(3).get()
       TestUtils.retry(60000) {
-        val latch = 
controller.controller.asInstanceOf[QuorumController].pause()
+        val latch = 
QuorumControllerIntegrationTestUtils.pause(controller.controller.asInstanceOf[QuorumController])
         Thread.sleep(1001)
         latch.countDown()
         assertEquals(0, 
controller.sharedServer.controllerServerMetrics.fencedBrokerCount())
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 111937b3fa3..199c679cb35 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -126,7 +126,6 @@ import java.util.OptionalLong;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -461,6 +460,12 @@ public final class QuorumController implements Controller {
     private Throwable handleEventException(String name,
                                            OptionalLong startProcessingTimeNs,
                                            Throwable exception) {
+        if (!startProcessingTimeNs.isPresent() &&
+                ControllerExceptions.isTimeoutException(exception)) {
+            // If the event never started, and the exception is a timeout, 
increment the timed
+            // out metric.
+            controllerMetrics.incrementOperationsTimedOut();
+        }
         Throwable externalException =
                 ControllerExceptions.toExternalException(exception, () -> 
latestController());
         if (!startProcessingTimeNs.isPresent()) {
@@ -492,6 +497,15 @@ public final class QuorumController implements Controller {
         return externalException;
     }
 
+    private long updateEventStartMetricsAndGetTime(OptionalLong 
eventCreatedTimeNs) {
+        long now = time.nanoseconds();
+        controllerMetrics.incrementOperationsStarted();
+        if (eventCreatedTimeNs.isPresent()) {
+            controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - 
eventCreatedTimeNs.getAsLong()));
+        }
+        return now;
+    }
+
     /**
      * A controller event for handling internal state changes, such as Raft 
inputs.
      */
@@ -508,9 +522,8 @@ public final class QuorumController implements Controller {
 
         @Override
         public void run() throws Exception {
-            long now = time.nanoseconds();
-            controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - 
eventCreatedTimeNs));
-            startProcessingTimeNs = OptionalLong.of(now);
+            startProcessingTimeNs = OptionalLong.of(
+                
updateEventStartMetricsAndGetTime(OptionalLong.of(eventCreatedTimeNs)));
             log.debug("Executing {}.", this);
             handler.run();
             handleEventEnd(this.toString(), startProcessingTimeNs.getAsLong());
@@ -527,11 +540,16 @@ public final class QuorumController implements Controller 
{
         }
     }
 
-    private void appendControlEvent(String name, Runnable handler) {
+    void appendControlEvent(String name, Runnable handler) {
         ControllerEvent event = new ControllerEvent(name, handler);
         queue.append(event);
     }
 
+    void appendControlEventWithDeadline(String name, Runnable handler, long 
deadlineNs) {
+        ControllerEvent event = new ControllerEvent(name, handler);
+        queue.appendWithDeadline(deadlineNs, event);
+    }
+
     /**
      * A controller event that reads the committed internal state in order to 
expose it
      * to an API.
@@ -555,9 +573,8 @@ public final class QuorumController implements Controller {
 
         @Override
         public void run() throws Exception {
-            long now = time.nanoseconds();
-            controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - 
eventCreatedTimeNs));
-            startProcessingTimeNs = OptionalLong.of(now);
+            startProcessingTimeNs = OptionalLong.of(
+                
updateEventStartMetricsAndGetTime(OptionalLong.of(eventCreatedTimeNs)));
             T value = handler.get();
             handleEventEnd(this.toString(), startProcessingTimeNs.getAsLong());
             future.complete(value);
@@ -692,12 +709,11 @@ public final class QuorumController implements Controller 
{
 
         @Override
         public void run() throws Exception {
-            long now = time.nanoseconds();
-            if (!flags.contains(DOES_NOT_UPDATE_QUEUE_TIME)) {
-                // We exclude deferred events from the event queue time metric 
to prevent
-                // incorrectly including the deferral time in the queue time.
-                
controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - 
eventCreatedTimeNs));
-            }
+            // Deferred events set the DOES_NOT_UPDATE_QUEUE_TIME flag to 
prevent incorrectly
+            // including their deferral time in the event queue time.
+            startProcessingTimeNs = OptionalLong.of(
+                
updateEventStartMetricsAndGetTime(flags.contains(DOES_NOT_UPDATE_QUEUE_TIME) ?
+                    OptionalLong.empty() : 
OptionalLong.of(eventCreatedTimeNs)));
             int controllerEpoch = curClaimEpoch;
             if (!isActiveController(controllerEpoch)) {
                 throw 
ControllerExceptions.newWrongControllerException(latestController());
@@ -706,7 +722,6 @@ public final class QuorumController implements Controller {
                 log.info("Cannot run write operation {} in pre-migration mode. 
Returning NOT_CONTROLLER.", name);
                 throw 
ControllerExceptions.newPreMigrationException(latestController());
             }
-            startProcessingTimeNs = OptionalLong.of(now);
             ControllerResult<T> result = op.generateRecordsAndResult();
             if (result.records().isEmpty()) {
                 op.processBatchEndOffset(writeOffset);
@@ -1063,6 +1078,9 @@ public final class QuorumController implements Controller 
{
             appendRaftEvent("handleLeaderChange[" + newLeader.epoch() + "]", 
() -> {
                 final String newLeaderName = newLeader.leaderId().isPresent() ?
                         String.valueOf(newLeader.leaderId().getAsInt()) : 
"(none)";
+                if (newLeader.leaderId().isPresent()) {
+                    controllerMetrics.incrementNewActiveControllers();
+                }
                 if (isActiveController()) {
                     if (newLeader.isLeader(nodeId)) {
                         log.warn("We were the leader in epoch {}, and are 
still the leader " +
@@ -1308,7 +1326,7 @@ public final class QuorumController implements Controller 
{
         }
     }
 
-    private void renounce() {
+    void renounce() {
         try {
             if (curClaimEpoch == -1) {
                 throw new RuntimeException("Cannot renounce leadership because 
we are not the " +
@@ -2302,20 +2320,12 @@ public final class QuorumController implements 
Controller {
     }
 
     // VisibleForTesting
-    public CountDownLatch pause() {
-        final CountDownLatch latch = new CountDownLatch(1);
-        appendControlEvent("pause", () -> {
-            try {
-                latch.await();
-            } catch (InterruptedException e) {
-                log.info("Interrupted while waiting for unpause.", e);
-            }
-        });
-        return latch;
+    Time time() {
+        return time;
     }
 
     // VisibleForTesting
-    Time time() {
-        return time;
+    QuorumControllerMetrics controllerMetrics() {
+        return controllerMetrics;
     }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
index e267ebdfb9a..6c7aa581a79 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
@@ -45,7 +45,7 @@ public class QuorumControllerMetrics implements AutoCloseable 
{
     private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = 
getMetricName(
         "ControllerEventManager", "EventQueueProcessingTimeMs");
     private final static MetricName ZK_WRITE_BEHIND_LAG = getMetricName(
-            "KafkaController", "ZKWriteBehindLag");
+        "KafkaController", "ZKWriteBehindLag");
     private final static MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
         "KafkaController", "LastAppliedRecordOffset");
     private final static MetricName LAST_COMMITTED_RECORD_OFFSET = 
getMetricName(
@@ -54,6 +54,14 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
         "KafkaController", "LastAppliedRecordTimestamp");
     private final static MetricName LAST_APPLIED_RECORD_LAG_MS = getMetricName(
         "KafkaController", "LastAppliedRecordLagMs");
+    private final static MetricName TIMED_OUT_BROKER_HEARTBEAT_COUNT = 
getMetricName(
+        "KafkaController", "TimedOutBrokerHeartbeatCount");
+    private final static MetricName EVENT_QUEUE_OPERATIONS_STARTED_COUNT = 
getMetricName(
+        "KafkaController", "EventQueueOperationsStartedCount");
+    private final static MetricName EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT = 
getMetricName(
+        "KafkaController", "EventQueueOperationsTimedOutCount");
+    private final static MetricName NEW_ACTIVE_CONTROLLERS_COUNT = 
getMetricName(
+        "KafkaController", "NewActiveControllersCount");
 
     private final Optional<MetricsRegistry> registry;
     private volatile boolean active;
@@ -64,6 +72,9 @@ public class QuorumControllerMetrics implements AutoCloseable 
{
     private final Consumer<Long> eventQueueTimeUpdater;
     private final Consumer<Long> eventQueueProcessingTimeUpdater;
     private final AtomicLong timedOutHeartbeats = new AtomicLong(0);
+    private final AtomicLong operationsStarted = new AtomicLong(0);
+    private final AtomicLong operationsTimedOut = new AtomicLong(0);
+    private final AtomicLong newActiveControllers = new AtomicLong(0);
 
     private Consumer<Long> newHistogram(MetricName name, boolean biased) {
         if (registry.isPresent()) {
@@ -113,7 +124,30 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
                 return time.milliseconds() - lastAppliedRecordTimestamp();
             }
         }));
-
+        registry.ifPresent(r -> r.newGauge(TIMED_OUT_BROKER_HEARTBEAT_COUNT, 
new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return timedOutHeartbeats();
+            }
+        }));
+        registry.ifPresent(r -> 
r.newGauge(EVENT_QUEUE_OPERATIONS_STARTED_COUNT, new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return operationsStarted();
+            }
+        }));
+        registry.ifPresent(r -> 
r.newGauge(EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT, new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return operationsTimedOut();
+            }
+        }));
+        registry.ifPresent(r -> r.newGauge(NEW_ACTIVE_CONTROLLERS_COUNT, new 
Gauge<Long>() {
+            @Override
+            public Long value() {
+                return newActiveControllers();
+            }
+        }));
         if (zkMigrationState) {
             registry.ifPresent(r -> r.newGauge(ZK_WRITE_BEHIND_LAG, new 
Gauge<Long>() {
                 @Override
@@ -176,17 +210,37 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
     }
 
     public void incrementTimedOutHeartbeats() {
-        timedOutHeartbeats.addAndGet(1);
-    }
-
-    public void setTimedOutHeartbeats(long heartbeats) {
-        timedOutHeartbeats.set(heartbeats);
+        timedOutHeartbeats.incrementAndGet();
     }
 
     public long timedOutHeartbeats() {
         return timedOutHeartbeats.get();
     }
 
+    public void incrementOperationsStarted() {
+        operationsStarted.incrementAndGet();
+    }
+
+    public long operationsStarted() {
+        return operationsStarted.get();
+    }
+
+    public void incrementOperationsTimedOut() {
+        operationsTimedOut.incrementAndGet();
+    }
+
+    public long operationsTimedOut() {
+        return operationsTimedOut.get();
+    }
+
+    public void incrementNewActiveControllers() {
+        newActiveControllers.incrementAndGet();
+    }
+
+    public long newActiveControllers() {
+        return newActiveControllers.get();
+    }
+
     @Override
     public void close() {
         registry.ifPresent(r -> Arrays.asList(
@@ -197,6 +251,10 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
             LAST_COMMITTED_RECORD_OFFSET,
             LAST_APPLIED_RECORD_TIMESTAMP,
             LAST_APPLIED_RECORD_LAG_MS,
+            TIMED_OUT_BROKER_HEARTBEAT_COUNT,
+            EVENT_QUEUE_OPERATIONS_STARTED_COUNT,
+            EVENT_QUEUE_OPERATIONS_TIMED_OUT_COUNT,
+            NEW_ACTIVE_CONTROLLERS_COUNT,
             ZK_WRITE_BEHIND_LAG
         ).forEach(r::removeMetric));
     }
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java 
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index 768fcb2574b..c2c066418b9 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.loader.metrics.MetadataLoaderMetrics;
 import org.apache.kafka.image.publisher.MetadataPublisher;
 import org.apache.kafka.image.writer.ImageReWriter;
 import org.apache.kafka.image.writer.ImageWriterOptions;
@@ -35,14 +36,17 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.fault.FaultHandler;
 import org.apache.kafka.server.fault.FaultHandlerException;
 import org.apache.kafka.snapshot.SnapshotReader;
+import org.apache.kafka.snapshot.Snapshots;
 import org.slf4j.Logger;
 
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -69,28 +73,7 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
         private Time time = Time.SYSTEM;
         private LogContext logContext = null;
         private FaultHandler faultHandler = (m, e) -> new 
FaultHandlerException(m, e);
-        private MetadataLoaderMetrics metrics = new MetadataLoaderMetrics() {
-            private volatile long lastAppliedOffset = -1L;
-
-            @Override
-            public void updateBatchProcessingTime(long elapsedNs) { }
-
-            @Override
-            public void updateBatchSize(int size) { }
-
-            @Override
-            public void updateLastAppliedImageProvenance(MetadataProvenance 
provenance) {
-                this.lastAppliedOffset = provenance.lastContainedOffset();
-            }
-
-            @Override
-            public long lastAppliedOffset() {
-                return lastAppliedOffset;
-            }
-
-            @Override
-            public void close() throws Exception { }
-        };
+        private MetadataLoaderMetrics metrics = null;
         private Supplier<OptionalLong> highWaterMarkAccessor = null;
 
         public Builder setNodeId(int nodeId) {
@@ -113,13 +96,13 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
             return this;
         }
 
-        public Builder setMetadataLoaderMetrics(MetadataLoaderMetrics metrics) 
{
-            this.metrics = metrics;
+        public Builder setHighWaterMarkAccessor(Supplier<OptionalLong> 
highWaterMarkAccessor) {
+            this.highWaterMarkAccessor = highWaterMarkAccessor;
             return this;
         }
 
-        public Builder setHighWaterMarkAccessor(Supplier<OptionalLong> 
highWaterMarkAccessor) {
-            this.highWaterMarkAccessor = highWaterMarkAccessor;
+        public Builder setMetrics(MetadataLoaderMetrics metrics) {
+            this.metrics = metrics;
             return this;
         }
 
@@ -130,6 +113,12 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
             if (highWaterMarkAccessor == null) {
                 throw new RuntimeException("You must set the high water mark 
accessor.");
             }
+            if (metrics == null) {
+                metrics = new MetadataLoaderMetrics(Optional.empty(),
+                    __ -> { },
+                    __ -> { },
+                    new AtomicReference<>(MetadataProvenance.EMPTY));
+            }
             return new MetadataLoader(
                 time,
                 logContext,
@@ -221,6 +210,11 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
                 new ShutdownEvent());
     }
 
+    // VisibleForTesting
+    MetadataLoaderMetrics metrics() {
+        return metrics;
+    }
+
     private boolean stillNeedToCatchUp(String where, long offset) {
         if (!catchingUp) {
             log.trace("{}: we are not in the initial catching up state.", 
where);
@@ -349,6 +343,9 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
                     }
                 }
                 metrics.updateLastAppliedImageProvenance(image.provenance());
+                if (delta.featuresDelta() != null) {
+                    
metrics.setCurrentMetadataVersion(image.features().metadataVersion());
+                }
                 if (uninitializedPublishers.isEmpty()) {
                     scheduleInitializeNewPublishers(0);
                 }
@@ -406,7 +403,7 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
         MetadataProvenance provenance =
                 new MetadataProvenance(lastOffset, lastEpoch, 
lastContainedLogTimeMs);
         long elapsedNs = time.nanoseconds() - startNs;
-        metrics.updateBatchProcessingTime(elapsedNs);
+        metrics.updateBatchProcessingTimeNs(elapsedNs);
         return new LogDeltaManifest(provenance,
                 currentLeaderAndEpoch,
                 numBatches,
@@ -418,24 +415,30 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
     public void handleLoadSnapshot(SnapshotReader<ApiMessageAndVersion> 
reader) {
         eventQueue.append(() -> {
             try {
+                long numLoaded = metrics.incrementHandleLoadSnapshotCount();
+                String snapshotName = 
Snapshots.filenameFromSnapshotId(reader.snapshotId());
+                log.info("handleLoadSnapshot({}): incrementing 
HandleLoadSnapshotCount to {}.",
+                    snapshotName, numLoaded);
                 MetadataDelta delta = new MetadataDelta.Builder().
                         setImage(image).
                         build();
                 SnapshotManifest manifest = loadSnapshot(delta, reader);
-                log.info("handleLoadSnapshot: generated a metadata delta from 
a snapshot at offset {} " +
-                        "in {} us.", 
manifest.provenance().lastContainedOffset(),
+                log.info("handleLoadSnapshot({}): generated a metadata delta 
between offset {} " +
+                        "and this snapshot in {} us.", snapshotName,
+                        image.provenance().lastContainedOffset(),
                         NANOSECONDS.toMicros(manifest.elapsedNs()));
                 try {
                     image = delta.apply(manifest.provenance());
                 } catch (Throwable e) {
                     faultHandler.handleFault("Error generating new metadata 
image from " +
-                            "snapshot at offset " + 
reader.lastContainedLogOffset(), e);
+                            "snapshot " + snapshotName, e);
                     return;
                 }
                 if (stillNeedToCatchUp("handleLoadSnapshot", 
manifest.provenance().lastContainedOffset())) {
                     return;
                 }
-                log.info("handleLoadSnapshot: publishing new snapshot image 
with provenance {}.", image.provenance());
+                log.info("handleLoadSnapshot({}): publishing new snapshot 
image to {} publisher(s).",
+                        snapshotName, publishers.size());
                 for (MetadataPublisher publisher : publishers.values()) {
                     try {
                         publisher.onMetadataUpdate(delta, image, manifest);
@@ -446,6 +449,7 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
                     }
                 }
                 metrics.updateLastAppliedImageProvenance(image.provenance());
+                
metrics.setCurrentMetadataVersion(image.features().metadataVersion());
                 if (uninitializedPublishers.isEmpty()) {
                     scheduleInitializeNewPublishers(0);
                 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoaderMetrics.java
 
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoaderMetrics.java
deleted file mode 100644
index 654bc9dd505..00000000000
--- 
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoaderMetrics.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.image.loader;
-
-import org.apache.kafka.image.MetadataProvenance;
-
-
-/**
- * An interface for the metadata loader metrics.
- */
-public interface MetadataLoaderMetrics extends AutoCloseable {
-    /**
-     * Update the batch processing time histogram.
-     */
-    void updateBatchProcessingTime(long elapsedNs);
-
-    /**
-     * Update the batch size histogram.
-     */
-    void updateBatchSize(int size);
-
-    /**
-     * Set the provenance of the last image which has been processed by all 
publishers.
-     */
-    void updateLastAppliedImageProvenance(MetadataProvenance provenance);
-
-    /**
-     * Retrieve the last offset which has been processed by all publishers.
-     */
-    long lastAppliedOffset();
-}
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java
 
b/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java
new file mode 100644
index 00000000000..351ac4fc5c3
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetrics.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+/**
+ * These are the metrics which are managed by the MetadataLoader class.
+ */
+public final class MetadataLoaderMetrics implements AutoCloseable {
+    private final static MetricName CURRENT_METADATA_VERSION = getMetricName(
+        "MetadataLoader", "CurrentMetadataVersion");
+    private final static MetricName HANDLE_LOAD_SNAPSHOT_COUNT = getMetricName(
+        "MetadataLoader", "HandleLoadSnapshotCount");
+
+    private final Optional<MetricsRegistry> registry;
+    private final AtomicReference<MetadataVersion> currentMetadataVersion =
+            new AtomicReference<>(MetadataVersion.MINIMUM_KRAFT_VERSION);
+    private final AtomicLong handleLoadSnapshotCount = new AtomicLong(0);
+    private final Consumer<Long> batchProcessingTimeNsUpdater;
+    private final Consumer<Integer> batchSizesUpdater;
+    private final AtomicReference<MetadataProvenance> lastAppliedProvenance;
+
+    /**
+     * Create a new LoaderMetrics object.
+     *
+     * @param registry                      The metrics registry, or 
Optional.empty if this is a
+     *                                      test and we don't have one.
+     * @param batchProcessingTimeNsUpdater  Updates the batch processing time 
histogram.
+     * @param batchSizesUpdater             Updates the batch sizes histogram.
+     */
+    public MetadataLoaderMetrics(
+        Optional<MetricsRegistry> registry,
+        Consumer<Long> batchProcessingTimeNsUpdater,
+        Consumer<Integer> batchSizesUpdater,
+        AtomicReference<MetadataProvenance> lastAppliedProvenance
+    ) {
+        this.registry = registry;
+        this.batchProcessingTimeNsUpdater = batchProcessingTimeNsUpdater;
+        this.batchSizesUpdater = batchSizesUpdater;
+        this.lastAppliedProvenance = lastAppliedProvenance;
+        registry.ifPresent(r -> r.newGauge(CURRENT_METADATA_VERSION, new 
Gauge<Integer>() {
+            @Override
+            public Integer value() {
+                return 
Integer.valueOf(currentMetadataVersion().featureLevel());
+            }
+        }));
+        registry.ifPresent(r -> r.newGauge(HANDLE_LOAD_SNAPSHOT_COUNT, new 
Gauge<Long>() {
+            @Override
+            public Long value() {
+                return handleLoadSnapshotCount();
+            }
+        }));
+    }
+
+    /**
+     * Update the batch processing time histogram.
+     */
+    public void updateBatchProcessingTimeNs(long elapsedNs) {
+        batchProcessingTimeNsUpdater.accept(elapsedNs);
+    }
+
+    /**
+     * Update the batch size histogram.
+     */
+    public void updateBatchSize(int size) {
+        batchSizesUpdater.accept(size);
+    }
+
+    /**
+     * Set the provenance of the last image which has been processed by all 
publishers.
+     */
+    public void updateLastAppliedImageProvenance(MetadataProvenance 
lastAppliedProvenance) {
+        this.lastAppliedProvenance.set(lastAppliedProvenance);
+    }
+
+    /**
+     * Retrieve the last offset which has been processed by all publishers.
+     */
+    public long lastAppliedOffset() {
+        return this.lastAppliedProvenance.get().lastContainedOffset();
+    }
+
+    public void setCurrentMetadataVersion(MetadataVersion metadataVersion) {
+        this.currentMetadataVersion.set(metadataVersion);
+    }
+
+    public MetadataVersion currentMetadataVersion() {
+        return this.currentMetadataVersion.get();
+    }
+
+    public long incrementHandleLoadSnapshotCount() {
+        return this.handleLoadSnapshotCount.incrementAndGet();
+    }
+
+    public long handleLoadSnapshotCount() {
+        return this.handleLoadSnapshotCount.get();
+    }
+
+    @Override
+    public void close() {
+        registry.ifPresent(r -> Arrays.asList(
+            CURRENT_METADATA_VERSION,
+            HANDLE_LOAD_SNAPSHOT_COUNT
+        ).forEach(r::removeMetric));
+    }
+
+    private static MetricName getMetricName(String type, String name) {
+        return KafkaYammerMetrics.getMetricName("kafka.server", type, name);
+    }
+}
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java 
b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java
index ec46bcc06fa..8ab224f91f2 100644
--- 
a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java
+++ 
b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java
@@ -18,9 +18,11 @@
 package org.apache.kafka.image.publisher;
 
 import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.image.MetadataImage;
 import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.publisher.metrics.SnapshotEmitterMetrics;
 import org.apache.kafka.image.writer.ImageWriterOptions;
 import org.apache.kafka.image.writer.RaftSnapshotWriter;
 import org.apache.kafka.raft.RaftClient;
@@ -43,9 +45,16 @@ public class SnapshotEmitter implements 
SnapshotGenerator.Emitter {
     private final static int DEFAULT_BATCH_SIZE = 1024;
 
     public static class Builder {
+        private Time time = Time.SYSTEM;
         private int nodeId = 0;
         private RaftClient<ApiMessageAndVersion> raftClient = null;
         private int batchSize = DEFAULT_BATCH_SIZE;
+        private SnapshotEmitterMetrics metrics = null;
+
+        public Builder setTime(Time time) {
+            this.time = time;
+            return this;
+        }
 
         public Builder setNodeId(int nodeId) {
             this.nodeId = nodeId;
@@ -62,11 +71,21 @@ public class SnapshotEmitter implements 
SnapshotGenerator.Emitter {
             return this;
         }
 
+        public Builder setMetrics(SnapshotEmitterMetrics metrics) {
+            this.metrics = metrics;
+            return this;
+        }
+
         public SnapshotEmitter build() {
             if (raftClient == null) throw new RuntimeException("You must set 
the raftClient.");
-            return new SnapshotEmitter(nodeId,
+            if (metrics == null) metrics = new SnapshotEmitterMetrics(
+                    Optional.empty(),
+                    time);
+            return new SnapshotEmitter(time,
+                    nodeId,
                     raftClient,
-                    batchSize);
+                    batchSize,
+                    metrics);
         }
     }
 
@@ -75,6 +94,11 @@ public class SnapshotEmitter implements 
SnapshotGenerator.Emitter {
      */
     private final Logger log;
 
+    /**
+     * The clock object.
+     */
+    private final Time time;
+
     /**
      * The RaftClient to use.
      */
@@ -85,14 +109,27 @@ public class SnapshotEmitter implements 
SnapshotGenerator.Emitter {
      */
     private final int batchSize;
 
+    /**
+     * The metrics to use.
+     */
+    private final SnapshotEmitterMetrics metrics;
+
     private SnapshotEmitter(
-            int nodeId,
-            RaftClient<ApiMessageAndVersion> raftClient,
-            int batchSize
+        Time time,
+        int nodeId,
+        RaftClient<ApiMessageAndVersion> raftClient,
+        int batchSize,
+        SnapshotEmitterMetrics metrics
     ) {
+        this.time = time;
         this.log = new LogContext("[SnapshotEmitter id=" + nodeId + "] 
").logger(SnapshotEmitter.class);
         this.raftClient = raftClient;
         this.batchSize = batchSize;
+        this.metrics = metrics;
+    }
+
+    SnapshotEmitterMetrics metrics() {
+        return metrics;
     }
 
     @Override
@@ -112,6 +149,9 @@ public class SnapshotEmitter implements 
SnapshotGenerator.Emitter {
                     setMetadataVersion(image.features().metadataVersion()).
                     build());
             writer.close(true);
+            metrics.setLatestSnapshotGeneratedTimeMs(time.milliseconds());
+            
metrics.setLatestSnapshotGeneratedBytes(writer.frozenSize().getAsLong());
+            log.info("Successfully wrote {}", provenance.snapshotName());
         } catch (Throwable e) {
             log.error("Encountered error while writing {}", 
provenance.snapshotName(), e);
             throw e;
@@ -119,6 +159,5 @@ public class SnapshotEmitter implements 
SnapshotGenerator.Emitter {
             Utils.closeQuietly(writer, "RaftSnapshotWriter");
             Utils.closeQuietly(snapshotWriter.get(), "SnapshotWriter");
         }
-        log.info("Successfully wrote {}", provenance.snapshotName());
     }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetrics.java
 
b/metadata/src/main/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetrics.java
new file mode 100644
index 00000000000..5e59942c960
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetrics.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.metrics.KafkaYammerMetrics;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * These are the metrics which are managed by the SnapshotEmitter class.
+ */
+public final class SnapshotEmitterMetrics implements AutoCloseable {
+    private final static MetricName LATEST_SNAPSHOT_GENERATED_BYTES = 
getMetricName(
+        "SnapshotEmitter", "LatestSnapshotGeneratedBytes");
+    private final static MetricName LATEST_SNAPSHOT_GENERATED_AGE_MS = 
getMetricName(
+        "SnapshotEmitter", "LatestSnapshotGeneratedAgeMs");
+
+    private final Optional<MetricsRegistry> registry;
+    private final Time time;
+    private final AtomicLong latestSnapshotGeneratedBytes;
+    private final AtomicLong latestSnapshotGeneratedTimeMs;
+
+    /**
+     * Create a new LoaderMetrics object.
+     *
+     * @param registry  The metrics registry, or Optional.empty if this is a 
test and we don't have one.
+     */
+    public SnapshotEmitterMetrics(
+        Optional<MetricsRegistry> registry,
+        Time time
+    ) {
+        this.registry = registry;
+        this.time = time;
+        this.latestSnapshotGeneratedBytes = new AtomicLong(0L);
+        this.latestSnapshotGeneratedTimeMs = new 
AtomicLong(time.milliseconds());
+        registry.ifPresent(r -> r.newGauge(LATEST_SNAPSHOT_GENERATED_BYTES, 
new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return latestSnapshotGeneratedBytes();
+            }
+        }));
+        registry.ifPresent(r -> r.newGauge(LATEST_SNAPSHOT_GENERATED_AGE_MS, 
new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return latestSnapshotGeneratedAgeMs();
+            }
+        }));
+    }
+
+    public void setLatestSnapshotGeneratedBytes(long value) {
+        this.latestSnapshotGeneratedBytes.set(value);
+    }
+
+    public long latestSnapshotGeneratedBytes() {
+        return this.latestSnapshotGeneratedBytes.get();
+    }
+
+    public void setLatestSnapshotGeneratedTimeMs(long timeMs) {
+        this.latestSnapshotGeneratedTimeMs.set(timeMs);
+    }
+
+    public long latestSnapshotGeneratedTimeMs() {
+        return this.latestSnapshotGeneratedTimeMs.get();
+    }
+
+    public long latestSnapshotGeneratedAgeMs() {
+        return time.milliseconds() - this.latestSnapshotGeneratedTimeMs.get();
+    }
+
+    @Override
+    public void close() {
+        registry.ifPresent(r -> Arrays.asList(
+            LATEST_SNAPSHOT_GENERATED_BYTES,
+            LATEST_SNAPSHOT_GENERATED_AGE_MS
+        ).forEach(r::removeMetric));
+    }
+
+    private static MetricName getMetricName(String type, String name) {
+        return KafkaYammerMetrics.getMetricName("kafka.server", type, name);
+    }
+}
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/writer/RaftSnapshotWriter.java 
b/metadata/src/main/java/org/apache/kafka/image/writer/RaftSnapshotWriter.java
index e1faad90dc3..eec5cd90c76 100644
--- 
a/metadata/src/main/java/org/apache/kafka/image/writer/RaftSnapshotWriter.java
+++ 
b/metadata/src/main/java/org/apache/kafka/image/writer/RaftSnapshotWriter.java
@@ -22,6 +22,7 @@ import org.apache.kafka.snapshot.SnapshotWriter;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.OptionalLong;
 
 
 /**
@@ -31,6 +32,7 @@ public class RaftSnapshotWriter implements ImageWriter {
     private final SnapshotWriter<ApiMessageAndVersion> snapshotWriter;
     private final int batchSize;
     private List<ApiMessageAndVersion> records;
+    private OptionalLong frozenSize = OptionalLong.empty();
 
     public RaftSnapshotWriter(
         SnapshotWriter<ApiMessageAndVersion> snapshotWriter,
@@ -59,11 +61,18 @@ public class RaftSnapshotWriter implements ImageWriter {
                 if (!records.isEmpty()) {
                     snapshotWriter.append(records);
                 }
-                snapshotWriter.freeze();
+                frozenSize = OptionalLong.of(snapshotWriter.freeze());
             }
         } finally {
             records = null;
             snapshotWriter.close();
         }
     }
+
+    /**
+     * @return the frozen size of the snapshot, or OptionalLong.empty if the 
snapshot was not frozen.
+     */
+    public OptionalLong frozenSize() {
+        return frozenSize;
+    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
new file mode 100644
index 00000000000..aec2a4a513f
--- /dev/null
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerIntegrationTestUtils.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.kafka.common.Uuid;
+import 
org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData.Listener;
+import 
org.apache.kafka.common.message.BrokerRegistrationRequestData.ListenerCollection;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+/**
+ * Utility functions for use in QuorumController integration tests.
+ */
+public class QuorumControllerIntegrationTestUtils {
+    private final static Logger log = 
LoggerFactory.getLogger(QuorumControllerIntegrationTestUtils.class);
+
+    BrokerRegistrationRequestData.FeatureCollection brokerFeatures() {
+        return brokerFeatures(MetadataVersion.MINIMUM_KRAFT_VERSION, 
MetadataVersion.latest());
+    }
+
+    /**
+     * Create a broker features collection for use in a registration request. 
We only set MV. here.
+     *
+     * @param minVersion    The minimum supported MV.
+     * @param maxVersion    The maximum supported MV.
+     */
+    static BrokerRegistrationRequestData.FeatureCollection brokerFeatures(
+        MetadataVersion minVersion,
+        MetadataVersion maxVersion
+    ) {
+        BrokerRegistrationRequestData.FeatureCollection features = new 
BrokerRegistrationRequestData.FeatureCollection();
+        features.add(new BrokerRegistrationRequestData.Feature()
+                         .setName(MetadataVersion.FEATURE_NAME)
+                         .setMinSupportedVersion(minVersion.featureLevel())
+                         .setMaxSupportedVersion(maxVersion.featureLevel()));
+        return features;
+    }
+
+    /**
+     * Register the given number of brokers.
+     *
+     * @param controller    The active controller.
+     * @param numBrokers    The number of brokers to register. We will start 
at 0 and increment.
+     *
+     * @return              A map from broker IDs to broker epochs.
+     */
+    static Map<Integer, Long> registerBrokersAndUnfence(
+        QuorumController controller,
+        int numBrokers
+    ) throws Exception {
+        Map<Integer, Long> brokerEpochs = new HashMap<>();
+        for (int brokerId = 0; brokerId < numBrokers; brokerId++) {
+            BrokerRegistrationReply reply = 
controller.registerBroker(ANONYMOUS_CONTEXT,
+                new BrokerRegistrationRequestData()
+                    .setBrokerId(brokerId)
+                    .setRack(null)
+                    .setClusterId(controller.clusterId())
+                    .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_6_IV0))
+                    .setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" 
+ brokerId))
+                    .setListeners(new ListenerCollection(
+                        Arrays.asList(
+                            new Listener()
+                                .setName("PLAINTEXT")
+                                .setHost("localhost")
+                                .setPort(9092 + brokerId)
+                            ).iterator()
+                        )
+                    )
+            ).get();
+            brokerEpochs.put(brokerId, reply.epoch());
+
+            // Send heartbeat to unfence
+            controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT,
+                new BrokerHeartbeatRequestData()
+                    .setWantFence(false)
+                    .setBrokerEpoch(brokerEpochs.get(brokerId))
+                    .setBrokerId(brokerId)
+                    .setCurrentMetadataOffset(100000L)
+            ).get();
+        }
+
+        return brokerEpochs;
+    }
+
+    /**
+     * Send broker heartbeats for the provided brokers.
+     *
+     * @param controller    The active controller.
+     * @param brokers       The broker IDs to send heartbeats for.
+     * @param brokerEpochs  A map from broker ID to broker epoch.
+     */
+    static void sendBrokerHeartbeat(
+        QuorumController controller,
+        List<Integer> brokers,
+        Map<Integer, Long> brokerEpochs
+    ) throws Exception {
+        if (brokers.isEmpty()) {
+            return;
+        }
+        for (Integer brokerId : brokers) {
+            BrokerHeartbeatReply reply = 
controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT,
+                new BrokerHeartbeatRequestData()
+                    .setWantFence(false)
+                    .setBrokerEpoch(brokerEpochs.get(brokerId))
+                    .setBrokerId(brokerId)
+                    .setCurrentMetadataOffset(100000)
+            ).get();
+            assertEquals(new BrokerHeartbeatReply(true, false, false, false), 
reply);
+        }
+    }
+
+    /**
+     * Create some topics directly on the controller.
+     *
+     * @param controller            The active controller.
+     * @param prefix                The prefix to use for topic names.
+     * @param numTopics             The number of topics to create.
+     * @param replicationFactor     The replication factor to use.
+     */
+    static void createTopics(
+        QuorumController controller,
+        String prefix,
+        int numTopics,
+        int replicationFactor
+    ) throws Exception {
+        HashSet<String> describable = new HashSet<>();
+        for (int i = 0; i < numTopics; i++) {
+            describable.add(prefix + i);
+        }
+        CreateTopicsRequestData request = new CreateTopicsRequestData();
+        for (int i = 0; i < numTopics; i++) {
+            request.topics().add(
+                new CreatableTopic().
+                    setName(prefix + i).
+                    setNumPartitions(-1).
+                    setReplicationFactor((short) replicationFactor));
+        }
+        CreateTopicsResponseData response =
+            controller.createTopics(ANONYMOUS_CONTEXT, request, 
describable).get();
+        for (int i = 0; i < numTopics; i++) {
+            CreatableTopicResult result = response.topics().find(prefix + i);
+            assertEquals((short) 0, result.errorCode());
+        }
+    }
+
+    /**
+     * Add an event to the controller event queue that will pause it 
temporarily.
+     *
+     * @param controller    The controller.
+     * @return              The latch that can be used to unpause the 
controller.
+     */
+    public static CountDownLatch pause(QuorumController controller) {
+        final CountDownLatch latch = new CountDownLatch(1);
+        controller.appendControlEvent("pause", () -> {
+            try {
+                latch.await();
+            } catch (InterruptedException e) {
+                log.info("Interrupted while waiting for unpause.", e);
+            }
+        });
+        return latch;
+    }
+
+    /**
+     * Force the current controller to renounce.
+     *
+     * @param controller    The controller.
+     */
+    static void forceRenounce(QuorumController controller) throws Exception {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        controller.appendControlEvent("forceRenounce", () -> {
+            controller.renounce();
+            future.complete(null);
+        });
+        future.get();
+    }
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
new file mode 100644
index 00000000000..949d63fa375
--- /dev/null
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.RequestHeaderData;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metalog.LocalLogManagerTestEnv;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.forceRenounce;
+import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.pause;
+import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(value = 40)
+public class QuorumControllerMetricsIntegrationTest {
+    private final static Logger log = 
LoggerFactory.getLogger(QuorumControllerMetricsIntegrationTest.class);
+
+    static class MockControllerMetrics extends QuorumControllerMetrics {
+        final AtomicBoolean closed = new AtomicBoolean(false);
+
+        MockControllerMetrics() {
+            super(Optional.empty(), Time.SYSTEM, true);
+        }
+
+        @Override
+        public void close() {
+            super.close();
+            closed.set(true);
+        }
+    }
+
+    /**
+     * Test that closing the QuorumController closes the metrics object.
+     */
+    @Test
+    public void testClosingQuorumControllerClosesMetrics() throws Throwable {
+        MockControllerMetrics metrics = new MockControllerMetrics();
+        try (
+            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).
+                build();
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+                setControllerBuilderInitializer(controllerBuilder -> {
+                    controllerBuilder.setMetrics(metrics);
+                }).
+                build()
+        ) {
+            assertEquals(1, 
controlEnv.activeController().controllerMetrics().newActiveControllers());
+        }
+        assertTrue(metrics.closed.get(), "metrics were not closed");
+    }
+
+    /**
+     * Test that failing over to a new controller increments 
NewActiveControllersCount on both the
+     * active and inactive controllers.
+     */
+    @Test
+    public void testFailingOverIncrementsNewActiveControllerCount() throws 
Throwable {
+        try (
+            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(3).
+                build();
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+                build()
+        ) {
+            controlEnv.activeController(); // wait for a controller to become 
active.
+            TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                for (QuorumController controller : controlEnv.controllers()) {
+                    assertEquals(1, 
controller.controllerMetrics().newActiveControllers());
+                }
+            });
+            forceRenounce(controlEnv.activeController());
+            TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                for (QuorumController controller : controlEnv.controllers()) {
+                    assertEquals(2, 
controller.controllerMetrics().newActiveControllers());
+                }
+            });
+        }
+    }
+
+    /**
+     * Test the heartbeat and general operation timeout metrics.
+     * These are incremented on the active controller only.
+     */
+    @Test
+    public void testTimeoutMetrics() throws Throwable {
+        try (
+            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(3).
+                build();
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+                build()
+        ) {
+            QuorumController active = controlEnv.activeController();
+            Map<Integer, Long> brokerEpochs = 
registerBrokersAndUnfence(active, 3);
+            assertEquals(0L, active.controllerMetrics().timedOutHeartbeats());
+            assertEquals(0L, active.controllerMetrics().operationsTimedOut());
+
+            // We pause the controller so that the heartbeat event will 
definitely be expired
+            // rather than processed.
+            CountDownLatch latch = pause(active);
+            ControllerRequestContext expiredTimeoutContext = new 
ControllerRequestContext(
+                new RequestHeaderData(),
+                KafkaPrincipal.ANONYMOUS,
+                OptionalLong.of(active.time().nanoseconds()));
+            CompletableFuture<BrokerHeartbeatReply> replyFuture =
+                active.processBrokerHeartbeat(expiredTimeoutContext,
+                    new BrokerHeartbeatRequestData()
+                        .setWantFence(false)
+                        .setBrokerEpoch(brokerEpochs.get(0))
+                        .setBrokerId(0)
+                        .setCurrentMetadataOffset(100000));
+            latch.countDown(); // Unpause the controller.
+            assertEquals(TimeoutException.class,
+                assertThrows(ExecutionException.class, () -> 
replyFuture.get()).
+                    getCause().getClass());
+            assertEquals(1L, active.controllerMetrics().timedOutHeartbeats());
+            assertEquals(1L, active.controllerMetrics().operationsTimedOut());
+
+            // Inject a new timed out operation.
+            CountDownLatch latch2 = pause(active);
+            active.appendControlEventWithDeadline("fakeTimeoutOperation",
+                () -> { },
+                active.time().nanoseconds());
+            latch2.countDown();
+            TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                // The fake timeout increments operationsTimedOut but not 
timedOutHeartbeats.
+                assertEquals(1L, 
active.controllerMetrics().timedOutHeartbeats());
+                assertEquals(2L, 
active.controllerMetrics().operationsTimedOut());
+            });
+            for (QuorumController controller : controlEnv.controllers()) {
+                // Inactive controllers don't set these metrics.
+                if (!controller.isActive()) {
+                    assertEquals(false, 
controller.controllerMetrics().active());
+                    assertEquals(0L, 
controller.controllerMetrics().timedOutHeartbeats());
+                    assertEquals(0L, 
controller.controllerMetrics().operationsTimedOut());
+                }
+            }
+        }
+    }
+
+    /**
+     * Test the event queue operations started metric.
+     */
+    @Test
+    public void testEventQueueOperationsStartedMetric() throws Throwable {
+        try (
+            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(3).
+                                                build();
+            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
+                                                     build()
+        ) {
+            QuorumController active = controlEnv.activeController();
+            Map<Integer, Long> brokerEpochs = 
registerBrokersAndUnfence(active, 3);
+
+            // Test that a new operation increments operationsStarted. We 
retry this if needed
+            // to handle the case where another operation is performed in 
between loading
+            // expectedOperationsStarted and running the new control event.
+            TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                long expectedOperationsStarted = 
active.controllerMetrics().operationsStarted() + 1;
+                CompletableFuture<Long> actualOperationsStarted = new 
CompletableFuture<>();
+                active.appendControlEvent("checkOperationsStarted", () -> {
+                    
actualOperationsStarted.complete(active.controllerMetrics().operationsStarted());
+                });
+                assertEquals(expectedOperationsStarted, 
actualOperationsStarted.get());
+            });
+        }
+    }
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index be39b9f74ee..261d3c91b28 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -36,7 +36,6 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -88,7 +87,6 @@ import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import 
org.apache.kafka.controller.QuorumController.ConfigResourceExistenceChecker;
-import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
@@ -124,6 +122,10 @@ import static 
org.apache.kafka.controller.ConfigurationControlManagerTest.BROKER
 import static 
org.apache.kafka.controller.ConfigurationControlManagerTest.SCHEMA;
 import static 
org.apache.kafka.controller.ConfigurationControlManagerTest.entry;
 import static 
org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEXT;
+import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.brokerFeatures;
+import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.pause;
+import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.registerBrokersAndUnfence;
+import static 
org.apache.kafka.controller.QuorumControllerIntegrationTestUtils.sendBrokerHeartbeat;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -140,39 +142,6 @@ public class QuorumControllerTest {
     static final BootstrapMetadata SIMPLE_BOOTSTRAP = BootstrapMetadata.
             fromVersion(MetadataVersion.IBP_3_3_IV3, "test-provided 
bootstrap");
 
-    static class MockControllerMetrics extends QuorumControllerMetrics {
-        final AtomicBoolean closed = new AtomicBoolean(false);
-
-        MockControllerMetrics() {
-            super(Optional.empty(), Time.SYSTEM, false);
-        }
-
-        @Override
-        public void close() {
-            super.close();
-            closed.set(true);
-        }
-    }
-
-    /**
-     * Test creating a new QuorumController and closing it.
-     */
-    @Test
-    public void testCreateAndClose() throws Throwable {
-        MockControllerMetrics metrics = new MockControllerMetrics();
-        try (
-            LocalLogManagerTestEnv logEnv = new 
LocalLogManagerTestEnv.Builder(1).
-                build();
-            QuorumControllerTestEnv controlEnv = new 
QuorumControllerTestEnv.Builder(logEnv).
-                setControllerBuilderInitializer(controllerBuilder -> {
-                    controllerBuilder.setMetrics(metrics);
-                }).
-                build()
-        ) {
-        }
-        assertTrue(metrics.closed.get(), "metrics were not closed");
-    }
-
     /**
      * Test setting some configuration values and reading them back.
      */
@@ -610,22 +579,6 @@ public class QuorumControllerTest {
         }
     }
 
-    private BrokerRegistrationRequestData.FeatureCollection brokerFeatures() {
-        return brokerFeatures(MetadataVersion.MINIMUM_KRAFT_VERSION, 
MetadataVersion.latest());
-    }
-
-    private BrokerRegistrationRequestData.FeatureCollection brokerFeatures(
-        MetadataVersion minVersion,
-        MetadataVersion maxVersion
-    ) {
-        BrokerRegistrationRequestData.FeatureCollection features = new 
BrokerRegistrationRequestData.FeatureCollection();
-        features.add(new BrokerRegistrationRequestData.Feature()
-            .setName(MetadataVersion.FEATURE_NAME)
-            .setMinSupportedVersion(minVersion.featureLevel())
-            .setMaxSupportedVersion(maxVersion.featureLevel()));
-        return features;
-    }
-
     private RegisterBrokerRecord.BrokerFeatureCollection registrationFeatures(
         MetadataVersion minVersion,
         MetadataVersion maxVersion
@@ -782,7 +735,7 @@ public class QuorumControllerTest {
                 build()
         ) {
             QuorumController controller = controlEnv.activeController();
-            CountDownLatch countDownLatch = controller.pause();
+            CountDownLatch countDownLatch = pause(controller);
             long now = controller.time().nanoseconds();
             ControllerRequestContext context0 = new ControllerRequestContext(
                 new RequestHeaderData(), KafkaPrincipal.ANONYMOUS, 
OptionalLong.of(now));
@@ -846,7 +799,7 @@ public class QuorumControllerTest {
                 build()
         ) {
             QuorumController controller = controlEnv.activeController();
-            CountDownLatch countDownLatch = controller.pause();
+            CountDownLatch countDownLatch = pause(controller);
             CompletableFuture<CreateTopicsResponseData> createFuture =
                 controller.createTopics(ANONYMOUS_CONTEXT, new 
CreateTopicsRequestData().
                     setTimeoutMs(120000), Collections.emptySet());
@@ -891,7 +844,7 @@ public class QuorumControllerTest {
         ) {
             QuorumController controller = controlEnv.activeController();
 
-            Map<Integer, Long> brokerEpochs = registerBrokers(controller, 
numBrokers);
+            Map<Integer, Long> brokerEpochs = 
registerBrokersAndUnfence(controller, numBrokers);
 
             // Create a lot of partitions
             List<CreatableReplicaAssignment> partitions = IntStream
@@ -980,62 +933,6 @@ public class QuorumControllerTest {
         }
     }
 
-    private Map<Integer, Long> registerBrokers(QuorumController controller, 
int numBrokers) throws Exception {
-        Map<Integer, Long> brokerEpochs = new HashMap<>();
-        for (int brokerId = 0; brokerId < numBrokers; brokerId++) {
-            BrokerRegistrationReply reply = 
controller.registerBroker(ANONYMOUS_CONTEXT,
-                new BrokerRegistrationRequestData()
-                    .setBrokerId(brokerId)
-                    .setRack(null)
-                    .setClusterId(controller.clusterId())
-                    .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, 
MetadataVersion.IBP_3_6_IV0))
-                    .setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" 
+ brokerId))
-                    .setListeners(
-                        new ListenerCollection(
-                            Arrays.asList(
-                                new Listener()
-                                .setName("PLAINTEXT")
-                                .setHost("localhost")
-                                .setPort(9092 + brokerId)
-                                ).iterator()
-                            )
-                        )
-                    ).get();
-            brokerEpochs.put(brokerId, reply.epoch());
-
-            // Send heartbeat to unfence
-            controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT,
-                new BrokerHeartbeatRequestData()
-                    .setWantFence(false)
-                    .setBrokerEpoch(brokerEpochs.get(brokerId))
-                    .setBrokerId(brokerId)
-                    .setCurrentMetadataOffset(100000L)
-            ).get();
-        }
-
-        return brokerEpochs;
-    }
-
-    private void sendBrokerHeartbeat(
-        QuorumController controller,
-        List<Integer> brokers,
-        Map<Integer, Long> brokerEpochs
-    ) throws Exception {
-        if (brokers.isEmpty()) {
-            return;
-        }
-        for (Integer brokerId : brokers) {
-            BrokerHeartbeatReply reply = 
controller.processBrokerHeartbeat(ANONYMOUS_CONTEXT,
-                new BrokerHeartbeatRequestData()
-                    .setWantFence(false)
-                    .setBrokerEpoch(brokerEpochs.get(brokerId))
-                    .setBrokerId(brokerId)
-                    .setCurrentMetadataOffset(100000)
-            ).get();
-            assertEquals(new BrokerHeartbeatReply(true, false, false, false), 
reply);
-        }
-    }
-
     @Test
     public void testConfigResourceExistenceChecker() throws Throwable {
         try (
@@ -1048,7 +945,7 @@ public class QuorumControllerTest {
                 build()
         ) {
             QuorumController active = controlEnv.activeController();
-            registerBrokers(active, 5);
+            registerBrokersAndUnfence(active, 5);
             active.createTopics(ANONYMOUS_CONTEXT, new 
CreateTopicsRequestData().
                 setTopics(new CreatableTopicCollection(Collections.singleton(
                     new CreatableTopic().setName("foo").
@@ -1508,7 +1405,8 @@ public class QuorumControllerTest {
 
         featureControl = getActivationRecords(MetadataVersion.IBP_3_4_IV0, 
Optional.empty(), true);
         assertEquals(MetadataVersion.IBP_3_4_IV0, 
featureControl.metadataVersion());
-        assertEquals(ZkMigrationState.PRE_MIGRATION, 
featureControl.zkMigrationState());    }
+        assertEquals(ZkMigrationState.PRE_MIGRATION, 
featureControl.zkMigrationState());
+    }
 
     @Test
     public void testMigrationsEnabledForOldBootstrapMetadataVersion() throws 
Exception {
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
index bec023020a4..9258577a0dd 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/metrics/QuorumControllerMetricsTest.java
@@ -29,52 +29,39 @@ import java.util.Optional;
 
 import org.apache.kafka.common.utils.MockTime;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class QuorumControllerMetricsTest {
-    @Test
-    public void testMetricNamesNotInMigrationState() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testMetricNames(boolean inMigration) {
         MetricsRegistry registry = new MetricsRegistry();
         MockTime time = new MockTime();
         try {
-            try (QuorumControllerMetrics metrics = new 
QuorumControllerMetrics(Optional.of(registry), time, false)) {
-                ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, 
"kafka.controller",
-                    new HashSet<>(Arrays.asList(
-                        
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
-                        
"kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
-                        
"kafka.controller:type=KafkaController,name=ActiveControllerCount",
-                        
"kafka.controller:type=KafkaController,name=LastCommittedRecordOffset",
-                        
"kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs",
-                        
"kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
-                        
"kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp",
-                        
"kafka.controller:type=KafkaController,name=LastAppliedRecordOffset"
-                    )));
-            }
-            ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, 
"kafka.controller",
-                    Collections.emptySet());
-        } finally {
-            registry.shutdown();
-        }
-    }
-
-    @Test
-    public void testMetricNamesInMigrationState() {
-        MetricsRegistry registry = new MetricsRegistry();
-        MockTime time = new MockTime();
-        try {
-            try (QuorumControllerMetrics metrics = new 
QuorumControllerMetrics(Optional.of(registry), time, true)) {
-                ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, 
"kafka.controller",
-                    new HashSet<>(Arrays.asList(
-                        
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
-                        
"kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
-                        
"kafka.controller:type=KafkaController,name=ActiveControllerCount",
-                        
"kafka.controller:type=KafkaController,name=LastCommittedRecordOffset",
-                        
"kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs",
-                        
"kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
-                        
"kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp",
-                        
"kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
-                        
"kafka.controller:type=KafkaController,name=ZKWriteBehindLag"
-                        )));
+            try (QuorumControllerMetrics metrics = new QuorumControllerMetrics(
+                    Optional.of(registry),
+                    time,
+                    inMigration)) {
+                HashSet<String> expected = new HashSet<>(Arrays.asList(
+                    
"kafka.controller:type=ControllerEventManager,name=EventQueueProcessingTimeMs",
+                    
"kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs",
+                    
"kafka.controller:type=KafkaController,name=ActiveControllerCount",
+                    
"kafka.controller:type=KafkaController,name=EventQueueOperationsStartedCount",
+                    
"kafka.controller:type=KafkaController,name=EventQueueOperationsTimedOutCount",
+                    
"kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs",
+                    
"kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
+                    
"kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp",
+                    
"kafka.controller:type=KafkaController,name=LastCommittedRecordOffset",
+                    
"kafka.controller:type=KafkaController,name=NewActiveControllersCount",
+                    
"kafka.controller:type=KafkaController,name=TimedOutBrokerHeartbeatCount"
+                ));
+                if (inMigration) {
+                    
expected.add("kafka.controller:type=KafkaController,name=ZKWriteBehindLag");
+                }
+                ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, 
"kafka.controller", expected);
             }
             ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, 
"kafka.controller",
                     Collections.emptySet());
@@ -112,10 +99,23 @@ public class QuorumControllerMetricsTest {
         MetricsRegistry registry = new MetricsRegistry();
         MockTime time = new MockTime();
         time.sleep(1000);
-        try (QuorumControllerMetrics metrics = new 
QuorumControllerMetrics(Optional.of(registry), time, false)) {
+        try (QuorumControllerMetrics metrics = new 
QuorumControllerMetrics(Optional.of(registry), time, true)) {
             metrics.setLastAppliedRecordOffset(100);
             metrics.setLastAppliedRecordTimestamp(500);
             metrics.setLastCommittedRecordOffset(50);
+            metrics.updateDualWriteOffset(40L);
+            for (int i = 0; i < 2; i++) {
+                metrics.incrementTimedOutHeartbeats();
+            }
+            for (int i = 0; i < 3; i++) {
+                metrics.incrementOperationsStarted();
+            }
+            for (int i = 0; i < 4; i++) {
+                metrics.incrementOperationsTimedOut();
+            }
+            for (int i = 0; i < 5; i++) {
+                metrics.incrementNewActiveControllers();
+            }
 
             @SuppressWarnings("unchecked")
             Gauge<Long> lastAppliedRecordOffset = (Gauge<Long>) registry
@@ -140,6 +140,36 @@ public class QuorumControllerMetricsTest {
                 .allMetrics()
                 .get(metricName("KafkaController", 
"LastCommittedRecordOffset"));
             assertEquals(50, lastCommittedRecordOffset.value());
+
+            @SuppressWarnings("unchecked")
+            Gauge<Long> zkWriteBehindLag = (Gauge<Long>) registry
+                    .allMetrics()
+                    .get(metricName("KafkaController", "ZKWriteBehindLag"));
+            assertEquals(10L, zkWriteBehindLag.value());
+
+            @SuppressWarnings("unchecked")
+            Gauge<Long> timedOutBrokerHeartbeats = (Gauge<Long>) registry
+                    .allMetrics()
+                    .get(metricName("KafkaController", 
"TimedOutBrokerHeartbeatCount"));
+            assertEquals(2L, timedOutBrokerHeartbeats.value());
+
+            @SuppressWarnings("unchecked")
+            Gauge<Long> operationsStarted = (Gauge<Long>) registry
+                    .allMetrics()
+                    .get(metricName("KafkaController", 
"EventQueueOperationsStartedCount"));
+            assertEquals(3L, operationsStarted.value());
+
+            @SuppressWarnings("unchecked")
+            Gauge<Long> operationsTimedOut = (Gauge<Long>) registry
+                    .allMetrics()
+                    .get(metricName("KafkaController", 
"EventQueueOperationsTimedOutCount"));
+            assertEquals(4L, operationsTimedOut.value());
+
+            @SuppressWarnings("unchecked")
+            Gauge<Long> newActiveControllers = (Gauge<Long>) registry
+                    .allMetrics()
+                    .get(metricName("KafkaController", 
"NewActiveControllersCount"));
+            assertEquals(5L, newActiveControllers.value());
         } finally {
             registry.shutdown();
         }
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/FakeSnapshotWriter.java 
b/metadata/src/test/java/org/apache/kafka/image/FakeSnapshotWriter.java
new file mode 100644
index 00000000000..d76b1c81f21
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/image/FakeSnapshotWriter.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.snapshot.SnapshotWriter;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+
+public class FakeSnapshotWriter implements 
SnapshotWriter<ApiMessageAndVersion> {
+    private final OffsetAndEpoch snapshotId;
+    private List<List<ApiMessageAndVersion>> batches = new ArrayList<>();
+    private boolean frozen = false;
+    private boolean closed = false;
+
+    public List<List<ApiMessageAndVersion>> batches() {
+        List<List<ApiMessageAndVersion>> result = new ArrayList<>();
+        for (List<ApiMessageAndVersion> batch : batches) {
+            result.add(Collections.unmodifiableList(batch));
+        }
+        return Collections.unmodifiableList(result);
+    }
+
+    public FakeSnapshotWriter() {
+        this(new OffsetAndEpoch(100L, 10));
+    }
+
+    public FakeSnapshotWriter(OffsetAndEpoch snapshotId) {
+        this.snapshotId = snapshotId;
+    }
+
+    @Override
+    public OffsetAndEpoch snapshotId() {
+        return snapshotId;
+    }
+
+    @Override
+    public long lastContainedLogOffset() {
+        return snapshotId().offset() - 1;
+    }
+
+    @Override
+    public int lastContainedLogEpoch() {
+        return snapshotId().epoch();
+    }
+
+    @Override
+    public boolean isFrozen() {
+        return frozen;
+    }
+
+    @Override
+    public void append(List<ApiMessageAndVersion> batch) {
+        if (frozen) {
+            throw new IllegalStateException("Append not supported. Snapshot is 
already frozen.");
+        }
+        batches.add(batch);
+    }
+
+    @Override
+    public long freeze() {
+        frozen = true;
+        return batches.size() * 100;
+    }
+
+    @Override
+    public void close() {
+        closed = true;
+    }
+
+    public boolean isClosed() {
+        return closed;
+    }
+}
\ No newline at end of file
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java 
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index 1eefa15a8c3..372700b1fb6 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -37,6 +37,7 @@ import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.fault.MockFaultHandler;
 import org.apache.kafka.snapshot.SnapshotReader;
+import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -55,6 +56,7 @@ import java.util.stream.Collectors;
 import static java.util.Arrays.asList;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV1;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_5_IV0;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -251,6 +253,13 @@ public class MetadataLoaderTest {
                     )
                 );
                 loader.handleLoadSnapshot(snapshotReader);
+                TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                    assertEquals(1L, 
loader.metrics().handleLoadSnapshotCount());
+                });
+            } else {
+                TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
+                    assertEquals(0L, 
loader.metrics().handleLoadSnapshotCount());
+                });
             }
             loader.waitForAllEventsToBeHandled();
             if (sameObject) {
@@ -328,6 +337,8 @@ public class MetadataLoaderTest {
             assertEquals(300L, loader.lastAppliedOffset());
             assertEquals(new SnapshotManifest(new MetadataProvenance(300, 100, 
4000), 3000000L),
                 publishers.get(0).latestSnapshotManifest);
+            assertEquals(MetadataVersion.MINIMUM_KRAFT_VERSION,
+                loader.metrics().currentMetadataVersion());
         }
         assertTrue(publishers.get(0).closed);
         assertEquals(MetadataVersion.IBP_3_0_IV1,
@@ -587,14 +598,27 @@ public class MetadataLoaderTest {
 
             loadTestSnapshot(loader, 200);
             assertEquals(200L, loader.lastAppliedOffset());
+            assertEquals(IBP_3_3_IV1.featureLevel(),
+                loader.metrics().currentMetadataVersion().featureLevel());
             assertFalse(publishers.get(0).latestDelta.image().isEmpty());
 
             loadTestSnapshot2(loader, 400);
             assertEquals(400L, loader.lastAppliedOffset());
+            assertEquals(IBP_3_3_IV2.featureLevel(),
+                loader.metrics().currentMetadataVersion().featureLevel());
 
             // Make sure the topic in the initial snapshot was overwritten by 
loading the new snapshot.
             
assertFalse(publishers.get(0).latestImage.topics().topicsByName().containsKey("foo"));
             
assertTrue(publishers.get(0).latestImage.topics().topicsByName().containsKey("bar"));
+
+            loader.handleCommit(new MockBatchReader(500, asList(
+                MockBatchReader.newBatch(500, 100, asList(
+                    new ApiMessageAndVersion(new FeatureLevelRecord().
+                        setName(MetadataVersion.FEATURE_NAME).
+                        setFeatureLevel(IBP_3_5_IV0.featureLevel()), (short) 
0))))));
+            loader.waitForAllEventsToBeHandled();
+            assertEquals(IBP_3_5_IV0.featureLevel(),
+                loader.metrics().currentMetadataVersion().featureLevel());
         }
         faultHandler.maybeRethrowFirstException();
     }
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
 
b/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
new file mode 100644
index 00000000000..5e3c6c6f571
--- /dev/null
+++ 
b/metadata/src/test/java/org/apache/kafka/image/loader/metrics/MetadataLoaderMetricsTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.kafka.controller.metrics.ControllerMetricsTestUtils;
+import org.apache.kafka.image.MetadataProvenance;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+public class MetadataLoaderMetricsTest {
+    private static class FakeMetadataLoaderMetrics implements AutoCloseable {
+        final AtomicLong batchProcessingTimeNs = new AtomicLong(0L);
+        final AtomicInteger batchSize = new AtomicInteger(0);
+        final AtomicReference<MetadataProvenance> provenance =
+            new AtomicReference<>(MetadataProvenance.EMPTY);
+        final MetadataLoaderMetrics metrics;
+
+        FakeMetadataLoaderMetrics() {
+            this(Optional.empty());
+        }
+
+        FakeMetadataLoaderMetrics(MetricsRegistry registry) {
+            this(Optional.of(registry));
+        }
+
+        FakeMetadataLoaderMetrics(Optional<MetricsRegistry> registry) {
+            metrics = new MetadataLoaderMetrics(
+                registry,
+                n -> batchProcessingTimeNs.set(n),
+                n -> batchSize.set(n),
+                provenance);
+        }
+
+        @Override
+        public void close() {
+            metrics.close();
+        }
+    }
+
+    @Test
+    public void testMetricNames() {
+        MetricsRegistry registry = new MetricsRegistry();
+        try {
+            try (FakeMetadataLoaderMetrics fakeMetrics = new 
FakeMetadataLoaderMetrics(registry)) {
+                ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, 
"kafka.server",
+                    new HashSet<>(Arrays.asList(
+                        
"kafka.server:type=MetadataLoader,name=CurrentMetadataVersion",
+                        
"kafka.server:type=MetadataLoader,name=HandleLoadSnapshotCount"
+                    )));
+            }
+            ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, 
"kafka.server",
+                    Collections.emptySet());
+        } finally {
+            registry.shutdown();
+        }
+    }
+
+    @Test
+    public void testUpdateBatchProcessingTimeNs() {
+        MetricsRegistry registry = new MetricsRegistry();
+        try (FakeMetadataLoaderMetrics fakeMetrics = new 
FakeMetadataLoaderMetrics(registry)) {
+            fakeMetrics.metrics.updateBatchProcessingTimeNs(123L);
+            assertEquals(123L, fakeMetrics.batchProcessingTimeNs.get());
+        }
+    }
+
+    @Test
+    public void testUpdateBatchSize() {
+        MetricsRegistry registry = new MetricsRegistry();
+        try (FakeMetadataLoaderMetrics fakeMetrics = new 
FakeMetadataLoaderMetrics(registry)) {
+            fakeMetrics.metrics.updateBatchSize(50);
+            assertEquals(50, fakeMetrics.batchSize.get());
+        }
+    }
+
+    @Test
+    public void testUpdateLastAppliedImageProvenance() {
+        MetricsRegistry registry = new MetricsRegistry();
+        try (FakeMetadataLoaderMetrics fakeMetrics = new 
FakeMetadataLoaderMetrics(registry)) {
+            MetadataProvenance provenance = new MetadataProvenance(1L, 2, 3L);
+            fakeMetrics.metrics.updateLastAppliedImageProvenance(provenance);
+            assertEquals(provenance, fakeMetrics.provenance.get());
+        }
+    }
+
+    @Test
+    public void testManagedMetrics() {
+        MetricsRegistry registry = new MetricsRegistry();
+        try {
+            try (FakeMetadataLoaderMetrics fakeMetrics = new 
FakeMetadataLoaderMetrics(registry)) {
+                fakeMetrics.metrics.setCurrentMetadataVersion(IBP_3_3_IV2);
+                fakeMetrics.metrics.incrementHandleLoadSnapshotCount();
+                fakeMetrics.metrics.incrementHandleLoadSnapshotCount();
+
+                @SuppressWarnings("unchecked")
+                Gauge<Integer> currentMetadataVersion = (Gauge<Integer>) 
registry
+                    .allMetrics()
+                    .get(metricName("MetadataLoader", 
"CurrentMetadataVersion"));
+                assertEquals(IBP_3_3_IV2.featureLevel(),
+                    currentMetadataVersion.value().shortValue());
+
+                @SuppressWarnings("unchecked")
+                Gauge<Long> loadSnapshotCount = (Gauge<Long>) registry
+                    .allMetrics()
+                    .get(metricName("MetadataLoader", 
"HandleLoadSnapshotCount"));
+                assertEquals(2L, loadSnapshotCount.value().longValue());
+            }
+            ControllerMetricsTestUtils.assertMetricsForTypeEqual(registry, 
"kafka.server",
+                Collections.emptySet());
+        } finally {
+            registry.shutdown();
+        }
+    }
+
+    private static MetricName metricName(String type, String name) {
+        String mBeanName = String.format("kafka.server:type=%s,name=%s", type, 
name);
+        return new MetricName("kafka.server", type, name, null, mBeanName);
+    }
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
 
b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
index ca72aa058ef..be4285f6bbf 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.image.publisher;
 
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.image.FakeSnapshotWriter;
 import org.apache.kafka.image.MetadataImageTest;
 import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.raft.OffsetAndEpoch;
@@ -26,7 +28,6 @@ import org.apache.kafka.snapshot.SnapshotWriter;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
-import java.util.ArrayList;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.Optional;
@@ -43,7 +44,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 @Timeout(value = 40)
 public class SnapshotEmitterTest {
     static class MockRaftClient implements RaftClient<ApiMessageAndVersion> {
-        TreeMap<OffsetAndEpoch, MockSnapshotWriter> writers = new TreeMap<>();
+        TreeMap<OffsetAndEpoch, FakeSnapshotWriter> writers = new TreeMap<>();
 
         @Override
         public void initialize() {
@@ -103,7 +104,7 @@ public class SnapshotEmitterTest {
             if (writers.containsKey(snapshotId)) {
                 return Optional.empty();
             }
-            MockSnapshotWriter writer = new MockSnapshotWriter(snapshotId);
+            FakeSnapshotWriter writer = new FakeSnapshotWriter(snapshotId);
             writers.put(snapshotId, writer);
             return Optional.of(writer);
         }
@@ -124,72 +125,24 @@ public class SnapshotEmitterTest {
         }
     }
 
-    static class MockSnapshotWriter implements 
SnapshotWriter<ApiMessageAndVersion> {
-        private final OffsetAndEpoch snapshotId;
-        private boolean frozen = false;
-        private boolean closed = false;
-        private final List<List<ApiMessageAndVersion>> batches;
-
-        MockSnapshotWriter(OffsetAndEpoch snapshotId) {
-            this.snapshotId = snapshotId;
-            this.batches = new ArrayList<>();
-        }
-
-        @Override
-        public OffsetAndEpoch snapshotId() {
-            return snapshotId;
-        }
-
-        @Override
-        public long lastContainedLogOffset() {
-            return snapshotId.offset() - 1;
-        }
-
-        @Override
-        public int lastContainedLogEpoch() {
-            return snapshotId.epoch();
-        }
-
-        @Override
-        public boolean isFrozen() {
-            return frozen;
-        }
-
-        @Override
-        public void append(List<ApiMessageAndVersion> records) {
-            batches.add(records);
-        }
-
-        List<List<ApiMessageAndVersion>> batches() {
-            List<List<ApiMessageAndVersion>> results = new ArrayList<>();
-            batches.forEach(batch -> results.add(new ArrayList<>(batch)));
-            return results;
-        }
-
-        @Override
-        public void freeze() {
-            frozen = true;
-        }
-
-        @Override
-        public void close() {
-            closed = true;
-        }
-
-        boolean isClosed() {
-            return closed;
-        }
-    }
-
     @Test
     public void testEmit() throws Exception {
         MockRaftClient mockRaftClient = new MockRaftClient();
+        MockTime time = new MockTime(0, 10000L, 20000L);
         SnapshotEmitter emitter = new SnapshotEmitter.Builder().
+            setTime(time).
             setBatchSize(2).
             setRaftClient(mockRaftClient).
             build();
+        assertEquals(0L, emitter.metrics().latestSnapshotGeneratedAgeMs());
+        assertEquals(0L, emitter.metrics().latestSnapshotGeneratedBytes());
+        time.sleep(30000L);
+        assertEquals(30000L, emitter.metrics().latestSnapshotGeneratedAgeMs());
+        assertEquals(0L, emitter.metrics().latestSnapshotGeneratedBytes());
         emitter.maybeEmit(MetadataImageTest.IMAGE1);
-        MockSnapshotWriter writer = mockRaftClient.writers.get(
+        assertEquals(0L, emitter.metrics().latestSnapshotGeneratedAgeMs());
+        assertEquals(1400L, emitter.metrics().latestSnapshotGeneratedBytes());
+        FakeSnapshotWriter writer = mockRaftClient.writers.get(
                 MetadataImageTest.IMAGE1.provenance().snapshotId());
         assertNotNull(writer);
         assertEquals(MetadataImageTest.IMAGE1.highestOffsetAndEpoch().offset(),
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetricsTest.java
 
b/metadata/src/test/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetricsTest.java
new file mode 100644
index 00000000000..9f79631eef3
--- /dev/null
+++ 
b/metadata/src/test/java/org/apache/kafka/image/publisher/metrics/SnapshotEmitterMetricsTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher.metrics;
+
+import com.yammer.metrics.core.Gauge;
+import com.yammer.metrics.core.MetricName;
+import com.yammer.metrics.core.MetricsRegistry;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Optional;
+
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.controller.metrics.ControllerMetricsTestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+@Timeout(value = 40)
+public class SnapshotEmitterMetricsTest {
+    private static final Logger log = 
LoggerFactory.getLogger(SnapshotEmitterMetricsTest.class);
+
+    static class SnapshotEmitterMetricsTestContext implements AutoCloseable {
+        final MetricsRegistry registry;
+        final MockTime time;
+        final SnapshotEmitterMetrics metrics;
+
+        SnapshotEmitterMetricsTestContext() {
+            this.registry = new MetricsRegistry();
+            this.time = new MockTime(0, 10000L, 0L);
+            this.metrics = new SnapshotEmitterMetrics(Optional.of(registry), 
time);
+        }
+
+        @SuppressWarnings("unchecked") // suppress warning about Gauge typecast
+        long readLongGauge(String name) {
+            MetricName metricName = new MetricName(
+                "kafka.server",
+                "SnapshotEmitter",
+                name,
+                null,
+                "kafka.server:type=SnapshotEmitter,name=" + name
+            );
+            return ((Gauge<Long>) 
registry.allMetrics().get(metricName)).value();
+        }
+
+        @Override
+        public void close() {
+            try {
+                registry.shutdown();
+            } catch (Exception e) {
+                log.error("Error closing registry", e);
+            }
+        }
+    }
+
+    @Test
+    public void testMetricNames() {
+        try (SnapshotEmitterMetricsTestContext ctx = new 
SnapshotEmitterMetricsTestContext()) {
+            ControllerMetricsTestUtils.assertMetricsForTypeEqual(ctx.registry, 
"kafka.server:",
+                new HashSet<>(Arrays.asList(
+                    
"kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedBytes",
+                    
"kafka.server:type=SnapshotEmitter,name=LatestSnapshotGeneratedAgeMs"
+                )));
+            ctx.metrics.close();
+            ControllerMetricsTestUtils.assertMetricsForTypeEqual(ctx.registry, 
"KafkaController",
+                    Collections.emptySet());
+        }
+    }
+
+    @Test
+    public void testLatestSnapshotGeneratedBytesMetric() {
+        try (SnapshotEmitterMetricsTestContext ctx = new 
SnapshotEmitterMetricsTestContext()) {
+            assertEquals(0L, ctx.metrics.latestSnapshotGeneratedBytes());
+            ctx.metrics.setLatestSnapshotGeneratedBytes(12345L);
+            assertEquals(12345L, ctx.metrics.latestSnapshotGeneratedBytes());
+            assertEquals(12345L, 
ctx.readLongGauge("LatestSnapshotGeneratedBytes"));
+        }
+    }
+
+    @Test
+    public void testLatestSnapshotGeneratedAgeMsMetric() {
+        try (SnapshotEmitterMetricsTestContext ctx = new 
SnapshotEmitterMetricsTestContext()) {
+            assertEquals(10000L, ctx.metrics.latestSnapshotGeneratedTimeMs());
+            assertEquals(0L, ctx.metrics.latestSnapshotGeneratedAgeMs());
+            ctx.time.sleep(20000L);
+            assertEquals(10000L, ctx.metrics.latestSnapshotGeneratedTimeMs());
+            assertEquals(20000L, ctx.metrics.latestSnapshotGeneratedAgeMs());
+            assertEquals(20000L, 
ctx.readLongGauge("LatestSnapshotGeneratedAgeMs"));
+        }
+    }
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/writer/RaftSnapshotWriterTest.java
 
b/metadata/src/test/java/org/apache/kafka/image/writer/RaftSnapshotWriterTest.java
index 4cdacfe7e1e..0137aeb0772 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/writer/RaftSnapshotWriterTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/writer/RaftSnapshotWriterTest.java
@@ -17,15 +17,11 @@
 
 package org.apache.kafka.image.writer;
 
-import org.apache.kafka.raft.OffsetAndEpoch;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.snapshot.SnapshotWriter;
+import org.apache.kafka.image.FakeSnapshotWriter;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
 
 import static java.util.Collections.emptyList;
 import static org.apache.kafka.metadata.RecordTestUtils.testRecord;
@@ -36,70 +32,29 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 
 @Timeout(value = 40)
 public class RaftSnapshotWriterTest {
-    static class MockSnapshotWriter implements 
SnapshotWriter<ApiMessageAndVersion> {
-        boolean frozen = false;
-        boolean closed = false;
-        List<List<ApiMessageAndVersion>> batches = new ArrayList<>();
-
-        @Override
-        public OffsetAndEpoch snapshotId() {
-            return new OffsetAndEpoch(100L, 10);
-        }
-
-        @Override
-        public long lastContainedLogOffset() {
-            return snapshotId().offset();
-        }
-
-        @Override
-        public int lastContainedLogEpoch() {
-            return snapshotId().epoch();
-        }
-
-        @Override
-        public boolean isFrozen() {
-            return frozen;
-        }
-
-        @Override
-        public void append(List<ApiMessageAndVersion> batch) {
-            batches.add(batch);
-        }
-
-        @Override
-        public void freeze() {
-            frozen = true;
-        }
-
-        @Override
-        public void close() {
-            closed = true;
-        }
-    }
-
     @Test
     public void testFreezeAndClose() {
-        MockSnapshotWriter snapshotWriter = new MockSnapshotWriter();
+        FakeSnapshotWriter snapshotWriter = new FakeSnapshotWriter();
         RaftSnapshotWriter writer = new RaftSnapshotWriter(snapshotWriter, 2);
         writer.write(testRecord(0));
         writer.write(testRecord(1));
         writer.write(testRecord(2));
         writer.close(true);
-        assertTrue(snapshotWriter.frozen);
-        assertTrue(snapshotWriter.closed);
+        assertTrue(snapshotWriter.isFrozen());
+        assertTrue(snapshotWriter.isClosed());
         assertEquals(Arrays.asList(
                 Arrays.asList(testRecord(0), testRecord(1)),
-                Arrays.asList(testRecord(2))), snapshotWriter.batches);
+                Arrays.asList(testRecord(2))), snapshotWriter.batches());
     }
 
     @Test
     public void testCloseWithoutFreeze() {
-        MockSnapshotWriter snapshotWriter = new MockSnapshotWriter();
+        FakeSnapshotWriter snapshotWriter = new FakeSnapshotWriter();
         RaftSnapshotWriter writer = new RaftSnapshotWriter(snapshotWriter, 2);
         writer.write(testRecord(0));
         writer.close();
-        assertFalse(snapshotWriter.frozen);
-        assertTrue(snapshotWriter.closed);
-        assertEquals(emptyList(), snapshotWriter.batches);
+        assertFalse(snapshotWriter.isFrozen());
+        assertTrue(snapshotWriter.isClosed());
+        assertEquals(emptyList(), snapshotWriter.batches());
     }
 }
diff --git 
a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java 
b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
index badefd321ed..535c176c728 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotWriter.java
@@ -35,7 +35,7 @@ public final class FileRawSnapshotWriter implements 
RawSnapshotWriter {
     private final FileChannel channel;
     private final OffsetAndEpoch snapshotId;
     private final Optional<ReplicatedLog> replicatedLog;
-    private boolean frozen = false;
+    private long frozenSize;
 
     private FileRawSnapshotWriter(
         Path tempSnapshotPath,
@@ -47,6 +47,7 @@ public final class FileRawSnapshotWriter implements 
RawSnapshotWriter {
         this.channel = channel;
         this.snapshotId = snapshotId;
         this.replicatedLog = replicatedLog;
+        this.frozenSize = -1L;
     }
 
     @Override
@@ -56,6 +57,9 @@ public final class FileRawSnapshotWriter implements 
RawSnapshotWriter {
 
     @Override
     public long sizeInBytes() {
+        if (frozenSize >= 0) {
+            return frozenSize;
+        }
         try {
             return channel.size();
         } catch (IOException e) {
@@ -99,7 +103,7 @@ public final class FileRawSnapshotWriter implements 
RawSnapshotWriter {
 
     @Override
     public boolean isFrozen() {
-        return frozen;
+        return frozenSize >= 0;
     }
 
     @Override
@@ -107,8 +111,8 @@ public final class FileRawSnapshotWriter implements 
RawSnapshotWriter {
         try {
             checkIfFrozen("Freeze");
 
+            frozenSize = channel.size();
             channel.close();
-            frozen = true;
 
             if (!tempSnapshotPath.toFile().setReadOnly()) {
                 throw new IllegalStateException(String.format("Unable to set 
file (%s) as read-only", tempSnapshotPath));
@@ -148,12 +152,12 @@ public final class FileRawSnapshotWriter implements 
RawSnapshotWriter {
             "FileRawSnapshotWriter(path=%s, snapshotId=%s, frozen=%s)",
             tempSnapshotPath,
             snapshotId,
-            frozen
+            isFrozen()
         );
     }
 
     void checkIfFrozen(String operation) {
-        if (frozen) {
+        if (isFrozen()) {
             throw new IllegalStateException(
                 String.format(
                     "%s is not supported. Snapshot is already frozen: id = %s; 
temp path = %s",
diff --git 
a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java 
b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
index eeacf608a9f..ae6202426b8 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
@@ -192,11 +192,12 @@ final public class RecordsSnapshotWriter<T> implements 
SnapshotWriter<T> {
     }
 
     @Override
-    public void freeze() {
+    public long freeze() {
         finalizeSnapshotWithFooter();
         appendBatches(accumulator.drain());
         snapshot.freeze();
         accumulator.close();
+        return snapshot.sizeInBytes();
     }
 
     @Override
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java 
b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
index 537335c058a..244cc5478f4 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java
@@ -71,8 +71,10 @@ public interface SnapshotWriter<T> extends AutoCloseable {
      * Freezes the snapshot by flushing all pending writes and marking it as 
immutable.
      *
      * Also adds a {@link SnapshotFooterRecord} to the end of the snapshot
+     *
+     * @return  The size of the snapshot in bytes.
      */
-    void freeze();
+    long freeze();
 
     /**
      * Closes the snapshot writer.
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index bc1d0ca21fc..f5b1ea15c3d 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -48,6 +48,7 @@ import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.OptionalInt;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -1832,12 +1833,12 @@ final public class KafkaRaftClientSnapshotTest {
     private final static class MemorySnapshotWriter implements 
RawSnapshotWriter {
         private final OffsetAndEpoch snapshotId;
         private ByteBuffer data;
-        private boolean frozen;
+        private AtomicLong frozenPosition;
 
         public MemorySnapshotWriter(OffsetAndEpoch snapshotId) {
             this.snapshotId = snapshotId;
             this.data = ByteBuffer.allocate(0);
-            this.frozen = false;
+            this.frozenPosition = new AtomicLong(-1L);
         }
 
         @Override
@@ -1847,16 +1848,13 @@ final public class KafkaRaftClientSnapshotTest {
 
         @Override
         public long sizeInBytes() {
-            if (frozen) {
-                throw new RuntimeException("Snapshot is already frozen " + 
snapshotId);
-            }
-
-            return data.position();
+            long position = frozenPosition.get();
+            return (position < 0) ? data.position() : position;
         }
 
         @Override
         public void append(UnalignedMemoryRecords records) {
-            if (frozen) {
+            if (isFrozen()) {
                 throw new RuntimeException("Snapshot is already frozen " + 
snapshotId);
             }
             append(records.buffer());
@@ -1864,7 +1862,7 @@ final public class KafkaRaftClientSnapshotTest {
 
         @Override
         public void append(MemoryRecords records) {
-            if (frozen) {
+            if (isFrozen()) {
                 throw new RuntimeException("Snapshot is already frozen " + 
snapshotId);
             }
             append(records.buffer());
@@ -1885,16 +1883,14 @@ final public class KafkaRaftClientSnapshotTest {
 
         @Override
         public boolean isFrozen() {
-            return frozen;
+            return frozenPosition.get() >= 0;
         }
 
         @Override
         public void freeze() {
-            if (frozen) {
+            if (!frozenPosition.compareAndSet(-1L, data.position())) {
                 throw new RuntimeException("Snapshot is already frozen " + 
snapshotId);
             }
-
-            frozen = true;
             data.flip();
         }
 
diff --git 
a/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java 
b/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java
index 0b5cc66a0c1..103eb3781c6 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/MockRawSnapshotWriter.java
@@ -46,7 +46,7 @@ public final class MockRawSnapshotWriter implements 
RawSnapshotWriter {
 
     @Override
     public long sizeInBytes() {
-        ensureNotFrozenOrClosed();
+        ensureOpen();
         return data.position();
     }
 

Reply via email to