[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
cmccabe commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r660935556 ## File path: core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala ## @@ -0,0 +1,406 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util +import java.util.Collections +import java.util.concurrent.locks.ReentrantReadWriteLock + +import kafka.admin.BrokerMetadata + +import scala.collection.{Seq, Set, mutable} +import scala.jdk.CollectionConverters._ +import kafka.cluster.{Broker, EndPoint} +import kafka.api._ +import kafka.controller.StateChangeLogger +import kafka.utils.CoreUtils._ +import kafka.utils.Logging +import kafka.utils.Implicits._ +import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataPartitionState +import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic +import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} +import org.apache.kafka.common.security.auth.SecurityProtocol + +/** + * A cache for the state (e.g., current leader) of each partition. This cache is updated through + * UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously. + */ +class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging { Review comment: OK -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
cmccabe commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r660928423 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1721,8 +1720,10 @@ class ReplicaManager(val config: KafkaConfig, } else { // we do not need to check if the leader exists again since this has been done at the beginning of this process val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition => - val leader = metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get -.brokerEndPoint(config.interBrokerListenerName) + val leaderNode = metadataCache.getAliveBrokerNode( +partition.leaderReplicaIdOpt.getOrElse(-1), config.interBrokerListenerName.value()). Review comment: Hmm... `getAliveBrokerNode` returns `None` or `Some`, never `noNode`. Anyway, I don't feel strongly about this, so I just used the version you suggested. I suppose it's one less place where we use -1 as a sentinel value (although honestly, there are enough of these places that -1 will never be a valid node value in Kafka, I think.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
cmccabe commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r660928423 ## File path: core/src/main/scala/kafka/server/ReplicaManager.scala ## @@ -1721,8 +1720,10 @@ class ReplicaManager(val config: KafkaConfig, } else { // we do not need to check if the leader exists again since this has been done at the beginning of this process val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition => - val leader = metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get -.brokerEndPoint(config.interBrokerListenerName) + val leaderNode = metadataCache.getAliveBrokerNode( +partition.leaderReplicaIdOpt.getOrElse(-1), config.interBrokerListenerName.value()). Review comment: `getAliveBrokerNode` returns `None` when a broker ID that is not alive is passed. The conversion to `NoNode` happens in `ReplicaManager`, not in the metadata cache. I guess I don't feel strongly about it, though, if you want a different version of this...? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
cmccabe commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r660925631 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -62,17 +46,22 @@ trait MetadataCache { def getAllTopics(): collection.Set[String] - def getAllPartitions(): collection.Set[TopicPartition] + def getTopicPartitions(topicName: String): collection.Set[TopicPartition] - def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + def hasAliveBroker(brokerId: Int): Boolean - def getAliveBroker(brokerId: Int): Option[MetadataBroker] + def getAliveBrokers(): Iterable[BrokerMetadata] - def getAliveBrokers: collection.Seq[MetadataBroker] + def getAliveBrokerNode(brokerId: Int, listenerName: String): Option[Node] + + def getAliveBrokerNodes(listenerName: String): Iterable[Node] Review comment: Yes, let's just use `ListenerName`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
cmccabe commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r660911200 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -62,17 +46,22 @@ trait MetadataCache { def getAllTopics(): collection.Set[String] - def getAllPartitions(): collection.Set[TopicPartition] + def getTopicPartitions(topicName: String): collection.Set[TopicPartition] - def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + def hasAliveBroker(brokerId: Int): Boolean - def getAliveBroker(brokerId: Int): Option[MetadataBroker] + def getAliveBrokers(): Iterable[BrokerMetadata] - def getAliveBrokers: collection.Seq[MetadataBroker] + def getAliveBrokerNode(brokerId: Int, listenerName: String): Option[Node] + + def getAliveBrokerNodes(listenerName: String): Iterable[Node] def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] - def numPartitions(topic: String): Option[Int] + /** + * Return the number of partitions in the given topic, or 0 if the given topic does not exist. + */ + def numPartitions(topic: String): Int Review comment: OK, I will just convert it back to an `Option[Int]`, then. Edit: that is more consistent with some of the other functions that return `Option` as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
cmccabe commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r660911200 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -62,17 +46,22 @@ trait MetadataCache { def getAllTopics(): collection.Set[String] - def getAllPartitions(): collection.Set[TopicPartition] + def getTopicPartitions(topicName: String): collection.Set[TopicPartition] - def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + def hasAliveBroker(brokerId: Int): Boolean - def getAliveBroker(brokerId: Int): Option[MetadataBroker] + def getAliveBrokers(): Iterable[BrokerMetadata] - def getAliveBrokers: collection.Seq[MetadataBroker] + def getAliveBrokerNode(brokerId: Int, listenerName: String): Option[Node] + + def getAliveBrokerNodes(listenerName: String): Iterable[Node] def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] - def numPartitions(topic: String): Option[Int] + /** + * Return the number of partitions in the given topic, or 0 if the given topic does not exist. + */ + def numPartitions(topic: String): Int Review comment: OK, I will just convert it back to an `Option[Int]`, then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
cmccabe commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r657531976 ## File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala ## @@ -1352,8 +1370,7 @@ class ReplicaManagerTest { Optional.of(1)) val fetchResult = sendConsumerFetch(replicaManager, tp0, partitionData, None, timeout = 10) assertNull(fetchResult.get) - -Mockito.when(replicaManager.metadataCache.contains(tp0)).thenReturn(true) + Mockito.when(replicaManager.metadataCache.contains(ArgumentMatchers.eq(tp0))).thenReturn(true) Review comment: your proposed change doesn't compile because metadataCache is not defined in this scope -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
cmccabe commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r657531588 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -62,17 +46,22 @@ trait MetadataCache { def getAllTopics(): collection.Set[String] - def getAllPartitions(): collection.Set[TopicPartition] + def getTopicPartitions(topicName: String): collection.Set[TopicPartition] - def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + def hasAliveBroker(brokerId: Int): Boolean - def getAliveBroker(brokerId: Int): Option[MetadataBroker] + def getAliveBrokers(): Iterable[BrokerMetadata] - def getAliveBrokers: collection.Seq[MetadataBroker] + def getAliveBrokerNode(brokerId: Int, listenerName: String): Option[Node] + + def getAliveBrokerNodes(listenerName: String): Iterable[Node] def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] - def numPartitions(topic: String): Option[Int] + /** + * Return the number of partitions in the given topic, or 0 if the given topic does not exist. + */ + def numPartitions(topic: String): Int Review comment: It just seems awkward because typically when you ask "how many partitions?" the answer is not Some or None, but a number. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
cmccabe commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r657530812 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -2886,7 +2885,7 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponseCallback(error)(partitionErrors) } else { val partitions = if (electionRequest.data.topicPartitions == null) { -metadataCache.getAllPartitions() + metadataCache.getAllTopics().flatMap(metadataCache.getTopicPartitions(_)) Review comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #10887: MINOR: Refactor the MetadataCache interface
cmccabe commented on a change in pull request #10887: URL: https://github.com/apache/kafka/pull/10887#discussion_r657530674 ## File path: core/src/main/scala/kafka/server/KafkaApis.scala ## @@ -1164,7 +1164,7 @@ class KafkaApis(val requestChannel: RequestChannel, var unauthorizedForCreateTopics = Set[String]() if (authorizedTopics.nonEmpty) { - val nonExistingTopics = metadataCache.getNonExistingTopics(authorizedTopics) + val nonExistingTopics = authorizedTopics.filter(!metadataCache.contains(_)) Review comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org