cmccabe commented on a change in pull request #10931:
URL: https://github.com/apache/kafka/pull/10931#discussion_r663277869



##########
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -70,230 +58,188 @@ class BrokerMetadataListener(
    */
   @volatile private var _highestMetadataOffset = -1L
 
+  /**
+   * The current broker metadata image. Accessed only from the event queue 
thread.
+   */
+  private var _image = MetadataImage.EMPTY
+
+  /**
+   * The current metadata delta. Accessed only from the event queue thread.
+   */
+  private var _delta = new MetadataDelta(_image)
+
+  /**
+   * The object to use to publish new metadata changes, or None if this 
listener has not
+   * been activated yet.
+   */
+  private var _publisher: Option[MetadataPublisher] = None
+
+  /**
+   * The event queue which runs this listener.
+   */
   val eventQueue = new KafkaEventQueue(time, logContext, 
threadNamePrefix.getOrElse(""))
 
+  /**
+   * Returns the highest metadata-offset. Thread-safe.
+   */
   def highestMetadataOffset(): Long = _highestMetadataOffset
 
   /**
    * Handle new metadata records.
    */
-  override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit = 
{
+  override def handleCommit(reader: BatchReader[ApiMessageAndVersion]): Unit =
     eventQueue.append(new HandleCommitsEvent(reader))
+
+  class HandleCommitsEvent(reader: BatchReader[ApiMessageAndVersion])
+      extends EventQueue.FailureLoggingEvent(log) {
+    override def run(): Unit = {
+      val results = try {
+        val loadResults = loadBatches(_delta, reader)
+        if (isDebugEnabled) {
+          debug(s"Loaded new commits: ${loadResults}")
+        }
+        loadResults
+      } finally {
+        reader.close()
+      }
+      maybePublish(results.highestMetadataOffset)
+    }
   }
 
   /**
    * Handle metadata snapshots
    */
-  override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): 
Unit = {
-    // Loading snapshot on the broker is currently not supported.
-    reader.close();
-    throw new UnsupportedOperationException(s"Loading snapshot 
(${reader.snapshotId()}) is not supported")
-  }
-
-  // Visible for testing. It's useful to execute events synchronously in order
-  // to make tests deterministic. This object is responsible for closing the 
reader.
-  private[metadata] def execCommits(batchReader: 
BatchReader[ApiMessageAndVersion]): Unit = {
-    new HandleCommitsEvent(batchReader).run()
-  }
+  override def handleSnapshot(reader: SnapshotReader[ApiMessageAndVersion]): 
Unit =
+    eventQueue.append(new HandleSnapshotEvent(reader))
 
-  class HandleCommitsEvent(
-    reader: BatchReader[ApiMessageAndVersion]
-  ) extends EventQueue.FailureLoggingEvent(log) {
+  class HandleSnapshotEvent(reader: SnapshotReader[ApiMessageAndVersion])
+    extends EventQueue.FailureLoggingEvent(log) {
     override def run(): Unit = {
-      try {
-        while (reader.hasNext()) {
-          apply(reader.next())
-        }
+      val results = try {
+        info(s"Loading snapshot 
${reader.snapshotId().offset}-${reader.snapshotId().epoch}.")
+        _delta = new MetadataDelta(_image) // Discard any previous deltas.
+        val loadResults = loadBatches(_delta, reader)
+        _delta.finishSnapshot()

Review comment:
       Yeah, I thought about a few other names. Remove unreferenced? None of 
them seem great...
   
   I added some more JavaDoc to `MetadataDelta#finishSnapshot`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to