This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7d1b0926fa KAFKA-13883: Implement NoOpRecord and metadata metrics
(#12183)
7d1b0926fa is described below
commit 7d1b0926fab104311a505826831ddc45864e9a6a
Author: José Armando García Sancio <[email protected]>
AuthorDate: Wed Jun 1 10:48:24 2022 -0700
KAFKA-13883: Implement NoOpRecord and metadata metrics (#12183)
Implement NoOpRecord as described in KIP-835. This is controlled by the new
metadata.max.idle.interval.ms configuration.
The KRaft controller schedules an event to write NoOpRecord to the metadata
log if the metadata
version supports this feature. This event is scheduled at the interval
defined in
metadata.max.idle.interval.ms. Brokers and controllers were improved to
ignore the NoOpRecord when
replaying the metadata log.
This PR also addsffour new metrics to the KafkaController metric group, as
described KIP-835.
Finally, there are some small fixes to leader recovery. This PR fixes a bug
where metadata version
3.3-IV1 was not marked as changing the metadata. It also changes the
ReplicaControlManager to
accept a metadata version supplier to determine if the leader recovery
state is supported.
Reviewers: Colin P. McCabe <[email protected]>
---
.../org/apache/kafka/common/metrics/Gauge.java | 1 +
.../src/main/scala/kafka/server/BrokerServer.scala | 14 +-
.../main/scala/kafka/server/ControllerServer.scala | 7 +-
core/src/main/scala/kafka/server/KafkaConfig.scala | 12 +-
.../server/metadata/BrokerMetadataListener.scala | 10 +-
.../server/metadata/BrokerServerMetrics.scala | 88 +++++++++
.../server/metadata/BrokerServerMetricsTest.scala | 88 +++++++++
.../scala/unit/kafka/metrics/MetricsTest.scala | 15 ++
.../scala/unit/kafka/server/KafkaConfigTest.scala | 1 +
.../metadata/BrokerMetadataListenerTest.scala | 127 ++++++++-----
.../apache/kafka/controller/ControllerMetrics.java | 12 ++
.../apache/kafka/controller/QuorumController.java | 209 ++++++++++++++++-----
.../kafka/controller/QuorumControllerMetrics.java | 85 ++++++++-
.../controller/ReplicationControlManager.java | 60 +++---
.../java/org/apache/kafka/image/MetadataDelta.java | 5 +
.../main/resources/common/metadata/NoOpRecord.json | 23 +++
.../controller/FeatureControlManagerTest.java | 2 +-
.../kafka/controller/MockControllerMetrics.java | 58 ++++--
.../controller/QuorumControllerMetricsTest.java | 49 ++++-
.../kafka/controller/QuorumControllerTest.java | 50 +++++
.../kafka/controller/QuorumControllerTestEnv.java | 17 +-
.../controller/ReplicationControlManagerTest.java | 2 +
.../org/apache/kafka/metadata/RecordTestUtils.java | 16 +-
.../kafka/raft/internals/KafkaRaftMetrics.java | 43 +++--
.../kafka/server/common/MetadataVersion.java | 15 +-
.../kafka/server/common/MetadataVersionTest.java | 16 +-
26 files changed, 826 insertions(+), 199 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java
b/clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java
index 647942b3d0..d71bbd853d 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Gauge.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.metrics;
/**
* A gauge metric is an instantaneous reading of a particular value.
*/
+@FunctionalInterface
public interface Gauge<T> extends MetricValueProvider<T> {
/**
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index b0b102762d..b62d118096 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -31,6 +31,7 @@ import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.raft.RaftManager
import kafka.security.CredentialProvider
import kafka.server.KafkaRaftServer.ControllerRole
+import kafka.server.metadata.BrokerServerMetrics
import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher,
BrokerMetadataSnapshotter, ClientQuotaMetadataManager, KRaftMetadataCache,
SnapshotWriterBuilder}
import kafka.utils.{CoreUtils, KafkaScheduler}
import org.apache.kafka.common.feature.SupportedVersionRange
@@ -319,11 +320,14 @@ class BrokerServer(
))
}
- metadataListener = new BrokerMetadataListener(config.nodeId,
- time,
- threadNamePrefix,
-
config.metadataSnapshotMaxNewRecordBytes,
- metadataSnapshotter)
+ metadataListener = new BrokerMetadataListener(
+ config.nodeId,
+ time,
+ threadNamePrefix,
+ config.metadataSnapshotMaxNewRecordBytes,
+ metadataSnapshotter,
+ BrokerServerMetrics(metrics)
+ )
val networkListeners = new ListenerCollection()
config.effectiveAdvertisedListeners.foreach { ep =>
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 7a2913e9cf..67b3f0276d 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -43,7 +43,6 @@ import org.apache.kafka.raft.RaftConfig
import org.apache.kafka.raft.RaftConfig.AddressSpec
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.apache.kafka.server.common.MetadataVersion.IBP_3_2_IV0
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.server.metrics.KafkaYammerMetrics
@@ -175,6 +174,8 @@ class ControllerServer(
OptionalLong.empty()
}
+ val maxIdleIntervalNs =
config.metadataMaxIdleIntervalNs.fold(OptionalLong.empty)(OptionalLong.of)
+
new QuorumController.Builder(config.nodeId, metaProperties.clusterId).
setTime(time).
setThreadNamePrefix(threadNamePrefixAsString).
@@ -183,12 +184,12 @@ class ControllerServer(
setQuorumFeatures(quorumFeatures).
setDefaultReplicationFactor(config.defaultReplicationFactor.toShort).
setDefaultNumPartitions(config.numPartitions.intValue()).
-
setIsLeaderRecoverySupported(bootstrapMetadata.metadataVersion().isAtLeast(IBP_3_2_IV0)).
setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
TimeUnit.MILLISECONDS)).
setSnapshotMaxNewRecordBytes(config.metadataSnapshotMaxNewRecordBytes).
setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs).
- setMetrics(new
QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry())).
+ setMaxIdleIntervalNs(maxIdleIntervalNs).
+ setMetrics(new
QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry(), time)).
setCreateTopicPolicy(createTopicPolicy.asJava).
setAlterConfigPolicy(alterConfigPolicy.asJava).
setConfigurationValidator(new ControllerConfigurationValidator()).
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 8588828d4f..eb545cea96 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -18,6 +18,7 @@
package kafka.server
import java.util
+import java.util.concurrent.TimeUnit
import java.util.{Collections, Locale, Properties}
import kafka.cluster.EndPoint
@@ -81,6 +82,7 @@ object Defaults {
val BrokerHeartbeatIntervalMs = 2000
val BrokerSessionTimeoutMs = 9000
val MetadataSnapshotMaxNewRecordBytes = 20 * 1024 * 1024
+ val MetadataMaxIdleIntervalMs = 500
/** KRaft mode configs */
val EmptyNodeId: Int = -1
@@ -402,6 +404,7 @@ object KafkaConfig {
val MetadataMaxRetentionBytesProp = "metadata.max.retention.bytes"
val MetadataMaxRetentionMillisProp = "metadata.max.retention.ms"
val QuorumVotersProp = RaftConfig.QUORUM_VOTERS_CONFIG
+ val MetadataMaxIdleIntervalMsProp = "metadata.max.idle.interval.ms"
/************* Authorizer Configuration ***********/
val AuthorizerClassNameProp = "authorizer.class.name"
@@ -712,6 +715,9 @@ object KafkaConfig {
val MetadataLogDirDoc = "This configuration determines where we put the
metadata log for clusters in KRaft mode. " +
"If it is not set, the metadata log is placed in the first log directory
from log.dirs."
val MetadataSnapshotMaxNewRecordBytesDoc = "This is the maximum number of
bytes in the log between the latest snapshot and the high-watermark needed
before generating a new snapshot."
+ val MetadataMaxIdleIntervalMsDoc = "This configuration controls how often
the active " +
+ "controller should write no-op records to the metadata partition. If the
value is 0, no-op records " +
+ s"are not appended to the metadata partition. The default value is
${Defaults.MetadataMaxIdleIntervalMs}";
val ControllerListenerNamesDoc = "A comma-separated list of the names of the
listeners used by the controller. This is required " +
"if running in KRaft mode. When communicating with the controller quorum,
the broker will always use the first listener in this list.\n " +
"Note: The ZK-based controller should not set this configuration."
@@ -1148,6 +1154,7 @@ object KafkaConfig {
.define(MetadataLogSegmentMillisProp, LONG, Defaults.LogRollHours * 60 *
60 * 1000L, null, HIGH, MetadataLogSegmentMillisDoc)
.define(MetadataMaxRetentionBytesProp, LONG, Defaults.LogRetentionBytes,
null, HIGH, MetadataMaxRetentionBytesDoc)
.define(MetadataMaxRetentionMillisProp, LONG, Defaults.LogRetentionHours
* 60 * 60 * 1000L, null, HIGH, MetadataMaxRetentionMillisDoc)
+ .define(MetadataMaxIdleIntervalMsProp, INT,
Defaults.MetadataMaxIdleIntervalMs, atLeast(0), LOW,
MetadataMaxIdleIntervalMsDoc)
/************* Authorizer Configuration ***********/
.define(AuthorizerClassNameProp, STRING, Defaults.AuthorizerClassName,
new ConfigDef.NonNullValidator(), LOW, AuthorizerClassNameDoc)
@@ -1643,7 +1650,6 @@ class KafkaConfig private(doLog: Boolean, val props:
java.util.Map[_, _], dynami
def metadataRetentionBytes =
getLong(KafkaConfig.MetadataMaxRetentionBytesProp)
def metadataRetentionMillis =
getLong(KafkaConfig.MetadataMaxRetentionMillisProp)
-
def numNetworkThreads = getInt(KafkaConfig.NumNetworkThreadsProp)
def backgroundThreads = getInt(KafkaConfig.BackgroundThreadsProp)
val queuedMaxRequests = getInt(KafkaConfig.QueuedMaxRequestsProp)
@@ -1661,6 +1667,10 @@ class KafkaConfig private(doLog: Boolean, val props:
java.util.Map[_, _], dynami
/************* Metadata Configuration ***********/
val metadataSnapshotMaxNewRecordBytes =
getLong(KafkaConfig.MetadataSnapshotMaxNewRecordBytesProp)
+ val metadataMaxIdleIntervalNs: Option[Long] = {
+ val value =
TimeUnit.NANOSECONDS.convert(getInt(KafkaConfig.MetadataMaxIdleIntervalMsProp).toLong,
TimeUnit.MILLISECONDS)
+ if (value > 0) Some(value) else None
+ }
/************* Authorizer Configuration ***********/
def createNewAuthorizer(): Option[Authorizer] = {
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index e8819e9c5b..fa0bc52d7a 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -39,7 +39,8 @@ class BrokerMetadataListener(
time: Time,
threadNamePrefix: Option[String],
val maxBytesBetweenSnapshots: Long,
- val snapshotter: Option[MetadataSnapshotter]
+ val snapshotter: Option[MetadataSnapshotter],
+ brokerMetrics: BrokerServerMetrics
) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
private val logContext = new LogContext(s"[BrokerMetadataListener
id=$brokerId] ")
private val log = logContext.logger(classOf[BrokerMetadataListener])
@@ -279,6 +280,10 @@ class BrokerMetadataListener(
debug(s"Publishing new metadata delta $delta at offset
${_image.highestOffsetAndEpoch().offset}.")
}
publisher.publish(delta, _image)
+
+ // Update the metrics since the publisher handled the lastest image
+ brokerMetrics.lastAppliedRecordOffset.set(_highestOffset)
+ brokerMetrics.lastAppliedRecordTimestamp.set(_highestTimestamp)
}
override def handleLeaderChange(leaderAndEpoch: LeaderAndEpoch): Unit = {
@@ -289,8 +294,9 @@ class BrokerMetadataListener(
eventQueue.beginShutdown("beginShutdown", new ShutdownEvent())
}
- class ShutdownEvent() extends EventQueue.FailureLoggingEvent(log) {
+ class ShutdownEvent extends EventQueue.FailureLoggingEvent(log) {
override def run(): Unit = {
+ brokerMetrics.close()
removeMetric(BrokerMetadataListener.MetadataBatchProcessingTimeUs)
removeMetric(BrokerMetadataListener.MetadataBatchSizes)
}
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
new file mode 100644
index 0000000000..0db6f0071c
--- /dev/null
+++ b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import java.util.concurrent.atomic.AtomicLong
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics.Gauge
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.metrics.MetricConfig
+
+final class BrokerServerMetrics private (metrics: Metrics) extends
AutoCloseable {
+ import BrokerServerMetrics._
+
+ val lastAppliedRecordOffset: AtomicLong = new AtomicLong(0)
+ val lastAppliedRecordTimestamp: AtomicLong = new AtomicLong(0)
+
+ val lastAppliedRecordOffsetName = metrics.metricName(
+ "last-applied-record-offset",
+ metricGroupName,
+ "The offset of the last record from the cluster metadata partition that
was applied by the broker"
+ )
+
+ val lastAppliedRecordTimestampName = metrics.metricName(
+ "last-applied-record-timestamp",
+ metricGroupName,
+ "The timestamp of the last record from the cluster metadata partition that
was applied by the broker"
+ )
+
+ val lastAppliedRecordLagMsName = metrics.metricName(
+ "last-applied-record-lag-ms",
+ metricGroupName,
+ "The difference between now and the timestamp of the last record from the
cluster metadata partition that was applied by the broker"
+ )
+
+ addMetric(metrics, lastAppliedRecordOffsetName) { _ =>
+ lastAppliedRecordOffset.get
+ }
+
+ addMetric(metrics, lastAppliedRecordTimestampName) { _ =>
+ lastAppliedRecordTimestamp.get
+ }
+
+ addMetric(metrics, lastAppliedRecordLagMsName) { now =>
+ now - lastAppliedRecordTimestamp.get
+ }
+
+ override def close(): Unit = {
+ List(
+ lastAppliedRecordOffsetName,
+ lastAppliedRecordTimestampName,
+ lastAppliedRecordLagMsName
+ ).foreach(metrics.removeMetric)
+ }
+}
+
+
+final object BrokerServerMetrics {
+ private val metricGroupName = "broker-metadata-metrics"
+
+ private def addMetric[T](metrics: Metrics, name: MetricName)(func: Long =>
T): Unit = {
+ metrics.addMetric(name, new FuncGauge(func))
+ }
+
+ private final class FuncGauge[T](func: Long => T) extends Gauge[T] {
+ override def value(config: MetricConfig, now: Long): T = {
+ func(now)
+ }
+ }
+
+ def apply(metrics: Metrics): BrokerServerMetrics = {
+ new BrokerServerMetrics(metrics)
+ }
+}
diff --git
a/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala
b/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala
new file mode 100644
index 0000000000..df114ef59e
--- /dev/null
+++ b/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import java.util.Collections
+import kafka.utils.TestUtils
+import org.apache.kafka.common.MetricName
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.utils.MockTime
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertTrue
+import org.junit.jupiter.api.Test
+import scala.jdk.CollectionConverters._
+
+final class BrokerServerMetricsTest {
+ @Test
+ def testMetricsExported(): Unit = {
+ val metrics = new Metrics()
+ val expectedGroup = "broker-metadata-metrics"
+
+ // Metric description is not use for metric name equality
+ val expectedMetrics = Set(
+ new MetricName("last-applied-record-offset", expectedGroup, "",
Collections.emptyMap()),
+ new MetricName("last-applied-record-timestamp", expectedGroup, "",
Collections.emptyMap()),
+ new MetricName("last-applied-record-lag-ms", expectedGroup, "",
Collections.emptyMap())
+ )
+
+ TestUtils.resource(BrokerServerMetrics(metrics)) { brokerMetrics =>
+ val metricsMap = metrics.metrics().asScala.filter{ case (name, _) =>
name.group == expectedGroup }
+ assertEquals(3, metricsMap.size)
+ metricsMap.foreach { case (name, metric) =>
+ assertTrue(expectedMetrics.contains(name))
+ }
+ }
+
+ val metricsMap = metrics.metrics().asScala.filter{ case (name, _) =>
name.group == expectedGroup }
+ assertEquals(0, metricsMap.size)
+ }
+
+ @Test
+ def testLastAppliedRecordOffset(): Unit = {
+ val metrics = new Metrics()
+ TestUtils.resource(BrokerServerMetrics(metrics)) { brokerMetrics =>
+ val offsetMetric =
metrics.metrics().get(brokerMetrics.lastAppliedRecordOffsetName)
+ assertEquals(0, offsetMetric.metricValue.asInstanceOf[Long])
+
+ // Update metric value and check
+ val expectedValue = 1000
+ brokerMetrics.lastAppliedRecordOffset.set(expectedValue)
+ assertEquals(expectedValue, offsetMetric.metricValue.asInstanceOf[Long])
+ }
+ }
+
+ @Test
+ def testLastAppliedRecordTimestamp(): Unit = {
+ val time = new MockTime()
+ val metrics = new Metrics(time)
+ TestUtils.resource(BrokerServerMetrics(metrics)) { brokerMetrics =>
+ time.sleep(1000)
+ val timestampMetric =
metrics.metrics().get(brokerMetrics.lastAppliedRecordTimestampName)
+ val lagMetric =
metrics.metrics().get(brokerMetrics.lastAppliedRecordLagMsName)
+
+ assertEquals(0, timestampMetric.metricValue.asInstanceOf[Long])
+ assertEquals(time.milliseconds, lagMetric.metricValue.asInstanceOf[Long])
+
+ // Update metric value and check
+ val timestamp = 500
+ brokerMetrics.lastAppliedRecordTimestamp.set(timestamp)
+ assertEquals(timestamp, timestampMetric.metricValue.asInstanceOf[Long])
+ assertEquals(time.milliseconds - timestamp,
lagMetric.metricValue.asInstanceOf[Long])
+ }
+ }
+}
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index fb2ed99ef1..44a1e53f47 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -229,6 +229,21 @@ class MetricsTest extends KafkaServerTestHarness with
Logging {
assertEquals(metrics.keySet.asScala.count(_.getMBeanName ==
"kafka.controller:type=KafkaController,name=FencedBrokerCount"), 1)
}
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("kraft"))
+ def testKRaftControllerMetrics(quorum: String): Unit = {
+ val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
+
+ assertEquals(metrics.keySet.asScala.count(_.getMBeanName ==
"kafka.controller:type=KafkaController,name=ActiveControllerCount"), 1)
+ assertEquals(metrics.keySet.asScala.count(_.getMBeanName ==
"kafka.controller:type=KafkaController,name=OfflinePartitionsCount"), 1)
+ assertEquals(metrics.keySet.asScala.count(_.getMBeanName ==
"kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount"), 1)
+ assertEquals(metrics.keySet.asScala.count(_.getMBeanName ==
"kafka.controller:type=KafkaController,name=GlobalTopicCount"), 1)
+ assertEquals(metrics.keySet.asScala.count(_.getMBeanName ==
"kafka.controller:type=KafkaController,name=GlobalPartitionCount"), 1)
+ assertEquals(metrics.keySet.asScala.count(_.getMBeanName ==
"kafka.controller:type=KafkaController,name=LastAppliedRecordOffset"), 1)
+ assertEquals(metrics.keySet.asScala.count(_.getMBeanName ==
"kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp"), 1)
+ assertEquals(metrics.keySet.asScala.count(_.getMBeanName ==
"kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs"), 1)
+ }
+
/**
* Test that the metrics are created with the right name,
testZooKeeperStateChangeRateMetrics
* and testZooKeeperSessionStateMetric in ZooKeeperClientTest test the
metrics behaviour.
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 36dff7a679..1dd0a9ebc8 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -769,6 +769,7 @@ class KafkaConfigTest {
case KafkaConfig.MetadataMaxRetentionBytesProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.MetadataMaxRetentionMillisProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.ControllerListenerNamesProp => // ignore string
+ case KafkaConfig.MetadataMaxIdleIntervalMsProp =>
assertPropertyInvalid(baseProperties, name, "not_a_number")
case KafkaConfig.AuthorizerClassNameProp => //ignore string
case KafkaConfig.CreateTopicPolicyClassNameProp => //ignore string
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index 93dc8bdd39..20df47a6a6 100644
---
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicReference
import java.util.{Collections, Optional}
import org.apache.kafka.common.metadata.{PartitionChangeRecord,
PartitionRecord, RegisterBrokerRecord, TopicRecord}
+import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.{Endpoint, Uuid}
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
@@ -34,6 +35,7 @@ import scala.jdk.CollectionConverters._
class BrokerMetadataListenerTest {
private def newBrokerMetadataListener(
+ metrics: BrokerServerMetrics = BrokerServerMetrics(new Metrics()),
snapshotter: Option[MetadataSnapshotter] = None,
maxBytesBetweenSnapshots: Long = 1000000L,
): BrokerMetadataListener = {
@@ -42,7 +44,9 @@ class BrokerMetadataListenerTest {
time = Time.SYSTEM,
threadNamePrefix = None,
maxBytesBetweenSnapshots = maxBytesBetweenSnapshots,
- snapshotter = snapshotter)
+ snapshotter = snapshotter,
+ brokerMetrics = metrics
+ )
}
@Test
@@ -53,25 +57,42 @@ class BrokerMetadataListenerTest {
@Test
def testPublish(): Unit = {
- val listener = newBrokerMetadataListener()
+ val metrics = BrokerServerMetrics(new Metrics())
+ val listener = newBrokerMetadataListener(metrics = metrics)
try {
- listener.handleCommit(RecordTestUtils.mockBatchReader(100L,
- util.Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
- setBrokerId(0).
- setBrokerEpoch(100L).
- setFenced(false).
- setRack(null).
- setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg")),
0.toShort))))
+ val unfencedTimestamp = 300L
+ listener.handleCommit(
+ RecordTestUtils.mockBatchReader(
+ 100,
+ unfencedTimestamp,
+ util.Arrays.asList(new ApiMessageAndVersion(new
RegisterBrokerRecord().
+ setBrokerId(0).
+ setBrokerEpoch(100L).
+ setFenced(false).
+ setRack(null).
+ setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg")),
0.toShort))
+ )
+ )
val imageRecords = listener.getImageRecords().get()
assertEquals(0, imageRecords.size())
assertEquals(100L, listener.highestMetadataOffset)
- listener.handleCommit(RecordTestUtils.mockBatchReader(200L,
- util.Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
- setBrokerId(1).
- setBrokerEpoch(200L).
- setFenced(true).
- setRack(null).
- setIncarnationId(Uuid.fromString("QkOQtNKVTYatADcaJ28xDg")),
0.toShort))))
+ assertEquals(0L, metrics.lastAppliedRecordOffset.get)
+ assertEquals(0L, metrics.lastAppliedRecordTimestamp.get)
+
+ val fencedTimestamp = 500L
+ val fencedLastOffset = 200L
+ listener.handleCommit(
+ RecordTestUtils.mockBatchReader(
+ fencedLastOffset,
+ fencedTimestamp,
+ util.Arrays.asList(new ApiMessageAndVersion(new
RegisterBrokerRecord().
+ setBrokerId(1).
+ setBrokerEpoch(200L).
+ setFenced(true).
+ setRack(null).
+ setIncarnationId(Uuid.fromString("QkOQtNKVTYatADcaJ28xDg")),
0.toShort))
+ )
+ )
listener.startPublishing(new MetadataPublisher {
override def publish(delta: MetadataDelta, newImage: MetadataImage):
Unit = {
assertEquals(200L, newImage.highestOffsetAndEpoch().offset)
@@ -87,6 +108,9 @@ class BrokerMetadataListenerTest {
override def publishedOffset: Long = -1
}).get()
+
+ assertEquals(fencedLastOffset, metrics.lastAppliedRecordOffset.get)
+ assertEquals(fencedTimestamp, metrics.lastAppliedRecordTimestamp.get)
} finally {
listener.close()
}
@@ -136,15 +160,22 @@ class BrokerMetadataListenerTest {
private def generateManyRecords(listener: BrokerMetadataListener,
endOffset: Long): Unit = {
(0 to 10000).foreach { _ =>
- listener.handleCommit(RecordTestUtils.mockBatchReader(endOffset,
- util.Arrays.asList(new ApiMessageAndVersion(new
PartitionChangeRecord().
- setPartitionId(0).
- setTopicId(FOO_ID).
- setRemovingReplicas(Collections.singletonList(1)), 0.toShort),
- new ApiMessageAndVersion(new PartitionChangeRecord().
- setPartitionId(0).
- setTopicId(FOO_ID).
- setRemovingReplicas(Collections.emptyList()), 0.toShort))))
+ listener.handleCommit(
+ RecordTestUtils.mockBatchReader(
+ endOffset,
+ 0,
+ util.Arrays.asList(
+ new ApiMessageAndVersion(new PartitionChangeRecord().
+ setPartitionId(0).
+ setTopicId(FOO_ID).
+ setRemovingReplicas(Collections.singletonList(1)), 0.toShort),
+ new ApiMessageAndVersion(new PartitionChangeRecord().
+ setPartitionId(0).
+ setTopicId(FOO_ID).
+ setRemovingReplicas(Collections.emptyList()), 0.toShort)
+ )
+ )
+ )
}
listener.getImageRecords().get()
}
@@ -215,13 +246,18 @@ class BrokerMetadataListenerTest {
endOffset: Long
): Unit = {
brokerIds.foreach { brokerId =>
- listener.handleCommit(RecordTestUtils.mockBatchReader(endOffset,
- util.Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
- setBrokerId(brokerId).
- setBrokerEpoch(100L).
- setFenced(false).
- setRack(null).
- setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CS" +
brokerId)), 0.toShort))))
+ listener.handleCommit(
+ RecordTestUtils.mockBatchReader(
+ endOffset,
+ 0,
+ util.Arrays.asList(new ApiMessageAndVersion(new
RegisterBrokerRecord().
+ setBrokerId(brokerId).
+ setBrokerEpoch(100L).
+ setFenced(false).
+ setRack(null).
+ setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CS" +
brokerId)), 0.toShort))
+ )
+ )
}
}
@@ -230,17 +266,22 @@ class BrokerMetadataListenerTest {
replicas: Seq[Int],
endOffset: Long
): Unit = {
- listener.handleCommit(RecordTestUtils.mockBatchReader(endOffset,
- util.Arrays.asList(
- new ApiMessageAndVersion(new TopicRecord().
- setName("foo").
- setTopicId(FOO_ID), 0.toShort),
- new ApiMessageAndVersion(new PartitionRecord().
- setPartitionId(0).
- setTopicId(FOO_ID).
- setIsr(replicas.map(Int.box).asJava).
- setLeader(0).
- setReplicas(replicas.map(Int.box).asJava), 0.toShort)))
+ listener.handleCommit(
+ RecordTestUtils.mockBatchReader(
+ endOffset,
+ 0,
+ util.Arrays.asList(
+ new ApiMessageAndVersion(new TopicRecord().
+ setName("foo").
+ setTopicId(FOO_ID), 0.toShort),
+ new ApiMessageAndVersion(new PartitionRecord().
+ setPartitionId(0).
+ setTopicId(FOO_ID).
+ setIsr(replicas.map(Int.box).asJava).
+ setLeader(0).
+ setReplicas(replicas.map(Int.box).asJava), 0.toShort)
+ )
+ )
)
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
index fa03e058de..6b470664d6 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
@@ -51,5 +51,17 @@ public interface ControllerMetrics extends AutoCloseable {
int preferredReplicaImbalanceCount();
+ void setLastAppliedRecordOffset(long offset);
+
+ long lastAppliedRecordOffset();
+
+ void setLastCommittedRecordOffset(long offset);
+
+ long lastCommittedRecordOffset();
+
+ void setLastAppliedRecordTimestamp(long timestamp);
+
+ long lastAppliedRecordTimestamp();
+
void close();
}
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 78cebd892c..7ca557dada 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -53,6 +53,7 @@ import org.apache.kafka.common.metadata.ClientQuotaRecord;
import org.apache.kafka.common.metadata.FeatureLevelRecord;
import org.apache.kafka.common.metadata.FenceBrokerRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.NoOpRecord;
import org.apache.kafka.common.metadata.PartitionChangeRecord;
import org.apache.kafka.common.metadata.PartitionRecord;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
@@ -70,24 +71,24 @@ import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.SnapshotGenerator.Section;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FinalizedControllerFeatures;
import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
import org.apache.kafka.metadata.placement.ReplicaPlacer;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
-import org.apache.kafka.server.authorizer.AclCreateResult;
-import org.apache.kafka.server.authorizer.AclDeleteResult;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.metadata.BrokerHeartbeatReply;
-import org.apache.kafka.metadata.BrokerRegistrationReply;
-import org.apache.kafka.metadata.FinalizedControllerFeatures;
-import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction;
+import org.apache.kafka.queue.EventQueue;
import org.apache.kafka.queue.KafkaEventQueue;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
import org.apache.kafka.raft.LeaderAndEpoch;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.server.authorizer.AclCreateResult;
+import org.apache.kafka.server.authorizer.AclDeleteResult;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.CreateTopicPolicy;
@@ -155,10 +156,10 @@ public final class QuorumController implements Controller
{
private QuorumFeatures quorumFeatures = null;
private short defaultReplicationFactor = 3;
private int defaultNumPartitions = 1;
- private boolean isLeaderRecoverySupported = false;
private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new
Random());
private long snapshotMaxNewRecordBytes = Long.MAX_VALUE;
private OptionalLong leaderImbalanceCheckIntervalNs =
OptionalLong.empty();
+ private OptionalLong maxIdleIntervalNs = OptionalLong.empty();
private long sessionTimeoutNs =
ClusterControlManager.DEFAULT_SESSION_TIMEOUT_NS;
private ControllerMetrics controllerMetrics = null;
private Optional<CreateTopicPolicy> createTopicPolicy =
Optional.empty();
@@ -217,11 +218,6 @@ public final class QuorumController implements Controller {
return this;
}
- public Builder setIsLeaderRecoverySupported(boolean
isLeaderRecoverySupported) {
- this.isLeaderRecoverySupported = isLeaderRecoverySupported;
- return this;
- }
-
public Builder setReplicaPlacer(ReplicaPlacer replicaPlacer) {
this.replicaPlacer = replicaPlacer;
return this;
@@ -237,6 +233,11 @@ public final class QuorumController implements Controller {
return this;
}
+ public Builder setMaxIdleIntervalNs(OptionalLong value) {
+ this.maxIdleIntervalNs = value;
+ return this;
+ }
+
public Builder setSessionTimeoutNs(long sessionTimeoutNs) {
this.sessionTimeoutNs = sessionTimeoutNs;
return this;
@@ -280,14 +281,13 @@ public final class QuorumController implements Controller
{
@SuppressWarnings("unchecked")
public QuorumController build() throws Exception {
if (raftClient == null) {
- throw new RuntimeException("You must set a raft client.");
- }
- if (bootstrapMetadata == null ||
bootstrapMetadata.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
- throw new RuntimeException("You must specify an initial
metadata.version using the kafka-storage tool.");
- }
- if (quorumFeatures == null) {
- throw new RuntimeException("You must specify the quorum
features");
+ throw new IllegalStateException("You must set a raft client.");
+ } else if (bootstrapMetadata == null ||
bootstrapMetadata.metadataVersion().equals(MetadataVersion.UNINITIALIZED)) {
+ throw new IllegalStateException("You must specify an initial
metadata.version using the kafka-storage tool.");
+ } else if (quorumFeatures == null) {
+ throw new IllegalStateException("You must specify the quorum
features");
}
+
if (threadNamePrefix == null) {
threadNamePrefix = String.format("Node%d_", nodeId);
}
@@ -302,12 +302,30 @@ public final class QuorumController implements Controller
{
KafkaEventQueue queue = null;
try {
queue = new KafkaEventQueue(time, logContext, threadNamePrefix
+ "QuorumController");
- return new QuorumController(logContext, nodeId, clusterId,
queue, time,
- configSchema, raftClient, quorumFeatures,
defaultReplicationFactor,
- defaultNumPartitions, isLeaderRecoverySupported,
replicaPlacer, snapshotMaxNewRecordBytes,
- leaderImbalanceCheckIntervalNs, sessionTimeoutNs,
controllerMetrics,
- createTopicPolicy, alterConfigPolicy,
configurationValidator, authorizer,
- staticConfig, bootstrapMetadata);
+ return new QuorumController(
+ logContext,
+ nodeId,
+ clusterId,
+ queue,
+ time,
+ configSchema,
+ raftClient,
+ quorumFeatures,
+ defaultReplicationFactor,
+ defaultNumPartitions,
+ replicaPlacer,
+ snapshotMaxNewRecordBytes,
+ leaderImbalanceCheckIntervalNs,
+ maxIdleIntervalNs,
+ sessionTimeoutNs,
+ controllerMetrics,
+ createTopicPolicy,
+ alterConfigPolicy,
+ configurationValidator,
+ authorizer,
+ staticConfig,
+ bootstrapMetadata
+ );
} catch (Exception e) {
Utils.closeQuietly(queue, "event queue");
throw e;
@@ -715,7 +733,7 @@ public final class QuorumController implements Controller {
offset = raftClient.scheduleAppend(controllerEpoch,
result.records());
}
op.processBatchEndOffset(offset);
- writeOffset = offset;
+ updateWriteOffset(offset);
resultAndOffset = ControllerResultAndOffset.of(offset, result);
for (ApiMessageAndVersion message : result.records()) {
replay(message.message(), Optional.empty(), offset);
@@ -776,7 +794,6 @@ public final class QuorumController implements Controller {
appendRaftEvent("handleCommit[baseOffset=" + reader.baseOffset() +
"]", () -> {
try {
maybeCompleteAuthorizerInitialLoad();
- boolean isActiveController = curClaimEpoch != -1;
long processedRecordsSize = 0;
while (reader.hasNext()) {
Batch<ApiMessageAndVersion> batch = reader.next();
@@ -784,7 +801,7 @@ public final class QuorumController implements Controller {
int epoch = batch.epoch();
List<ApiMessageAndVersion> messages = batch.records();
- if (isActiveController) {
+ if (isActiveController()) {
// If the controller is active, the records were
already replayed,
// so we don't need to do it here.
log.debug("Completing purgatory items up to offset
{} and epoch {}.", offset, epoch);
@@ -816,9 +833,7 @@ public final class QuorumController implements Controller {
}
}
- lastCommittedOffset = offset;
- lastCommittedEpoch = epoch;
- lastCommittedTimestamp = batch.appendTimestamp();
+ updateLastCommittedState(offset, epoch,
batch.appendTimestamp());
processedRecordsSize += batch.sizeInBytes();
}
@@ -833,8 +848,7 @@ public final class QuorumController implements Controller {
public void handleSnapshot(SnapshotReader<ApiMessageAndVersion>
reader) {
appendRaftEvent(String.format("handleSnapshot[snapshotId=%s]",
reader.snapshotId()), () -> {
try {
- boolean isActiveController = curClaimEpoch != -1;
- if (isActiveController) {
+ if (isActiveController()) {
throw new IllegalStateException(
String.format(
"Asked to load snapshot (%s) when it is the
active controller (%s)",
@@ -877,10 +891,11 @@ public final class QuorumController implements Controller
{
replay(messageAndVersion.message(),
Optional.of(reader.snapshotId()), offset);
}
}
-
- lastCommittedOffset = reader.lastContainedLogOffset();
- lastCommittedEpoch = reader.lastContainedLogEpoch();
- lastCommittedTimestamp =
reader.lastContainedLogTimestamp();
+ updateLastCommittedState(
+ reader.lastContainedLogOffset(),
+ reader.lastContainedLogEpoch(),
+ reader.lastContainedLogTimestamp()
+ );
snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
authorizer.ifPresent(a ->
a.loadSnapshot(aclControlManager.idToAcl()));
} finally {
@@ -904,7 +919,7 @@ public final class QuorumController implements Controller {
curClaimEpoch = newEpoch;
controllerMetrics.setActive(true);
- writeOffset = lastCommittedOffset;
+ updateWriteOffset(lastCommittedOffset);
clusterControl.activate();
// Check if we need to bootstrap metadata into the log.
This must happen before we can
@@ -960,8 +975,11 @@ public final class QuorumController implements Controller {
// When becoming the active controller, schedule a leader
rebalance if there are any topic partition
// with leader that is not the preferred leader.
maybeScheduleNextBalancePartitionLeaders();
+
+ // When becoming leader schedule periodic write of the no
op record
+ maybeScheduleNextWriteNoOpRecord();
});
- } else if (curClaimEpoch != -1) {
+ } else if (isActiveController()) {
appendRaftEvent("handleRenounce[" + curClaimEpoch + "]", () ->
{
log.warn("Renouncing the leadership at oldEpoch {} due to
a metadata " +
"log event. Reverting to last committed offset
{}.", curClaimEpoch,
@@ -1010,6 +1028,38 @@ public final class QuorumController implements
Controller {
}
}
+ private boolean isActiveController() {
+ return curClaimEpoch != -1;
+ }
+
+ private void updateWriteOffset(long offset) {
+ writeOffset = offset;
+ if (isActiveController()) {
+ controllerMetrics.setLastAppliedRecordOffset(writeOffset);
+ // This is not truly the append timestamp. The KRaft client
doesn't expose the append time when scheduling a write.
+ // This is good enough because this is called right after the
records were given to the KRAft client for appending and
+ // the default append linger for KRaft is 25ms.
+
controllerMetrics.setLastAppliedRecordTimestamp(time.milliseconds());
+ } else {
+ // Change the last applied record metrics back to the last
committed state. Inactive controllers report the last committed
+ // state while active controllers report the latest state which
may include uncommitted data.
+ controllerMetrics.setLastAppliedRecordOffset(lastCommittedOffset);
+
controllerMetrics.setLastAppliedRecordTimestamp(lastCommittedTimestamp);
+ }
+ }
+
+ private void updateLastCommittedState(long offset, int epoch, long
timestamp) {
+ lastCommittedOffset = offset;
+ lastCommittedEpoch = epoch;
+ lastCommittedTimestamp = timestamp;
+
+ controllerMetrics.setLastCommittedRecordOffset(offset);
+ if (!isActiveController()) {
+ controllerMetrics.setLastAppliedRecordOffset(offset);
+ controllerMetrics.setLastAppliedRecordTimestamp(timestamp);
+ }
+ }
+
private void renounce() {
curClaimEpoch = -1;
controllerMetrics.setActive(false);
@@ -1026,10 +1076,11 @@ public final class QuorumController implements
Controller {
raftClient.register(metaLogListener);
}
- writeOffset = -1;
+ updateWriteOffset(-1);
clusterControl.deactivate();
cancelMaybeFenceReplicas();
cancelMaybeBalancePartitionLeaders();
+ cancelNextWriteNoOpRecord();
}
private <T> void scheduleDeferredWriteEvent(String name, long deadlineNs,
@@ -1130,6 +1181,53 @@ public final class QuorumController implements
Controller {
queue.cancelDeferred(MAYBE_BALANCE_PARTITION_LEADERS);
}
+ private static final String WRITE_NO_OP_RECORD = "writeNoOpRecord";
+
+ private void maybeScheduleNextWriteNoOpRecord() {
+ if (!noOpRecordScheduled &&
+ maxIdleIntervalNs.isPresent() &&
+ featureControl.metadataVersion().isNoOpRecordSupported()) {
+
+ log.debug(
+ "Scheduling write event for {} because maxIdleIntervalNs ({})
and metadataVersion ({})",
+ WRITE_NO_OP_RECORD,
+ maxIdleIntervalNs.getAsLong(),
+ featureControl.metadataVersion()
+ );
+
+ ControllerWriteEvent<Void> event = new
ControllerWriteEvent<>(WRITE_NO_OP_RECORD, () -> {
+ noOpRecordScheduled = false;
+ maybeScheduleNextWriteNoOpRecord();
+
+ return ControllerResult.of(
+ Arrays.asList(new ApiMessageAndVersion(new NoOpRecord(),
(short) 0)),
+ null
+ );
+ });
+
+ long delayNs = time.nanoseconds() + maxIdleIntervalNs.getAsLong();
+ queue.scheduleDeferred(WRITE_NO_OP_RECORD, new
EarliestDeadlineFunction(delayNs), event);
+ noOpRecordScheduled = true;
+ }
+ }
+
+ private void cancelNextWriteNoOpRecord() {
+ noOpRecordScheduled = false;
+ queue.cancelDeferred(WRITE_NO_OP_RECORD);
+ }
+
+ private void handleFeatureControlChange() {
+ // The feature control maybe have changed. On the active controller
cancel or schedule noop
+ // record writes accordingly.
+ if (isActiveController()) {
+ if (featureControl.metadataVersion().isNoOpRecordSupported()) {
+ maybeScheduleNextWriteNoOpRecord();
+ } else {
+ cancelNextWriteNoOpRecord();
+ }
+ }
+ }
+
@SuppressWarnings("unchecked")
private void replay(ApiMessage message, Optional<OffsetAndEpoch>
snapshotId, long offset) {
try {
@@ -1164,6 +1262,8 @@ public final class QuorumController implements Controller
{
break;
case FEATURE_LEVEL_RECORD:
featureControl.replay((FeatureLevelRecord) message);
+
+ handleFeatureControlChange();
break;
case CLIENT_QUOTA_RECORD:
clientQuotaControlManager.replay((ClientQuotaRecord)
message);
@@ -1177,6 +1277,9 @@ public final class QuorumController implements Controller
{
case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
aclControlManager.replay((RemoveAccessControlEntryRecord)
message, snapshotId);
break;
+ case NO_OP_RECORD:
+ // NoOpRecord is an empty record and doesn't need to be
replayed
+ break;
default:
throw new RuntimeException("Unhandled record type " +
type);
}
@@ -1196,8 +1299,7 @@ public final class QuorumController implements Controller
{
if (newBytesSinceLastSnapshot >= snapshotMaxNewRecordBytes &&
snapshotGeneratorManager.generator == null
) {
- boolean isActiveController = curClaimEpoch != -1;
- if (!isActiveController) {
+ if (!isActiveController()) {
// The active controller creates in-memory snapshot every time
an uncommitted
// batch gets appended. The in-active controller can be more
efficient and only
// create an in-memory snapshot when needed.
@@ -1220,9 +1322,7 @@ public final class QuorumController implements Controller
{
snapshotRegistry.reset();
newBytesSinceLastSnapshot = 0;
- lastCommittedOffset = -1;
- lastCommittedEpoch = -1;
- lastCommittedTimestamp = -1;
+ updateLastCommittedState(-1, -1, -1);
}
private final LogContext logContext;
@@ -1385,6 +1485,11 @@ public final class QuorumController implements
Controller {
*/
private final OptionalLong leaderImbalanceCheckIntervalNs;
+ /**
+ * How log to delay between appending NoOpRecord to the log.
+ */
+ private final OptionalLong maxIdleIntervalNs;
+
private enum ImbalanceSchedule {
// The leader balancing operation has been scheduled
SCHEDULED,
@@ -1399,6 +1504,11 @@ public final class QuorumController implements
Controller {
*/
private ImbalanceSchedule imbalancedScheduled = ImbalanceSchedule.DEFERRED;
+ /**
+ * Tracks if the a write of the NoOpRecord has been scheduled.
+ */
+ private boolean noOpRecordScheduled = false;
+
private final BootstrapMetadata bootstrapMetadata;
private QuorumController(LogContext logContext,
@@ -1411,10 +1521,10 @@ public final class QuorumController implements
Controller {
QuorumFeatures quorumFeatures,
short defaultReplicationFactor,
int defaultNumPartitions,
- boolean isLeaderRecoverySupported,
ReplicaPlacer replicaPlacer,
long snapshotMaxNewRecordBytes,
OptionalLong leaderImbalanceCheckIntervalNs,
+ OptionalLong maxIdleIntervalNs,
long sessionTimeoutNs,
ControllerMetrics controllerMetrics,
Optional<CreateTopicPolicy> createTopicPolicy,
@@ -1457,13 +1567,14 @@ public final class QuorumController implements
Controller {
this.producerIdControlManager = new
ProducerIdControlManager(clusterControl, snapshotRegistry);
this.snapshotMaxNewRecordBytes = snapshotMaxNewRecordBytes;
this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
+ this.maxIdleIntervalNs = maxIdleIntervalNs;
this.replicationControl = new ReplicationControlManager.Builder().
+ setMetadataVersion(() -> featureControl.metadataVersion()).
setSnapshotRegistry(snapshotRegistry).
setLogContext(logContext).
setDefaultReplicationFactor(defaultReplicationFactor).
setDefaultNumPartitions(defaultNumPartitions).
setMaxElectionsPerImbalance(ReplicationControlManager.MAX_ELECTIONS_PER_IMBALANCE).
- setIsLeaderRecoverySupported(isLeaderRecoverySupported).
setConfigurationControl(configurationControl).
setClusterControl(clusterControl).
setControllerMetrics(controllerMetrics).
@@ -1476,8 +1587,8 @@ public final class QuorumController implements Controller
{
this.bootstrapMetadata = bootstrapMetadata;
this.metaLogListener = new QuorumMetaLogListener();
this.curClaimEpoch = -1;
- this.writeOffset = -1L;
this.needToCompleteAuthorizerLoad = authorizer.isPresent();
+ updateWriteOffset(-1);
resetState();
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
index d0280651aa..5abf0d9770 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
@@ -21,10 +21,12 @@ import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
+import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.metrics.KafkaYammerMetrics;
import java.util.Arrays;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
public final class QuorumControllerMetrics implements ControllerMetrics {
private final static MetricName ACTIVE_CONTROLLER_COUNT = getMetricName(
@@ -45,7 +47,15 @@ public final class QuorumControllerMetrics implements
ControllerMetrics {
"KafkaController", "OfflinePartitionsCount");
private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT =
getMetricName(
"KafkaController", "PreferredReplicaImbalanceCount");
-
+ private final static MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
+ "KafkaController", "LastAppliedRecordOffset");
+ private final static MetricName LAST_COMMITTED_RECORD_OFFSET =
getMetricName(
+ "KafkaController", "LastCommittedRecordOffset");
+ private final static MetricName LAST_APPLIED_RECORD_TIMESTAMP =
getMetricName(
+ "KafkaController", "LastAppliedRecordTimestamp");
+ private final static MetricName LAST_APPLIED_RECORD_LAG_MS = getMetricName(
+ "KafkaController", "LastAppliedRecordLagMs");
+
private final MetricsRegistry registry;
private volatile boolean active;
private volatile int fencedBrokerCount;
@@ -54,6 +64,9 @@ public final class QuorumControllerMetrics implements
ControllerMetrics {
private volatile int globalPartitionCount;
private volatile int offlinePartitionCount;
private volatile int preferredReplicaImbalanceCount;
+ private final AtomicLong lastAppliedRecordOffset = new AtomicLong(0);
+ private final AtomicLong lastCommittedRecordOffset = new AtomicLong(0);
+ private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
private final Gauge<Integer> activeControllerCount;
private final Gauge<Integer> fencedBrokerCountGauge;
private final Gauge<Integer> activeBrokerCountGauge;
@@ -61,10 +74,17 @@ public final class QuorumControllerMetrics implements
ControllerMetrics {
private final Gauge<Integer> globalTopicCountGauge;
private final Gauge<Integer> offlinePartitionCountGauge;
private final Gauge<Integer> preferredReplicaImbalanceCountGauge;
+ private final Gauge<Long> lastAppliedRecordOffsetGauge;
+ private final Gauge<Long> lastCommittedRecordOffsetGauge;
+ private final Gauge<Long> lastAppliedRecordTimestampGauge;
+ private final Gauge<Long> lastAppliedRecordLagMsGauge;
private final Histogram eventQueueTime;
private final Histogram eventQueueProcessingTime;
- public QuorumControllerMetrics(MetricsRegistry registry) {
+ public QuorumControllerMetrics(
+ MetricsRegistry registry,
+ Time time
+ ) {
this.registry = Objects.requireNonNull(registry);
this.active = false;
this.fencedBrokerCount = 0;
@@ -117,6 +137,30 @@ public final class QuorumControllerMetrics implements
ControllerMetrics {
return preferredReplicaImbalanceCount;
}
});
+ lastAppliedRecordOffsetGauge =
registry.newGauge(LAST_APPLIED_RECORD_OFFSET, new Gauge<Long>() {
+ @Override
+ public Long value() {
+ return lastAppliedRecordOffset.get();
+ }
+ });
+ lastCommittedRecordOffsetGauge =
registry.newGauge(LAST_COMMITTED_RECORD_OFFSET, new Gauge<Long>() {
+ @Override
+ public Long value() {
+ return lastCommittedRecordOffset.get();
+ }
+ });
+ lastAppliedRecordTimestampGauge =
registry.newGauge(LAST_APPLIED_RECORD_TIMESTAMP, new Gauge<Long>() {
+ @Override
+ public Long value() {
+ return lastAppliedRecordTimestamp.get();
+ }
+ });
+ lastAppliedRecordLagMsGauge =
registry.newGauge(LAST_APPLIED_RECORD_LAG_MS, new Gauge<Long>() {
+ @Override
+ public Long value() {
+ return time.milliseconds() - lastAppliedRecordTimestamp.get();
+ }
+ });
}
@Override
@@ -198,6 +242,36 @@ public final class QuorumControllerMetrics implements
ControllerMetrics {
return this.preferredReplicaImbalanceCount;
}
+ @Override
+ public void setLastAppliedRecordOffset(long offset) {
+ lastAppliedRecordOffset.set(offset);
+ }
+
+ @Override
+ public long lastAppliedRecordOffset() {
+ return lastAppliedRecordOffset.get();
+ }
+
+ @Override
+ public void setLastCommittedRecordOffset(long offset) {
+ lastCommittedRecordOffset.set(offset);
+ }
+
+ @Override
+ public long lastCommittedRecordOffset() {
+ return lastCommittedRecordOffset.get();
+ }
+
+ @Override
+ public void setLastAppliedRecordTimestamp(long timestamp) {
+ lastAppliedRecordTimestamp.set(timestamp);
+ }
+
+ @Override
+ public long lastAppliedRecordTimestamp() {
+ return lastAppliedRecordTimestamp.get();
+ }
+
@Override
public void close() {
Arrays.asList(
@@ -207,7 +281,12 @@ public final class QuorumControllerMetrics implements
ControllerMetrics {
GLOBAL_TOPIC_COUNT,
GLOBAL_PARTITION_COUNT,
OFFLINE_PARTITION_COUNT,
-
PREFERRED_REPLICA_IMBALANCE_COUNT).forEach(this.registry::removeMetric);
+ PREFERRED_REPLICA_IMBALANCE_COUNT,
+ LAST_APPLIED_RECORD_OFFSET,
+ LAST_COMMITTED_RECORD_OFFSET,
+ LAST_APPLIED_RECORD_TIMESTAMP,
+ LAST_APPLIED_RECORD_LAG_MS
+ ).forEach(registry::removeMetric);
}
private static MetricName getMetricName(String type, String name) {
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index d0273bc02d..81ec253434 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -73,16 +73,17 @@ import
org.apache.kafka.common.metadata.UnregisterBrokerRecord;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.metadata.KafkaConfigSchema;
-import org.apache.kafka.metadata.placement.ClusterDescriber;
-import org.apache.kafka.metadata.placement.PlacementSpec;
-import org.apache.kafka.metadata.placement.UsableBroker;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.metadata.KafkaConfigSchema;
import org.apache.kafka.metadata.LeaderRecoveryState;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.Replicas;
+import org.apache.kafka.metadata.placement.ClusterDescriber;
+import org.apache.kafka.metadata.placement.PlacementSpec;
+import org.apache.kafka.metadata.placement.UsableBroker;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
@@ -138,17 +139,22 @@ public class ReplicationControlManager {
static final int MAX_ELECTIONS_PER_IMBALANCE = 1_000;
static class Builder {
+ private Supplier<MetadataVersion> metadataVersion =
MetadataVersion::latest;
private SnapshotRegistry snapshotRegistry = null;
private LogContext logContext = null;
private short defaultReplicationFactor = (short) 3;
private int defaultNumPartitions = 1;
private int maxElectionsPerImbalance = MAX_ELECTIONS_PER_IMBALANCE;
- private boolean isLeaderRecoverySupported = true;
private ConfigurationControlManager configurationControl = null;
private ClusterControlManager clusterControl = null;
private ControllerMetrics controllerMetrics = null;
private Optional<CreateTopicPolicy> createTopicPolicy =
Optional.empty();
+ Builder setMetadataVersion(Supplier<MetadataVersion> metadataVersion) {
+ this.metadataVersion = metadataVersion;
+ return this;
+ }
+
Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
return this;
@@ -174,11 +180,6 @@ public class ReplicationControlManager {
return this;
}
- Builder setIsLeaderRecoverySupported(boolean
isLeaderRecoverySupported) {
- this.isLeaderRecoverySupported = isLeaderRecoverySupported;
- return this;
- }
-
Builder setConfigurationControl(ConfigurationControlManager
configurationControl) {
this.configurationControl = configurationControl;
return this;
@@ -201,22 +202,21 @@ public class ReplicationControlManager {
ReplicationControlManager build() {
if (configurationControl == null) {
- throw new RuntimeException("You must specify
configurationControl.");
- }
- if (clusterControl == null) {
- throw new RuntimeException("You must specify clusterControl.");
- }
- if (controllerMetrics == null) {
- throw new RuntimeException("You must specify
controllerMetrics.");
+ throw new IllegalStateException("Configuration control must be
set before building");
+ } else if (clusterControl == null) {
+ throw new IllegalStateException("Cluster controller must be
set before building");
+ } else if (controllerMetrics == null) {
+ throw new IllegalStateException("Metrics must be set before
building");
}
if (logContext == null) logContext = new LogContext();
if (snapshotRegistry == null) snapshotRegistry =
configurationControl.snapshotRegistry();
- return new ReplicationControlManager(snapshotRegistry,
+ return new ReplicationControlManager(
+ metadataVersion,
+ snapshotRegistry,
logContext,
defaultReplicationFactor,
defaultNumPartitions,
maxElectionsPerImbalance,
- isLeaderRecoverySupported,
configurationControl,
clusterControl,
controllerMetrics,
@@ -276,9 +276,9 @@ public class ReplicationControlManager {
private final int defaultNumPartitions;
/**
- * If the cluster supports leader recovery state from KIP-704.
+ * Metadata version (or IBP) for the cluster
*/
- private final boolean isLeaderRecoverySupported;
+ private final Supplier<MetadataVersion> metadataVersion;
/**
* Maximum number of leader elections to perform during one partition
leader balancing operation.
@@ -358,23 +358,23 @@ public class ReplicationControlManager {
final KRaftClusterDescriber clusterDescriber = new KRaftClusterDescriber();
private ReplicationControlManager(
+ Supplier<MetadataVersion> metadataVersion,
SnapshotRegistry snapshotRegistry,
LogContext logContext,
short defaultReplicationFactor,
int defaultNumPartitions,
int maxElectionsPerImbalance,
- boolean isLeaderRecoverySupported,
ConfigurationControlManager configurationControl,
ClusterControlManager clusterControl,
ControllerMetrics controllerMetrics,
Optional<CreateTopicPolicy> createTopicPolicy
) {
+ this.metadataVersion = metadataVersion;
this.snapshotRegistry = snapshotRegistry;
this.log = logContext.logger(ReplicationControlManager.class);
this.defaultReplicationFactor = defaultReplicationFactor;
this.defaultNumPartitions = defaultNumPartitions;
this.maxElectionsPerImbalance = maxElectionsPerImbalance;
- this.isLeaderRecoverySupported = isLeaderRecoverySupported;
this.configurationControl = configurationControl;
this.controllerMetrics = controllerMetrics;
this.createTopicPolicy = createTopicPolicy;
@@ -929,7 +929,7 @@ public class ReplicationControlManager {
topic.id,
partitionId,
r -> clusterControl.unfenced(r),
- isLeaderRecoverySupported);
+ metadataVersion.get().isLeaderRecoverySupported());
if
(configurationControl.uncleanLeaderElectionEnabledForTopic(topicData.name())) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@@ -1223,7 +1223,7 @@ public class ReplicationControlManager {
topicId,
partitionId,
r -> clusterControl.unfenced(r),
- isLeaderRecoverySupported);
+ metadataVersion.get().isLeaderRecoverySupported());
builder.setElection(election);
Optional<ApiMessageAndVersion> record = builder.build();
if (!record.isPresent()) {
@@ -1339,7 +1339,7 @@ public class ReplicationControlManager {
topicPartition.topicId(),
topicPartition.partitionId(),
r -> clusterControl.unfenced(r),
- isLeaderRecoverySupported
+ metadataVersion.get().isLeaderRecoverySupported()
);
builder.setElection(PartitionChangeBuilder.Election.PREFERRED);
builder.build().ifPresent(records::add);
@@ -1542,7 +1542,7 @@ public class ReplicationControlManager {
topicIdPart.topicId(),
topicIdPart.partitionId(),
isAcceptableLeader,
- isLeaderRecoverySupported);
+ metadataVersion.get().isLeaderRecoverySupported());
if
(configurationControl.uncleanLeaderElectionEnabledForTopic(topic.name)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@@ -1652,7 +1652,7 @@ public class ReplicationControlManager {
tp.topicId(),
tp.partitionId(),
r -> clusterControl.unfenced(r),
- isLeaderRecoverySupported);
+ metadataVersion.get().isLeaderRecoverySupported());
if
(configurationControl.uncleanLeaderElectionEnabledForTopic(topicName)) {
builder.setElection(PartitionChangeBuilder.Election.UNCLEAN);
}
@@ -1704,7 +1704,7 @@ public class ReplicationControlManager {
tp.topicId(),
tp.partitionId(),
r -> clusterControl.unfenced(r),
- isLeaderRecoverySupported);
+ metadataVersion.get().isLeaderRecoverySupported());
if (!reassignment.merged().equals(currentReplicas)) {
builder.setTargetReplicas(reassignment.merged());
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
index f40ae9ae7f..01455e360c 100644
--- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java
@@ -216,6 +216,11 @@ public final class MetadataDelta {
case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
replay((RemoveAccessControlEntryRecord) record);
break;
+ case NO_OP_RECORD:
+ /* NoOpRecord is an empty record and doesn't need to be
replayed beyond
+ * updating the highest offset and epoch.
+ */
+ break;
default:
throw new RuntimeException("Unknown metadata record type " +
type);
}
diff --git a/metadata/src/main/resources/common/metadata/NoOpRecord.json
b/metadata/src/main/resources/common/metadata/NoOpRecord.json
new file mode 100644
index 0000000000..88b907f8cc
--- /dev/null
+++ b/metadata/src/main/resources/common/metadata/NoOpRecord.json
@@ -0,0 +1,23 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "apiKey": 20,
+ "type": "metadata",
+ "name": "NoOpRecord",
+ "validVersions": "0",
+ "flexibleVersions": "0+",
+ "fields": []
+}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index ec1fcdeb0a..02b1493548 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -257,7 +257,7 @@ public class FeatureControlManagerTest {
FeatureControlManager manager = new FeatureControlManager(logContext,
features, snapshotRegistry);
ControllerResult<Map<String, ApiError>> result =
manager.initializeMetadataVersion(MetadataVersion.IBP_3_3_IV0.featureLevel());
RecordTestUtils.replayAll(manager, result.records());
- assertEquals(manager.metadataVersion(), MetadataVersion.latest());
+ assertEquals(manager.metadataVersion(), MetadataVersion.IBP_3_3_IV0);
result = manager.updateFeatures(
Collections.singletonMap(MetadataVersion.FEATURE_NAME,
MetadataVersion.IBP_3_2_IV0.featureLevel()),
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
index 0120f15295..5991fcc34f 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
@@ -18,24 +18,18 @@
package org.apache.kafka.controller;
public final class MockControllerMetrics implements ControllerMetrics {
- private volatile boolean active;
- private volatile int fencedBrokers;
- private volatile int activeBrokers;
- private volatile int topics;
- private volatile int partitions;
- private volatile int offlinePartitions;
- private volatile int preferredReplicaImbalances;
- private volatile boolean closed = false;
+ private volatile boolean active = false;
+ private volatile int fencedBrokers = 0;
+ private volatile int activeBrokers = 0;
+ private volatile int topics = 0;
+ private volatile int partitions = 0;
+ private volatile int offlinePartitions = 0;
+ private volatile int preferredReplicaImbalances = 0;
+ private volatile long lastAppliedRecordOffset = 0;
+ private volatile long lastCommittedRecordOffset = 0;
+ private volatile long lastAppliedRecordTimestamp = 0;
- public MockControllerMetrics() {
- this.active = false;
- this.fencedBrokers = 0;
- this.activeBrokers = 0;
- this.topics = 0;
- this.partitions = 0;
- this.offlinePartitions = 0;
- this.preferredReplicaImbalances = 0;
- }
+ private volatile boolean closed = false;
@Override
public void setActive(boolean active) {
@@ -117,6 +111,36 @@ public final class MockControllerMetrics implements
ControllerMetrics {
return this.preferredReplicaImbalances;
}
+ @Override
+ public void setLastAppliedRecordOffset(long offset) {
+ lastAppliedRecordOffset = offset;
+ }
+
+ @Override
+ public long lastAppliedRecordOffset() {
+ return lastAppliedRecordOffset;
+ }
+
+ @Override
+ public void setLastCommittedRecordOffset(long offset) {
+ lastCommittedRecordOffset = offset;
+ }
+
+ @Override
+ public long lastCommittedRecordOffset() {
+ return lastCommittedRecordOffset;
+ }
+
+ @Override
+ public void setLastAppliedRecordTimestamp(long timestamp) {
+ lastAppliedRecordTimestamp = timestamp;
+ }
+
+ @Override
+ public long lastAppliedRecordTimestamp() {
+ return lastAppliedRecordTimestamp;
+ }
+
@Override
public void close() {
closed = true;
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
index 4b0afb52c6..21ac02f49e 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
@@ -17,10 +17,12 @@
package org.apache.kafka.controller;
+import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.Histogram;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import java.util.Set;
+import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -37,7 +39,10 @@ public class QuorumControllerMetricsTest {
"GlobalTopicCount",
"GlobalPartitionCount",
"OfflinePartitionsCount",
- "PreferredReplicaImbalanceCount");
+ "PreferredReplicaImbalanceCount",
+ "LastAppliedRecordOffset",
+ "LastAppliedRecordTimestamp",
+ "LastAppliedRecordLagMs");
assertMetricsCreatedAndRemovedUponClose(expectedType,
expectedMetricNames);
}
@@ -53,8 +58,9 @@ public class QuorumControllerMetricsTest {
@Test
public void testUpdateEventQueueTime() {
MetricsRegistry registry = new MetricsRegistry();
+ MockTime time = new MockTime();
try {
- try (QuorumControllerMetrics quorumControllerMetrics = new
QuorumControllerMetrics(registry)) {
+ try (QuorumControllerMetrics quorumControllerMetrics = new
QuorumControllerMetrics(registry, time)) {
quorumControllerMetrics.updateEventQueueTime(1000);
assertMetricHistogram(registry,
metricName("ControllerEventManager", "EventQueueTimeMs"), 1, 1000);
}
@@ -66,8 +72,9 @@ public class QuorumControllerMetricsTest {
@Test
public void testUpdateEventQueueProcessingTime() {
MetricsRegistry registry = new MetricsRegistry();
+ MockTime time = new MockTime();
try {
- try (QuorumControllerMetrics quorumControllerMetrics = new
QuorumControllerMetrics(registry)) {
+ try (QuorumControllerMetrics quorumControllerMetrics = new
QuorumControllerMetrics(registry, time)) {
quorumControllerMetrics.updateEventQueueProcessingTime(1000);
assertMetricHistogram(registry,
metricName("ControllerEventManager", "EventQueueProcessingTimeMs"), 1, 1000);
}
@@ -76,10 +83,44 @@ public class QuorumControllerMetricsTest {
}
}
+ @Test
+ public void testLastAppliedRecordMetrics() {
+ MetricsRegistry registry = new MetricsRegistry();
+ MockTime time = new MockTime();
+ time.sleep(1000);
+ try {
+ try (QuorumControllerMetrics quorumControllerMetrics = new
QuorumControllerMetrics(registry, time)) {
+ quorumControllerMetrics.setLastAppliedRecordOffset(100);
+ quorumControllerMetrics.setLastAppliedRecordTimestamp(500);
+
+ @SuppressWarnings("unchecked")
+ Gauge<Long> lastAppliedRecordOffset = (Gauge<Long>) registry
+ .allMetrics()
+ .get(metricName("KafkaController",
"LastAppliedRecordOffset"));
+ assertEquals(100, lastAppliedRecordOffset.value());
+
+ @SuppressWarnings("unchecked")
+ Gauge<Long> lastAppliedRecordTimestamp = (Gauge<Long>) registry
+ .allMetrics()
+ .get(metricName("KafkaController",
"LastAppliedRecordTimestamp"));
+ assertEquals(500, lastAppliedRecordTimestamp.value());
+
+ @SuppressWarnings("unchecked")
+ Gauge<Long> lastAppliedRecordLagMs = (Gauge<Long>) registry
+ .allMetrics()
+ .get(metricName("KafkaController",
"LastAppliedRecordLagMs"));
+ assertEquals(time.milliseconds() - 500,
lastAppliedRecordLagMs.value());
+ }
+ } finally {
+ registry.shutdown();
+ }
+ }
+
private static void assertMetricsCreatedAndRemovedUponClose(String
expectedType, Set<String> expectedMetricNames) {
MetricsRegistry registry = new MetricsRegistry();
+ MockTime time = new MockTime();
try {
- try (QuorumControllerMetrics quorumControllerMetrics = new
QuorumControllerMetrics(registry)) {
+ try (QuorumControllerMetrics quorumControllerMetrics = new
QuorumControllerMetrics(registry, time)) {
assertMetricsCreated(registry, expectedMetricNames,
expectedType);
}
assertMetricsRemoved(registry, expectedMetricNames, expectedType);
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index e51b38e783..eef7215070 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;
import java.util.Spliterator;
@@ -84,6 +85,7 @@ import org.apache.kafka.metadata.MetadataRecordSerde;
import org.apache.kafka.metadata.PartitionRegistration;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
+import org.apache.kafka.metalog.LocalLogManager;
import org.apache.kafka.metalog.LocalLogManagerTestEnv;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -423,6 +425,54 @@ public class QuorumControllerTest {
}
}
+ @Test
+ public void testNoOpRecordWriteAfterTimeout() throws Throwable {
+ long maxIdleIntervalNs = 1_000;
+ long maxReplicationDelayMs = 60_000;
+ try (
+ LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3,
Optional.empty());
+ QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(
+ logEnv,
+ builder -> {
+ builder.setConfigSchema(SCHEMA)
+
.setMaxIdleIntervalNs(OptionalLong.of(maxIdleIntervalNs));
+ }
+ );
+ ) {
+ ListenerCollection listeners = new ListenerCollection();
+ listeners.add(new
Listener().setName("PLAINTEXT").setHost("localhost").setPort(9092));
+ QuorumController active = controlEnv.activeController();
+
+ LocalLogManager localLogManager = logEnv
+ .logManagers()
+ .stream()
+ .filter(logManager ->
logManager.nodeId().equals(OptionalInt.of(active.nodeId())))
+ .findAny()
+ .get();
+ TestUtils.waitForCondition(
+ () -> localLogManager.highWatermark().isPresent(),
+ maxReplicationDelayMs,
+ "High watermark was not established"
+ );
+
+
+ final long firstHighWatermark =
localLogManager.highWatermark().getAsLong();
+ TestUtils.waitForCondition(
+ () -> localLogManager.highWatermark().getAsLong() >
firstHighWatermark,
+ maxReplicationDelayMs,
+ "Active controller didn't write NoOpRecord the first time"
+ );
+
+ // Do it again to make sure that we are not counting the leader
change record
+ final long secondHighWatermark =
localLogManager.highWatermark().getAsLong();
+ TestUtils.waitForCondition(
+ () -> localLogManager.highWatermark().getAsLong() >
secondHighWatermark,
+ maxReplicationDelayMs,
+ "Active controller didn't write NoOpRecord the second time"
+ );
+ }
+ }
+
@Test
public void testUnregisterBroker() throws Throwable {
try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(1,
Optional.empty())) {
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index af9cd218b4..e1481b9fe5 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -19,8 +19,15 @@ package org.apache.kafka.controller;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.controller.QuorumController.Builder;
@@ -31,14 +38,6 @@ import org.apache.kafka.test.TestUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.OptionalInt;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
public class QuorumControllerTestEnv implements AutoCloseable {
private static final Logger log =
LoggerFactory.getLogger(QuorumControllerTestEnv.class);
@@ -70,11 +69,11 @@ public class QuorumControllerTestEnv implements
AutoCloseable {
QuorumController.Builder builder = new
QuorumController.Builder(i, logEnv.clusterId());
builder.setRaftClient(logEnv.logManagers().get(i));
builder.setBootstrapMetadata(BootstrapMetadata.create(metadataVersion));
+
builder.setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs);
builder.setQuorumFeatures(new QuorumFeatures(i, apiVersions,
QuorumFeatures.defaultFeatureMap(), nodeIds));
sessionTimeoutMillis.ifPresent(timeout -> {
builder.setSessionTimeoutNs(NANOSECONDS.convert(timeout,
TimeUnit.MILLISECONDS));
});
-
builder.setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs);
builderConsumer.accept(builder);
this.controllers.add(builder.build());
}
diff --git
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index d7141a76dd..abc90912a1 100644
---
a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -76,6 +76,7 @@ import org.apache.kafka.metadata.Replicas;
import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
import org.apache.kafka.metadata.placement.UsableBroker;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.policy.CreateTopicPolicy;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Test;
@@ -164,6 +165,7 @@ public class ReplicationControlManagerTest {
ReplicationControlTestContext(Optional<CreateTopicPolicy>
createTopicPolicy) {
this.replicationControl = new ReplicationControlManager.Builder().
+ setMetadataVersion(() -> MetadataVersion.IBP_3_3_IV1).
setSnapshotRegistry(snapshotRegistry).
setLogContext(logContext).
setMaxElectionsPerImbalance(Integer.MAX_VALUE).
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
index 3fdeea8dbe..f5a8da5f8a 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
@@ -205,12 +205,16 @@ public class RecordTestUtils {
/**
* Create a batch reader for testing.
*
- * @param lastOffset The last offset of the given list of records.
- * @param records The records.
- * @return A batch reader which will return the given records.
+ * @param lastOffset the last offset of the given list of records
+ * @param appendTimestamp the append timestamp for the batches created
+ * @param records the records
+ * @return a batch reader which will return the given records
*/
- public static BatchReader<ApiMessageAndVersion>
- mockBatchReader(long lastOffset, List<ApiMessageAndVersion>
records) {
+ public static BatchReader<ApiMessageAndVersion> mockBatchReader(
+ long lastOffset,
+ long appendTimestamp,
+ List<ApiMessageAndVersion> records
+ ) {
List<Batch<ApiMessageAndVersion>> batches = new ArrayList<>();
long offset = lastOffset - records.size() + 1;
Iterator<ApiMessageAndVersion> iterator = records.iterator();
@@ -218,7 +222,7 @@ public class RecordTestUtils {
assertTrue(iterator.hasNext()); // At least one record is required
while (true) {
if (!iterator.hasNext() || curRecords.size() >= 2) {
- batches.add(Batch.data(offset, 0, 0, sizeInBytes(curRecords),
curRecords));
+ batches.add(Batch.data(offset, 0, appendTimestamp,
sizeInBytes(curRecords), curRecords));
if (!iterator.hasNext()) {
break;
}
diff --git
a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
index c7ffcfb0b6..84748bd330 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/KafkaRaftMetrics.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.metrics.stats.WindowedSum;
import org.apache.kafka.raft.OffsetAndEpoch;
import org.apache.kafka.raft.QuorumState;
+import java.util.Arrays;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
@@ -34,11 +35,11 @@ public class KafkaRaftMetrics implements AutoCloseable {
private final Metrics metrics;
- private OffsetAndEpoch logEndOffset;
- private int numUnknownVoterConnections;
- private OptionalLong electionStartMs;
- private OptionalLong pollStartMs;
- private OptionalLong pollEndMs;
+ private volatile OffsetAndEpoch logEndOffset;
+ private volatile int numUnknownVoterConnections;
+ private volatile OptionalLong electionStartMs;
+ private volatile OptionalLong pollStartMs;
+ private volatile OptionalLong pollEndMs;
private final MetricName currentLeaderIdMetricName;
private final MetricName currentVotedIdMetricName;
@@ -186,19 +187,23 @@ public class KafkaRaftMetrics implements AutoCloseable {
@Override
public void close() {
- metrics.removeMetric(currentLeaderIdMetricName);
- metrics.removeMetric(currentVotedIdMetricName);
- metrics.removeMetric(currentEpochMetricName);
- metrics.removeMetric(currentStateMetricName);
- metrics.removeMetric(highWatermarkMetricName);
- metrics.removeMetric(logEndOffsetMetricName);
- metrics.removeMetric(logEndEpochMetricName);
- metrics.removeMetric(numUnknownVoterConnectionsMetricName);
-
- metrics.removeSensor(commitTimeSensor.name());
- metrics.removeSensor(electionTimeSensor.name());
- metrics.removeSensor(fetchRecordsSensor.name());
- metrics.removeSensor(appendRecordsSensor.name());
- metrics.removeSensor(pollIdleSensor.name());
+ Arrays.asList(
+ currentLeaderIdMetricName,
+ currentVotedIdMetricName,
+ currentEpochMetricName,
+ currentStateMetricName,
+ highWatermarkMetricName,
+ logEndOffsetMetricName,
+ logEndEpochMetricName,
+ numUnknownVoterConnectionsMetricName
+ ).forEach(metrics::removeMetric);
+
+ Arrays.asList(
+ commitTimeSensor.name(),
+ electionTimeSensor.name(),
+ fetchRecordsSensor.name(),
+ appendRecordsSensor.name(),
+ pollIdleSensor.name()
+ ).forEach(metrics::removeSensor);
}
}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 7f3c12122e..920c60300d 100644
---
a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++
b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -150,10 +150,13 @@ public enum MetadataVersion {
IBP_3_1_IV0(3, "3.1", "IV0", false),
// Support for leader recovery for unclean leader election (KIP-704)
- IBP_3_2_IV0(4, "3.2", "IV0", false),
+ IBP_3_2_IV0(4, "3.2", "IV0", true),
// Support for metadata.version feature flag and Removes min_version_level
from the finalized version range that is written to ZooKeeper (KIP-778)
- IBP_3_3_IV0(5, "3.3", "IV0", false);
+ IBP_3_3_IV0(5, "3.3", "IV0", false),
+
+ // Support NoopRecord for the cluster metadata log (KIP-835)
+ IBP_3_3_IV1(6, "3.3", "IV1", true);
public static final String FEATURE_NAME = "metadata.version";
@@ -211,6 +214,14 @@ public enum MetadataVersion {
return this.isAtLeast(IBP_3_0_IV0);
}
+ public boolean isLeaderRecoverySupported() {
+ return this.isAtLeast(IBP_3_2_IV0);
+ }
+
+ public boolean isNoOpRecordSupported() {
+ return this.isAtLeast(IBP_3_3_IV1);
+ }
+
public boolean isKRaftSupported() {
return this.featureLevel > 0;
}
diff --git
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index bbce1b6ae4..8649586be9 100644
---
a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -60,6 +60,7 @@ import static
org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_1_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_2_IV0;
import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV0;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV1;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -188,8 +189,9 @@ class MetadataVersionTest {
assertEquals(IBP_3_2_IV0, MetadataVersion.fromVersionString("3.2"));
assertEquals(IBP_3_2_IV0,
MetadataVersion.fromVersionString("3.2-IV0"));
- assertEquals(IBP_3_3_IV0, MetadataVersion.fromVersionString("3.3"));
+ assertEquals(IBP_3_3_IV1, MetadataVersion.fromVersionString("3.3"));
assertEquals(IBP_3_3_IV0,
MetadataVersion.fromVersionString("3.3-IV0"));
+ assertEquals(IBP_3_3_IV1,
MetadataVersion.fromVersionString("3.3-IV1"));
}
@Test
@@ -232,7 +234,9 @@ class MetadataVersionTest {
assertEquals("3.0", IBP_3_0_IV0.shortVersion());
assertEquals("3.0", IBP_3_0_IV1.shortVersion());
assertEquals("3.1", IBP_3_1_IV0.shortVersion());
+ assertEquals("3.2", IBP_3_2_IV0.shortVersion());
assertEquals("3.3", IBP_3_3_IV0.shortVersion());
+ assertEquals("3.3", IBP_3_3_IV1.shortVersion());
}
@Test
@@ -266,6 +270,7 @@ class MetadataVersionTest {
assertEquals("3.1-IV0", IBP_3_1_IV0.version());
assertEquals("3.2-IV0", IBP_3_2_IV0.version());
assertEquals("3.3-IV0", IBP_3_3_IV0.version());
+ assertEquals("3.3-IV1", IBP_3_3_IV1.version());
}
@Test
@@ -280,13 +285,14 @@ class MetadataVersionTest {
@Test
public void testMetadataChanged() {
assertFalse(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0,
IBP_3_2_IV0));
- assertFalse(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0,
IBP_3_1_IV0));
- assertFalse(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0,
IBP_3_0_IV1));
- assertFalse(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0,
IBP_3_0_IV0));
+ assertTrue(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0,
IBP_3_1_IV0));
+ assertTrue(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0,
IBP_3_0_IV1));
+ assertTrue(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0,
IBP_3_0_IV0));
assertTrue(MetadataVersion.checkIfMetadataChanged(IBP_3_2_IV0,
IBP_2_8_IV1));
+ assertTrue(MetadataVersion.checkIfMetadataChanged(IBP_3_3_IV1,
IBP_3_3_IV0));
// Check that argument order doesn't matter
- assertFalse(MetadataVersion.checkIfMetadataChanged(IBP_3_0_IV0,
IBP_3_2_IV0));
+ assertTrue(MetadataVersion.checkIfMetadataChanged(IBP_3_0_IV0,
IBP_3_2_IV0));
assertTrue(MetadataVersion.checkIfMetadataChanged(IBP_2_8_IV1,
IBP_3_2_IV0));
}