[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571253465 ## File path: core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala ## @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server.metadata + +import kafka.api.LeaderAndIsr +import kafka.controller.StateChangeLogger +import kafka.server.MetadataCache +import kafka.utils.CoreUtils.inLock +import kafka.utils.Logging +import org.apache.kafka.common.internals.Topic +import org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition, MetadataResponseTopic} +import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} +import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataPartitionState} +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} + +import java.util +import java.util.Collections +import java.util.concurrent.locks.ReentrantLock +import scala.collection.{Seq, Set, mutable} +import scala.jdk.CollectionConverters._ + +object RaftMetadataCache { + def removePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]], + topic: String, partitionId: Int): Boolean = { +partitionStates.get(topic).exists { infos => + infos.remove(partitionId) + if (infos.isEmpty) partitionStates.remove(topic) + true +} + } + + def addOrUpdatePartitionInfo(partitionStates: mutable.AnyRefMap[String, mutable.LongMap[UpdateMetadataPartitionState]], + topic: String, + partitionId: Int, + stateInfo: UpdateMetadataPartitionState): Unit = { +val infos = partitionStates.getOrElseUpdate(topic, mutable.LongMap.empty) +infos(partitionId) = stateInfo + } +} + + +class RaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging { + this.logIdent = s"[MetadataCache brokerId=$brokerId] " + + private val lock = new ReentrantLock() + + //this is the cache state. every MetadataImage instance is immutable, and updates (performed under a lock) + //replace the value with a completely new one. this means reads (which are not under any lock) need to grab + //the value of this var (into a val) ONCE and retain that read copy for the duration of their operation. + //multiple reads of this value risk getting different snapshots. + @volatile private var _currentImage: MetadataImage = new MetadataImage() + + private val stateChangeLogger = new StateChangeLogger(brokerId, inControllerContext = false, None) + + // This method is the main hotspot when it comes to the performance of metadata requests, + // we should be careful about adding additional logic here. Relatedly, `brokers` is + // `List[Integer]` instead of `List[Int]` to avoid a collection copy. + // filterUnavailableEndpoints exists to support v0 MetadataResponses + private def maybeFilterAliveReplicas(image: MetadataImage, + brokers: java.util.List[Integer], + listenerName: ListenerName, + filterUnavailableEndpoints: Boolean): java.util.List[Integer] = { +if (!filterUnavailableEndpoints) { + brokers +} else { + val res = new util.ArrayList[Integer](math.min(image.brokers.aliveBrokers().size, brokers.size)) + for (brokerId <- brokers.asScala) { +if (hasAliveEndpoint(image, brokerId, listenerName)) + res.add(brokerId) + } + res +} + } + + def currentImage(): MetadataImage = _currentImage + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener is missing on the broker. + // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing listener (Metadata response v5 and below). + private def getPartitionMetadata(image: MetadataImage, topic: St
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571173703 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition +import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol +trait MetadataCache { + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + def getTopicMetadata( +topics: collection.Set[String], +listenerName: ListenerName, +errorUnavailableEndpoints: Boolean = false, +errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic] + + def getAllTopics(): collection.Set[String] + + def getAllPartitions(): collection.Set[TopicPartition] + + def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + + def getAliveBroker(brokerId: Int): Option[MetadataBroker] + + def getAliveBrokers: collection.Seq[MetadataBroker] + + def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] + + def numPartitions(topic: String): Option[Int] + + // if the leader is not known, return None; + // if the leader is known and corresponding node is available, return Some(node) + // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE) + def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] + + def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] + + def getControllerId: Option[Int] + + def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster + + // This method returns the deleted TopicPartitions received from UpdateMetadataRequest + def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): collection.Seq[TopicPartition] + + def contains(topic: String): Boolean + + def contains(tp: TopicPartition): Boolean +} + +object MetadataCache { + def zkMetadataCache(brokerId: Int): ZkMetadataCache = { +new ZkMetadataCache(brokerId) + } + + def raftMetadataCache(brokerId: Int): RaftMetadataCache = { +new RaftMetadataCache(brokerId) + } +} + /** * A cache for the state (e.g., current leader) of each partition. This cache is updated through * UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously. */ -class MetadataCache(brokerId: Int) extends Logging { +class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging { Review comment: KAFKA-12299 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571173506 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition +import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol +trait MetadataCache { + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + def getTopicMetadata( +topics: collection.Set[String], +listenerName: ListenerName, +errorUnavailableEndpoints: Boolean = false, +errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic] + + def getAllTopics(): collection.Set[String] Review comment: KAFKA-12299 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571163776 ## File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ## @@ -485,7 +494,7 @@ class MetadataCacheTest { @Test Review comment: This test is failing in Raft mode. I'll investigate This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571163226 ## File path: core/src/main/scala/kafka/server/metadata/MetadataBrokers.scala ## @@ -37,7 +36,15 @@ object MetadataBroker { endPoint.name() -> new Node(record.brokerId, endPoint.host, endPoint.port, record.rack) }.toMap, - true) + fenced = true) + } + + def apply(broker: Broker): MetadataBroker = { +new MetadataBroker(broker.id, broker.rack.orNull, + broker.endPoints.map { endpoint => +endpoint.listenerName.value -> new Node(broker.id, endpoint.host, endpoint.port, broker.rack.orNull) + }.toMap, + fenced = false) Review comment: I wanted to go ahead and conform to MetadataBroker for both implementations. One side-effect is we are exposing the fenced flag to ZK-based clusters. I've set it to false here since we don't have any notion of broker fencing in the ZK-based metadata. Is there any harm in this? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571161027 ## File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ## @@ -16,20 +16,20 @@ */ package kafka.server +import org.apache.kafka.common.{Node, TopicPartition, Uuid} + import java.util -import java.util.Collections import util.Arrays.asList - -import org.apache.kafka.common.{Node, TopicPartition, Uuid} import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.UpdateMetadataRequest import org.apache.kafka.common.security.auth.SecurityProtocol -import org.junit.jupiter.api.Test import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test +import java.util.Collections import scala.jdk.CollectionConverters._ class MetadataCacheTest { Review comment: Yea easy enough. I added `@ParameterizedTest` to MetadataCacheTest. One is failing with the Raft metadata cache, so I left that for ZK-only now. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571145320 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition +import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol +trait MetadataCache { + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + def getTopicMetadata( +topics: collection.Set[String], +listenerName: ListenerName, +errorUnavailableEndpoints: Boolean = false, +errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic] + + def getAllTopics(): collection.Set[String] + + def getAllPartitions(): collection.Set[TopicPartition] + + def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + + def getAliveBroker(brokerId: Int): Option[MetadataBroker] + + def getAliveBrokers: collection.Seq[MetadataBroker] + + def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] + + def numPartitions(topic: String): Option[Int] + + // if the leader is not known, return None; + // if the leader is known and corresponding node is available, return Some(node) + // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE) + def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] + + def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] + + def getControllerId: Option[Int] + + def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster + + // This method returns the deleted TopicPartitions received from UpdateMetadataRequest + def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): collection.Seq[TopicPartition] + + def contains(topic: String): Boolean + + def contains(tp: TopicPartition): Boolean +} + +object MetadataCache { + def zkMetadataCache(brokerId: Int): ZkMetadataCache = { Review comment: I was thinking it might be easier to refactor in the future if we only need to rename the factory method rather than changing all the `new Class`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571140218 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition +import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol +trait MetadataCache { + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + def getTopicMetadata( +topics: collection.Set[String], +listenerName: ListenerName, +errorUnavailableEndpoints: Boolean = false, +errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic] + + def getAllTopics(): collection.Set[String] + + def getAllPartitions(): collection.Set[TopicPartition] + + def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + + def getAliveBroker(brokerId: Int): Option[MetadataBroker] + + def getAliveBrokers: collection.Seq[MetadataBroker] + + def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] + + def numPartitions(topic: String): Option[Int] + + // if the leader is not known, return None; + // if the leader is known and corresponding node is available, return Some(node) + // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE) + def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] Review comment: Yea, this just got pulled up from the class when I extracted the trait. I'll fix up these comments This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571139784 ## File path: core/src/main/scala/kafka/server/MetadataCache.scala ## @@ -34,16 +34,69 @@ import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, Uuid} import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic import org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition +import org.apache.kafka.common.message.{MetadataResponseData, UpdateMetadataRequestData} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.{MetadataResponse, UpdateMetadataRequest} import org.apache.kafka.common.security.auth.SecurityProtocol +trait MetadataCache { + + // errorUnavailableEndpoints exists to support v0 MetadataResponses + def getTopicMetadata( +topics: collection.Set[String], +listenerName: ListenerName, +errorUnavailableEndpoints: Boolean = false, +errorUnavailableListeners: Boolean = false): collection.Seq[MetadataResponseData.MetadataResponseTopic] + + def getAllTopics(): collection.Set[String] + + def getAllPartitions(): collection.Set[TopicPartition] + + def getNonExistingTopics(topics: collection.Set[String]): collection.Set[String] + + def getAliveBroker(brokerId: Int): Option[MetadataBroker] + + def getAliveBrokers: collection.Seq[MetadataBroker] + + def getPartitionInfo(topic: String, partitionId: Int): Option[UpdateMetadataRequestData.UpdateMetadataPartitionState] + + def numPartitions(topic: String): Option[Int] + + // if the leader is not known, return None; + // if the leader is known and corresponding node is available, return Some(node) + // if the leader is known but corresponding node with the listener name is not available, return Some(NO_NODE) + def getPartitionLeaderEndpoint(topic: String, partitionId: Int, listenerName: ListenerName): Option[Node] + + def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: ListenerName): Map[Int, Node] + + def getControllerId: Option[Int] + + def getClusterMetadata(clusterId: String, listenerName: ListenerName): Cluster + + // This method returns the deleted TopicPartitions received from UpdateMetadataRequest + def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): collection.Seq[TopicPartition] + + def contains(topic: String): Boolean + + def contains(tp: TopicPartition): Boolean +} + +object MetadataCache { + def zkMetadataCache(brokerId: Int): ZkMetadataCache = { +new ZkMetadataCache(brokerId) + } + + def raftMetadataCache(brokerId: Int): RaftMetadataCache = { +new RaftMetadataCache(brokerId) + } +} + /** * A cache for the state (e.g., current leader) of each partition. This cache is updated through * UpdateMetadataRequest from the controller. Every broker maintains the same cache, asynchronously. */ -class MetadataCache(brokerId: Int) extends Logging { +class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging { Review comment: I left the ZK implementation in place since it's really the only production implementation for now. It also reduces the size of the diff for this change. I don't feel very strongly about it either way, so I'm happy to relocate it to a separate file This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r571138962 ## File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ## @@ -16,20 +16,20 @@ */ package kafka.server +import org.apache.kafka.common.{Node, TopicPartition, Uuid} + import java.util -import java.util.Collections import util.Arrays.asList - -import org.apache.kafka.common.{Node, TopicPartition, Uuid} import org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker, UpdateMetadataEndpoint, UpdateMetadataPartitionState} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.UpdateMetadataRequest import org.apache.kafka.common.security.auth.SecurityProtocol -import org.junit.jupiter.api.Test import org.junit.jupiter.api.Assertions._ +import org.junit.jupiter.api.Test +import java.util.Collections import scala.jdk.CollectionConverters._ class MetadataCacheTest { Review comment: Yea, I think we can probably do some parameterization thing or possibly even use test templating similar to #9986 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r570451366 ## File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ## @@ -483,61 +475,4 @@ class MetadataCacheTest { assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet) } - @Test - def testGetClusterMetadataWithOfflineReplicas(): Unit = { Review comment: Hmm, I think this was an artifact of the merge, I'll restore this test ## File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ## @@ -483,61 +475,4 @@ class MetadataCacheTest { assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet) } - @Test - def testGetClusterMetadataWithOfflineReplicas(): Unit = { -val cache = new MetadataCache(1) -val topic = "topic" -val topicPartition = new TopicPartition(topic, 0) -val securityProtocol = SecurityProtocol.PLAINTEXT -val listenerName = ListenerName.forSecurityProtocol(securityProtocol) - -val brokers = Seq( - new UpdateMetadataBroker() -.setId(0) -.setRack("") -.setEndpoints(Seq(new UpdateMetadataEndpoint() - .setHost("foo") - .setPort(9092) - .setSecurityProtocol(securityProtocol.id) - .setListener(listenerName.value)).asJava), - new UpdateMetadataBroker() -.setId(1) -.setEndpoints(Seq.empty.asJava) -) -val controllerEpoch = 1 -val leader = 1 -val leaderEpoch = 0 -val replicas = asList[Integer](0, 1) -val isr = asList[Integer](0, 1) -val offline = asList[Integer](1) -val partitionStates = Seq(new UpdateMetadataPartitionState() - .setTopicName(topic) - .setPartitionIndex(topicPartition.partition) - .setControllerEpoch(controllerEpoch) - .setLeader(leader) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setZkVersion(3) - .setReplicas(replicas) - .setOfflineReplicas(offline)) -val version = ApiKeys.UPDATE_METADATA.latestVersion -val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, brokerEpoch, partitionStates.asJava, - brokers.asJava, Collections.emptyMap()).build() -cache.updateMetadata(15, updateMetadataRequest) - -val expectedNode0 = new Node(0, "foo", 9092) -val expectedNode1 = new Node(1, "", -1) - -val cluster = cache.getClusterMetadata("clusterId", listenerName) Review comment: Since we're looking up the cluster by listener name here, we don't see the offline broker in the MetadataImage because it's endpoints map is empty. @hachikuji @cmccabe is a change in metadata behavior, or does this test have bad assumptions ## File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ## @@ -483,61 +475,4 @@ class MetadataCacheTest { assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet) } - @Test - def testGetClusterMetadataWithOfflineReplicas(): Unit = { -val cache = new MetadataCache(1) -val topic = "topic" -val topicPartition = new TopicPartition(topic, 0) -val securityProtocol = SecurityProtocol.PLAINTEXT -val listenerName = ListenerName.forSecurityProtocol(securityProtocol) - -val brokers = Seq( - new UpdateMetadataBroker() -.setId(0) -.setRack("") -.setEndpoints(Seq(new UpdateMetadataEndpoint() - .setHost("foo") - .setPort(9092) - .setSecurityProtocol(securityProtocol.id) - .setListener(listenerName.value)).asJava), - new UpdateMetadataBroker() -.setId(1) -.setEndpoints(Seq.empty.asJava) -) -val controllerEpoch = 1 -val leader = 1 -val leaderEpoch = 0 -val replicas = asList[Integer](0, 1) -val isr = asList[Integer](0, 1) -val offline = asList[Integer](1) -val partitionStates = Seq(new UpdateMetadataPartitionState() - .setTopicName(topic) - .setPartitionIndex(topicPartition.partition) - .setControllerEpoch(controllerEpoch) - .setLeader(leader) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setZkVersion(3) - .setReplicas(replicas) - .setOfflineReplicas(offline)) -val version = ApiKeys.UPDATE_METADATA.latestVersion -val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, brokerEpoch, partitionStates.asJava, - brokers.asJava, Collections.emptyMap()).build() -cache.updateMetadata(15, updateMetadataRequest) - -val expectedNode0 = new Node(0, "foo", 9092) -val expectedNode1 = new Node(1, "", -1) - -val cluster = cache.getClusterMetadata("clusterId", listenerName) Review comment: Since we're looking up the cluster by listener name here, we don't see the offline broker in the MetadataImage because it's endpoints map is empty. This leads to `cluster.leaderFor` on L534 retur
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r570474971 ## File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ## @@ -483,61 +475,4 @@ class MetadataCacheTest { assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet) } - @Test - def testGetClusterMetadataWithOfflineReplicas(): Unit = { -val cache = new MetadataCache(1) -val topic = "topic" -val topicPartition = new TopicPartition(topic, 0) -val securityProtocol = SecurityProtocol.PLAINTEXT -val listenerName = ListenerName.forSecurityProtocol(securityProtocol) - -val brokers = Seq( - new UpdateMetadataBroker() -.setId(0) -.setRack("") -.setEndpoints(Seq(new UpdateMetadataEndpoint() - .setHost("foo") - .setPort(9092) - .setSecurityProtocol(securityProtocol.id) - .setListener(listenerName.value)).asJava), - new UpdateMetadataBroker() -.setId(1) -.setEndpoints(Seq.empty.asJava) -) -val controllerEpoch = 1 -val leader = 1 -val leaderEpoch = 0 -val replicas = asList[Integer](0, 1) -val isr = asList[Integer](0, 1) -val offline = asList[Integer](1) -val partitionStates = Seq(new UpdateMetadataPartitionState() - .setTopicName(topic) - .setPartitionIndex(topicPartition.partition) - .setControllerEpoch(controllerEpoch) - .setLeader(leader) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setZkVersion(3) - .setReplicas(replicas) - .setOfflineReplicas(offline)) -val version = ApiKeys.UPDATE_METADATA.latestVersion -val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, brokerEpoch, partitionStates.asJava, - brokers.asJava, Collections.emptyMap()).build() -cache.updateMetadata(15, updateMetadataRequest) - -val expectedNode0 = new Node(0, "foo", 9092) -val expectedNode1 = new Node(1, "", -1) - -val cluster = cache.getClusterMetadata("clusterId", listenerName) Review comment: Since we're looking up the cluster by listener name here, we don't see the offline broker in the MetadataImage because it's endpoints map is empty. This leads to `cluster.leaderFor` on L534 returning null @hachikuji @cmccabe is a change in metadata behavior, or does this test have bad assumptions This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r570474971 ## File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ## @@ -483,61 +475,4 @@ class MetadataCacheTest { assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet) } - @Test - def testGetClusterMetadataWithOfflineReplicas(): Unit = { -val cache = new MetadataCache(1) -val topic = "topic" -val topicPartition = new TopicPartition(topic, 0) -val securityProtocol = SecurityProtocol.PLAINTEXT -val listenerName = ListenerName.forSecurityProtocol(securityProtocol) - -val brokers = Seq( - new UpdateMetadataBroker() -.setId(0) -.setRack("") -.setEndpoints(Seq(new UpdateMetadataEndpoint() - .setHost("foo") - .setPort(9092) - .setSecurityProtocol(securityProtocol.id) - .setListener(listenerName.value)).asJava), - new UpdateMetadataBroker() -.setId(1) -.setEndpoints(Seq.empty.asJava) -) -val controllerEpoch = 1 -val leader = 1 -val leaderEpoch = 0 -val replicas = asList[Integer](0, 1) -val isr = asList[Integer](0, 1) -val offline = asList[Integer](1) -val partitionStates = Seq(new UpdateMetadataPartitionState() - .setTopicName(topic) - .setPartitionIndex(topicPartition.partition) - .setControllerEpoch(controllerEpoch) - .setLeader(leader) - .setLeaderEpoch(leaderEpoch) - .setIsr(isr) - .setZkVersion(3) - .setReplicas(replicas) - .setOfflineReplicas(offline)) -val version = ApiKeys.UPDATE_METADATA.latestVersion -val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, controllerEpoch, brokerEpoch, partitionStates.asJava, - brokers.asJava, Collections.emptyMap()).build() -cache.updateMetadata(15, updateMetadataRequest) - -val expectedNode0 = new Node(0, "foo", 9092) -val expectedNode1 = new Node(1, "", -1) - -val cluster = cache.getClusterMetadata("clusterId", listenerName) Review comment: Since we're looking up the cluster by listener name here, we don't see the offline broker in the MetadataImage because it's endpoints map is empty. @hachikuji @cmccabe is a change in metadata behavior, or does this test have bad assumptions This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata
mumrah commented on a change in pull request #10049: URL: https://github.com/apache/kafka/pull/10049#discussion_r570451366 ## File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala ## @@ -483,61 +475,4 @@ class MetadataCacheTest { assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet) } - @Test - def testGetClusterMetadataWithOfflineReplicas(): Unit = { Review comment: Hmm, I think this was an artifact of the merge, I'll restore this test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org