divijvaidya commented on code in PR #12265:
URL: https://github.com/apache/kafka/pull/12265#discussion_r892101409


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -117,26 +117,30 @@ class BrokerMetadataListener(
       } finally {
         reader.close()
       }
-      _publisher.foreach(publish)
 
-      // If we detected a change in metadata.version, generate a local snapshot
-      val metadataVersionChanged = Option(_delta.featuresDelta()).exists { 
featuresDelta =>
-        featuresDelta.metadataVersionChange().isPresent
+      _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
+      if (_publisher.nonEmpty && shouldSnapshot()) {
+        maybeStartSnapshot()
       }
 
-      snapshotter.foreach { snapshotter =>
-        _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
-        if (shouldSnapshot() || metadataVersionChanged) {
-          if (snapshotter.maybeStartSnapshot(_highestTimestamp, 
_delta.apply())) {
-            _bytesSinceLastSnapshot = 0L
-          }
-        }
-      }
+      _publisher.foreach(publish)
     }
   }
 
   private def shouldSnapshot(): Boolean = {
-    _bytesSinceLastSnapshot >= maxBytesBetweenSnapshots
+    _bytesSinceLastSnapshot >= maxBytesBetweenSnapshots || 
metadataVersionChanged()

Review Comment:
   nit
   
   personally I prefer to use parentheses for explicit grouping so that reader 
doesn't have to guess the precedence order amongst binary operators. It also 
avoids inadvertent bugs.



##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -117,26 +117,30 @@ class BrokerMetadataListener(
       } finally {
         reader.close()
       }
-      _publisher.foreach(publish)
 
-      // If we detected a change in metadata.version, generate a local snapshot
-      val metadataVersionChanged = Option(_delta.featuresDelta()).exists { 
featuresDelta =>
-        featuresDelta.metadataVersionChange().isPresent
+      _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
+      if (_publisher.nonEmpty && shouldSnapshot()) {
+        maybeStartSnapshot()
       }
 
-      snapshotter.foreach { snapshotter =>
-        _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
-        if (shouldSnapshot() || metadataVersionChanged) {
-          if (snapshotter.maybeStartSnapshot(_highestTimestamp, 
_delta.apply())) {
-            _bytesSinceLastSnapshot = 0L
-          }
-        }
-      }
+      _publisher.foreach(publish)

Review Comment:
   could we rename `publish` to `publishAndResetDelta`? This will make it very 
implicit that the publish method resets the state set by loadBatches.



##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -244,6 +248,9 @@ class BrokerMetadataListener(
       _publisher = Some(publisher)
       log.info(s"Starting to publish metadata events at offset 
$highestMetadataOffset.")
       try {
+        if (shouldSnapshot()) {

Review Comment:
   For my better understanding here, please let me know if my assumptions are 
correct:
   
   A listener can potentially receive a `handleCommit` or a `handleSnapshot` 
even before it has started publishing. It is because there is a duration of 
time between when a metadataListener is registered and a metaListener starts 
publishing. Yes?
   
   In such cases (when listener has not started publishing) `handleCommit` will 
create a snapshot if it detects a metadata version change. 
   
   Then why do we need to create a snapshot on start publishing? Isn't it 
guaranteed that there would have been a `handleCommit` prior to this which 
would already have created the snapshot?



##########
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala:
##########
@@ -240,6 +244,40 @@ class BrokerMetadataListenerTest {
     }
   }
 
+  @Test
+  def testNotSnapshotBeforePublishing(): Unit = {
+    val snapshotter = new MockMetadataSnapshotter()
+    val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
+      maxBytesBetweenSnapshots = 1000L)
+
+    updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, 
MetadataVersion.latest.featureLevel(), 100L)
+    listener.getImageRecords().get()
+    assertEquals(-1L, snapshotter.activeSnapshotOffset, "We won't generate 
snapshot before starting publishing")
+  }
+
+  @Test
+  def testSnapshotWhenStarting(): Unit = {
+    val snapshotter = new MockMetadataSnapshotter()
+    val listener = newBrokerMetadataListener(snapshotter = Some(snapshotter),
+      maxBytesBetweenSnapshots = 1000L)
+
+    updateFeature(listener, feature = MetadataVersion.FEATURE_NAME, 
MetadataVersion.latest.featureLevel(), 100L)
+    listener.startPublishing(new MockMetadataPublisher()).get()
+    assertEquals(100L, snapshotter.activeSnapshotOffset, "We should try to 
generate snapshot when starting publishing")
+  }
+
+  @Test
+  def testSnapshotAfterMetadataVersionChange(): Unit = {

Review Comment:
   could we also add a negative test here which would have failed before we 
fixed the bug where publish was resetting _delta and hence we were doing a 
snapshot on every commit. Perhaps a test that validates that the snapshot is 
not occurring more than expected.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to