This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5ef962b  KAFKA-13056; Do not rely on broker for snapshots if 
controller is co-resident (#11013)
5ef962b is described below

commit 5ef962ba064efd5e073310aa39ab6a9b6ae8b7c3
Author: Jason Gustafson <[email protected]>
AuthorDate: Sat Jul 10 10:47:46 2021 -0700

    KAFKA-13056; Do not rely on broker for snapshots if controller is 
co-resident (#11013)
    
    When a node is serving as both broker and controller, we should only rely 
on the controller to write new snapshots.
    
    Reviewers: Luke Chen <[email protected]>, Colin P. McCabe 
<[email protected]>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |  22 +++--
 .../server/metadata/BrokerMetadataListener.scala   |  19 ++--
 .../metadata/BrokerMetadataListenerTest.scala      | 101 ++++++++++++++-------
 3 files changed, 94 insertions(+), 48 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index a29e759..16856f8 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -31,6 +31,7 @@ import kafka.metrics.KafkaYammerMetrics
 import kafka.network.SocketServer
 import kafka.raft.RaftManager
 import kafka.security.CredentialProvider
+import kafka.server.KafkaRaftServer.ControllerRole
 import kafka.server.metadata.{BrokerMetadataListener, BrokerMetadataPublisher, 
BrokerMetadataSnapshotter, ClientQuotaMetadataManager, KRaftMetadataCache, 
SnapshotWriterBuilder}
 import kafka.utils.{CoreUtils, KafkaScheduler}
 import org.apache.kafka.snapshot.SnapshotWriter
@@ -145,7 +146,7 @@ class BrokerServer(
 
   val clusterId: String = metaProps.clusterId
 
-  var metadataSnapshotter: BrokerMetadataSnapshotter = null
+  var metadataSnapshotter: Option[BrokerMetadataSnapshotter] = None
 
   var metadataListener: BrokerMetadataListener = null
 
@@ -289,10 +290,16 @@ class BrokerServer(
         ConfigType.Topic -> new TopicConfigHandler(logManager, config, 
quotaManagers, None),
         ConfigType.Broker -> new BrokerConfigHandler(config, quotaManagers))
 
-      metadataSnapshotter = new BrokerMetadataSnapshotter(config.nodeId,
-                                                          time,
-                                                          threadNamePrefix,
-                                                          new 
BrokerSnapshotWriterBuilder(raftManager.client))
+      if (!config.processRoles.contains(ControllerRole)) {
+        // If no controller is defined, we rely on the broker to generate 
snapshots.
+        metadataSnapshotter = Some(new BrokerMetadataSnapshotter(
+          config.nodeId,
+          time,
+          threadNamePrefix,
+          new BrokerSnapshotWriterBuilder(raftManager.client)
+        ))
+      }
+
       metadataListener = new BrokerMetadataListener(config.nodeId,
                                                     time,
                                                     threadNamePrefix,
@@ -437,9 +444,8 @@ class BrokerServer(
       if (metadataListener !=  null) {
         CoreUtils.swallow(metadataListener.close(), this)
       }
-      if (metadataSnapshotter !=  null) {
-        CoreUtils.swallow(metadataSnapshotter.close(), this)
-      }
+      metadataSnapshotter.foreach(snapshotter => 
CoreUtils.swallow(snapshotter.close(), this))
+
       if (transactionCoordinator != null)
         CoreUtils.swallow(transactionCoordinator.shutdown(), this)
       if (groupCoordinator != null)
diff --git 
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala 
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 9369384..c4c027a 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -39,7 +39,7 @@ class BrokerMetadataListener(
   time: Time,
   threadNamePrefix: Option[String],
   val maxBytesBetweenSnapshots: Long,
-  val snapshotter: MetadataSnapshotter
+  val snapshotter: Option[MetadataSnapshotter]
 ) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
   private val logContext = new LogContext(s"[BrokerMetadataListener 
id=${brokerId}] ")
   private val log = logContext.logger(classOf[BrokerMetadataListener])
@@ -121,14 +121,17 @@ class BrokerMetadataListener(
       } finally {
         reader.close()
       }
-      _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
       _publisher.foreach(publish(_, results.highestMetadataOffset))
-      if (shouldSnapshot()) {
-        if (snapshotter.maybeStartSnapshot(results.highestMetadataOffset,
-                                           _highestEpoch,
-                                           _highestTimestamp,
-                                           _delta.apply())) {
-          _bytesSinceLastSnapshot = 0L
+
+      snapshotter.foreach { snapshotter =>
+        _bytesSinceLastSnapshot = _bytesSinceLastSnapshot + results.numBytes
+        if (shouldSnapshot()) {
+          if (snapshotter.maybeStartSnapshot(results.highestMetadataOffset,
+            _highestEpoch,
+            _highestTimestamp,
+            _delta.apply())) {
+            _bytesSinceLastSnapshot = 0L
+          }
         }
       }
     }
diff --git 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index 81a22e1..735c779 100644
--- 
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -30,19 +30,20 @@ import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.Test
 
+import scala.jdk.CollectionConverters._
 
 class BrokerMetadataListenerTest {
   @Test
   def testCreateAndClose(): Unit = {
     val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000000L,
-      (_, _, _, _) => throw new UnsupportedOperationException())
+      snapshotter = None)
     listener.close()
   }
 
   @Test
   def testPublish(): Unit = {
     val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000000L,
-      (_, _, _, _) => throw new UnsupportedOperationException())
+      snapshotter = None)
     try {
       listener.handleCommit(RecordTestUtils.mockBatchReader(100L,
         util.Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
@@ -50,7 +51,7 @@ class BrokerMetadataListenerTest {
           setBrokerEpoch(100L).
           setFenced(false).
           setRack(null).
-          setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg")), 1))));
+          setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CSg")), 
0.toShort))))
       val imageRecords = listener.getImageRecords().get()
       assertEquals(0, imageRecords.size())
       assertEquals(100L, listener.highestMetadataOffset())
@@ -60,7 +61,7 @@ class BrokerMetadataListenerTest {
           setBrokerEpoch(200L).
           setFenced(true).
           setRack(null).
-          setIncarnationId(Uuid.fromString("QkOQtNKVTYatADcaJ28xDg")), 1))));
+          setIncarnationId(Uuid.fromString("QkOQtNKVTYatADcaJ28xDg")), 
0.toShort))))
       listener.startPublishing(new MetadataPublisher {
         override def publish(newHighestMetadataOffset: Long,
                              delta: MetadataDelta,
@@ -142,37 +143,39 @@ class BrokerMetadataListenerTest {
   }
 
   @Test
+  def testHandleCommitsWithNoSnapshotterDefined(): Unit = {
+    val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000L,
+      snapshotter = None)
+    try {
+      val brokerIds = 0 to 3
+
+      registerBrokers(listener, brokerIds, endOffset = 100L)
+      createTopicWithOnePartition(listener, replicas = brokerIds, endOffset = 
200L)
+      listener.getImageRecords().get()
+      assertEquals(200L, listener.highestMetadataOffset())
+
+      generateManyRecords(listener, endOffset = 1000L)
+      assertEquals(1000L, listener.highestMetadataOffset())
+    } finally {
+      listener.close()
+    }
+  }
+
+  @Test
   def testCreateSnapshot(): Unit = {
     val snapshotter = new MockMetadataSnapshotter()
-    val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000L, 
snapshotter)
+    val listener = new BrokerMetadataListener(0, Time.SYSTEM, None, 1000L, 
Some(snapshotter))
     try {
-      (0 to 3).foreach {
-        id => listener.handleCommit(RecordTestUtils.mockBatchReader(100L,
-            util.Arrays.asList(new ApiMessageAndVersion(new 
RegisterBrokerRecord().
-              setBrokerId(id).
-              setBrokerEpoch(100L).
-              setFenced(false).
-              setRack(null).
-              setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CS" + id)), 
1))))
-      }
-      listener.handleCommit(RecordTestUtils.mockBatchReader(200L,
-        util.Arrays.asList(new ApiMessageAndVersion(new TopicRecord().
-            setName("foo").
-            setTopicId(FOO_ID), 1.toShort),
-          new ApiMessageAndVersion(new PartitionRecord().
-            setPartitionId(0).
-            setTopicId(FOO_ID).
-            setIsr(util.Arrays.asList(0, 1, 2)).
-            setLeader(0).
-            setReplicas(util.Arrays.asList(0, 1, 2)).
-            setRemovingReplicas(util.Arrays.asList(0, 1, 2)).
-            setAddingReplicas(util.Arrays.asList(0, 1, 2)), 1.toShort))))
+      val brokerIds = 0 to 3
+
+      registerBrokers(listener, brokerIds, endOffset = 100L)
+      createTopicWithOnePartition(listener, replicas = brokerIds, endOffset = 
200L)
       listener.getImageRecords().get()
       assertEquals(200L, listener.highestMetadataOffset())
 
       // Check that we generate at least one snapshot once we see enough 
records.
       assertEquals(-1L, snapshotter.prevCommittedOffset)
-      generateManyRecords(listener, 1000L);
+      generateManyRecords(listener, 1000L)
       assertEquals(1000L, snapshotter.prevCommittedOffset)
       assertEquals(1000L, snapshotter.activeSnapshotOffset)
       snapshotter.activeSnapshotOffset = -1L
@@ -180,18 +183,18 @@ class BrokerMetadataListenerTest {
       // Test creating a new snapshot after publishing it.
       val publisher = new MockMetadataPublisher()
       listener.startPublishing(publisher).get()
-      generateManyRecords(listener, 2000L);
+      generateManyRecords(listener, 2000L)
       listener.getImageRecords().get()
       assertEquals(2000L, snapshotter.activeSnapshotOffset)
       assertEquals(2000L, snapshotter.prevCommittedOffset)
 
       // Test how we handle the snapshotter returning false.
-      generateManyRecords(listener, 3000L);
+      generateManyRecords(listener, 3000L)
       assertEquals(2000L, snapshotter.activeSnapshotOffset)
-      generateManyRecords(listener, 4000L);
+      generateManyRecords(listener, 4000L)
       assertEquals(2000L, snapshotter.activeSnapshotOffset)
       snapshotter.activeSnapshotOffset = -1L
-      generateManyRecords(listener, 5000L);
+      generateManyRecords(listener, 5000L)
       assertEquals(5000L, snapshotter.activeSnapshotOffset)
       assertEquals(null, snapshotter.failure.get())
     } finally {
@@ -199,5 +202,39 @@ class BrokerMetadataListenerTest {
     }
   }
 
-}
+  private def registerBrokers(
+    listener: BrokerMetadataListener,
+    brokerIds: Iterable[Int],
+    endOffset: Long
+  ): Unit = {
+    brokerIds.foreach { brokerId =>
+      listener.handleCommit(RecordTestUtils.mockBatchReader(endOffset,
+        util.Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
+          setBrokerId(brokerId).
+          setBrokerEpoch(100L).
+          setFenced(false).
+          setRack(null).
+          setIncarnationId(Uuid.fromString("GFBwlTcpQUuLYQ2ig05CS" + 
brokerId)), 0.toShort))))
+    }
+  }
 
+  private def createTopicWithOnePartition(
+    listener: BrokerMetadataListener,
+    replicas: Seq[Int],
+    endOffset: Long
+  ): Unit = {
+    listener.handleCommit(RecordTestUtils.mockBatchReader(endOffset,
+      util.Arrays.asList(
+        new ApiMessageAndVersion(new TopicRecord().
+          setName("foo").
+          setTopicId(FOO_ID), 0.toShort),
+        new ApiMessageAndVersion(new PartitionRecord().
+          setPartitionId(0).
+          setTopicId(FOO_ID).
+          setIsr(replicas.map(Int.box).asJava).
+          setLeader(0).
+          setReplicas(replicas.map(Int.box).asJava), 0.toShort)))
+    )
+  }
+
+}

Reply via email to