dengziming commented on code in PR #12265:
URL: https://github.com/apache/kafka/pull/12265#discussion_r895799683


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -244,6 +248,9 @@ class BrokerMetadataListener(
       _publisher = Some(publisher)
       log.info(s"Starting to publish metadata events at offset 
$highestMetadataOffset.")
       try {
+        if (shouldSnapshot()) {

Review Comment:
   1. Yes, a listener(here we refer to BrokerMetadataListener) will potentially 
receive a handleCommit or a handleSnapshot every time when it receives a 
metadata log change(for example, create a topic), when a broker startup, it 
will not start publishing until it gets all the metadata logs(I call this 
period "Catching up period" because a broker needs to connect to the controller 
and fetch all metadata logs).
   2. We used to create a snapshot if it detects a metadata version change or 
_bytesSinceLastSnapshot is bigger enough.
   3. But I changed the logic here to not create a snapshot in "Catching up 
period", so when starting publishing, which means the end of "Catching up 
period", we should inspect that is it necessary to create a snapshot.



##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -117,26 +117,30 @@ class BrokerMetadataListener(
       } finally {
         reader.close()
       }
-      _publisher.foreach(publish)
 
-      // If we detected a change in metadata.version, generate a local snapshot
-      val metadataVersionChanged = Option(_delta.featuresDelta()).exists { 
featuresDelta =>
-        featuresDelta.metadataVersionChange().isPresent
+      _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
+      if (_publisher.nonEmpty && shouldSnapshot()) {
+        maybeStartSnapshot()
       }
 
-      snapshotter.foreach { snapshotter =>
-        _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
-        if (shouldSnapshot() || metadataVersionChanged) {
-          if (snapshotter.maybeStartSnapshot(_highestTimestamp, 
_delta.apply())) {
-            _bytesSinceLastSnapshot = 0L
-          }
-        }
-      }
+      _publisher.foreach(publish)

Review Comment:
   I think changing the name is a little overkill here, we may do many other 
things besides resetting delta.



##########
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala:
##########
@@ -240,6 +244,40 @@ class BrokerMetadataListenerTest {
     }
   }
 
+  @Test
+  def testNotSnapshotBeforePublishing(): Unit = {
+    val snapshotter = new MockMetadataSnapshotter()
+    val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
+      maxBytesBetweenSnapshots = 1000L)
+
+    updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, 
MetadataVersion.latest.featureLevel(), 100L)
+    listener.getImageRecords().get()
+    assertEquals(-1L, snapshotter.activeSnapshotOffset, "We won't generate 
snapshot before starting publishing")
+  }
+
+  @Test
+  def testSnapshotWhenStarting(): Unit = {
+    val snapshotter = new MockMetadataSnapshotter()
+    val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
+      maxBytesBetweenSnapshots = 1000L)
+
+    updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, 
MetadataVersion.latest.featureLevel(), 100L)
+    listener.startPublishing(new MockMetadataPublisher()).get()
+    assertEquals(100L, snapshotter.activeSnapshotOffset, "We should try to 
generate snapshot when starting publishing")
+  }
+
+  @Test
+  def testSnapshotAfterMetadataVersionChange(): Unit = {

Review Comment:
   This seems tested by `testNotSnapshotBeforePublishing` or I forgot about it, 
I will recheck it later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to