jsancio commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r656432494
########## File path: core/src/main/scala/kafka/server/BrokerServer.scala ########## @@ -359,10 +359,14 @@ class BrokerServer( // Start other services that we've delayed starting, in the appropriate order. replicaManager.startup() replicaManager.startHighWatermarkCheckPointThread() - groupCoordinator.startup(() => metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME). - getOrElse(config.offsetsTopicPartitions)) - transactionCoordinator.startup(() => metadataCache.numPartitions(Topic.TRANSACTION_STATE_TOPIC_NAME). - getOrElse(config.transactionTopicPartitions)) + groupCoordinator.startup(() => { + val curPartitions = metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME) + if (curPartitions > 0) curPartitions else config.offsetsTopicPartitions + }) + transactionCoordinator.startup(() => { + val curPartitions = metadataCache.numPartitions(Topic.TRANSACTION_STATE_TOPIC_NAME) + if (curPartitions > 0) curPartitions else config.transactionTopicPartitions + }) Review comment: For these two `startup` calls you can use this syntax: ``` startup { () => } ``` ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -1164,7 +1164,7 @@ class KafkaApis(val requestChannel: RequestChannel, var unauthorizedForCreateTopics = Set[String]() if (authorizedTopics.nonEmpty) { - val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) + val nonExistingTopics = authorizedTopics.filter(!metadataCache.contains(_)) Review comment: ```suggestion val nonExistingTopics = authorizedTopics.filterNot(metadataCache.contains) ``` ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -151,17 +154,32 @@ class ReplicaManagerTest { TestUtils.assertNoNonDaemonThreads(this.getClass.getName) } + private def mockGetAliveBrokerFunctions(cache: MetadataCache, aliveBrokers: Seq[Node]): Unit = { + Mockito.when(cache.hasAliveBroker(ArgumentMatchers.anyInt())).thenAnswer(new Answer[Boolean]() { + override def answer(invocation: InvocationOnMock): Boolean = { + aliveBrokers.map(_.id()).contains(invocation.getArguments()(0).asInstanceOf[Int]) + } + }) + Mockito.when(cache.getAliveBrokerNode(ArgumentMatchers.anyInt(), ArgumentMatchers.any[String])). + thenAnswer(new Answer[Option[Node]]() { + override def answer(invocation: InvocationOnMock): Option[Node] = { + aliveBrokers.find(node => node.id == invocation.getArguments()(0).asInstanceOf[Integer]) + } + }) + Mockito.when(cache.getAliveBrokerNodes(ArgumentMatchers.any[String])).thenReturn(aliveBrokers) + } + @Test def testClearPurgatoryOnBecomingFollower(): Unit = { - val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) + + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) Review comment: The indentation seems off. ########## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ########## @@ -1352,8 +1370,7 @@ class ReplicaManagerTest { Optional.of(1)) val fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None, timeout = 10) assertNull(fetchResult.get) - - Mockito.when(replicaManager.metadataCache.contains(tp0)).thenReturn(true) + Mockito.when(replicaManager.metadataCache.contains(ArgumentMatchers.eq(tp0))).thenReturn(true) Review comment: ```suggestion Mockito.when(metadataCache.contains(ArgumentMatchers.eq(tp0))).thenReturn(true) ``` ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -2886,7 +2885,7 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseCallback(error)(partitionErrors) } else { val partitions = if (electionRequest.data.topicPartitions == null) { - metadataCache.getAllPartitions() + metadataCache.getAllTopics().flatMap(metadataCache.getTopicPartitions(_)) Review comment: ```suggestion metadataCache.getAllTopics().flatMap(metadataCache.getTopicPartitions) ``` ########## File path: core/src/main/scala/kafka/server/MetadataCache.scala ########## @@ -62,17 +46,22 @@ trait MetadataCache { def getAllTopics(): collection.Set[String] - def getAllPartitions(): collection.Set[TopicPartition] + def getTopicPartitions(topicName: String): collection.Set[TopicPartition] - def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + def hasAliveBroker(brokerId: Int): Boolean - def getAliveBroker(brokerId: Int): Option[MetadataBroker] + def getAliveBrokers(): Iterable[BrokerMetadata] - def getAliveBrokers: collection.Seq[MetadataBroker] + def getAliveBrokerNode(brokerId: Int, listenerName: String): Option[Node] + + def getAliveBrokerNodes(listenerName: String): Iterable[Node] def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] - def numPartitions(topic: String): Option[Int] + /** + * Return the number of partitions in the given topic, or 0 if the given topic does not exist. + */ + def numPartitions(topic: String): Int Review comment: The nice thing about using `Option[Int]` is that it forces the caller to handle the `None` or `0` case. For example, in some cases the caller converts the `None` to `0`. In other cases it converts the `None` case to some configuration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org