jsancio commented on a change in pull request #10887:
URL: https://github.com/apache/kafka/pull/10887#discussion_r656432494



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -359,10 +359,14 @@ class BrokerServer(
       // Start other services that we've delayed starting, in the appropriate 
order.
       replicaManager.startup()
       replicaManager.startHighWatermarkCheckPointThread()
-      groupCoordinator.startup(() => 
metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME).
-        getOrElse(config.offsetsTopicPartitions))
-      transactionCoordinator.startup(() => 
metadataCache.numPartitions(Topic.TRANSACTION_STATE_TOPIC_NAME).
-        getOrElse(config.transactionTopicPartitions))
+      groupCoordinator.startup(() => {
+        val curPartitions = 
metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME)
+        if (curPartitions > 0) curPartitions else  
config.offsetsTopicPartitions
+      })
+      transactionCoordinator.startup(() => {
+        val curPartitions = 
metadataCache.numPartitions(Topic.TRANSACTION_STATE_TOPIC_NAME)
+        if (curPartitions > 0) curPartitions else  
config.transactionTopicPartitions
+      })

Review comment:
       For these two `startup` calls you can use this syntax:
   ```
   startup { () =>
   }
   ```

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1164,7 +1164,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     var unauthorizedForCreateTopics = Set[String]()
 
     if (authorizedTopics.nonEmpty) {
-      val nonExistingTopics = 
metadataCache.getNonExistingTopics(authorizedTopics)
+      val nonExistingTopics = 
authorizedTopics.filter(!metadataCache.contains(_))

Review comment:
       ```suggestion
         val nonExistingTopics = 
authorizedTopics.filterNot(metadataCache.contains)
   ```

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -151,17 +154,32 @@ class ReplicaManagerTest {
     TestUtils.assertNoNonDaemonThreads(this.getClass.getName)
   }
 
+  private def mockGetAliveBrokerFunctions(cache: MetadataCache, aliveBrokers: 
Seq[Node]): Unit = {
+    
Mockito.when(cache.hasAliveBroker(ArgumentMatchers.anyInt())).thenAnswer(new 
Answer[Boolean]() {
+      override def answer(invocation: InvocationOnMock): Boolean = {
+        
aliveBrokers.map(_.id()).contains(invocation.getArguments()(0).asInstanceOf[Int])
+      }
+    })
+    Mockito.when(cache.getAliveBrokerNode(ArgumentMatchers.anyInt(), 
ArgumentMatchers.any[String])).
+      thenAnswer(new Answer[Option[Node]]() {
+        override def answer(invocation: InvocationOnMock): Option[Node] = {
+          aliveBrokers.find(node => node.id == 
invocation.getArguments()(0).asInstanceOf[Integer])
+        }
+      })
+    
Mockito.when(cache.getAliveBrokerNodes(ArgumentMatchers.any[String])).thenReturn(aliveBrokers)
+  }
+
   @Test
   def testClearPurgatoryOnBecomingFollower(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+
+  val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)

Review comment:
       The indentation seems off.

##########
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##########
@@ -1352,8 +1370,7 @@ class ReplicaManagerTest {
       Optional.of(1))
     val fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, 
None, timeout = 10)
     assertNull(fetchResult.get)
-
-    Mockito.when(replicaManager.metadataCache.contains(tp0)).thenReturn(true)
+    
Mockito.when(replicaManager.metadataCache.contains(ArgumentMatchers.eq(tp0))).thenReturn(true)

Review comment:
       ```suggestion
       
Mockito.when(metadataCache.contains(ArgumentMatchers.eq(tp0))).thenReturn(true)
   ```

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2886,7 +2885,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       sendResponseCallback(error)(partitionErrors)
     } else {
       val partitions = if (electionRequest.data.topicPartitions == null) {
-        metadataCache.getAllPartitions()
+        
metadataCache.getAllTopics().flatMap(metadataCache.getTopicPartitions(_))

Review comment:
       ```suggestion
           
metadataCache.getAllTopics().flatMap(metadataCache.getTopicPartitions)
   ```

##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -62,17 +46,22 @@ trait MetadataCache {
 
   def getAllTopics(): collection.Set[String]
 
-  def getAllPartitions(): collection.Set[TopicPartition]
+  def getTopicPartitions(topicName: String): collection.Set[TopicPartition]
 
-  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+  def hasAliveBroker(brokerId: Int): Boolean
 
-  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+  def getAliveBrokers(): Iterable[BrokerMetadata]
 
-  def getAliveBrokers: collection.Seq[MetadataBroker]
+  def getAliveBrokerNode(brokerId: Int, listenerName: String): Option[Node]
+
+  def getAliveBrokerNodes(listenerName: String): Iterable[Node]
 
   def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
 
-  def numPartitions(topic: String): Option[Int]
+  /**
+   * Return the number of partitions in the given topic, or 0 if the given 
topic does not exist.
+   */
+  def numPartitions(topic: String): Int

Review comment:
       The nice thing about using `Option[Int]` is that it forces the caller to 
handle the `None` or `0` case. For example, in some cases the caller converts 
the `None` to `0`. In other cases it converts the `None` case to some 
configuration.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to