dengziming commented on code in PR #12265: URL: https://github.com/apache/kafka/pull/12265#discussion_r895799683
########## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ########## @@ -244,6 +248,9 @@ class BrokerMetadataListener( _publisher = Some(publisher) log.info(s"Starting to publish metadata events at offset $highestMetadataOffset.") try { + if (shouldSnapshot()) { Review Comment: 1. Yes, a listener(here we refer to BrokerMetadataListener) will potentially receive a handleCommit or a handleSnapshot every time when it receives a metadata log change(for example, create a topic), when a broker startup, it will not start publishing until it gets all the metadata logs(I call this period "Catching up period" because a broker needs to connect to the controller and fetch all metadata logs). 2. We used to create a snapshot if it detects a metadata version change or _bytesSinceLastSnapshot is bigger enough. 3. But I changed the logic here to not create a snapshot in "Catching up period", so when starting publishing, which means the end of "Catching up period", we should inspect that is it necessary to create a snapshot. ########## core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala: ########## @@ -117,26 +117,30 @@ class BrokerMetadataListener( } finally { reader.close() } - _publisher.foreach(publish) - // If we detected a change in metadata.version, generate a local snapshot - val metadataVersionChanged = Option(_delta.featuresDelta()).exists { featuresDelta => - featuresDelta.metadataVersionChange().isPresent + _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes + if (_publisher.nonEmpty && shouldSnapshot()) { + maybeStartSnapshot() } - snapshotter.foreach { snapshotter => - _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes - if (shouldSnapshot() || metadataVersionChanged) { - if (snapshotter.maybeStartSnapshot(_highestTimestamp, _delta.apply())) { - _bytesSinceLastSnapshot = 0L - } - } - } + _publisher.foreach(publish) Review Comment: I think changing the name is a little overkill here, we may do many other things besides resetting delta. ########## core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala: ########## @@ -240,6 +244,40 @@ class BrokerMetadataListenerTest { } } + @Test + def testNotSnapshotBeforePublishing(): Unit = { + val snapshotter = new MockMetadataSnapshotter() + val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter), + maxBytesBetweenSnapshots = 1000L) + + updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, MetadataVersion.latest.featureLevel(), 100L) + listener.getImageRecords().get() + assertEquals(-1L, snapshotter.activeSnapshotOffset, "We won't generate snapshot before starting publishing") + } + + @Test + def testSnapshotWhenStarting(): Unit = { + val snapshotter = new MockMetadataSnapshotter() + val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter), + maxBytesBetweenSnapshots = 1000L) + + updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, MetadataVersion.latest.featureLevel(), 100L) + listener.startPublishing(new MockMetadataPublisher()).get() + assertEquals(100L, snapshotter.activeSnapshotOffset, "We should try to generate snapshot when starting publishing") + } + + @Test + def testSnapshotAfterMetadataVersionChange(): Unit = { Review Comment: This seems tested by `testNotSnapshotBeforePublishing` or I forgot about it, I will recheck it later. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org