This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new d3f911d MINOR: the broker should use
metadata.log.max.record.bytes.between.snapshots (#10990)
d3f911d is described below
commit d3f911d5b2baf90a39b23764292482db48164e84
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Fri Jul 9 12:00:26 2021 -0700
MINOR: the broker should use
metadata.log.max.record.bytes.between.snapshots (#10990)
The broker should trigger a snapshot once
metadata.log.max.record.bytes.between.snapshots has been exceeded.
Reviewers: Jason Gustafson <[email protected]>
---
build.gradle | 1 +
.../src/main/scala/kafka/server/BrokerServer.scala | 38 +++++-
.../server/metadata/BrokerMetadataListener.scala | 89 ++++++++++----
.../metadata/BrokerMetadataSnapshotter.scala | 113 ++++++++++++++++++
.../server/metadata/MetadataSnapshotter.scala | 40 +++++++
.../metadata/BrokerMetadataListenerTest.scala | 130 ++++++++++++++++++++-
.../metadata/BrokerMetadataSnapshotterTest.scala | 103 ++++++++++++++++
.../apache/kafka/metadata/MetadataRecordSerde.java | 1 +
.../org/apache/kafka/image/MetadataImageTest.java | 6 +-
.../org/apache/kafka/metadata/RecordTestUtils.java | 15 ++-
10 files changed, 498 insertions(+), 38 deletions(-)
diff --git a/build.gradle b/build.gradle
index b589525..5f41112 100644
--- a/build.gradle
+++ b/build.gradle
@@ -837,6 +837,7 @@ project(':core') {
testImplementation project(':clients').sourceSets.test.output
testImplementation project(':metadata').sourceSets.test.output
+ testImplementation project(':raft').sourceSets.test.output
testImplementation libs.bcpkix
testImplementation libs.mockitoCore
testImplementation libs.easymock
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 39ca0ce..a29e759 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -31,9 +31,9 @@ import kafka.metrics.KafkaYammerMetrics
import kafka.network.SocketServer
import kafka.raft.RaftManager
import kafka.security.CredentialProvider
-import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher,
ClientQuotaMetadataManager, KRaftMetadataCache}
+import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher,
BrokerMetadataSnapshotter, ClientQuotaMetadataManager, KRaftMetadataCache,
SnapshotWriterBuilder}
import kafka.utils.{CoreUtils, KafkaScheduler}
-//import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME,
TRANSACTION_STATE_TOPIC_NAME}
+import org.apache.kafka.snapshot.SnapshotWriter
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import
org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener,
ListenerCollection}
import org.apache.kafka.common.metrics.Metrics
@@ -44,13 +44,29 @@ import
org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
import org.apache.kafka.common.{ClusterResource, Endpoint}
import org.apache.kafka.metadata.{BrokerState, VersionRange}
-import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.raft.{RaftClient, RaftConfig}
import org.apache.kafka.raft.RaftConfig.AddressSpec
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.ApiMessageAndVersion
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
+import scala.compat.java8.OptionConverters._
+
+
+class BrokerSnapshotWriterBuilder(raftClient: RaftClient[ApiMessageAndVersion])
+ extends SnapshotWriterBuilder {
+ override def build(committedOffset: Long,
+ committedEpoch: Int,
+ lastContainedLogTime: Long):
SnapshotWriter[ApiMessageAndVersion] = {
+ raftClient.createSnapshot(committedOffset, committedEpoch,
lastContainedLogTime).
+ asScala.getOrElse(
+ throw new RuntimeException("A snapshot already exists with " +
+ s"committedOffset=${committedOffset},
committedEpoch=${committedEpoch}, " +
+ s"lastContainedLogTime=${lastContainedLogTime}")
+ )
+ }
+}
/**
* A Kafka broker that runs in KRaft (Kafka Raft) mode.
@@ -129,6 +145,8 @@ class BrokerServer(
val clusterId: String = metaProps.clusterId
+ var metadataSnapshotter: BrokerMetadataSnapshotter = null
+
var metadataListener: BrokerMetadataListener = null
var metadataPublisher: BrokerMetadataPublisher = null
@@ -271,7 +289,15 @@ class BrokerServer(
ConfigType.Topic -> new TopicConfigHandler(logManager, config,
quotaManagers, None),
ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
- metadataListener = new BrokerMetadataListener(config.nodeId, time,
threadNamePrefix)
+ metadataSnapshotter = new BrokerMetadataSnapshotter(config.nodeId,
+ time,
+ threadNamePrefix,
+ new
BrokerSnapshotWriterBuilder(raftManager.client))
+ metadataListener = new BrokerMetadataListener(config.nodeId,
+ time,
+ threadNamePrefix,
+
config.metadataSnapshotMaxNewRecordBytes,
+ metadataSnapshotter)
val networkListeners = new ListenerCollection()
config.advertisedListeners.foreach { ep =>
@@ -408,10 +434,12 @@ class BrokerServer(
if (controlPlaneRequestProcessor != null)
CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
CoreUtils.swallow(authorizer.foreach(_.close()), this)
-
if (metadataListener != null) {
CoreUtils.swallow(metadataListener.close(), this)
}
+ if (metadataSnapshotter != null) {
+ CoreUtils.swallow(metadataSnapshotter.close(), this)
+ }
if (transactionCoordinator != null)
CoreUtils.swallow(transactionCoordinator.shutdown(), this)
if (groupCoordinator != null)
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 5a04219..9369384 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -37,7 +37,9 @@ object BrokerMetadataListener {
class BrokerMetadataListener(
val brokerId: Int,
time: Time,
- threadNamePrefix: Option[String]
+ threadNamePrefix: Option[String],
+ val maxBytesBetweenSnapshots: Long,
+ val snapshotter: MetadataSnapshotter
) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
private val logContext = new LogContext(s"[BrokerMetadataListener
id=${brokerId}] ")
private val log = logContext.logger(classOf[BrokerMetadataListener])
@@ -56,7 +58,17 @@ class BrokerMetadataListener(
/**
* The highest metadata offset that we've seen. Written only from the event
queue thread.
*/
- @volatile private var _highestMetadataOffset = -1L
+ @volatile var _highestMetadataOffset = -1L
+
+ /**
+ * The highest metadata log epoch that we've seen. Written only from the
event queue thread.
+ */
+ private var _highestEpoch = -1
+
+ /**
+ * The highest metadata log time that we've seen. Written only from the
event queue thread.
+ */
+ private var _highestTimestamp = -1L
/**
* The current broker metadata image. Accessed only from the event queue
thread.
@@ -70,11 +82,18 @@ class BrokerMetadataListener(
/**
* The object to use to publish new metadata changes, or None if this
listener has not
- * been activated yet.
+ * been activated yet. Accessed only from the event queue thread.
*/
private var _publisher: Option[MetadataPublisher] = None
/**
+ * The number of bytes of records that we have read since the last snapshot
we took.
+ * This does not include records we read from a snapshot.
+ * Accessed only from the event queue thread.
+ */
+ private var _bytesSinceLastSnapshot: Long = 0L
+
+ /**
* The event queue which runs this listener.
*/
val eventQueue = new KafkaEventQueue(time, logContext,
threadNamePrefix.getOrElse(""))
@@ -102,10 +121,23 @@ class BrokerMetadataListener(
} finally {
reader.close()
}
- maybePublish(results.highestMetadataOffset)
+ _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
+ _publisher.foreach(publish(_, results.highestMetadataOffset))
+ if (shouldSnapshot()) {
+ if (snapshotter.maybeStartSnapshot(results.highestMetadataOffset,
+ _highestEpoch,
+ _highestTimestamp,
+ _delta.apply())) {
+ _bytesSinceLastSnapshot = 0L
+ }
+ }
}
}
+ private def shouldSnapshot(): Boolean = {
+ _bytesSinceLastSnapshot >= maxBytesBetweenSnapshots
+ }
+
/**
* Handle metadata snapshots
*/
@@ -126,17 +158,18 @@ class BrokerMetadataListener(
} finally {
reader.close()
}
- maybePublish(results.highestMetadataOffset)
+ _publisher.foreach(publish(_, results.highestMetadataOffset))
}
}
case class BatchLoadResults(numBatches: Int,
numRecords: Int,
elapsedUs: Long,
+ numBytes: Long,
highestMetadataOffset: Long) {
override def toString(): String = {
- s"${numBatches} batch(es) with ${numRecords} record(s) ending at offset
" +
- s"${highestMetadataOffset} in ${elapsedUs} microseconds"
+ s"${numBatches} batch(es) with ${numRecords} record(s) in ${numBytes}
bytes " +
+ s"ending at offset ${highestMetadataOffset} in ${elapsedUs}
microseconds"
}
}
@@ -145,12 +178,12 @@ class BrokerMetadataListener(
val startTimeNs = time.nanoseconds()
var numBatches = 0
var numRecords = 0
- var newHighestMetadataOffset = _highestMetadataOffset
+ var batch: Batch[ApiMessageAndVersion] = null
+ var numBytes = 0L
while (iterator.hasNext()) {
- val batch = iterator.next()
+ batch = iterator.next()
var index = 0
batch.records().forEach { messageAndVersion =>
- newHighestMetadataOffset = batch.lastOffset()
if (isTraceEnabled) {
trace("Metadata batch %d: processing [%d/%d]:
%s.".format(batch.lastOffset, index + 1,
batch.records().size(), messageAndVersion.message().toString()))
@@ -159,14 +192,22 @@ class BrokerMetadataListener(
numRecords += 1
index += 1
}
+ numBytes = numBytes + batch.sizeInBytes()
metadataBatchSizeHist.update(batch.records().size())
numBatches = numBatches + 1
}
- _highestMetadataOffset = newHighestMetadataOffset
+ val newHighestMetadataOffset = if (batch == null) {
+ _highestMetadataOffset
+ } else {
+ _highestMetadataOffset = batch.lastOffset()
+ _highestEpoch = batch.epoch()
+ _highestTimestamp = batch.appendTimestamp()
+ batch.lastOffset()
+ }
val endTimeNs = time.nanoseconds()
val elapsedUs = TimeUnit.MICROSECONDS.convert(endTimeNs - startTimeNs,
TimeUnit.NANOSECONDS)
batchProcessingTimeHist.update(elapsedUs)
- BatchLoadResults(numBatches, numRecords, elapsedUs,
newHighestMetadataOffset)
+ BatchLoadResults(numBatches, numRecords, elapsedUs, numBytes,
newHighestMetadataOffset)
}
def startPublishing(publisher: MetadataPublisher): CompletableFuture[Void] =
{
@@ -183,28 +224,26 @@ class BrokerMetadataListener(
_publisher = Some(publisher)
log.info(s"Starting to publish metadata events at offset
${_highestMetadataOffset}.")
try {
- maybePublish(_highestMetadataOffset)
+ publish(publisher, _highestMetadataOffset)
future.complete(null)
} catch {
- case e: Throwable => future.completeExceptionally(e)
+ case e: Throwable =>
+ future.completeExceptionally(e)
+ throw e
}
}
}
- private def maybePublish(newHighestMetadataOffset: Long): Unit = {
- _publisher match {
- case None => // Nothing to do
- case Some(publisher) => {
- val delta = _delta
- _image = _delta.apply()
- _delta = new MetadataDelta(_image)
- publisher.publish(newHighestMetadataOffset, delta, _image)
- }
- }
+ private def publish(publisher: MetadataPublisher,
+ newHighestMetadataOffset: Long): Unit = {
+ val delta = _delta
+ _image = _delta.apply()
+ _delta = new MetadataDelta(_image)
+ publisher.publish(newHighestMetadataOffset, delta, _image)
}
override def handleLeaderChange(leaderAndEpoch: LeaderAndEpoch): Unit = {
- // TODO: cache leaderAndEpoch so we can use the epoch in broker-initiated
snapshots.
+ // Nothing to do.
}
override def beginShutdown(): Unit = {
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
new file mode 100644
index 0000000..db41e0c
--- /dev/null
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.metadata
+
+import java.util.concurrent.RejectedExecutionException
+
+import kafka.utils.Logging
+import org.apache.kafka.image.MetadataImage
+import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
+import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.snapshot.SnapshotWriter
+
+
+trait SnapshotWriterBuilder {
+ def build(committedOffset: Long,
+ committedEpoch: Int,
+ lastContainedLogTime: Long): SnapshotWriter[ApiMessageAndVersion]
+}
+
+class BrokerMetadataSnapshotter(
+ brokerId: Int,
+ val time: Time,
+ threadNamePrefix: Option[String],
+ writerBuilder: SnapshotWriterBuilder
+) extends Logging with MetadataSnapshotter {
+ private val logContext = new LogContext(s"[BrokerMetadataSnapshotter
id=${brokerId}] ")
+ logIdent = logContext.logPrefix()
+
+ /**
+ * The offset of the snapshot in progress, or -1 if there isn't one.
Accessed only under
+ * the object lock.
+ */
+ private var _currentSnapshotOffset = -1L
+
+ /**
+ * The event queue which runs this listener.
+ */
+ val eventQueue = new KafkaEventQueue(time, logContext,
threadNamePrefix.getOrElse(""))
+
+ override def maybeStartSnapshot(committedOffset: Long,
+ committedEpoch: Int,
+ lastContainedLogTime: Long,
+ image: MetadataImage): Boolean =
synchronized {
+ if (_currentSnapshotOffset == -1L) {
+ val writer = writerBuilder.build(committedOffset, committedEpoch,
lastContainedLogTime)
+ _currentSnapshotOffset = committedOffset
+ info(s"Creating a new snapshot at offset ${committedOffset}...")
+ eventQueue.append(new CreateSnapshotEvent(image, writer))
+ true
+ } else {
+ warn(s"Declining to create a new snapshot at offset ${committedOffset}
because " +
+ s"there is already a snapshot in progress at offset
${_currentSnapshotOffset}")
+ false
+ }
+ }
+
+ class CreateSnapshotEvent(image: MetadataImage,
+ writer: SnapshotWriter[ApiMessageAndVersion])
+ extends EventQueue.Event {
+ override def run(): Unit = {
+ try {
+ image.write(writer.append(_))
+ writer.freeze()
+ } finally {
+ try {
+ writer.close()
+ } finally {
+ BrokerMetadataSnapshotter.this.synchronized {
+ _currentSnapshotOffset = -1L
+ }
+ }
+ }
+ }
+
+ override def handleException(e: Throwable): Unit = {
+ e match {
+ case _: RejectedExecutionException =>
+ info("Not processing CreateSnapshotEvent because the event queue is
closed.")
+ case _ => error("Unexpected error handling CreateSnapshotEvent", e)
+ }
+ writer.close()
+ }
+ }
+
+ def beginShutdown(): Unit = {
+ eventQueue.beginShutdown("beginShutdown", new ShutdownEvent())
+ }
+
+ class ShutdownEvent() extends EventQueue.Event {
+ override def run(): Unit = {
+ }
+ }
+
+ def close(): Unit = {
+ beginShutdown()
+ eventQueue.close()
+ }
+}
diff --git
a/core/src/main/scala/kafka/server/metadata/MetadataSnapshotter.scala
b/core/src/main/scala/kafka/server/metadata/MetadataSnapshotter.scala
new file mode 100644
index 0000000..9b377a6
--- /dev/null
+++ b/core/src/main/scala/kafka/server/metadata/MetadataSnapshotter.scala
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.metadata
+
+import org.apache.kafka.image.MetadataImage
+
+
+/**
+ * Handles creating snapshots.
+ */
+trait MetadataSnapshotter {
+ /**
+ * If there is no other snapshot being written out, start writing out a
snapshot.
+ *
+ * @param committedOffset The highest metadata log offset of the
snapshot.
+ * @param committedEpoch The highest metadata log epoch of the
snapshot.
+ * @param lastContainedLogTime The highest time contained in the snapshot.
+ * @param image The metadata image to write out.
+ *
+ * @return True if we will write out a new snapshot;
false otherwise.
+ */
+ def maybeStartSnapshot(committedOffset: Long,
+ committedEpoch: Int,
+ lastContainedLogTime: Long,
+ image: MetadataImage): Boolean
+}
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index a19f67d..81a22e1 100644
---
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -18,28 +18,31 @@
package kafka.server.metadata
import java.util
+import java.util.concurrent.atomic.AtomicReference
import java.util.{Collections, Optional}
import org.apache.kafka.common.{Endpoint, Uuid}
-import org.apache.kafka.common.metadata.RegisterBrokerRecord
+import org.apache.kafka.common.metadata.{PartitionChangeRecord,
PartitionRecord, RegisterBrokerRecord, TopicRecord}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
import org.apache.kafka.metadata.{BrokerRegistration, RecordTestUtils,
VersionRange}
import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Test
class BrokerMetadataListenerTest {
@Test
def testCreateAndClose(): Unit = {
- val listener = new BrokerMetadataListener(0, Time.SYSTEM, None)
+ val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000000L,
+ (_, _, _, _) => throw new UnsupportedOperationException())
listener.close()
}
@Test
def testPublish(): Unit = {
- val listener = new BrokerMetadataListener(0, Time.SYSTEM, None)
+ val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000000L,
+ (_, _, _, _) => throw new UnsupportedOperationException())
try {
listener.handleCommit(RecordTestUtils.mockBatchReader(100L,
util.Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
@@ -77,5 +80,124 @@ class BrokerMetadataListenerTest {
listener.close()
}
}
+
+ class MockMetadataSnapshotter extends MetadataSnapshotter {
+ var image = MetadataImage.EMPTY
+ val failure = new AtomicReference[Throwable](null)
+ var activeSnapshotOffset = -1L
+ var prevCommittedOffset = -1L
+ var prevCommittedEpoch = -1
+ var prevLastContainedLogTime = -1L
+
+ override def maybeStartSnapshot(committedOffset: Long,
+ committedEpoch: Int,
+ lastContainedLogTime: Long,
+ newImage: MetadataImage): Boolean = {
+ try {
+ if (activeSnapshotOffset == -1L) {
+ assertTrue(prevCommittedOffset <= committedOffset)
+ assertTrue(prevCommittedEpoch <= committedEpoch)
+ assertTrue(prevLastContainedLogTime <= lastContainedLogTime)
+ prevCommittedOffset = committedOffset
+ prevCommittedEpoch = committedEpoch
+ prevLastContainedLogTime = lastContainedLogTime
+ image = newImage
+ activeSnapshotOffset = committedOffset
+ true
+ } else {
+ false
+ }
+ } catch {
+ case t: Throwable => failure.compareAndSet(null, t)
+ }
+ }
+ }
+
+ class MockMetadataPublisher extends MetadataPublisher {
+ var image = MetadataImage.EMPTY
+
+ override def publish(newHighestMetadataOffset: Long,
+ delta: MetadataDelta,
+ newImage: MetadataImage): Unit = {
+ image = newImage
+ }
+ }
+
+ private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
+
+ private def generateManyRecords(listener: BrokerMetadataListener,
+ endOffset: Long): Unit = {
+ (0 to 10000).foreach { _ =>
+ listener.handleCommit(RecordTestUtils.mockBatchReader(endOffset,
+ util.Arrays.asList(new ApiMessageAndVersion(new
PartitionChangeRecord().
+ setPartitionId(0).
+ setTopicId(FOO_ID).
+ setRemovingReplicas(Collections.singletonList(1)), 0.toShort),
+ new ApiMessageAndVersion(new PartitionChangeRecord().
+ setPartitionId(0).
+ setTopicId(FOO_ID).
+ setRemovingReplicas(Collections.emptyList()), 0.toShort))))
+ }
+ listener.getImageRecords().get()
+ }
+
+ @Test
+ def testCreateSnapshot(): Unit = {
+ val snapshotter = new MockMetadataSnapshotter()
+ val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000L,
snapshotter)
+ try {
+ (0 to 3).foreach {
+ id => listener.handleCommit(RecordTestUtils.mockBatchReader(100L,
+ util.Arrays.asList(new ApiMessageAndVersion(new
RegisterBrokerRecord().
+ setBrokerId(id).
+ setBrokerEpoch(100L).
+ setFenced(false).
+ setRack(null).
+ setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CS" + id)),
1))))
+ }
+ listener.handleCommit(RecordTestUtils.mockBatchReader(200L,
+ util.Arrays.asList(new ApiMessageAndVersion(new TopicRecord().
+ setName("foo").
+ setTopicId(FOO_ID), 1.toShort),
+ new ApiMessageAndVersion(new PartitionRecord().
+ setPartitionId(0).
+ setTopicId(FOO_ID).
+ setIsr(util.Arrays.asList(0, 1, 2)).
+ setLeader(0).
+ setReplicas(util.Arrays.asList(0, 1, 2)).
+ setRemovingReplicas(util.Arrays.asList(0, 1, 2)).
+ setAddingReplicas(util.Arrays.asList(0, 1, 2)), 1.toShort))))
+ listener.getImageRecords().get()
+ assertEquals(200L, listener.highestMetadataOffset())
+
+ // Check that we generate at least one snapshot once we see enough
records.
+ assertEquals(-1L, snapshotter.prevCommittedOffset)
+ generateManyRecords(listener, 1000L);
+ assertEquals(1000L, snapshotter.prevCommittedOffset)
+ assertEquals(1000L, snapshotter.activeSnapshotOffset)
+ snapshotter.activeSnapshotOffset = -1L
+
+ // Test creating a new snapshot after publishing it.
+ val publisher = new MockMetadataPublisher()
+ listener.startPublishing(publisher).get()
+ generateManyRecords(listener, 2000L);
+ listener.getImageRecords().get()
+ assertEquals(2000L, snapshotter.activeSnapshotOffset)
+ assertEquals(2000L, snapshotter.prevCommittedOffset)
+
+ // Test how we handle the snapshotter returning false.
+ generateManyRecords(listener, 3000L);
+ assertEquals(2000L, snapshotter.activeSnapshotOffset)
+ generateManyRecords(listener, 4000L);
+ assertEquals(2000L, snapshotter.activeSnapshotOffset)
+ snapshotter.activeSnapshotOffset = -1L
+ generateManyRecords(listener, 5000L);
+ assertEquals(5000L, snapshotter.activeSnapshotOffset)
+ assertEquals(null, snapshotter.failure.get())
+ } finally {
+ listener.close()
+ }
+ }
+
}
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
new file mode 100644
index 0000000..b030e87
--- /dev/null
+++
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import java.nio.ByteBuffer
+import java.util.Optional
+import java.util.concurrent.{CompletableFuture, CountDownLatch}
+
+import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.protocol.ByteBufferAccessor
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataImageTest}
+import org.apache.kafka.metadata.MetadataRecordSerde
+import org.apache.kafka.queue.EventQueue
+import org.apache.kafka.raft.OffsetAndEpoch
+import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.snapshot.{MockRawSnapshotWriter, SnapshotWriter}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.api.Test
+
+
+class BrokerMetadataSnapshotterTest {
+ @Test
+ def testCreateAndClose(): Unit = {
+ val snapshotter = new BrokerMetadataSnapshotter(0, Time.SYSTEM, None,
+ (_, _, _) => throw new RuntimeException("unimplemented"))
+ snapshotter.close()
+ }
+
+ class MockSnapshotWriterBuilder extends SnapshotWriterBuilder {
+ var image = new CompletableFuture[MetadataImage]
+
+ override def build(committedOffset: Long,
+ committedEpoch: Int,
+ lastContainedLogTime: Long):
SnapshotWriter[ApiMessageAndVersion] = {
+ val offsetAndEpoch = new OffsetAndEpoch(committedOffset, committedEpoch)
+ SnapshotWriter.createWithHeader(
+ () => Optional.of(new MockRawSnapshotWriter(offsetAndEpoch,
consumeSnapshotBuffer)),
+ 1024,
+ MemoryPool.NONE,
+ Time.SYSTEM,
+ lastContainedLogTime,
+ CompressionType.NONE,
+ MetadataRecordSerde.INSTANCE
+ ).get();
+ }
+
+ def consumeSnapshotBuffer(buffer: ByteBuffer): Unit = {
+ val delta = new MetadataDelta(MetadataImage.EMPTY)
+ val memoryRecords = MemoryRecords.readableRecords(buffer)
+ val batchIterator = memoryRecords.batchIterator()
+ while (batchIterator.hasNext) {
+ val batch = batchIterator.next()
+ if (!batch.isControlBatch()) {
+ batch.forEach(record => {
+ val recordBuffer = record.value().duplicate()
+ val messageAndVersion = MetadataRecordSerde.INSTANCE.read(
+ new ByteBufferAccessor(recordBuffer), recordBuffer.remaining())
+ delta.replay(messageAndVersion.message())
+ })
+ }
+ }
+ image.complete(delta.apply())
+ }
+ }
+
+ class BlockingEvent extends EventQueue.Event {
+ val latch = new CountDownLatch(1)
+ override def run(): Unit = latch.await()
+ }
+
+ @Test
+ def testCreateSnapshot(): Unit = {
+ val writerBuilder = new MockSnapshotWriterBuilder()
+ val snapshotter = new BrokerMetadataSnapshotter(0, Time.SYSTEM, None,
writerBuilder)
+ try {
+ val blockingEvent = new BlockingEvent()
+ snapshotter.eventQueue.append(blockingEvent)
+ assertTrue(snapshotter.maybeStartSnapshot(123L, 12, 10000L,
MetadataImageTest.IMAGE1))
+ assertFalse(snapshotter.maybeStartSnapshot(124L, 12, 11000L,
MetadataImageTest.IMAGE2))
+ blockingEvent.latch.countDown()
+ assertEquals(MetadataImageTest.IMAGE1, writerBuilder.image.get())
+ } finally {
+ snapshotter.close()
+ }
+ }
+}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java
b/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java
index 4a4d0ee..7964fed 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.server.common.serialization.AbstractApiMessageSerde;
public class MetadataRecordSerde extends AbstractApiMessageSerde {
+ public static final MetadataRecordSerde INSTANCE = new
MetadataRecordSerde();
@Override
public ApiMessage apiMessageFor(short apiKey) {
diff --git
a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
index 6d52d34..2ee05bb 100644
--- a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
@@ -26,11 +26,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
@Timeout(value = 40)
public class MetadataImageTest {
- private final static MetadataImage IMAGE1;
+ public final static MetadataImage IMAGE1;
- private final static MetadataDelta DELTA1;
+ public final static MetadataDelta DELTA1;
- private final static MetadataImage IMAGE2;
+ public final static MetadataImage IMAGE2;
static {
IMAGE1 = new MetadataImage(FeaturesImageTest.IMAGE1,
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
index ffc1777..7bedde3 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
@@ -19,6 +19,7 @@ package org.apache.kafka.metadata;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Message;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.raft.Batch;
import org.apache.kafka.raft.BatchReader;
@@ -36,6 +37,7 @@ import java.util.List;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
@@ -161,9 +163,10 @@ public class RecordTestUtils {
long offset = lastOffset - records.size() + 1;
Iterator<ApiMessageAndVersion> iterator = records.iterator();
List<ApiMessageAndVersion> curRecords = new ArrayList<>();
+ assertTrue(iterator.hasNext()); // At least one record is required
while (true) {
if (!iterator.hasNext() || curRecords.size() >= 2) {
- batches.add(Batch.data(offset, 0, 0, 0, curRecords));
+ batches.add(Batch.data(offset, 0, 0, sizeInBytes(curRecords),
curRecords));
if (!iterator.hasNext()) {
break;
}
@@ -174,4 +177,14 @@ public class RecordTestUtils {
}
return MemoryBatchReader.of(batches, __ -> { });
}
+
+
+ private static int sizeInBytes(List<ApiMessageAndVersion> records) {
+ int size = 0;
+ for (ApiMessageAndVersion record : records) {
+ ObjectSerializationCache cache = new ObjectSerializationCache();
+ size += MetadataRecordSerde.INSTANCE.recordSize(record, cache);
+ }
+ return size;
+ }
}