This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7d1b0926fa KAFKA-13883: Implement NoOpRecord and metadata metrics 
(#12183)
7d1b0926fa is described below

commit 7d1b0926fab104311a505826831ddc45864e9a6a
Author: José Armando García Sancio <[email protected]>
AuthorDate: Wed Jun 1 10:48:24 2022 -0700

    KAFKA-13883: Implement NoOpRecord and metadata metrics (#12183)
    
    Implement NoOpRecord as described in KIP-835. This is controlled by the new
    metadata.max.idle.interval.ms configuration.
    
    The KRaft controller schedules an event to write NoOpRecord to the metadata 
log if the metadata
    version supports this feature. This event is scheduled at the interval 
defined in
    metadata.max.idle.interval.ms. Brokers and controllers were improved to 
ignore the NoOpRecord when
    replaying the metadata log.
    
    This PR also addsffour new metrics to the KafkaController metric group, as 
described KIP-835.
    
    Finally, there are some small fixes to leader recovery. This PR fixes a bug 
where metadata version
    3.3-IV1 was not marked as changing the metadata. It also changes the 
ReplicaControlManager to
    accept a metadata version supplier to determine if the leader recovery 
state is supported.
    
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../org/apache/kafka/common/metrics/Gauge.java     |   1 +
 .../src/main/scala/kafka/server/BrokerServer.scala |  14 +-
 .../main/scala/kafka/server/ControllerServer.scala |   7 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  12 +-
 .../server/metadata/BrokerMetadataListener.scala   |  10 +-
 .../server/metadata/BrokerServerMetrics.scala      |  88 +++++++++
 .../server/metadata/BrokerServerMetricsTest.scala  |  88 +++++++++
 .../scala/unit/kafka/metrics/MetricsTest.scala     |  15 ++
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   1 +
 .../metadata/BrokerMetadataListenerTest.scala      | 127 ++++++++-----
 .../apache/kafka/controller/ControllerMetrics.java |  12 ++
 .../apache/kafka/controller/QuorumController.java  | 209 ++++++++++++++++-----
 .../kafka/controller/QuorumControllerMetrics.java  |  85 ++++++++-
 .../controller/ReplicationControlManager.java      |  60 +++---
 .../java/org/apache/kafka/image/MetadataDelta.java |   5 +
 .../main/resources/common/metadata/NoOpRecord.json |  23 +++
 .../controller/FeatureControlManagerTest.java      |   2 +-
 .../kafka/controller/MockControllerMetrics.java    |  58 ++++--
 .../controller/QuorumControllerMetricsTest.java    |  49 ++++-
 .../kafka/controller/QuorumControllerTest.java     |  50 +++++
 .../kafka/controller/QuorumControllerTestEnv.java  |  17 +-
 .../controller/ReplicationControlManagerTest.java  |   2 +
 .../org/apache/kafka/metadata/RecordTestUtils.java |  16 +-
 .../kafka/raft/internals/KafkaRaftMetrics.java     |  43 +++--
 .../kafka/server/common/MetadataVersion.java       |  15 +-
 .../kafka/server/common/MetadataVersionTest.java   |  16 +-
 26 files changed, 826 insertions(+), 199 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java 
b/clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java
index 647942b3d0..d71bbd853d 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.metrics;
 /**
  * A gauge metric is an instantaneous reading of a particular value.
  */
+@FunctionalInterface
 public interface Gauge<T> extends MetricValueProvider<T> {
 
     /**
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index b0b102762d..b62d118096 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -31,6 +31,7 @@ import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.raft.RaftManager
 import kafka.security.CredentialProvider
 import kafka.server.KafkaRaftServer.ControllerRole
+import kafka.server.metadata.BrokerServerMetrics
 import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, 
BrokerMetadataSnapshotter, ClientQuotaMetadataManager, KRaftMetadataCache, 
SnapshotWriterBuilder}
 import kafka.utils.{CoreUtils, KafkaScheduler}
 import org.apache.kafka.common.feature.SupportedVersionRange
@@ -319,11 +320,14 @@ class BrokerServer(
         ))
       }
 
-      metadataListener = new BrokerMetadataListener(config.nodeId,
-                                                    time,
-                                                    threadNamePrefix,
-                                                    
config.metadataSnapshotMaxNewRecordBytes,
-                                                    metadataSnapshotter)
+      metadataListener = new BrokerMetadataListener(
+        config.nodeId,
+        time,
+        threadNamePrefix,
+        config.metadataSnapshotMaxNewRecordBytes,
+        metadataSnapshotter,
+        BrokerServerMetrics(metrics)
+      )
 
       val networkListeners = new ListenerCollection()
       config.effectiveAdvertisedListeners.foreach { ep =>
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 7a2913e9cf..67b3f0276d 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -43,7 +43,6 @@ import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.raft.RaftConfig.AddressSpec
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.apache.kafka.server.common.MetadataVersion.IBP_3_2_IV0
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
@@ -175,6 +174,8 @@ class ControllerServer(
           OptionalLong.empty()
         }
 
+        val maxIdleIntervalNs = 
config.metadataMaxIdleIntervalNs.fold(OptionalLong.empty)(OptionalLong.of)
+
         new QuorumController.Builder(config.nodeId, metaProperties.clusterId).
           setTime(time).
           setThreadNamePrefix(threadNamePrefixAsString).
@@ -183,12 +184,12 @@ class ControllerServer(
           setQuorumFeatures(quorumFeatures).
           setDefaultReplicationFactor(config.defaultReplicationFactor.toShort).
           setDefaultNumPartitions(config.numPartitions.intValue()).
-          
setIsLeaderRecoverySupported(bootstrapMetadata.metadataVersion().isAtLeast(IBP_3_2_IV0)).
           
setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
             TimeUnit.MILLISECONDS)).
           
setSnapshotMaxNewRecordBytes(config.metadataSnapshotMaxNewRecordBytes).
           setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs).
-          setMetrics(new 
QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry())).
+          setMaxIdleIntervalNs(maxIdleIntervalNs).
+          setMetrics(new 
QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry(), time)).
           setCreateTopicPolicy(createTopicPolicy.asJava).
           setAlterConfigPolicy(alterConfigPolicy.asJava).
           setConfigurationValidator(new ControllerConfigurationValidator()).
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 8588828d4f..eb545cea96 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -18,6 +18,7 @@
 package kafka.server
 
 import java.util
+import java.util.concurrent.TimeUnit
 import java.util.{Collections, Locale, Properties}
 
 import kafka.cluster.EndPoint
@@ -81,6 +82,7 @@ object Defaults {
   val BrokerHeartbeatIntervalMs = 2000
   val BrokerSessionTimeoutMs = 9000
   val MetadataSnapshotMaxNewRecordBytes = 20 * 1024 * 1024
+  val MetadataMaxIdleIntervalMs = 500
 
   /** KRaft mode configs */
   val EmptyNodeId: Int = -1
@@ -402,6 +404,7 @@ object KafkaConfig {
   val MetadataMaxRetentionBytesProp = "metadata.max.retention.bytes"
   val MetadataMaxRetentionMillisProp = "metadata.max.retention.ms"
   val QuorumVotersProp = RaftConfig.QUORUM_VOTERS_CONFIG
+  val MetadataMaxIdleIntervalMsProp = "metadata.max.idle.interval.ms"
 
   /************* Authorizer Configuration ***********/
   val AuthorizerClassNameProp = "authorizer.class.name"
@@ -712,6 +715,9 @@ object KafkaConfig {
   val MetadataLogDirDoc = "This configuration determines where we put the 
metadata log for clusters in KRaft mode. " +
     "If it is not set, the metadata log is placed in the first log directory 
from log.dirs."
   val MetadataSnapshotMaxNewRecordBytesDoc = "This is the maximum number of 
bytes in the log between the latest snapshot and the high-watermark needed 
before generating a new snapshot."
+  val MetadataMaxIdleIntervalMsDoc = "This configuration controls how often 
the active " +
+    "controller should write no-op records to the metadata partition. If the 
value is 0, no-op records " +
+    s"are not appended to the metadata partition. The default value is 
${Defaults.MetadataMaxIdleIntervalMs}";
   val ControllerListenerNamesDoc = "A comma-separated list of the names of the 
listeners used by the controller. This is required " +
     "if running in KRaft mode. When communicating with the controller quorum, 
the broker will always use the first listener in this list.\n " +
     "Note: The ZK-based controller should not set this configuration."
@@ -1148,6 +1154,7 @@ object KafkaConfig {
       .define(MetadataLogSegmentMillisProp, LONG, Defaults.LogRollHours * 60 * 
60 * 1000L, null, HIGH, MetadataLogSegmentMillisDoc)
       .define(MetadataMaxRetentionBytesProp, LONG, Defaults.LogRetentionBytes, 
null, HIGH, MetadataMaxRetentionBytesDoc)
       .define(MetadataMaxRetentionMillisProp, LONG, Defaults.LogRetentionHours 
* 60 * 60 * 1000L, null, HIGH, MetadataMaxRetentionMillisDoc)
+      .define(MetadataMaxIdleIntervalMsProp, INT, 
Defaults.MetadataMaxIdleIntervalMs, atLeast(0), LOW, 
MetadataMaxIdleIntervalMsDoc)
 
       /************* Authorizer Configuration ***********/
       .define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName, 
new ConfigDef.NonNullValidator(), LOW, AuthorizerClassNameDoc)
@@ -1643,7 +1650,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   def metadataRetentionBytes = 
getLong(KafkaConfig.MetadataMaxRetentionBytesProp)
   def metadataRetentionMillis = 
getLong(KafkaConfig.MetadataMaxRetentionMillisProp)
 
-
   def numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp)
   def backgroundThreads = getInt(KafkaConfig.BackgroundThreadsProp)
   val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp)
@@ -1661,6 +1667,10 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
 
   /************* Metadata Configuration ***********/
   val metadataSnapshotMaxNewRecordBytes = 
getLong(KafkaConfig.MetadataSnapshotMaxNewRecordBytesProp)
+  val metadataMaxIdleIntervalNs: Option[Long] = {
+    val value = 
TimeUnit.NANOSECONDS.convert(getInt(KafkaConfig.MetadataMaxIdleIntervalMsProp).toLong,
 TimeUnit.MILLISECONDS)
+    if (value > 0) Some(value) else None
+  }
 
   /************* Authorizer Configuration ***********/
   def createNewAuthorizer(): Option[Authorizer] = {
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index e8819e9c5b..fa0bc52d7a 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -39,7 +39,8 @@ class BrokerMetadataListener(
   time: Time,
   threadNamePrefix: Option[String],
   val maxBytesBetweenSnapshots: Long,
-  val snapshotter: Option[MetadataSnapshotter]
+  val snapshotter: Option[MetadataSnapshotter],
+  brokerMetrics: BrokerServerMetrics
 ) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
   private val logContext = new LogContext(s"[BrokerMetadataListener 
id=$brokerId] ")
   private val log = logContext.logger(classOf[BrokerMetadataListener])
@@ -279,6 +280,10 @@ class BrokerMetadataListener(
       debug(s"Publishing new metadata delta $delta at offset 
${_image.highestOffsetAndEpoch().offset}.")
     }
     publisher.publish(delta, _image)
+
+    // Update the metrics since the publisher handled the lastest image
+    brokerMetrics.lastAppliedRecordOffset.set(_highestOffset)
+    brokerMetrics.lastAppliedRecordTimestamp.set(_highestTimestamp)
   }
 
   override def handleLeaderChange(leaderAndEpoch: LeaderAndEpoch): Unit = {
@@ -289,8 +294,9 @@ class BrokerMetadataListener(
     eventQueue.beginShutdown("beginShutdown", new ShutdownEvent())
   }
 
-  class ShutdownEvent() extends EventQueue.FailureLoggingEvent(log) {
+  class ShutdownEvent extends EventQueue.FailureLoggingEvent(log) {
     override def run(): Unit = {
+      brokerMetrics.close()
       removeMetric(BrokerMetadataListener.MetadataBatchProcessingTimeUs)
       removeMetric(BrokerMetadataListener.MetadataBatchSizes)
     }
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
new file mode 100644
index 0000000000..0db6f0071c
--- /dev/null
+++ b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import java.util.concurrent.atomic.AtomicLong
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics.Gauge
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.metrics.MetricConfig
+
+final class BrokerServerMetrics private (metrics: Metrics) extends 
AutoCloseable {
+  import BrokerServerMetrics._
+
+  val lastAppliedRecordOffset: AtomicLong = new AtomicLong(0)
+  val lastAppliedRecordTimestamp: AtomicLong = new AtomicLong(0)
+
+  val lastAppliedRecordOffsetName = metrics.metricName(
+    "last-applied-record-offset",
+    metricGroupName,
+    "The offset of the last record from the cluster metadata partition that 
was applied by the broker"
+  )
+
+  val lastAppliedRecordTimestampName = metrics.metricName(
+    "last-applied-record-timestamp",
+    metricGroupName,
+    "The timestamp of the last record from the cluster metadata partition that 
was applied by the broker"
+  )
+
+  val lastAppliedRecordLagMsName = metrics.metricName(
+    "last-applied-record-lag-ms",
+    metricGroupName,
+    "The difference between now and the timestamp of the last record from the 
cluster metadata partition that was applied by the broker"
+  )
+
+  addMetric(metrics, lastAppliedRecordOffsetName) { _ =>
+    lastAppliedRecordOffset.get
+  }
+
+  addMetric(metrics, lastAppliedRecordTimestampName) { _ =>
+    lastAppliedRecordTimestamp.get
+  }
+
+  addMetric(metrics, lastAppliedRecordLagMsName) { now =>
+    now - lastAppliedRecordTimestamp.get
+  }
+
+  override def close(): Unit = {
+    List(
+      lastAppliedRecordOffsetName,
+      lastAppliedRecordTimestampName,
+      lastAppliedRecordLagMsName
+    ).foreach(metrics.removeMetric)
+  }
+}
+
+
+final object BrokerServerMetrics {
+  private val metricGroupName = "broker-metadata-metrics"
+
+  private def addMetric[T](metrics: Metrics, name: MetricName)(func: Long => 
T): Unit = {
+    metrics.addMetric(name, new FuncGauge(func))
+  }
+
+  private final class FuncGauge[T](func: Long => T) extends Gauge[T] {
+    override def value(config: MetricConfig, now: Long): T = {
+      func(now)
+    }
+  }
+
+  def apply(metrics: Metrics): BrokerServerMetrics = {
+    new BrokerServerMetrics(metrics)
+  }
+}
diff --git 
a/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala 
b/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala
new file mode 100644
index 0000000000..df114ef59e
--- /dev/null
+++ b/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import java.util.Collections
+import kafka.utils.TestUtils
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.MockTime
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Test
+import scala.jdk.CollectionConverters._
+
+final class BrokerServerMetricsTest {
+  @Test
+  def testMetricsExported(): Unit = {
+    val metrics = new Metrics()
+    val expectedGroup = "broker-metadata-metrics"
+
+    // Metric description is not use for metric name equality
+    val expectedMetrics = Set(
+      new MetricName("last-applied-record-offset", expectedGroup, "", 
Collections.emptyMap()),
+      new MetricName("last-applied-record-timestamp", expectedGroup, "", 
Collections.emptyMap()),
+      new MetricName("last-applied-record-lag-ms", expectedGroup, "", 
Collections.emptyMap())
+    )
+     
+    TestUtils.resource(BrokerServerMetrics(metrics)) { brokerMetrics =>
+      val metricsMap = metrics.metrics().asScala.filter{ case (name, _) => 
name.group == expectedGroup }
+      assertEquals(3, metricsMap.size)
+      metricsMap.foreach { case (name, metric) =>
+        assertTrue(expectedMetrics.contains(name))
+      }
+    }
+
+    val metricsMap = metrics.metrics().asScala.filter{ case (name, _) => 
name.group == expectedGroup }
+    assertEquals(0, metricsMap.size)
+  }
+
+  @Test
+  def testLastAppliedRecordOffset(): Unit = {
+    val metrics = new Metrics()
+    TestUtils.resource(BrokerServerMetrics(metrics)) { brokerMetrics =>
+      val offsetMetric = 
metrics.metrics().get(brokerMetrics.lastAppliedRecordOffsetName)
+      assertEquals(0, offsetMetric.metricValue.asInstanceOf[Long])
+
+      // Update metric value and check
+      val expectedValue = 1000
+      brokerMetrics.lastAppliedRecordOffset.set(expectedValue)
+      assertEquals(expectedValue, offsetMetric.metricValue.asInstanceOf[Long])
+    }
+  }
+
+  @Test
+  def testLastAppliedRecordTimestamp(): Unit = {
+    val time = new MockTime()
+    val metrics = new Metrics(time)
+    TestUtils.resource(BrokerServerMetrics(metrics)) { brokerMetrics =>
+      time.sleep(1000)
+      val timestampMetric = 
metrics.metrics().get(brokerMetrics.lastAppliedRecordTimestampName)
+      val lagMetric = 
metrics.metrics().get(brokerMetrics.lastAppliedRecordLagMsName)
+
+      assertEquals(0, timestampMetric.metricValue.asInstanceOf[Long])
+      assertEquals(time.milliseconds, lagMetric.metricValue.asInstanceOf[Long])
+
+      // Update metric value and check
+      val timestamp = 500
+      brokerMetrics.lastAppliedRecordTimestamp.set(timestamp)
+      assertEquals(timestamp, timestampMetric.metricValue.asInstanceOf[Long])
+      assertEquals(time.milliseconds - timestamp, 
lagMetric.metricValue.asInstanceOf[Long])
+    }
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala 
b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index fb2ed99ef1..44a1e53f47 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -229,6 +229,21 @@ class MetricsTest extends KafkaServerTestHarness with 
Logging {
     assertEquals(metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.controller:type=KafkaController,name=FencedBrokerCount"), 1)
   }
 
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("kraft"))
+  def testKRaftControllerMetrics(quorum: String): Unit = {
+    val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
+
+    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.controller:type=KafkaController,name=ActiveControllerCount"), 1)
+    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.controller:type=KafkaController,name=OfflinePartitionsCount"), 1)
+    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount"), 1)
+    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.controller:type=KafkaController,name=GlobalTopicCount"), 1)
+    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.controller:type=KafkaController,name=GlobalPartitionCount"), 1)
+    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.controller:type=KafkaController,name=LastAppliedRecordOffset"), 1)
+    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp"), 1)
+    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == 
"kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs"), 1)
+  }
+
   /**
    * Test that the metrics are created with the right name, 
testZooKeeperStateChangeRateMetrics
    * and testZooKeeperSessionStateMetric in ZooKeeperClientTest test the 
metrics behaviour.
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 36dff7a679..1dd0a9ebc8 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -769,6 +769,7 @@ class KafkaConfigTest {
         case KafkaConfig.MetadataMaxRetentionBytesProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
         case KafkaConfig.MetadataMaxRetentionMillisProp => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
         case KafkaConfig.ControllerListenerNamesProp => // ignore string
+        case KafkaConfig.MetadataMaxIdleIntervalMsProp  => 
assertPropertyInvalid(baseProperties, name, "not_a_number")
 
         case KafkaConfig.AuthorizerClassNameProp => //ignore string
         case KafkaConfig.CreateTopicPolicyClassNameProp => //ignore string
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index 93dc8bdd39..20df47a6a6 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicReference
 import java.util.{Collections, Optional}
 
 import org.apache.kafka.common.metadata.{PartitionChangeRecord, 
PartitionRecord, RegisterBrokerRecord, TopicRecord}
+import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{Endpoint, Uuid}
 import org.apache.kafka.image.{MetadataDelta, MetadataImage}
@@ -34,6 +35,7 @@ import scala.jdk.CollectionConverters._
 
 class BrokerMetadataListenerTest {
   private def newBrokerMetadataListener(
+    metrics: BrokerServerMetrics = BrokerServerMetrics(new Metrics()),
     snapshotter: Option[MetadataSnapshotter] = None,
     maxBytesBetweenSnapshots: Long = 1000000L,
   ): BrokerMetadataListener = {
@@ -42,7 +44,9 @@ class BrokerMetadataListenerTest {
       time = Time.SYSTEM,
       threadNamePrefix = None,
       maxBytesBetweenSnapshots = maxBytesBetweenSnapshots,
-      snapshotter = snapshotter)
+      snapshotter = snapshotter,
+      brokerMetrics = metrics
+    )
   }
 
   @Test
@@ -53,25 +57,42 @@ class BrokerMetadataListenerTest {
 
   @Test
   def testPublish(): Unit = {
-    val listener = newBrokerMetadataListener()
+    val metrics = BrokerServerMetrics(new Metrics())
+    val listener = newBrokerMetadataListener(metrics = metrics)
     try {
-      listener.handleCommit(RecordTestUtils.mockBatchReader(100L,
-        util.Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
-          setBrokerId(0).
-          setBrokerEpoch(100L).
-          setFenced(false).
-          setRack(null).
-          setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg")), 
0.toShort))))
+      val unfencedTimestamp = 300L
+      listener.handleCommit(
+        RecordTestUtils.mockBatchReader(
+          100,
+          unfencedTimestamp,
+          util.Arrays.asList(new ApiMessageAndVersion(new 
RegisterBrokerRecord().
+            setBrokerId(0).
+            setBrokerEpoch(100L).
+            setFenced(false).
+            setRack(null).
+            setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg")), 
0.toShort))
+        )
+      )
       val imageRecords = listener.getImageRecords().get()
       assertEquals(0, imageRecords.size())
       assertEquals(100L, listener.highestMetadataOffset)
-      listener.handleCommit(RecordTestUtils.mockBatchReader(200L,
-        util.Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
-          setBrokerId(1).
-          setBrokerEpoch(200L).
-          setFenced(true).
-          setRack(null).
-          setIncarnationId(Uuid.fromString("QkOQtNKVTYatADcaJ28xDg")), 
0.toShort))))
+      assertEquals(0L, metrics.lastAppliedRecordOffset.get)
+      assertEquals(0L, metrics.lastAppliedRecordTimestamp.get)
+
+      val fencedTimestamp = 500L
+      val fencedLastOffset = 200L
+      listener.handleCommit(
+        RecordTestUtils.mockBatchReader(
+          fencedLastOffset,
+          fencedTimestamp,
+          util.Arrays.asList(new ApiMessageAndVersion(new 
RegisterBrokerRecord().
+            setBrokerId(1).
+            setBrokerEpoch(200L).
+            setFenced(true).
+            setRack(null).
+            setIncarnationId(Uuid.fromString("QkOQtNKVTYatADcaJ28xDg")), 
0.toShort))
+        )
+      )
       listener.startPublishing(new MetadataPublisher {
         override def publish(delta: MetadataDelta, newImage: MetadataImage): 
Unit = {
           assertEquals(200L, newImage.highestOffsetAndEpoch().offset)
@@ -87,6 +108,9 @@ class BrokerMetadataListenerTest {
 
         override def publishedOffset: Long = -1
       }).get()
+
+      assertEquals(fencedLastOffset, metrics.lastAppliedRecordOffset.get)
+      assertEquals(fencedTimestamp, metrics.lastAppliedRecordTimestamp.get)
     } finally {
       listener.close()
     }
@@ -136,15 +160,22 @@ class BrokerMetadataListenerTest {
   private def generateManyRecords(listener: BrokerMetadataListener,
                                   endOffset: Long): Unit = {
     (0 to 10000).foreach { _ =>
-      listener.handleCommit(RecordTestUtils.mockBatchReader(endOffset,
-        util.Arrays.asList(new ApiMessageAndVersion(new 
PartitionChangeRecord().
-          setPartitionId(0).
-          setTopicId(FOO_ID).
-          setRemovingReplicas(Collections.singletonList(1)), 0.toShort),
-          new ApiMessageAndVersion(new PartitionChangeRecord().
-            setPartitionId(0).
-            setTopicId(FOO_ID).
-            setRemovingReplicas(Collections.emptyList()), 0.toShort))))
+      listener.handleCommit(
+        RecordTestUtils.mockBatchReader(
+          endOffset,
+          0,
+          util.Arrays.asList(
+            new ApiMessageAndVersion(new PartitionChangeRecord().
+              setPartitionId(0).
+              setTopicId(FOO_ID).
+              setRemovingReplicas(Collections.singletonList(1)), 0.toShort),
+            new ApiMessageAndVersion(new PartitionChangeRecord().
+              setPartitionId(0).
+              setTopicId(FOO_ID).
+              setRemovingReplicas(Collections.emptyList()), 0.toShort)
+          )
+        )
+      )
     }
     listener.getImageRecords().get()
   }
@@ -215,13 +246,18 @@ class BrokerMetadataListenerTest {
     endOffset: Long
   ): Unit = {
     brokerIds.foreach { brokerId =>
-      listener.handleCommit(RecordTestUtils.mockBatchReader(endOffset,
-        util.Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
-          setBrokerId(brokerId).
-          setBrokerEpoch(100L).
-          setFenced(false).
-          setRack(null).
-          setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CS" + 
brokerId)), 0.toShort))))
+      listener.handleCommit(
+        RecordTestUtils.mockBatchReader(
+          endOffset,
+          0,
+          util.Arrays.asList(new ApiMessageAndVersion(new 
RegisterBrokerRecord().
+            setBrokerId(brokerId).
+            setBrokerEpoch(100L).
+            setFenced(false).
+            setRack(null).
+            setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CS" + 
brokerId)), 0.toShort))
+        )
+      )
     }
   }
 
@@ -230,17 +266,22 @@ class BrokerMetadataListenerTest {
     replicas: Seq[Int],
     endOffset: Long
   ): Unit = {
-    listener.handleCommit(RecordTestUtils.mockBatchReader(endOffset,
-      util.Arrays.asList(
-        new ApiMessageAndVersion(new TopicRecord().
-          setName("foo").
-          setTopicId(FOO_ID), 0.toShort),
-        new ApiMessageAndVersion(new PartitionRecord().
-          setPartitionId(0).
-          setTopicId(FOO_ID).
-          setIsr(replicas.map(Int.box).asJava).
-          setLeader(0).
-          setReplicas(replicas.map(Int.box).asJava), 0.toShort)))
+    listener.handleCommit(
+      RecordTestUtils.mockBatchReader(
+        endOffset,
+        0,
+        util.Arrays.asList(
+          new ApiMessageAndVersion(new TopicRecord().
+            setName("foo").
+            setTopicId(FOO_ID), 0.toShort),
+          new ApiMessageAndVersion(new PartitionRecord().
+            setPartitionId(0).
+            setTopicId(FOO_ID).
+            setIsr(replicas.map(Int.box).asJava).
+            setLeader(0).
+            setReplicas(replicas.map(Int.box).asJava), 0.toShort)
+        )
+      )
     )
   }
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java 
b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
index fa03e058de..6b470664d6 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
@@ -51,5 +51,17 @@ public interface ControllerMetrics extends AutoCloseable {
 
     int preferredReplicaImbalanceCount();
 
+    void setLastAppliedRecordOffset(long offset);
+
+    long lastAppliedRecordOffset();
+
+    void setLastCommittedRecordOffset(long offset);
+
+    long lastCommittedRecordOffset();
+
+    void setLastAppliedRecordTimestamp(long timestamp);
+
+    long lastAppliedRecordTimestamp();
+
     void close();
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 78cebd892c..7ca557dada 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -53,6 +53,7 @@ import org.apache.kafka.common.metadata.ClientQuotaRecord;
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.metadata.FenceBrokerRecord;
 import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.NoOpRecord;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
 import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.common.metadata.ProducerIdsRecord;
@@ -70,24 +71,24 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.controller.SnapshotGenerator.Section;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FinalizedControllerFeatures;
 import org.apache.kafka.metadata.KafkaConfigSchema;
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
 import org.apache.kafka.metadata.placement.ReplicaPlacer;
 import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
-import org.apache.kafka.server.authorizer.AclCreateResult;
-import org.apache.kafka.server.authorizer.AclDeleteResult;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.metadata.BrokerHeartbeatReply;
-import org.apache.kafka.metadata.BrokerRegistrationReply;
-import org.apache.kafka.metadata.FinalizedControllerFeatures;
-import org.apache.kafka.queue.EventQueue;
 import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
+import org.apache.kafka.queue.EventQueue;
 import org.apache.kafka.queue.KafkaEventQueue;
 import org.apache.kafka.raft.Batch;
 import org.apache.kafka.raft.BatchReader;
 import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.policy.AlterConfigPolicy;
 import org.apache.kafka.server.policy.CreateTopicPolicy;
@@ -155,10 +156,10 @@ public final class QuorumController implements Controller 
{
         private QuorumFeatures quorumFeatures = null;
         private short defaultReplicationFactor = 3;
         private int defaultNumPartitions = 1;
-        private boolean isLeaderRecoverySupported = false;
         private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new 
Random());
         private long snapshotMaxNewRecordBytes = Long.MAX_VALUE;
         private OptionalLong leaderImbalanceCheckIntervalNs = 
OptionalLong.empty();
+        private OptionalLong maxIdleIntervalNs = OptionalLong.empty();
         private long sessionTimeoutNs = 
ClusterControlManager.DEFAULT_SESSION_TIMEOUT_NS;
         private ControllerMetrics controllerMetrics = null;
         private Optional<CreateTopicPolicy> createTopicPolicy = 
Optional.empty();
@@ -217,11 +218,6 @@ public final class QuorumController implements Controller {
             return this;
         }
 
-        public Builder setIsLeaderRecoverySupported(boolean 
isLeaderRecoverySupported) {
-            this.isLeaderRecoverySupported = isLeaderRecoverySupported;
-            return this;
-        }
-
         public Builder setReplicaPlacer(ReplicaPlacer replicaPlacer) {
             this.replicaPlacer = replicaPlacer;
             return this;
@@ -237,6 +233,11 @@ public final class QuorumController implements Controller {
             return this;
         }
 
+        public Builder setMaxIdleIntervalNs(OptionalLong value) {
+            this.maxIdleIntervalNs = value;
+            return this;
+        }
+
         public Builder setSessionTimeoutNs(long sessionTimeoutNs) {
             this.sessionTimeoutNs = sessionTimeoutNs;
             return this;
@@ -280,14 +281,13 @@ public final class QuorumController implements Controller 
{
         @SuppressWarnings("unchecked")
         public QuorumController build() throws Exception {
             if (raftClient == null) {
-                throw new RuntimeException("You must set a raft client.");
-            }
-            if (bootstrapMetadata == null || 
bootstrapMetadata.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
-                throw new RuntimeException("You must specify an initial 
metadata.version using the kafka-storage tool.");
-            }
-            if (quorumFeatures == null) {
-                throw new RuntimeException("You must specify the quorum 
features");
+                throw new IllegalStateException("You must set a raft client.");
+            } else if (bootstrapMetadata == null || 
bootstrapMetadata.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
+                throw new IllegalStateException("You must specify an initial 
metadata.version using the kafka-storage tool.");
+            } else if (quorumFeatures == null) {
+                throw new IllegalStateException("You must specify the quorum 
features");
             }
+
             if (threadNamePrefix == null) {
                 threadNamePrefix = String.format("Node%d_", nodeId);
             }
@@ -302,12 +302,30 @@ public final class QuorumController implements Controller 
{
             KafkaEventQueue queue = null;
             try {
                 queue = new KafkaEventQueue(time, logContext, threadNamePrefix 
+ "QuorumController");
-                return new QuorumController(logContext, nodeId, clusterId, 
queue, time,
-                    configSchema, raftClient, quorumFeatures, 
defaultReplicationFactor,
-                    defaultNumPartitions, isLeaderRecoverySupported, 
replicaPlacer, snapshotMaxNewRecordBytes,
-                    leaderImbalanceCheckIntervalNs, sessionTimeoutNs, 
controllerMetrics,
-                    createTopicPolicy, alterConfigPolicy, 
configurationValidator, authorizer,
-                    staticConfig, bootstrapMetadata);
+                return new QuorumController(
+                    logContext,
+                    nodeId,
+                    clusterId,
+                    queue,
+                    time,
+                    configSchema,
+                    raftClient,
+                    quorumFeatures,
+                    defaultReplicationFactor,
+                    defaultNumPartitions,
+                    replicaPlacer,
+                    snapshotMaxNewRecordBytes,
+                    leaderImbalanceCheckIntervalNs,
+                    maxIdleIntervalNs,
+                    sessionTimeoutNs,
+                    controllerMetrics,
+                    createTopicPolicy,
+                    alterConfigPolicy,
+                    configurationValidator,
+                    authorizer,
+                    staticConfig,
+                    bootstrapMetadata
+                );
             } catch (Exception e) {
                 Utils.closeQuietly(queue, "event queue");
                 throw e;
@@ -715,7 +733,7 @@ public final class QuorumController implements Controller {
                     offset = raftClient.scheduleAppend(controllerEpoch, 
result.records());
                 }
                 op.processBatchEndOffset(offset);
-                writeOffset = offset;
+                updateWriteOffset(offset);
                 resultAndOffset = ControllerResultAndOffset.of(offset, result);
                 for (ApiMessageAndVersion message : result.records()) {
                     replay(message.message(), Optional.empty(), offset);
@@ -776,7 +794,6 @@ public final class QuorumController implements Controller {
             appendRaftEvent("handleCommit[baseOffset=" + reader.baseOffset() + 
"]", () -> {
                 try {
                     maybeCompleteAuthorizerInitialLoad();
-                    boolean isActiveController = curClaimEpoch != -1;
                     long processedRecordsSize = 0;
                     while (reader.hasNext()) {
                         Batch<ApiMessageAndVersion> batch = reader.next();
@@ -784,7 +801,7 @@ public final class QuorumController implements Controller {
                         int epoch = batch.epoch();
                         List<ApiMessageAndVersion> messages = batch.records();
 
-                        if (isActiveController) {
+                        if (isActiveController()) {
                             // If the controller is active, the records were 
already replayed,
                             // so we don't need to do it here.
                             log.debug("Completing purgatory items up to offset 
{} and epoch {}.", offset, epoch);
@@ -816,9 +833,7 @@ public final class QuorumController implements Controller {
                             }
                         }
 
-                        lastCommittedOffset = offset;
-                        lastCommittedEpoch = epoch;
-                        lastCommittedTimestamp = batch.appendTimestamp();
+                        updateLastCommittedState(offset, epoch, 
batch.appendTimestamp());
                         processedRecordsSize += batch.sizeInBytes();
                     }
 
@@ -833,8 +848,7 @@ public final class QuorumController implements Controller {
         public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> 
reader) {
             appendRaftEvent(String.format("handleSnapshot[snapshotId=%s]", 
reader.snapshotId()), () -> {
                 try {
-                    boolean isActiveController = curClaimEpoch != -1;
-                    if (isActiveController) {
+                    if (isActiveController()) {
                         throw new IllegalStateException(
                             String.format(
                                 "Asked to load snapshot (%s) when it is the 
active controller (%s)",
@@ -877,10 +891,11 @@ public final class QuorumController implements Controller 
{
                             replay(messageAndVersion.message(), 
Optional.of(reader.snapshotId()), offset);
                         }
                     }
-
-                    lastCommittedOffset = reader.lastContainedLogOffset();
-                    lastCommittedEpoch = reader.lastContainedLogEpoch();
-                    lastCommittedTimestamp = 
reader.lastContainedLogTimestamp();
+                    updateLastCommittedState(
+                        reader.lastContainedLogOffset(),
+                        reader.lastContainedLogEpoch(),
+                        reader.lastContainedLogTimestamp()
+                    );
                     snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
                     authorizer.ifPresent(a -> 
a.loadSnapshot(aclControlManager.idToAcl()));
                 } finally {
@@ -904,7 +919,7 @@ public final class QuorumController implements Controller {
 
                     curClaimEpoch = newEpoch;
                     controllerMetrics.setActive(true);
-                    writeOffset = lastCommittedOffset;
+                    updateWriteOffset(lastCommittedOffset);
                     clusterControl.activate();
 
                     // Check if we need to bootstrap metadata into the log. 
This must happen before we can
@@ -960,8 +975,11 @@ public final class QuorumController implements Controller {
                     // When becoming the active controller, schedule a leader 
rebalance if there are any topic partition
                     // with leader that is not the preferred leader.
                     maybeScheduleNextBalancePartitionLeaders();
+
+                    // When becoming leader schedule periodic write of the no 
op record
+                    maybeScheduleNextWriteNoOpRecord();
                 });
-            } else if (curClaimEpoch != -1) {
+            } else if (isActiveController()) {
                 appendRaftEvent("handleRenounce[" + curClaimEpoch + "]", () -> 
{
                     log.warn("Renouncing the leadership at oldEpoch {} due to 
a metadata " +
                              "log event. Reverting to last committed offset 
{}.", curClaimEpoch,
@@ -1010,6 +1028,38 @@ public final class QuorumController implements 
Controller {
         }
     }
 
+    private boolean isActiveController() {
+        return curClaimEpoch != -1;
+    }
+
+    private void updateWriteOffset(long offset) {
+        writeOffset = offset;
+        if (isActiveController()) {
+            controllerMetrics.setLastAppliedRecordOffset(writeOffset);
+            // This is not truly the append timestamp. The KRaft client 
doesn't expose the append time when scheduling a write.
+            // This is good enough because this is called right after the 
records were given to the KRAft client for appending and
+            // the default append linger for KRaft is 25ms.
+            
controllerMetrics.setLastAppliedRecordTimestamp(time.milliseconds());
+        } else {
+            // Change the last applied record metrics back to the last 
committed state. Inactive controllers report the last committed
+            // state while active controllers report the latest state which 
may include uncommitted data.
+            controllerMetrics.setLastAppliedRecordOffset(lastCommittedOffset);
+            
controllerMetrics.setLastAppliedRecordTimestamp(lastCommittedTimestamp);
+        }
+    }
+
+    private void updateLastCommittedState(long offset, int epoch, long 
timestamp) {
+        lastCommittedOffset = offset;
+        lastCommittedEpoch = epoch;
+        lastCommittedTimestamp = timestamp;
+
+        controllerMetrics.setLastCommittedRecordOffset(offset);
+        if (!isActiveController()) {
+            controllerMetrics.setLastAppliedRecordOffset(offset);
+            controllerMetrics.setLastAppliedRecordTimestamp(timestamp);
+        }
+    }
+
     private void renounce() {
         curClaimEpoch = -1;
         controllerMetrics.setActive(false);
@@ -1026,10 +1076,11 @@ public final class QuorumController implements 
Controller {
             raftClient.register(metaLogListener);
         }
 
-        writeOffset = -1;
+        updateWriteOffset(-1);
         clusterControl.deactivate();
         cancelMaybeFenceReplicas();
         cancelMaybeBalancePartitionLeaders();
+        cancelNextWriteNoOpRecord();
     }
 
     private <T> void scheduleDeferredWriteEvent(String name, long deadlineNs,
@@ -1130,6 +1181,53 @@ public final class QuorumController implements 
Controller {
         queue.cancelDeferred(MAYBE_BALANCE_PARTITION_LEADERS);
     }
 
+    private static final String WRITE_NO_OP_RECORD = "writeNoOpRecord";
+
+    private void maybeScheduleNextWriteNoOpRecord() {
+        if (!noOpRecordScheduled &&
+            maxIdleIntervalNs.isPresent() &&
+            featureControl.metadataVersion().isNoOpRecordSupported()) {
+
+            log.debug(
+                "Scheduling write event for {} because maxIdleIntervalNs ({}) 
and metadataVersion ({})",
+                WRITE_NO_OP_RECORD,
+                maxIdleIntervalNs.getAsLong(),
+                featureControl.metadataVersion()
+            );
+
+            ControllerWriteEvent<Void> event = new 
ControllerWriteEvent<>(WRITE_NO_OP_RECORD, () -> {
+                noOpRecordScheduled = false;
+                maybeScheduleNextWriteNoOpRecord();
+
+                return ControllerResult.of(
+                    Arrays.asList(new ApiMessageAndVersion(new NoOpRecord(), 
(short) 0)),
+                    null
+                );
+            });
+
+            long delayNs = time.nanoseconds() + maxIdleIntervalNs.getAsLong();
+            queue.scheduleDeferred(WRITE_NO_OP_RECORD, new 
EarliestDeadlineFunction(delayNs), event);
+            noOpRecordScheduled = true;
+        }
+    }
+
+    private void cancelNextWriteNoOpRecord() {
+        noOpRecordScheduled = false;
+        queue.cancelDeferred(WRITE_NO_OP_RECORD);
+    }
+
+    private void handleFeatureControlChange() {
+        // The feature control maybe have changed. On the active controller 
cancel or schedule noop
+        // record writes accordingly.
+        if (isActiveController()) {
+            if (featureControl.metadataVersion().isNoOpRecordSupported()) {
+                maybeScheduleNextWriteNoOpRecord();
+            } else {
+                cancelNextWriteNoOpRecord();
+            }
+        }
+    }
+
     @SuppressWarnings("unchecked")
     private void replay(ApiMessage message, Optional<OffsetAndEpoch> 
snapshotId, long offset) {
         try {
@@ -1164,6 +1262,8 @@ public final class QuorumController implements Controller 
{
                     break;
                 case FEATURE_LEVEL_RECORD:
                     featureControl.replay((FeatureLevelRecord) message);
+
+                    handleFeatureControlChange();
                     break;
                 case CLIENT_QUOTA_RECORD:
                     clientQuotaControlManager.replay((ClientQuotaRecord) 
message);
@@ -1177,6 +1277,9 @@ public final class QuorumController implements Controller 
{
                 case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
                     aclControlManager.replay((RemoveAccessControlEntryRecord) 
message, snapshotId);
                     break;
+                case NO_OP_RECORD:
+                    // NoOpRecord is an empty record and doesn't need to be 
replayed
+                    break;
                 default:
                     throw new RuntimeException("Unhandled record type " + 
type);
             }
@@ -1196,8 +1299,7 @@ public final class QuorumController implements Controller 
{
         if (newBytesSinceLastSnapshot >= snapshotMaxNewRecordBytes &&
             snapshotGeneratorManager.generator == null
         ) {
-            boolean isActiveController = curClaimEpoch != -1;
-            if (!isActiveController) {
+            if (!isActiveController()) {
                 // The active controller creates in-memory snapshot every time 
an uncommitted
                 // batch gets appended. The in-active controller can be more 
efficient and only
                 // create an in-memory snapshot when needed.
@@ -1220,9 +1322,7 @@ public final class QuorumController implements Controller 
{
         snapshotRegistry.reset();
 
         newBytesSinceLastSnapshot = 0;
-        lastCommittedOffset = -1;
-        lastCommittedEpoch = -1;
-        lastCommittedTimestamp = -1;
+        updateLastCommittedState(-1, -1, -1);
     }
 
     private final LogContext logContext;
@@ -1385,6 +1485,11 @@ public final class QuorumController implements 
Controller {
      */
     private final OptionalLong leaderImbalanceCheckIntervalNs;
 
+    /**
+     * How log to delay between appending NoOpRecord to the log.
+     */
+    private final OptionalLong maxIdleIntervalNs;
+
     private enum ImbalanceSchedule {
         // The leader balancing operation has been scheduled
         SCHEDULED,
@@ -1399,6 +1504,11 @@ public final class QuorumController implements 
Controller {
      */
     private ImbalanceSchedule imbalancedScheduled = ImbalanceSchedule.DEFERRED;
 
+    /**
+     * Tracks if the a write of the NoOpRecord has been scheduled.
+     */
+    private boolean noOpRecordScheduled = false;
+
     private final BootstrapMetadata bootstrapMetadata;
 
     private QuorumController(LogContext logContext,
@@ -1411,10 +1521,10 @@ public final class QuorumController implements 
Controller {
                              QuorumFeatures quorumFeatures,
                              short defaultReplicationFactor,
                              int defaultNumPartitions,
-                             boolean isLeaderRecoverySupported,
                              ReplicaPlacer replicaPlacer,
                              long snapshotMaxNewRecordBytes,
                              OptionalLong leaderImbalanceCheckIntervalNs,
+                             OptionalLong maxIdleIntervalNs,
                              long sessionTimeoutNs,
                              ControllerMetrics controllerMetrics,
                              Optional<CreateTopicPolicy> createTopicPolicy,
@@ -1457,13 +1567,14 @@ public final class QuorumController implements 
Controller {
         this.producerIdControlManager = new 
ProducerIdControlManager(clusterControl, snapshotRegistry);
         this.snapshotMaxNewRecordBytes = snapshotMaxNewRecordBytes;
         this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
+        this.maxIdleIntervalNs = maxIdleIntervalNs;
         this.replicationControl = new ReplicationControlManager.Builder().
+            setMetadataVersion(() -> featureControl.metadataVersion()).
             setSnapshotRegistry(snapshotRegistry).
             setLogContext(logContext).
             setDefaultReplicationFactor(defaultReplicationFactor).
             setDefaultNumPartitions(defaultNumPartitions).
             
setMaxElectionsPerImbalance(ReplicationControlManager.MAX_ELECTIONS_PER_IMBALANCE).
-            setIsLeaderRecoverySupported(isLeaderRecoverySupported).
             setConfigurationControl(configurationControl).
             setClusterControl(clusterControl).
             setControllerMetrics(controllerMetrics).
@@ -1476,8 +1587,8 @@ public final class QuorumController implements Controller 
{
         this.bootstrapMetadata = bootstrapMetadata;
         this.metaLogListener = new QuorumMetaLogListener();
         this.curClaimEpoch = -1;
-        this.writeOffset = -1L;
         this.needToCompleteAuthorizerLoad = authorizer.isPresent();
+        updateWriteOffset(-1);
 
         resetState();
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
index d0280651aa..5abf0d9770 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
@@ -21,10 +21,12 @@ import com.yammer.metrics.core.Gauge;
 import com.yammer.metrics.core.Histogram;
 import com.yammer.metrics.core.MetricName;
 import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.server.metrics.KafkaYammerMetrics;
 
 import java.util.Arrays;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
 
 public final class QuorumControllerMetrics implements ControllerMetrics {
     private final static MetricName ACTIVE_CONTROLLER_COUNT = getMetricName(
@@ -45,7 +47,15 @@ public final class QuorumControllerMetrics implements 
ControllerMetrics {
         "KafkaController", "OfflinePartitionsCount");
     private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = 
getMetricName(
         "KafkaController", "PreferredReplicaImbalanceCount");
-    
+    private final static MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
+        "KafkaController", "LastAppliedRecordOffset");
+    private final static MetricName LAST_COMMITTED_RECORD_OFFSET = 
getMetricName(
+        "KafkaController", "LastCommittedRecordOffset");
+    private final static MetricName LAST_APPLIED_RECORD_TIMESTAMP = 
getMetricName(
+        "KafkaController", "LastAppliedRecordTimestamp");
+    private final static MetricName LAST_APPLIED_RECORD_LAG_MS = getMetricName(
+        "KafkaController", "LastAppliedRecordLagMs");
+
     private final MetricsRegistry registry;
     private volatile boolean active;
     private volatile int fencedBrokerCount;
@@ -54,6 +64,9 @@ public final class QuorumControllerMetrics implements 
ControllerMetrics {
     private volatile int globalPartitionCount;
     private volatile int offlinePartitionCount;
     private volatile int preferredReplicaImbalanceCount;
+    private final AtomicLong lastAppliedRecordOffset = new AtomicLong(0);
+    private final AtomicLong lastCommittedRecordOffset = new AtomicLong(0);
+    private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
     private final Gauge<Integer> activeControllerCount;
     private final Gauge<Integer> fencedBrokerCountGauge;
     private final Gauge<Integer> activeBrokerCountGauge;
@@ -61,10 +74,17 @@ public final class QuorumControllerMetrics implements 
ControllerMetrics {
     private final Gauge<Integer> globalTopicCountGauge;
     private final Gauge<Integer> offlinePartitionCountGauge;
     private final Gauge<Integer> preferredReplicaImbalanceCountGauge;
+    private final Gauge<Long> lastAppliedRecordOffsetGauge;
+    private final Gauge<Long> lastCommittedRecordOffsetGauge;
+    private final Gauge<Long> lastAppliedRecordTimestampGauge;
+    private final Gauge<Long> lastAppliedRecordLagMsGauge;
     private final Histogram eventQueueTime;
     private final Histogram eventQueueProcessingTime;
 
-    public QuorumControllerMetrics(MetricsRegistry registry) {
+    public QuorumControllerMetrics(
+        MetricsRegistry registry,
+        Time time
+    ) {
         this.registry = Objects.requireNonNull(registry);
         this.active = false;
         this.fencedBrokerCount = 0;
@@ -117,6 +137,30 @@ public final class QuorumControllerMetrics implements 
ControllerMetrics {
                 return preferredReplicaImbalanceCount;
             }
         });
+        lastAppliedRecordOffsetGauge = 
registry.newGauge(LAST_APPLIED_RECORD_OFFSET, new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return lastAppliedRecordOffset.get();
+            }
+        });
+        lastCommittedRecordOffsetGauge = 
registry.newGauge(LAST_COMMITTED_RECORD_OFFSET, new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return lastCommittedRecordOffset.get();
+            }
+        });
+        lastAppliedRecordTimestampGauge = 
registry.newGauge(LAST_APPLIED_RECORD_TIMESTAMP, new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return lastAppliedRecordTimestamp.get();
+            }
+        });
+        lastAppliedRecordLagMsGauge = 
registry.newGauge(LAST_APPLIED_RECORD_LAG_MS, new Gauge<Long>() {
+            @Override
+            public Long value() {
+                return time.milliseconds() - lastAppliedRecordTimestamp.get();
+            }
+        });
     }
 
     @Override
@@ -198,6 +242,36 @@ public final class QuorumControllerMetrics implements 
ControllerMetrics {
         return this.preferredReplicaImbalanceCount;
     }
 
+    @Override
+    public void setLastAppliedRecordOffset(long offset) {
+        lastAppliedRecordOffset.set(offset);
+    }
+
+    @Override
+    public long lastAppliedRecordOffset() {
+        return lastAppliedRecordOffset.get();
+    }
+
+    @Override
+    public void setLastCommittedRecordOffset(long offset) {
+        lastCommittedRecordOffset.set(offset);
+    }
+
+    @Override
+    public long lastCommittedRecordOffset() {
+        return lastCommittedRecordOffset.get();
+    }
+
+    @Override
+    public void setLastAppliedRecordTimestamp(long timestamp) {
+        lastAppliedRecordTimestamp.set(timestamp);
+    }
+
+    @Override
+    public long lastAppliedRecordTimestamp() {
+        return lastAppliedRecordTimestamp.get();
+    }
+
     @Override
     public void close() {
         Arrays.asList(
@@ -207,7 +281,12 @@ public final class QuorumControllerMetrics implements 
ControllerMetrics {
             GLOBAL_TOPIC_COUNT,
             GLOBAL_PARTITION_COUNT,
             OFFLINE_PARTITION_COUNT,
-            
PREFERRED_REPLICA_IMBALANCE_COUNT).forEach(this.registry::removeMetric);
+            PREFERRED_REPLICA_IMBALANCE_COUNT,
+            LAST_APPLIED_RECORD_OFFSET,
+            LAST_COMMITTED_RECORD_OFFSET,
+            LAST_APPLIED_RECORD_TIMESTAMP,
+            LAST_APPLIED_RECORD_LAG_MS
+        ).forEach(registry::removeMetric);
     }
 
     private static MetricName getMetricName(String type, String name) {
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index d0273bc02d..81ec253434 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -73,16 +73,17 @@ import 
org.apache.kafka.common.metadata.UnregisterBrokerRecord;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.metadata.KafkaConfigSchema;
-import org.apache.kafka.metadata.placement.ClusterDescriber;
-import org.apache.kafka.metadata.placement.PlacementSpec;
-import org.apache.kafka.metadata.placement.UsableBroker;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.metadata.KafkaConfigSchema;
 import org.apache.kafka.metadata.LeaderRecoveryState;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.metadata.placement.ClusterDescriber;
+import org.apache.kafka.metadata.placement.PlacementSpec;
+import org.apache.kafka.metadata.placement.UsableBroker;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.policy.CreateTopicPolicy;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
@@ -138,17 +139,22 @@ public class ReplicationControlManager {
     static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
 
     static class Builder {
+        private Supplier<MetadataVersion> metadataVersion = 
MetadataVersion::latest;
         private SnapshotRegistry snapshotRegistry = null;
         private LogContext logContext = null;
         private short defaultReplicationFactor = (short) 3;
         private int defaultNumPartitions = 1;
         private int maxElectionsPerImbalance = MAX_ELECTIONS_PER_IMBALANCE;
-        private boolean isLeaderRecoverySupported = true;
         private ConfigurationControlManager configurationControl = null;
         private ClusterControlManager clusterControl = null;
         private ControllerMetrics controllerMetrics = null;
         private Optional<CreateTopicPolicy> createTopicPolicy = 
Optional.empty();
 
+        Builder setMetadataVersion(Supplier<MetadataVersion> metadataVersion) {
+            this.metadataVersion = metadataVersion;
+            return this;
+        }
+
         Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
             this.snapshotRegistry = snapshotRegistry;
             return this;
@@ -174,11 +180,6 @@ public class ReplicationControlManager {
             return this;
         }
 
-        Builder setIsLeaderRecoverySupported(boolean 
isLeaderRecoverySupported) {
-            this.isLeaderRecoverySupported = isLeaderRecoverySupported;
-            return this;
-        }
-
         Builder setConfigurationControl(ConfigurationControlManager 
configurationControl) {
             this.configurationControl = configurationControl;
             return this;
@@ -201,22 +202,21 @@ public class ReplicationControlManager {
 
         ReplicationControlManager build() {
             if (configurationControl == null) {
-                throw new RuntimeException("You must specify 
configurationControl.");
-            }
-            if (clusterControl == null) {
-                throw new RuntimeException("You must specify clusterControl.");
-            }
-            if (controllerMetrics == null) {
-                throw new RuntimeException("You must specify 
controllerMetrics.");
+                throw new IllegalStateException("Configuration control must be 
set before building");
+            } else if (clusterControl == null) {
+                throw new IllegalStateException("Cluster controller must be 
set before building");
+            } else if (controllerMetrics == null) {
+                throw new IllegalStateException("Metrics must be set before 
building");
             }
             if (logContext == null) logContext = new LogContext();
             if (snapshotRegistry == null) snapshotRegistry = 
configurationControl.snapshotRegistry();
-            return new ReplicationControlManager(snapshotRegistry,
+            return new ReplicationControlManager(
+                metadataVersion,
+                snapshotRegistry,
                 logContext,
                 defaultReplicationFactor,
                 defaultNumPartitions,
                 maxElectionsPerImbalance,
-                isLeaderRecoverySupported,
                 configurationControl,
                 clusterControl,
                 controllerMetrics,
@@ -276,9 +276,9 @@ public class ReplicationControlManager {
     private final int defaultNumPartitions;
 
     /**
-     * If the cluster supports leader recovery state from KIP-704.
+     * Metadata version (or IBP) for the cluster
      */
-    private final boolean isLeaderRecoverySupported;
+    private final Supplier<MetadataVersion> metadataVersion;
 
     /**
      * Maximum number of leader elections to perform during one partition 
leader balancing operation.
@@ -358,23 +358,23 @@ public class ReplicationControlManager {
     final KRaftClusterDescriber clusterDescriber = new KRaftClusterDescriber();
 
     private ReplicationControlManager(
+        Supplier<MetadataVersion> metadataVersion,
         SnapshotRegistry snapshotRegistry,
         LogContext logContext,
         short defaultReplicationFactor,
         int defaultNumPartitions,
         int maxElectionsPerImbalance,
-        boolean isLeaderRecoverySupported,
         ConfigurationControlManager configurationControl,
         ClusterControlManager clusterControl,
         ControllerMetrics controllerMetrics,
         Optional<CreateTopicPolicy> createTopicPolicy
     ) {
+        this.metadataVersion = metadataVersion;
         this.snapshotRegistry = snapshotRegistry;
         this.log = logContext.logger(ReplicationControlManager.class);
         this.defaultReplicationFactor = defaultReplicationFactor;
         this.defaultNumPartitions = defaultNumPartitions;
         this.maxElectionsPerImbalance = maxElectionsPerImbalance;
-        this.isLeaderRecoverySupported = isLeaderRecoverySupported;
         this.configurationControl = configurationControl;
         this.controllerMetrics = controllerMetrics;
         this.createTopicPolicy = createTopicPolicy;
@@ -929,7 +929,7 @@ public class ReplicationControlManager {
                     topic.id,
                     partitionId,
                     r -> clusterControl.unfenced(r),
-                    isLeaderRecoverySupported);
+                    metadataVersion.get().isLeaderRecoverySupported());
                 if 
(configurationControl.uncleanLeaderElectionEnabledForTopic(topicData.name())) {
                     
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
                 }
@@ -1223,7 +1223,7 @@ public class ReplicationControlManager {
             topicId,
             partitionId,
             r -> clusterControl.unfenced(r),
-            isLeaderRecoverySupported);
+            metadataVersion.get().isLeaderRecoverySupported());
         builder.setElection(election);
         Optional<ApiMessageAndVersion> record = builder.build();
         if (!record.isPresent()) {
@@ -1339,7 +1339,7 @@ public class ReplicationControlManager {
                 topicPartition.topicId(),
                 topicPartition.partitionId(),
                 r -> clusterControl.unfenced(r),
-                isLeaderRecoverySupported
+                metadataVersion.get().isLeaderRecoverySupported()
             );
             builder.setElection(PartitionChangeBuilder.Election.PREFERRED);
             builder.build().ifPresent(records::add);
@@ -1542,7 +1542,7 @@ public class ReplicationControlManager {
                 topicIdPart.topicId(),
                 topicIdPart.partitionId(),
                 isAcceptableLeader,
-                isLeaderRecoverySupported);
+                metadataVersion.get().isLeaderRecoverySupported());
             if 
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
                 builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
             }
@@ -1652,7 +1652,7 @@ public class ReplicationControlManager {
             tp.topicId(),
             tp.partitionId(),
             r -> clusterControl.unfenced(r),
-            isLeaderRecoverySupported);
+            metadataVersion.get().isLeaderRecoverySupported());
         if 
(configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
             builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
         }
@@ -1704,7 +1704,7 @@ public class ReplicationControlManager {
             tp.topicId(),
             tp.partitionId(),
             r -> clusterControl.unfenced(r),
-            isLeaderRecoverySupported);
+            metadataVersion.get().isLeaderRecoverySupported());
         if (!reassignment.merged().equals(currentReplicas)) {
             builder.setTargetReplicas(reassignment.merged());
         }
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java 
b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
index f40ae9ae7f..01455e360c 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
@@ -216,6 +216,11 @@ public final class MetadataDelta {
             case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
                 replay((RemoveAccessControlEntryRecord) record);
                 break;
+            case NO_OP_RECORD:
+                /* NoOpRecord is an empty record and doesn't need to be 
replayed beyond
+                 * updating the highest offset and epoch.
+                 */
+                break;
             default:
                 throw new RuntimeException("Unknown metadata record type " + 
type);
         }
diff --git a/metadata/src/main/resources/common/metadata/NoOpRecord.json 
b/metadata/src/main/resources/common/metadata/NoOpRecord.json
new file mode 100644
index 0000000000..88b907f8cc
--- /dev/null
+++ b/metadata/src/main/resources/common/metadata/NoOpRecord.json
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 20,
+  "type": "metadata",
+  "name": "NoOpRecord",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": []
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index ec1fcdeb0a..02b1493548 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -257,7 +257,7 @@ public class FeatureControlManagerTest {
         FeatureControlManager manager = new FeatureControlManager(logContext, 
features, snapshotRegistry);
         ControllerResult<Map<String, ApiError>> result = 
manager.initializeMetadataVersion(MetadataVersion.IBP_3_3_IV0.featureLevel());
         RecordTestUtils.replayAll(manager, result.records());
-        assertEquals(manager.metadataVersion(), MetadataVersion.latest());
+        assertEquals(manager.metadataVersion(), MetadataVersion.IBP_3_3_IV0);
 
         result = manager.updateFeatures(
             Collections.singletonMap(MetadataVersion.FEATURE_NAME, 
MetadataVersion.IBP_3_2_IV0.featureLevel()),
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java 
b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
index 0120f15295..5991fcc34f 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
@@ -18,24 +18,18 @@
 package org.apache.kafka.controller;
 
 public final class MockControllerMetrics implements ControllerMetrics {
-    private volatile boolean active;
-    private volatile int fencedBrokers;
-    private volatile int activeBrokers;
-    private volatile int topics;
-    private volatile int partitions;
-    private volatile int offlinePartitions;
-    private volatile int preferredReplicaImbalances;
-    private volatile boolean closed = false;
+    private volatile boolean active = false;
+    private volatile int fencedBrokers = 0;
+    private volatile int activeBrokers = 0;
+    private volatile int topics = 0;
+    private volatile int partitions = 0;
+    private volatile int offlinePartitions = 0;
+    private volatile int preferredReplicaImbalances = 0;
+    private volatile long lastAppliedRecordOffset = 0;
+    private volatile long lastCommittedRecordOffset = 0;
+    private volatile long lastAppliedRecordTimestamp = 0;
 
-    public MockControllerMetrics() {
-        this.active = false;
-        this.fencedBrokers = 0;
-        this.activeBrokers = 0;
-        this.topics = 0;
-        this.partitions = 0;
-        this.offlinePartitions = 0;
-        this.preferredReplicaImbalances = 0;
-    }
+    private volatile boolean closed = false;
 
     @Override
     public void setActive(boolean active) {
@@ -117,6 +111,36 @@ public final class MockControllerMetrics implements 
ControllerMetrics {
         return this.preferredReplicaImbalances;
     }
 
+    @Override
+    public void setLastAppliedRecordOffset(long offset) {
+        lastAppliedRecordOffset = offset;
+    }
+
+    @Override
+    public long lastAppliedRecordOffset() {
+        return lastAppliedRecordOffset;
+    }
+
+    @Override
+    public void setLastCommittedRecordOffset(long offset) {
+        lastCommittedRecordOffset = offset;
+    }
+
+    @Override
+    public long lastCommittedRecordOffset() {
+        return lastCommittedRecordOffset;
+    }
+
+    @Override
+    public void setLastAppliedRecordTimestamp(long timestamp) {
+        lastAppliedRecordTimestamp = timestamp;
+    }
+
+    @Override
+    public long lastAppliedRecordTimestamp() {
+        return lastAppliedRecordTimestamp;
+    }
+
     @Override
     public void close() {
         closed = true;
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
index 4b0afb52c6..21ac02f49e 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
@@ -17,10 +17,12 @@
 
 package org.apache.kafka.controller;
 
+import com.yammer.metrics.core.Gauge;
 import com.yammer.metrics.core.Histogram;
 import com.yammer.metrics.core.MetricName;
 import com.yammer.metrics.core.MetricsRegistry;
 import java.util.Set;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.jupiter.api.Test;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -37,7 +39,10 @@ public class QuorumControllerMetricsTest {
             "GlobalTopicCount",
             "GlobalPartitionCount",
             "OfflinePartitionsCount",
-            "PreferredReplicaImbalanceCount");
+            "PreferredReplicaImbalanceCount",
+            "LastAppliedRecordOffset",
+            "LastAppliedRecordTimestamp",
+            "LastAppliedRecordLagMs");
         assertMetricsCreatedAndRemovedUponClose(expectedType, 
expectedMetricNames);
     }
 
@@ -53,8 +58,9 @@ public class QuorumControllerMetricsTest {
     @Test
     public void testUpdateEventQueueTime() {
         MetricsRegistry registry = new MetricsRegistry();
+        MockTime time = new MockTime();
         try {
-            try (QuorumControllerMetrics quorumControllerMetrics = new 
QuorumControllerMetrics(registry)) {
+            try (QuorumControllerMetrics quorumControllerMetrics = new 
QuorumControllerMetrics(registry, time)) {
                 quorumControllerMetrics.updateEventQueueTime(1000);
                 assertMetricHistogram(registry, 
metricName("ControllerEventManager", "EventQueueTimeMs"), 1, 1000);
             }
@@ -66,8 +72,9 @@ public class QuorumControllerMetricsTest {
     @Test
     public void testUpdateEventQueueProcessingTime() {
         MetricsRegistry registry = new MetricsRegistry();
+        MockTime time = new MockTime();
         try {
-            try (QuorumControllerMetrics quorumControllerMetrics = new 
QuorumControllerMetrics(registry)) {
+            try (QuorumControllerMetrics quorumControllerMetrics = new 
QuorumControllerMetrics(registry, time)) {
                 quorumControllerMetrics.updateEventQueueProcessingTime(1000);
                 assertMetricHistogram(registry, 
metricName("ControllerEventManager", "EventQueueProcessingTimeMs"), 1, 1000);
             }
@@ -76,10 +83,44 @@ public class QuorumControllerMetricsTest {
         }
     }
 
+    @Test
+    public void testLastAppliedRecordMetrics() {
+        MetricsRegistry registry = new MetricsRegistry();
+        MockTime time = new MockTime();
+        time.sleep(1000);
+        try {
+            try (QuorumControllerMetrics quorumControllerMetrics = new 
QuorumControllerMetrics(registry, time)) {
+                quorumControllerMetrics.setLastAppliedRecordOffset(100);
+                quorumControllerMetrics.setLastAppliedRecordTimestamp(500);
+
+                @SuppressWarnings("unchecked")
+                Gauge<Long> lastAppliedRecordOffset = (Gauge<Long>) registry
+                    .allMetrics()
+                    .get(metricName("KafkaController", 
"LastAppliedRecordOffset"));
+                assertEquals(100, lastAppliedRecordOffset.value());
+
+                @SuppressWarnings("unchecked")
+                Gauge<Long> lastAppliedRecordTimestamp = (Gauge<Long>) registry
+                    .allMetrics()
+                    .get(metricName("KafkaController", 
"LastAppliedRecordTimestamp"));
+                assertEquals(500, lastAppliedRecordTimestamp.value());
+
+                @SuppressWarnings("unchecked")
+                Gauge<Long> lastAppliedRecordLagMs = (Gauge<Long>) registry
+                    .allMetrics()
+                    .get(metricName("KafkaController", 
"LastAppliedRecordLagMs"));
+                assertEquals(time.milliseconds() - 500, 
lastAppliedRecordLagMs.value());
+            }
+        } finally {
+            registry.shutdown();
+        }
+    }
+
     private static void assertMetricsCreatedAndRemovedUponClose(String 
expectedType, Set<String> expectedMetricNames) {
         MetricsRegistry registry = new MetricsRegistry();
+        MockTime time = new MockTime();
         try {
-            try (QuorumControllerMetrics quorumControllerMetrics = new 
QuorumControllerMetrics(registry)) {
+            try (QuorumControllerMetrics quorumControllerMetrics = new 
QuorumControllerMetrics(registry, time)) {
                 assertMetricsCreated(registry, expectedMetricNames, 
expectedType);
             }
             assertMetricsRemoved(registry, expectedMetricNames, expectedType);
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index e51b38e783..eef7215070 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.Spliterator;
@@ -84,6 +85,7 @@ import org.apache.kafka.metadata.MetadataRecordSerde;
 import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
+import org.apache.kafka.metalog.LocalLogManager;
 import org.apache.kafka.metalog.LocalLogManagerTestEnv;
 import org.apache.kafka.raft.Batch;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -423,6 +425,54 @@ public class QuorumControllerTest {
         }
     }
 
+    @Test
+    public void testNoOpRecordWriteAfterTimeout() throws Throwable {
+        long maxIdleIntervalNs = 1_000;
+        long maxReplicationDelayMs = 60_000;
+        try (
+            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, 
Optional.empty());
+            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(
+                logEnv,
+                builder -> {
+                    builder.setConfigSchema(SCHEMA)
+                        
.setMaxIdleIntervalNs(OptionalLong.of(maxIdleIntervalNs));
+                }
+            );
+        ) {
+            ListenerCollection listeners = new ListenerCollection();
+            listeners.add(new 
Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
+            QuorumController active = controlEnv.activeController();
+
+            LocalLogManager localLogManager = logEnv
+                .logManagers()
+                .stream()
+                .filter(logManager -> 
logManager.nodeId().equals(OptionalInt.of(active.nodeId())))
+                .findAny()
+                .get();
+            TestUtils.waitForCondition(
+                () -> localLogManager.highWatermark().isPresent(),
+                maxReplicationDelayMs,
+                "High watermark was not established"
+            );
+
+
+            final long firstHighWatermark = 
localLogManager.highWatermark().getAsLong();
+            TestUtils.waitForCondition(
+                () -> localLogManager.highWatermark().getAsLong() > 
firstHighWatermark,
+                maxReplicationDelayMs,
+                "Active controller didn't write NoOpRecord the first time"
+            );
+
+            // Do it again to make sure that we are not counting the leader 
change record
+            final long secondHighWatermark = 
localLogManager.highWatermark().getAsLong();
+            TestUtils.waitForCondition(
+                () -> localLogManager.highWatermark().getAsLong() > 
secondHighWatermark,
+                maxReplicationDelayMs,
+                "Active controller didn't write NoOpRecord the second time"
+            );
+        }
+    }
+
     @Test
     public void testUnregisterBroker() throws Throwable {
         try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1, 
Optional.empty())) {
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index af9cd218b4..e1481b9fe5 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -19,8 +19,15 @@ package org.apache.kafka.controller;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.controller.QuorumController.Builder;
@@ -31,14 +38,6 @@ import org.apache.kafka.test.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.OptionalInt;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
 public class QuorumControllerTestEnv implements AutoCloseable {
     private static final Logger log =
         LoggerFactory.getLogger(QuorumControllerTestEnv.class);
@@ -70,11 +69,11 @@ public class QuorumControllerTestEnv implements 
AutoCloseable {
                 QuorumController.Builder builder = new 
QuorumController.Builder(i, logEnv.clusterId());
                 builder.setRaftClient(logEnv.logManagers().get(i));
                 
builder.setBootstrapMetadata(BootstrapMetadata.create(metadataVersion));
+                
builder.setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs);
                 builder.setQuorumFeatures(new QuorumFeatures(i, apiVersions, 
QuorumFeatures.defaultFeatureMap(), nodeIds));
                 sessionTimeoutMillis.ifPresent(timeout -> {
                     builder.setSessionTimeoutNs(NANOSECONDS.convert(timeout, 
TimeUnit.MILLISECONDS));
                 });
-                
builder.setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs);
                 builderConsumer.accept(builder);
                 this.controllers.add(builder.build());
             }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index d7141a76dd..abc90912a1 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -76,6 +76,7 @@ import org.apache.kafka.metadata.Replicas;
 import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
 import org.apache.kafka.metadata.placement.UsableBroker;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.policy.CreateTopicPolicy;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.Test;
@@ -164,6 +165,7 @@ public class ReplicationControlManagerTest {
 
         ReplicationControlTestContext(Optional<CreateTopicPolicy> 
createTopicPolicy) {
             this.replicationControl = new ReplicationControlManager.Builder().
+                setMetadataVersion(() -> MetadataVersion.IBP_3_3_IV1).
                 setSnapshotRegistry(snapshotRegistry).
                 setLogContext(logContext).
                 setMaxElectionsPerImbalance(Integer.MAX_VALUE).
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java 
b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
index 3fdeea8dbe..f5a8da5f8a 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
@@ -205,12 +205,16 @@ public class RecordTestUtils {
     /**
      * Create a batch reader for testing.
      *
-     * @param lastOffset    The last offset of the given list of records.
-     * @param records       The records.
-     * @return              A batch reader which will return the given records.
+     * @param lastOffset the last offset of the given list of records
+     * @param appendTimestamp the append timestamp for the batches created
+     * @param records the records
+     * @return a batch reader which will return the given records
      */
-    public static BatchReader<ApiMessageAndVersion>
-            mockBatchReader(long lastOffset, List<ApiMessageAndVersion> 
records) {
+    public static BatchReader<ApiMessageAndVersion> mockBatchReader(
+        long lastOffset,
+        long appendTimestamp,
+        List<ApiMessageAndVersion> records
+    ) {
         List<Batch<ApiMessageAndVersion>> batches = new ArrayList<>();
         long offset = lastOffset - records.size() + 1;
         Iterator<ApiMessageAndVersion> iterator = records.iterator();
@@ -218,7 +222,7 @@ public class RecordTestUtils {
         assertTrue(iterator.hasNext()); // At least one record is required
         while (true) {
             if (!iterator.hasNext() || curRecords.size() >= 2) {
-                batches.add(Batch.data(offset, 0, 0, sizeInBytes(curRecords), 
curRecords));
+                batches.add(Batch.data(offset, 0, appendTimestamp, 
sizeInBytes(curRecords), curRecords));
                 if (!iterator.hasNext()) {
                     break;
                 }
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
index c7ffcfb0b6..84748bd330 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.metrics.stats.WindowedSum;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.raft.QuorumState;
 
+import java.util.Arrays;
 import java.util.OptionalLong;
 import java.util.concurrent.TimeUnit;
 
@@ -34,11 +35,11 @@ public class KafkaRaftMetrics implements AutoCloseable {
 
     private final Metrics metrics;
 
-    private OffsetAndEpoch logEndOffset;
-    private int numUnknownVoterConnections;
-    private OptionalLong electionStartMs;
-    private OptionalLong pollStartMs;
-    private OptionalLong pollEndMs;
+    private volatile OffsetAndEpoch logEndOffset;
+    private volatile int numUnknownVoterConnections;
+    private volatile OptionalLong electionStartMs;
+    private volatile OptionalLong pollStartMs;
+    private volatile OptionalLong pollEndMs;
 
     private final MetricName currentLeaderIdMetricName;
     private final MetricName currentVotedIdMetricName;
@@ -186,19 +187,23 @@ public class KafkaRaftMetrics implements AutoCloseable {
 
     @Override
     public void close() {
-        metrics.removeMetric(currentLeaderIdMetricName);
-        metrics.removeMetric(currentVotedIdMetricName);
-        metrics.removeMetric(currentEpochMetricName);
-        metrics.removeMetric(currentStateMetricName);
-        metrics.removeMetric(highWatermarkMetricName);
-        metrics.removeMetric(logEndOffsetMetricName);
-        metrics.removeMetric(logEndEpochMetricName);
-        metrics.removeMetric(numUnknownVoterConnectionsMetricName);
-
-        metrics.removeSensor(commitTimeSensor.name());
-        metrics.removeSensor(electionTimeSensor.name());
-        metrics.removeSensor(fetchRecordsSensor.name());
-        metrics.removeSensor(appendRecordsSensor.name());
-        metrics.removeSensor(pollIdleSensor.name());
+        Arrays.asList(
+            currentLeaderIdMetricName,
+            currentVotedIdMetricName,
+            currentEpochMetricName,
+            currentStateMetricName,
+            highWatermarkMetricName,
+            logEndOffsetMetricName,
+            logEndEpochMetricName,
+            numUnknownVoterConnectionsMetricName
+        ).forEach(metrics::removeMetric);
+
+        Arrays.asList(
+            commitTimeSensor.name(),
+            electionTimeSensor.name(),
+            fetchRecordsSensor.name(),
+            appendRecordsSensor.name(),
+            pollIdleSensor.name()
+        ).forEach(metrics::removeSensor);
     }
 }
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 7f3c12122e..920c60300d 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -150,10 +150,13 @@ public enum MetadataVersion {
     IBP_3_1_IV0(3, "3.1", "IV0", false),
 
     // Support for leader recovery for unclean leader election (KIP-704)
-    IBP_3_2_IV0(4, "3.2", "IV0", false),
+    IBP_3_2_IV0(4, "3.2", "IV0", true),
 
     // Support for metadata.version feature flag and Removes min_version_level 
from the finalized version range that is written to ZooKeeper (KIP-778)
-    IBP_3_3_IV0(5, "3.3", "IV0", false);
+    IBP_3_3_IV0(5, "3.3", "IV0", false),
+
+    // Support NoopRecord for the cluster metadata log (KIP-835)
+    IBP_3_3_IV1(6, "3.3", "IV1", true);
 
     public static final String FEATURE_NAME = "metadata.version";
 
@@ -211,6 +214,14 @@ public enum MetadataVersion {
         return this.isAtLeast(IBP_3_0_IV0);
     }
 
+    public boolean isLeaderRecoverySupported() {
+        return this.isAtLeast(IBP_3_2_IV0);
+    }
+
+    public boolean isNoOpRecordSupported() {
+        return this.isAtLeast(IBP_3_3_IV1);
+    }
+
     public boolean isKRaftSupported() {
         return this.featureLevel > 0;
     }
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index bbce1b6ae4..8649586be9 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -60,6 +60,7 @@ import static 
org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_1_IV0;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_2_IV0;
 import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV1;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -188,8 +189,9 @@ class MetadataVersionTest {
         assertEquals(IBP_3_2_IV0, MetadataVersion.fromVersionString("3.2"));
         assertEquals(IBP_3_2_IV0, 
MetadataVersion.fromVersionString("3.2-IV0"));
 
-        assertEquals(IBP_3_3_IV0, MetadataVersion.fromVersionString("3.3"));
+        assertEquals(IBP_3_3_IV1, MetadataVersion.fromVersionString("3.3"));
         assertEquals(IBP_3_3_IV0, 
MetadataVersion.fromVersionString("3.3-IV0"));
+        assertEquals(IBP_3_3_IV1, 
MetadataVersion.fromVersionString("3.3-IV1"));
     }
 
     @Test
@@ -232,7 +234,9 @@ class MetadataVersionTest {
         assertEquals("3.0", IBP_3_0_IV0.shortVersion());
         assertEquals("3.0", IBP_3_0_IV1.shortVersion());
         assertEquals("3.1", IBP_3_1_IV0.shortVersion());
+        assertEquals("3.2", IBP_3_2_IV0.shortVersion());
         assertEquals("3.3", IBP_3_3_IV0.shortVersion());
+        assertEquals("3.3", IBP_3_3_IV1.shortVersion());
     }
 
     @Test
@@ -266,6 +270,7 @@ class MetadataVersionTest {
         assertEquals("3.1-IV0", IBP_3_1_IV0.version());
         assertEquals("3.2-IV0", IBP_3_2_IV0.version());
         assertEquals("3.3-IV0", IBP_3_3_IV0.version());
+        assertEquals("3.3-IV1", IBP_3_3_IV1.version());
     }
 
     @Test
@@ -280,13 +285,14 @@ class MetadataVersionTest {
     @Test
     public void testMetadataChanged() {
         assertFalse(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0, 
IBP_3_2_IV0));
-        assertFalse(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0, 
IBP_3_1_IV0));
-        assertFalse(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0, 
IBP_3_0_IV1));
-        assertFalse(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0, 
IBP_3_0_IV0));
+        assertTrue(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0, 
IBP_3_1_IV0));
+        assertTrue(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0, 
IBP_3_0_IV1));
+        assertTrue(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0, 
IBP_3_0_IV0));
         assertTrue(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0, 
IBP_2_8_IV1));
+        assertTrue(MetadataVersion.checkIfMetadataChanged(IBP_3_3_IV1, 
IBP_3_3_IV0));
 
         // Check that argument order doesn't matter
-        assertFalse(MetadataVersion.checkIfMetadataChanged(IBP_3_0_IV0, 
IBP_3_2_IV0));
+        assertTrue(MetadataVersion.checkIfMetadataChanged(IBP_3_0_IV0, 
IBP_3_2_IV0));
         assertTrue(MetadataVersion.checkIfMetadataChanged(IBP_2_8_IV1, 
IBP_3_2_IV0));
     }
 

Reply via email to