This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new d3f911d  MINOR: the broker should use 
metadata.log.max.record.bytes.between.snapshots (#10990)
d3f911d is described below

commit d3f911d5b2baf90a39b23764292482db48164e84
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Fri Jul 9 12:00:26 2021 -0700

    MINOR: the broker should use 
metadata.log.max.record.bytes.between.snapshots (#10990)
    
    The broker should trigger a snapshot once
    metadata.log.max.record.bytes.between.snapshots has been exceeded.
    
    Reviewers: Jason Gustafson <[email protected]>
---
 build.gradle                                       |   1 +
 .../src/main/scala/kafka/server/BrokerServer.scala |  38 +++++-
 .../server/metadata/BrokerMetadataListener.scala   |  89 ++++++++++----
 .../metadata/BrokerMetadataSnapshotter.scala       | 113 ++++++++++++++++++
 .../server/metadata/MetadataSnapshotter.scala      |  40 +++++++
 .../metadata/BrokerMetadataListenerTest.scala      | 130 ++++++++++++++++++++-
 .../metadata/BrokerMetadataSnapshotterTest.scala   | 103 ++++++++++++++++
 .../apache/kafka/metadata/MetadataRecordSerde.java |   1 +
 .../org/apache/kafka/image/MetadataImageTest.java  |   6 +-
 .../org/apache/kafka/metadata/RecordTestUtils.java |  15 ++-
 10 files changed, 498 insertions(+), 38 deletions(-)

diff --git a/build.gradle b/build.gradle
index b589525..5f41112 100644
--- a/build.gradle
+++ b/build.gradle
@@ -837,6 +837,7 @@ project(':core') {
 
     testImplementation project(':clients').sourceSets.test.output
     testImplementation project(':metadata').sourceSets.test.output
+    testImplementation project(':raft').sourceSets.test.output
     testImplementation libs.bcpkix
     testImplementation libs.mockitoCore
     testImplementation libs.easymock
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 39ca0ce..a29e759 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -31,9 +31,9 @@ import kafka.metrics.KafkaYammerMetrics
 import kafka.network.SocketServer
 import kafka.raft.RaftManager
 import kafka.security.CredentialProvider
-import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, 
ClientQuotaMetadataManager, KRaftMetadataCache}
+import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, 
BrokerMetadataSnapshotter, ClientQuotaMetadataManager, KRaftMetadataCache, 
SnapshotWriterBuilder}
 import kafka.utils.{CoreUtils, KafkaScheduler}
-//import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, 
TRANSACTION_STATE_TOPIC_NAME}
+import org.apache.kafka.snapshot.SnapshotWriter
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import 
org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, 
ListenerCollection}
 import org.apache.kafka.common.metrics.Metrics
@@ -44,13 +44,29 @@ import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTok
 import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
 import org.apache.kafka.common.{ClusterResource, Endpoint}
 import org.apache.kafka.metadata.{BrokerState, VersionRange}
-import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.raft.{RaftClient, RaftConfig}
 import org.apache.kafka.raft.RaftConfig.AddressSpec
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.ApiMessageAndVersion
 
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
+import scala.compat.java8.OptionConverters._
+
+
+class BrokerSnapshotWriterBuilder(raftClient: RaftClient[ApiMessageAndVersion])
+    extends SnapshotWriterBuilder {
+  override def build(committedOffset: Long,
+                     committedEpoch: Int,
+                     lastContainedLogTime: Long): 
SnapshotWriter[ApiMessageAndVersion] = {
+    raftClient.createSnapshot(committedOffset, committedEpoch, 
lastContainedLogTime).
+        asScala.getOrElse(
+      throw new RuntimeException("A snapshot already exists with " +
+        s"committedOffset=${committedOffset}, 
committedEpoch=${committedEpoch}, " +
+        s"lastContainedLogTime=${lastContainedLogTime}")
+    )
+  }
+}
 
 /**
  * A Kafka broker that runs in KRaft (Kafka Raft) mode.
@@ -129,6 +145,8 @@ class BrokerServer(
 
   val clusterId: String = metaProps.clusterId
 
+  var metadataSnapshotter: BrokerMetadataSnapshotter = null
+
   var metadataListener: BrokerMetadataListener = null
 
   var metadataPublisher: BrokerMetadataPublisher = null
@@ -271,7 +289,15 @@ class BrokerServer(
         ConfigType.Topic -> new TopicConfigHandler(logManager, config, 
quotaManagers, None),
         ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
 
-      metadataListener = new BrokerMetadataListener(config.nodeId, time, 
threadNamePrefix)
+      metadataSnapshotter = new BrokerMetadataSnapshotter(config.nodeId,
+                                                          time,
+                                                          threadNamePrefix,
+                                                          new 
BrokerSnapshotWriterBuilder(raftManager.client))
+      metadataListener = new BrokerMetadataListener(config.nodeId,
+                                                    time,
+                                                    threadNamePrefix,
+                                                    
config.metadataSnapshotMaxNewRecordBytes,
+                                                    metadataSnapshotter)
 
       val networkListeners = new ListenerCollection()
       config.advertisedListeners.foreach { ep =>
@@ -408,10 +434,12 @@ class BrokerServer(
       if (controlPlaneRequestProcessor != null)
         CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
       CoreUtils.swallow(authorizer.foreach(_.close()), this)
-
       if (metadataListener !=  null) {
         CoreUtils.swallow(metadataListener.close(), this)
       }
+      if (metadataSnapshotter !=  null) {
+        CoreUtils.swallow(metadataSnapshotter.close(), this)
+      }
       if (transactionCoordinator != null)
         CoreUtils.swallow(transactionCoordinator.shutdown(), this)
       if (groupCoordinator != null)
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 5a04219..9369384 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -37,7 +37,9 @@ object BrokerMetadataListener {
 class BrokerMetadataListener(
   val brokerId: Int,
   time: Time,
-  threadNamePrefix: Option[String]
+  threadNamePrefix: Option[String],
+  val maxBytesBetweenSnapshots: Long,
+  val snapshotter: MetadataSnapshotter
 ) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
   private val logContext = new LogContext(s"[BrokerMetadataListener 
id=${brokerId}] ")
   private val log = logContext.logger(classOf[BrokerMetadataListener])
@@ -56,7 +58,17 @@ class BrokerMetadataListener(
   /**
    * The highest metadata offset that we've seen.  Written only from the event 
queue thread.
    */
-  @volatile private var _highestMetadataOffset = -1L
+  @volatile var _highestMetadataOffset = -1L
+
+  /**
+   * The highest metadata log epoch that we've seen. Written only from the 
event queue thread.
+   */
+  private var _highestEpoch = -1
+
+  /**
+   * The highest metadata log time that we've seen. Written only from the 
event queue thread.
+   */
+  private var _highestTimestamp = -1L
 
   /**
    * The current broker metadata image. Accessed only from the event queue 
thread.
@@ -70,11 +82,18 @@ class BrokerMetadataListener(
 
   /**
    * The object to use to publish new metadata changes, or None if this 
listener has not
-   * been activated yet.
+   * been activated yet. Accessed only from the event queue thread.
    */
   private var _publisher: Option[MetadataPublisher] = None
 
   /**
+   * The number of bytes of records that we have read  since the last snapshot 
we took.
+   * This does not include records we read from a snapshot.
+   * Accessed only from the event queue thread.
+   */
+  private var _bytesSinceLastSnapshot: Long = 0L
+
+  /**
    * The event queue which runs this listener.
    */
   val eventQueue = new KafkaEventQueue(time, logContext, 
threadNamePrefix.getOrElse(""))
@@ -102,10 +121,23 @@ class BrokerMetadataListener(
       } finally {
         reader.close()
       }
-      maybePublish(results.highestMetadataOffset)
+      _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
+      _publisher.foreach(publish(_, results.highestMetadataOffset))
+      if (shouldSnapshot()) {
+        if (snapshotter.maybeStartSnapshot(results.highestMetadataOffset,
+                                           _highestEpoch,
+                                           _highestTimestamp,
+                                           _delta.apply())) {
+          _bytesSinceLastSnapshot = 0L
+        }
+      }
     }
   }
 
+  private def shouldSnapshot(): Boolean = {
+    _bytesSinceLastSnapshot >= maxBytesBetweenSnapshots
+  }
+
   /**
    * Handle metadata snapshots
    */
@@ -126,17 +158,18 @@ class BrokerMetadataListener(
       } finally {
         reader.close()
       }
-      maybePublish(results.highestMetadataOffset)
+      _publisher.foreach(publish(_, results.highestMetadataOffset))
     }
   }
 
   case class BatchLoadResults(numBatches: Int,
                               numRecords: Int,
                               elapsedUs: Long,
+                              numBytes: Long,
                               highestMetadataOffset: Long) {
     override def toString(): String = {
-      s"${numBatches} batch(es) with ${numRecords} record(s) ending at offset 
" +
-      s"${highestMetadataOffset} in ${elapsedUs} microseconds"
+      s"${numBatches} batch(es) with ${numRecords} record(s) in ${numBytes} 
bytes " +
+        s"ending at offset ${highestMetadataOffset} in ${elapsedUs} 
microseconds"
     }
   }
 
@@ -145,12 +178,12 @@ class BrokerMetadataListener(
     val startTimeNs = time.nanoseconds()
     var numBatches = 0
     var numRecords = 0
-    var newHighestMetadataOffset = _highestMetadataOffset
+    var batch: Batch[ApiMessageAndVersion] = null
+    var numBytes = 0L
     while (iterator.hasNext()) {
-      val batch = iterator.next()
+      batch = iterator.next()
       var index = 0
       batch.records().forEach { messageAndVersion =>
-        newHighestMetadataOffset = batch.lastOffset()
         if (isTraceEnabled) {
           trace("Metadata batch %d: processing [%d/%d]: 
%s.".format(batch.lastOffset, index + 1,
             batch.records().size(), messageAndVersion.message().toString()))
@@ -159,14 +192,22 @@ class BrokerMetadataListener(
         numRecords += 1
         index += 1
       }
+      numBytes = numBytes + batch.sizeInBytes()
       metadataBatchSizeHist.update(batch.records().size())
       numBatches = numBatches + 1
     }
-    _highestMetadataOffset = newHighestMetadataOffset
+    val newHighestMetadataOffset = if (batch == null) {
+      _highestMetadataOffset
+    } else {
+      _highestMetadataOffset = batch.lastOffset()
+      _highestEpoch = batch.epoch()
+      _highestTimestamp = batch.appendTimestamp()
+      batch.lastOffset()
+    }
     val endTimeNs = time.nanoseconds()
     val elapsedUs = TimeUnit.MICROSECONDS.convert(endTimeNs - startTimeNs, 
TimeUnit.NANOSECONDS)
     batchProcessingTimeHist.update(elapsedUs)
-    BatchLoadResults(numBatches, numRecords, elapsedUs, 
newHighestMetadataOffset)
+    BatchLoadResults(numBatches, numRecords, elapsedUs, numBytes, 
newHighestMetadataOffset)
   }
 
   def startPublishing(publisher: MetadataPublisher): CompletableFuture[Void] = 
{
@@ -183,28 +224,26 @@ class BrokerMetadataListener(
       _publisher = Some(publisher)
       log.info(s"Starting to publish metadata events at offset 
${_highestMetadataOffset}.")
       try {
-        maybePublish(_highestMetadataOffset)
+        publish(publisher, _highestMetadataOffset)
         future.complete(null)
       } catch {
-        case e: Throwable => future.completeExceptionally(e)
+        case e: Throwable =>
+          future.completeExceptionally(e)
+          throw e
       }
     }
   }
 
-  private def maybePublish(newHighestMetadataOffset: Long): Unit = {
-    _publisher match {
-      case None => // Nothing to do
-      case Some(publisher) => {
-        val delta = _delta
-        _image = _delta.apply()
-        _delta = new MetadataDelta(_image)
-        publisher.publish(newHighestMetadataOffset, delta, _image)
-      }
-    }
+  private def publish(publisher: MetadataPublisher,
+                      newHighestMetadataOffset: Long): Unit = {
+    val delta = _delta
+    _image = _delta.apply()
+    _delta = new MetadataDelta(_image)
+    publisher.publish(newHighestMetadataOffset, delta, _image)
   }
 
   override def handleLeaderChange(leaderAndEpoch: LeaderAndEpoch): Unit = {
-    // TODO: cache leaderAndEpoch so we can use the epoch in broker-initiated 
snapshots.
+    // Nothing to do.
   }
 
   override def beginShutdown(): Unit = {
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
new file mode 100644
index 0000000..db41e0c
--- /dev/null
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataSnapshotter.scala
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.metadata
+
+import java.util.concurrent.RejectedExecutionException
+
+import kafka.utils.Logging
+import org.apache.kafka.image.MetadataImage
+import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
+import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.snapshot.SnapshotWriter
+
+
+trait SnapshotWriterBuilder {
+  def build(committedOffset: Long,
+            committedEpoch: Int,
+            lastContainedLogTime: Long): SnapshotWriter[ApiMessageAndVersion]
+}
+
+class BrokerMetadataSnapshotter(
+  brokerId: Int,
+  val time: Time,
+  threadNamePrefix: Option[String],
+  writerBuilder: SnapshotWriterBuilder
+) extends Logging with MetadataSnapshotter {
+  private val logContext = new LogContext(s"[BrokerMetadataSnapshotter 
id=${brokerId}] ")
+  logIdent = logContext.logPrefix()
+
+  /**
+   * The offset of the snapshot in progress, or -1 if there isn't one. 
Accessed only under
+   * the object lock.
+   */
+  private var _currentSnapshotOffset = -1L
+
+  /**
+   * The event queue which runs this listener.
+   */
+  val eventQueue = new KafkaEventQueue(time, logContext, 
threadNamePrefix.getOrElse(""))
+
+  override def maybeStartSnapshot(committedOffset: Long,
+                                  committedEpoch: Int,
+                                  lastContainedLogTime: Long,
+                                  image: MetadataImage): Boolean = 
synchronized {
+    if (_currentSnapshotOffset == -1L) {
+      val writer = writerBuilder.build(committedOffset, committedEpoch, 
lastContainedLogTime)
+      _currentSnapshotOffset = committedOffset
+      info(s"Creating a new snapshot at offset ${committedOffset}...")
+      eventQueue.append(new CreateSnapshotEvent(image, writer))
+      true
+    } else {
+      warn(s"Declining to create a new snapshot at offset ${committedOffset} 
because " +
+           s"there is already a snapshot in progress at offset 
${_currentSnapshotOffset}")
+      false
+    }
+  }
+
+  class CreateSnapshotEvent(image: MetadataImage,
+                            writer: SnapshotWriter[ApiMessageAndVersion])
+        extends EventQueue.Event {
+    override def run(): Unit = {
+      try {
+        image.write(writer.append(_))
+        writer.freeze()
+      } finally {
+        try {
+          writer.close()
+        } finally {
+          BrokerMetadataSnapshotter.this.synchronized {
+            _currentSnapshotOffset = -1L
+          }
+        }
+      }
+    }
+
+    override def handleException(e: Throwable): Unit = {
+      e match {
+        case _: RejectedExecutionException => 
+          info("Not processing CreateSnapshotEvent because the event queue is 
closed.")
+        case _ => error("Unexpected error handling CreateSnapshotEvent", e)
+      }
+      writer.close()
+    }
+  }
+
+  def beginShutdown(): Unit = {
+    eventQueue.beginShutdown("beginShutdown", new ShutdownEvent())
+  }
+
+  class ShutdownEvent() extends EventQueue.Event {
+    override def run(): Unit = {
+    }
+  }
+
+  def close(): Unit = {
+    beginShutdown()
+    eventQueue.close()
+  }
+}
diff --git 
a/core/src/main/scala/kafka/server/metadata/MetadataSnapshotter.scala 
b/core/src/main/scala/kafka/server/metadata/MetadataSnapshotter.scala
new file mode 100644
index 0000000..9b377a6
--- /dev/null
+++ b/core/src/main/scala/kafka/server/metadata/MetadataSnapshotter.scala
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server.metadata
+
+import org.apache.kafka.image.MetadataImage
+
+
+/**
+ * Handles creating snapshots.
+ */
+trait MetadataSnapshotter {
+  /**
+   * If there is no other snapshot being written out, start writing out a 
snapshot.
+   *
+   * @param committedOffset       The highest metadata log offset of the 
snapshot.
+   * @param committedEpoch        The highest metadata log epoch of the 
snapshot.
+   * @param lastContainedLogTime  The highest time contained in the snapshot.
+   * @param image                 The metadata image to write out.
+   *
+   * @return                      True if we will write out a new snapshot; 
false otherwise.
+   */
+  def maybeStartSnapshot(committedOffset: Long,
+                         committedEpoch: Int,
+                         lastContainedLogTime: Long,
+                         image: MetadataImage): Boolean
+}
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index a19f67d..81a22e1 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -18,28 +18,31 @@
 package kafka.server.metadata
 
 import java.util
+import java.util.concurrent.atomic.AtomicReference
 import java.util.{Collections, Optional}
 
 import org.apache.kafka.common.{Endpoint, Uuid}
-import org.apache.kafka.common.metadata.RegisterBrokerRecord
+import org.apache.kafka.common.metadata.{PartitionChangeRecord, 
PartitionRecord, RegisterBrokerRecord, TopicRecord}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.image.{MetadataDelta, MetadataImage}
 import org.apache.kafka.metadata.{BrokerRegistration, RecordTestUtils, 
VersionRange}
 import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.Test
 
 
 class BrokerMetadataListenerTest {
   @Test
   def testCreateAndClose(): Unit = {
-    val listener = new BrokerMetadataListener(0, Time.SYSTEM, None)
+    val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000000L,
+      (_, _, _, _) => throw new UnsupportedOperationException())
     listener.close()
   }
 
   @Test
   def testPublish(): Unit = {
-    val listener = new BrokerMetadataListener(0, Time.SYSTEM, None)
+    val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000000L,
+      (_, _, _, _) => throw new UnsupportedOperationException())
     try {
       listener.handleCommit(RecordTestUtils.mockBatchReader(100L,
         util.Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
@@ -77,5 +80,124 @@ class BrokerMetadataListenerTest {
       listener.close()
     }
   }
+
+  class MockMetadataSnapshotter extends MetadataSnapshotter {
+    var image = MetadataImage.EMPTY
+    val failure = new AtomicReference[Throwable](null)
+    var activeSnapshotOffset = -1L
+    var prevCommittedOffset = -1L
+    var prevCommittedEpoch = -1
+    var prevLastContainedLogTime = -1L
+
+    override def maybeStartSnapshot(committedOffset: Long,
+                                    committedEpoch: Int,
+                                    lastContainedLogTime: Long,
+                                    newImage: MetadataImage): Boolean = {
+      try {
+        if (activeSnapshotOffset == -1L) {
+          assertTrue(prevCommittedOffset <= committedOffset)
+          assertTrue(prevCommittedEpoch <= committedEpoch)
+          assertTrue(prevLastContainedLogTime <= lastContainedLogTime)
+          prevCommittedOffset = committedOffset
+          prevCommittedEpoch = committedEpoch
+          prevLastContainedLogTime = lastContainedLogTime
+          image = newImage
+          activeSnapshotOffset = committedOffset
+          true
+        } else {
+          false
+        }
+      } catch {
+        case t: Throwable => failure.compareAndSet(null, t)
+      }
+    }
+  }
+
+  class MockMetadataPublisher extends MetadataPublisher {
+    var image = MetadataImage.EMPTY
+
+    override def publish(newHighestMetadataOffset: Long,
+                         delta: MetadataDelta,
+                         newImage: MetadataImage): Unit = {
+      image = newImage
+    }
+  }
+
+  private val FOO_ID = Uuid.fromString("jj1G9utnTuCegi_gpnRgYw")
+
+  private def generateManyRecords(listener: BrokerMetadataListener,
+                                  endOffset: Long): Unit = {
+    (0 to 10000).foreach { _ =>
+      listener.handleCommit(RecordTestUtils.mockBatchReader(endOffset,
+        util.Arrays.asList(new ApiMessageAndVersion(new 
PartitionChangeRecord().
+          setPartitionId(0).
+          setTopicId(FOO_ID).
+          setRemovingReplicas(Collections.singletonList(1)), 0.toShort),
+          new ApiMessageAndVersion(new PartitionChangeRecord().
+            setPartitionId(0).
+            setTopicId(FOO_ID).
+            setRemovingReplicas(Collections.emptyList()), 0.toShort))))
+    }
+    listener.getImageRecords().get()
+  }
+
+  @Test
+  def testCreateSnapshot(): Unit = {
+    val snapshotter = new MockMetadataSnapshotter()
+    val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000L, 
snapshotter)
+    try {
+      (0 to 3).foreach {
+        id => listener.handleCommit(RecordTestUtils.mockBatchReader(100L,
+            util.Arrays.asList(new ApiMessageAndVersion(new 
RegisterBrokerRecord().
+              setBrokerId(id).
+              setBrokerEpoch(100L).
+              setFenced(false).
+              setRack(null).
+              setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CS" + id)), 
1))))
+      }
+      listener.handleCommit(RecordTestUtils.mockBatchReader(200L,
+        util.Arrays.asList(new ApiMessageAndVersion(new TopicRecord().
+            setName("foo").
+            setTopicId(FOO_ID), 1.toShort),
+          new ApiMessageAndVersion(new PartitionRecord().
+            setPartitionId(0).
+            setTopicId(FOO_ID).
+            setIsr(util.Arrays.asList(0, 1, 2)).
+            setLeader(0).
+            setReplicas(util.Arrays.asList(0, 1, 2)).
+            setRemovingReplicas(util.Arrays.asList(0, 1, 2)).
+            setAddingReplicas(util.Arrays.asList(0, 1, 2)), 1.toShort))))
+      listener.getImageRecords().get()
+      assertEquals(200L, listener.highestMetadataOffset())
+
+      // Check that we generate at least one snapshot once we see enough 
records.
+      assertEquals(-1L, snapshotter.prevCommittedOffset)
+      generateManyRecords(listener, 1000L);
+      assertEquals(1000L, snapshotter.prevCommittedOffset)
+      assertEquals(1000L, snapshotter.activeSnapshotOffset)
+      snapshotter.activeSnapshotOffset = -1L
+
+      // Test creating a new snapshot after publishing it.
+      val publisher = new MockMetadataPublisher()
+      listener.startPublishing(publisher).get()
+      generateManyRecords(listener, 2000L);
+      listener.getImageRecords().get()
+      assertEquals(2000L, snapshotter.activeSnapshotOffset)
+      assertEquals(2000L, snapshotter.prevCommittedOffset)
+
+      // Test how we handle the snapshotter returning false.
+      generateManyRecords(listener, 3000L);
+      assertEquals(2000L, snapshotter.activeSnapshotOffset)
+      generateManyRecords(listener, 4000L);
+      assertEquals(2000L, snapshotter.activeSnapshotOffset)
+      snapshotter.activeSnapshotOffset = -1L
+      generateManyRecords(listener, 5000L);
+      assertEquals(5000L, snapshotter.activeSnapshotOffset)
+      assertEquals(null, snapshotter.failure.get())
+    } finally {
+      listener.close()
+    }
+  }
+
 }
 
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
new file mode 100644
index 0000000..b030e87
--- /dev/null
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataSnapshotterTest.scala
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import java.nio.ByteBuffer
+import java.util.Optional
+import java.util.concurrent.{CompletableFuture, CountDownLatch}
+
+import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.protocol.ByteBufferAccessor
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, MetadataImageTest}
+import org.apache.kafka.metadata.MetadataRecordSerde
+import org.apache.kafka.queue.EventQueue
+import org.apache.kafka.raft.OffsetAndEpoch
+import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.snapshot.{MockRawSnapshotWriter, SnapshotWriter}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
+import org.junit.jupiter.api.Test
+
+
+class BrokerMetadataSnapshotterTest {
+  @Test
+  def testCreateAndClose(): Unit = {
+    val snapshotter = new BrokerMetadataSnapshotter(0, Time.SYSTEM, None,
+      (_, _, _) => throw new RuntimeException("unimplemented"))
+    snapshotter.close()
+  }
+
+  class MockSnapshotWriterBuilder extends SnapshotWriterBuilder {
+    var image = new CompletableFuture[MetadataImage]
+
+    override def build(committedOffset: Long,
+                       committedEpoch: Int,
+                       lastContainedLogTime: Long): 
SnapshotWriter[ApiMessageAndVersion] = {
+      val offsetAndEpoch = new OffsetAndEpoch(committedOffset, committedEpoch)
+      SnapshotWriter.createWithHeader(
+        () => Optional.of(new MockRawSnapshotWriter(offsetAndEpoch, 
consumeSnapshotBuffer)),
+        1024,
+        MemoryPool.NONE,
+        Time.SYSTEM,
+        lastContainedLogTime,
+        CompressionType.NONE,
+        MetadataRecordSerde.INSTANCE
+      ).get();
+    }
+
+    def consumeSnapshotBuffer(buffer: ByteBuffer): Unit = {
+      val delta = new MetadataDelta(MetadataImage.EMPTY)
+      val memoryRecords = MemoryRecords.readableRecords(buffer)
+      val batchIterator = memoryRecords.batchIterator()
+      while (batchIterator.hasNext) {
+        val batch = batchIterator.next()
+        if (!batch.isControlBatch()) {
+          batch.forEach(record => {
+            val recordBuffer = record.value().duplicate()
+            val messageAndVersion = MetadataRecordSerde.INSTANCE.read(
+              new ByteBufferAccessor(recordBuffer), recordBuffer.remaining())
+            delta.replay(messageAndVersion.message())
+          })
+        }
+      }
+      image.complete(delta.apply())
+    }
+  }
+
+  class BlockingEvent extends EventQueue.Event {
+    val latch = new CountDownLatch(1)
+    override def run(): Unit = latch.await()
+  }
+
+  @Test
+  def testCreateSnapshot(): Unit = {
+    val writerBuilder = new MockSnapshotWriterBuilder()
+    val snapshotter = new BrokerMetadataSnapshotter(0, Time.SYSTEM, None, 
writerBuilder)
+    try {
+      val blockingEvent = new BlockingEvent()
+      snapshotter.eventQueue.append(blockingEvent)
+      assertTrue(snapshotter.maybeStartSnapshot(123L, 12, 10000L, 
MetadataImageTest.IMAGE1))
+      assertFalse(snapshotter.maybeStartSnapshot(124L, 12, 11000L, 
MetadataImageTest.IMAGE2))
+      blockingEvent.latch.countDown()
+      assertEquals(MetadataImageTest.IMAGE1, writerBuilder.image.get())
+    } finally {
+      snapshotter.close()
+    }
+  }
+}
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java 
b/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java
index 4a4d0ee..7964fed 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/MetadataRecordSerde.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.server.common.serialization.AbstractApiMessageSerde;
 
 public class MetadataRecordSerde extends AbstractApiMessageSerde {
+    public static final MetadataRecordSerde INSTANCE = new 
MetadataRecordSerde();
 
     @Override
     public ApiMessage apiMessageFor(short apiKey) {
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java 
b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
index 6d52d34..2ee05bb 100644
--- a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java
@@ -26,11 +26,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 
 @Timeout(value = 40)
 public class MetadataImageTest {
-    private final static MetadataImage IMAGE1;
+    public final static MetadataImage IMAGE1;
 
-    private final static MetadataDelta DELTA1;
+    public final static MetadataDelta DELTA1;
 
-    private final static MetadataImage IMAGE2;
+    public final static MetadataImage IMAGE2;
 
     static {
         IMAGE1 = new MetadataImage(FeaturesImageTest.IMAGE1,
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java 
b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
index ffc1777..7bedde3 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/RecordTestUtils.java
@@ -19,6 +19,7 @@ package org.apache.kafka.metadata;
 
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.Message;
+import org.apache.kafka.common.protocol.ObjectSerializationCache;
 import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
 import org.apache.kafka.raft.Batch;
 import org.apache.kafka.raft.BatchReader;
@@ -36,6 +37,7 @@ import java.util.List;
 import java.util.Set;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
 /**
@@ -161,9 +163,10 @@ public class RecordTestUtils {
         long offset = lastOffset - records.size() + 1;
         Iterator<ApiMessageAndVersion> iterator = records.iterator();
         List<ApiMessageAndVersion> curRecords = new ArrayList<>();
+        assertTrue(iterator.hasNext()); // At least one record is required
         while (true) {
             if (!iterator.hasNext() || curRecords.size() >= 2) {
-                batches.add(Batch.data(offset, 0, 0, 0, curRecords));
+                batches.add(Batch.data(offset, 0, 0, sizeInBytes(curRecords), 
curRecords));
                 if (!iterator.hasNext()) {
                     break;
                 }
@@ -174,4 +177,14 @@ public class RecordTestUtils {
         }
         return MemoryBatchReader.of(batches, __ -> { });
     }
+
+
+    private static int sizeInBytes(List<ApiMessageAndVersion> records) {
+        int size = 0;
+        for (ApiMessageAndVersion record : records) {
+            ObjectSerializationCache cache = new ObjectSerializationCache();
+            size += MetadataRecordSerde.INSTANCE.recordSize(record, cache);
+        }
+        return size;
+    }
 }

Reply via email to