[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571253465



##
File path: core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala
##
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.api.LeaderAndIsr
+import kafka.controller.StateChangeLogger
+import kafka.server.MetadataCache
+import kafka.utils.CoreUtils.inLock
+import kafka.utils.Logging
+import org.apache.kafka.common.internals.Topic
+import 
org.apache.kafka.common.message.MetadataResponseData.{MetadataResponsePartition,
 MetadataResponseTopic}
+import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
+import 
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
 UpdateMetadataPartitionState}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.locks.ReentrantLock
+import scala.collection.{Seq, Set, mutable}
+import scala.jdk.CollectionConverters._
+
+object RaftMetadataCache {
+  def removePartitionInfo(partitionStates: mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]],
+  topic: String, partitionId: Int): Boolean = {
+partitionStates.get(topic).exists { infos =>
+  infos.remove(partitionId)
+  if (infos.isEmpty) partitionStates.remove(topic)
+  true
+}
+  }
+
+  def addOrUpdatePartitionInfo(partitionStates: mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]],
+   topic: String,
+   partitionId: Int,
+   stateInfo: UpdateMetadataPartitionState): Unit 
= {
+val infos = partitionStates.getOrElseUpdate(topic, mutable.LongMap.empty)
+infos(partitionId) = stateInfo
+  }
+}
+
+
+class RaftMetadataCache(val brokerId: Int) extends MetadataCache with Logging {
+  this.logIdent = s"[MetadataCache brokerId=$brokerId] "
+
+  private val lock = new ReentrantLock()
+
+  //this is the cache state. every MetadataImage instance is immutable, and 
updates (performed under a lock)
+  //replace the value with a completely new one. this means reads (which are 
not under any lock) need to grab
+  //the value of this var (into a val) ONCE and retain that read copy for the 
duration of their operation.
+  //multiple reads of this value risk getting different snapshots.
+  @volatile private var _currentImage: MetadataImage = new MetadataImage()
+
+  private val stateChangeLogger = new StateChangeLogger(brokerId, 
inControllerContext = false, None)
+
+  // This method is the main hotspot when it comes to the performance of 
metadata requests,
+  // we should be careful about adding additional logic here. Relatedly, 
`brokers` is
+  // `List[Integer]` instead of `List[Int]` to avoid a collection copy.
+  // filterUnavailableEndpoints exists to support v0 MetadataResponses
+  private def maybeFilterAliveReplicas(image: MetadataImage,
+   brokers: java.util.List[Integer],
+   listenerName: ListenerName,
+   filterUnavailableEndpoints: Boolean): 
java.util.List[Integer] = {
+if (!filterUnavailableEndpoints) {
+  brokers
+} else {
+  val res = new 
util.ArrayList[Integer](math.min(image.brokers.aliveBrokers().size, 
brokers.size))
+  for (brokerId <- brokers.asScala) {
+if (hasAliveEndpoint(image, brokerId, listenerName))
+  res.add(brokerId)
+  }
+  res
+}
+  }
+
+  def currentImage(): MetadataImage = _currentImage
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  // If errorUnavailableListeners=true, return LISTENER_NOT_FOUND if listener 
is missing on the broker.
+  // Otherwise, return LEADER_NOT_AVAILABLE for broker unavailable and missing 
listener (Metadata response v5 and below).
+  private def getPartitionMetadata(image: MetadataImage, topic: St

[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571173703



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -34,16 +34,69 @@ import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+topics: collection.Set[String],
+listenerName: ListenerName,
+errorUnavailableEndpoints: Boolean = false,
+errorUnavailableListeners: Boolean = false): 
collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return 
Some(node)
+  // if the leader is known but corresponding node with the listener name is 
not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, 
listenerName: ListenerName): Option[Node]
+
+  def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: 
ListenerName): Map[Int, Node]
+
+  def getControllerId: Option[Int]
+
+  def getClusterMetadata(clusterId: String, listenerName: ListenerName): 
Cluster
+
+  // This method returns the deleted TopicPartitions received from 
UpdateMetadataRequest
+  def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): 
collection.Seq[TopicPartition]
+
+  def contains(topic: String): Boolean
+
+  def contains(tp: TopicPartition): Boolean
+}
+
+object MetadataCache {
+  def zkMetadataCache(brokerId: Int): ZkMetadataCache = {
+new ZkMetadataCache(brokerId)
+  }
+
+  def raftMetadataCache(brokerId: Int): RaftMetadataCache = {
+new RaftMetadataCache(brokerId)
+  }
+}
+
 /**
  *  A cache for the state (e.g., current leader) of each partition. This cache 
is updated through
  *  UpdateMetadataRequest from the controller. Every broker maintains the same 
cache, asynchronously.
  */
-class MetadataCache(brokerId: Int) extends Logging {
+class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging {

Review comment:
   KAFKA-12299





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571173506



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -34,16 +34,69 @@ import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+topics: collection.Set[String],
+listenerName: ListenerName,
+errorUnavailableEndpoints: Boolean = false,
+errorUnavailableListeners: Boolean = false): 
collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]

Review comment:
   KAFKA-12299





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571163776



##
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##
@@ -485,7 +494,7 @@ class MetadataCacheTest {
 
   @Test

Review comment:
   This test is failing in Raft mode. I'll investigate





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571163226



##
File path: core/src/main/scala/kafka/server/metadata/MetadataBrokers.scala
##
@@ -37,7 +36,15 @@ object MetadataBroker {
 endPoint.name() ->
   new Node(record.brokerId, endPoint.host, endPoint.port, record.rack)
   }.toMap,
-  true)
+  fenced = true)
+  }
+
+  def apply(broker: Broker): MetadataBroker = {
+new MetadataBroker(broker.id, broker.rack.orNull,
+  broker.endPoints.map { endpoint =>
+endpoint.listenerName.value -> new Node(broker.id, endpoint.host, 
endpoint.port, broker.rack.orNull)
+  }.toMap,
+  fenced = false)

Review comment:
   I wanted to go ahead and conform to MetadataBroker for both 
implementations. One side-effect is we are exposing the fenced flag to ZK-based 
clusters. I've set it to false here since we don't have any notion of broker 
fencing in the ZK-based metadata. Is there any harm in this? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571161027



##
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##
@@ -16,20 +16,20 @@
   */
 package kafka.server
 
+import org.apache.kafka.common.{Node, TopicPartition, Uuid}
+
 import java.util
-import java.util.Collections
 import util.Arrays.asList
-
-import org.apache.kafka.common.{Node, TopicPartition, Uuid}
 import 
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
 UpdateMetadataEndpoint, UpdateMetadataPartitionState}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.UpdateMetadataRequest
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
 
+import java.util.Collections
 import scala.jdk.CollectionConverters._
 
 class MetadataCacheTest {

Review comment:
   Yea easy enough. I added `@ParameterizedTest` to MetadataCacheTest. One 
is failing with the Raft metadata cache, so I left that for ZK-only now.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571145320



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -34,16 +34,69 @@ import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+topics: collection.Set[String],
+listenerName: ListenerName,
+errorUnavailableEndpoints: Boolean = false,
+errorUnavailableListeners: Boolean = false): 
collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return 
Some(node)
+  // if the leader is known but corresponding node with the listener name is 
not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, 
listenerName: ListenerName): Option[Node]
+
+  def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: 
ListenerName): Map[Int, Node]
+
+  def getControllerId: Option[Int]
+
+  def getClusterMetadata(clusterId: String, listenerName: ListenerName): 
Cluster
+
+  // This method returns the deleted TopicPartitions received from 
UpdateMetadataRequest
+  def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): 
collection.Seq[TopicPartition]
+
+  def contains(topic: String): Boolean
+
+  def contains(tp: TopicPartition): Boolean
+}
+
+object MetadataCache {
+  def zkMetadataCache(brokerId: Int): ZkMetadataCache = {

Review comment:
   I was thinking it might be easier to refactor in the future if we only 
need to rename the factory method rather than changing all the `new Class`. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571140218



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -34,16 +34,69 @@ import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+topics: collection.Set[String],
+listenerName: ListenerName,
+errorUnavailableEndpoints: Boolean = false,
+errorUnavailableListeners: Boolean = false): 
collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return 
Some(node)
+  // if the leader is known but corresponding node with the listener name is 
not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, 
listenerName: ListenerName): Option[Node]

Review comment:
   Yea, this just got pulled up from the class when I extracted the trait. 
I'll fix up these comments





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571139784



##
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##
@@ -34,16 +34,69 @@ import 
org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataP
 import org.apache.kafka.common.{Cluster, Node, PartitionInfo, TopicPartition, 
Uuid}
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponseTopic
 import 
org.apache.kafka.common.message.MetadataResponseData.MetadataResponsePartition
+import org.apache.kafka.common.message.{MetadataResponseData, 
UpdateMetadataRequestData}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.{MetadataResponse, 
UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 
+trait MetadataCache {
+
+  // errorUnavailableEndpoints exists to support v0 MetadataResponses
+  def getTopicMetadata(
+topics: collection.Set[String],
+listenerName: ListenerName,
+errorUnavailableEndpoints: Boolean = false,
+errorUnavailableListeners: Boolean = false): 
collection.Seq[MetadataResponseData.MetadataResponseTopic]
+
+  def getAllTopics(): collection.Set[String]
+
+  def getAllPartitions(): collection.Set[TopicPartition]
+
+  def getNonExistingTopics(topics: collection.Set[String]): 
collection.Set[String]
+
+  def getAliveBroker(brokerId: Int): Option[MetadataBroker]
+
+  def getAliveBrokers: collection.Seq[MetadataBroker]
+
+  def getPartitionInfo(topic: String, partitionId: Int): 
Option[UpdateMetadataRequestData.UpdateMetadataPartitionState]
+
+  def numPartitions(topic: String): Option[Int]
+
+  // if the leader is not known, return None;
+  // if the leader is known and corresponding node is available, return 
Some(node)
+  // if the leader is known but corresponding node with the listener name is 
not available, return Some(NO_NODE)
+  def getPartitionLeaderEndpoint(topic: String, partitionId: Int, 
listenerName: ListenerName): Option[Node]
+
+  def getPartitionReplicaEndpoints(tp: TopicPartition, listenerName: 
ListenerName): Map[Int, Node]
+
+  def getControllerId: Option[Int]
+
+  def getClusterMetadata(clusterId: String, listenerName: ListenerName): 
Cluster
+
+  // This method returns the deleted TopicPartitions received from 
UpdateMetadataRequest
+  def updateMetadata(correlationId: Int, request: UpdateMetadataRequest): 
collection.Seq[TopicPartition]
+
+  def contains(topic: String): Boolean
+
+  def contains(tp: TopicPartition): Boolean
+}
+
+object MetadataCache {
+  def zkMetadataCache(brokerId: Int): ZkMetadataCache = {
+new ZkMetadataCache(brokerId)
+  }
+
+  def raftMetadataCache(brokerId: Int): RaftMetadataCache = {
+new RaftMetadataCache(brokerId)
+  }
+}
+
 /**
  *  A cache for the state (e.g., current leader) of each partition. This cache 
is updated through
  *  UpdateMetadataRequest from the controller. Every broker maintains the same 
cache, asynchronously.
  */
-class MetadataCache(brokerId: Int) extends Logging {
+class ZkMetadataCache(brokerId: Int) extends MetadataCache with Logging {

Review comment:
   I left the ZK implementation in place since it's really the only 
production implementation for now. It also reduces the size of the diff for 
this change. I don't feel very strongly about it either way, so I'm happy to 
relocate it to a separate file





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r571138962



##
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##
@@ -16,20 +16,20 @@
   */
 package kafka.server
 
+import org.apache.kafka.common.{Node, TopicPartition, Uuid}
+
 import java.util
-import java.util.Collections
 import util.Arrays.asList
-
-import org.apache.kafka.common.{Node, TopicPartition, Uuid}
 import 
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
 UpdateMetadataEndpoint, UpdateMetadataPartitionState}
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record.RecordBatch
 import org.apache.kafka.common.requests.UpdateMetadataRequest
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
 
+import java.util.Collections
 import scala.jdk.CollectionConverters._
 
 class MetadataCacheTest {

Review comment:
   Yea, I think we can probably do some parameterization thing or possibly 
even use test templating similar to #9986





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-05 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r570451366



##
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##
@@ -483,61 +475,4 @@ class MetadataCacheTest {
 assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet)
   }
 
-  @Test
-  def testGetClusterMetadataWithOfflineReplicas(): Unit = {

Review comment:
   Hmm, I think this was an artifact of the merge, I'll restore this test

##
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##
@@ -483,61 +475,4 @@ class MetadataCacheTest {
 assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet)
   }
 
-  @Test
-  def testGetClusterMetadataWithOfflineReplicas(): Unit = {
-val cache = new MetadataCache(1)
-val topic = "topic"
-val topicPartition = new TopicPartition(topic, 0)
-val securityProtocol = SecurityProtocol.PLAINTEXT
-val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-
-val brokers = Seq(
-  new UpdateMetadataBroker()
-.setId(0)
-.setRack("")
-.setEndpoints(Seq(new UpdateMetadataEndpoint()
-  .setHost("foo")
-  .setPort(9092)
-  .setSecurityProtocol(securityProtocol.id)
-  .setListener(listenerName.value)).asJava),
-  new UpdateMetadataBroker()
-.setId(1)
-.setEndpoints(Seq.empty.asJava)
-)
-val controllerEpoch = 1
-val leader = 1
-val leaderEpoch = 0
-val replicas = asList[Integer](0, 1)
-val isr = asList[Integer](0, 1)
-val offline = asList[Integer](1)
-val partitionStates = Seq(new UpdateMetadataPartitionState()
-  .setTopicName(topic)
-  .setPartitionIndex(topicPartition.partition)
-  .setControllerEpoch(controllerEpoch)
-  .setLeader(leader)
-  .setLeaderEpoch(leaderEpoch)
-  .setIsr(isr)
-  .setZkVersion(3)
-  .setReplicas(replicas)
-  .setOfflineReplicas(offline))
-val version = ApiKeys.UPDATE_METADATA.latestVersion
-val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, 
controllerEpoch, brokerEpoch, partitionStates.asJava,
-  brokers.asJava, Collections.emptyMap()).build()
-cache.updateMetadata(15, updateMetadataRequest)
-
-val expectedNode0 = new Node(0, "foo", 9092)
-val expectedNode1 = new Node(1, "", -1)
-
-val cluster = cache.getClusterMetadata("clusterId", listenerName)

Review comment:
   Since we're looking up the cluster by listener name here, we don't see 
the offline broker in the MetadataImage because it's endpoints map is empty. 
   
   @hachikuji @cmccabe is a change in metadata behavior, or does this test have 
bad assumptions

##
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##
@@ -483,61 +475,4 @@ class MetadataCacheTest {
 assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet)
   }
 
-  @Test
-  def testGetClusterMetadataWithOfflineReplicas(): Unit = {
-val cache = new MetadataCache(1)
-val topic = "topic"
-val topicPartition = new TopicPartition(topic, 0)
-val securityProtocol = SecurityProtocol.PLAINTEXT
-val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-
-val brokers = Seq(
-  new UpdateMetadataBroker()
-.setId(0)
-.setRack("")
-.setEndpoints(Seq(new UpdateMetadataEndpoint()
-  .setHost("foo")
-  .setPort(9092)
-  .setSecurityProtocol(securityProtocol.id)
-  .setListener(listenerName.value)).asJava),
-  new UpdateMetadataBroker()
-.setId(1)
-.setEndpoints(Seq.empty.asJava)
-)
-val controllerEpoch = 1
-val leader = 1
-val leaderEpoch = 0
-val replicas = asList[Integer](0, 1)
-val isr = asList[Integer](0, 1)
-val offline = asList[Integer](1)
-val partitionStates = Seq(new UpdateMetadataPartitionState()
-  .setTopicName(topic)
-  .setPartitionIndex(topicPartition.partition)
-  .setControllerEpoch(controllerEpoch)
-  .setLeader(leader)
-  .setLeaderEpoch(leaderEpoch)
-  .setIsr(isr)
-  .setZkVersion(3)
-  .setReplicas(replicas)
-  .setOfflineReplicas(offline))
-val version = ApiKeys.UPDATE_METADATA.latestVersion
-val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, 
controllerEpoch, brokerEpoch, partitionStates.asJava,
-  brokers.asJava, Collections.emptyMap()).build()
-cache.updateMetadata(15, updateMetadataRequest)
-
-val expectedNode0 = new Node(0, "foo", 9092)
-val expectedNode1 = new Node(1, "", -1)
-
-val cluster = cache.getClusterMetadata("clusterId", listenerName)

Review comment:
   Since we're looking up the cluster by listener name here, we don't see 
the offline broker in the MetadataImage because it's endpoints map is empty. 
This leads to `cluster.leaderFor` on L534 retur

[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-04 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r570474971



##
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##
@@ -483,61 +475,4 @@ class MetadataCacheTest {
 assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet)
   }
 
-  @Test
-  def testGetClusterMetadataWithOfflineReplicas(): Unit = {
-val cache = new MetadataCache(1)
-val topic = "topic"
-val topicPartition = new TopicPartition(topic, 0)
-val securityProtocol = SecurityProtocol.PLAINTEXT
-val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-
-val brokers = Seq(
-  new UpdateMetadataBroker()
-.setId(0)
-.setRack("")
-.setEndpoints(Seq(new UpdateMetadataEndpoint()
-  .setHost("foo")
-  .setPort(9092)
-  .setSecurityProtocol(securityProtocol.id)
-  .setListener(listenerName.value)).asJava),
-  new UpdateMetadataBroker()
-.setId(1)
-.setEndpoints(Seq.empty.asJava)
-)
-val controllerEpoch = 1
-val leader = 1
-val leaderEpoch = 0
-val replicas = asList[Integer](0, 1)
-val isr = asList[Integer](0, 1)
-val offline = asList[Integer](1)
-val partitionStates = Seq(new UpdateMetadataPartitionState()
-  .setTopicName(topic)
-  .setPartitionIndex(topicPartition.partition)
-  .setControllerEpoch(controllerEpoch)
-  .setLeader(leader)
-  .setLeaderEpoch(leaderEpoch)
-  .setIsr(isr)
-  .setZkVersion(3)
-  .setReplicas(replicas)
-  .setOfflineReplicas(offline))
-val version = ApiKeys.UPDATE_METADATA.latestVersion
-val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, 
controllerEpoch, brokerEpoch, partitionStates.asJava,
-  brokers.asJava, Collections.emptyMap()).build()
-cache.updateMetadata(15, updateMetadataRequest)
-
-val expectedNode0 = new Node(0, "foo", 9092)
-val expectedNode1 = new Node(1, "", -1)
-
-val cluster = cache.getClusterMetadata("clusterId", listenerName)

Review comment:
   Since we're looking up the cluster by listener name here, we don't see 
the offline broker in the MetadataImage because it's endpoints map is empty. 
This leads to `cluster.leaderFor` on L534 returning null
   
   @hachikuji @cmccabe is a change in metadata behavior, or does this test have 
bad assumptions





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-04 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r570474971



##
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##
@@ -483,61 +475,4 @@ class MetadataCacheTest {
 assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet)
   }
 
-  @Test
-  def testGetClusterMetadataWithOfflineReplicas(): Unit = {
-val cache = new MetadataCache(1)
-val topic = "topic"
-val topicPartition = new TopicPartition(topic, 0)
-val securityProtocol = SecurityProtocol.PLAINTEXT
-val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-
-val brokers = Seq(
-  new UpdateMetadataBroker()
-.setId(0)
-.setRack("")
-.setEndpoints(Seq(new UpdateMetadataEndpoint()
-  .setHost("foo")
-  .setPort(9092)
-  .setSecurityProtocol(securityProtocol.id)
-  .setListener(listenerName.value)).asJava),
-  new UpdateMetadataBroker()
-.setId(1)
-.setEndpoints(Seq.empty.asJava)
-)
-val controllerEpoch = 1
-val leader = 1
-val leaderEpoch = 0
-val replicas = asList[Integer](0, 1)
-val isr = asList[Integer](0, 1)
-val offline = asList[Integer](1)
-val partitionStates = Seq(new UpdateMetadataPartitionState()
-  .setTopicName(topic)
-  .setPartitionIndex(topicPartition.partition)
-  .setControllerEpoch(controllerEpoch)
-  .setLeader(leader)
-  .setLeaderEpoch(leaderEpoch)
-  .setIsr(isr)
-  .setZkVersion(3)
-  .setReplicas(replicas)
-  .setOfflineReplicas(offline))
-val version = ApiKeys.UPDATE_METADATA.latestVersion
-val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, 
controllerEpoch, brokerEpoch, partitionStates.asJava,
-  brokers.asJava, Collections.emptyMap()).build()
-cache.updateMetadata(15, updateMetadataRequest)
-
-val expectedNode0 = new Node(0, "foo", 9092)
-val expectedNode1 = new Node(1, "", -1)
-
-val cluster = cache.getClusterMetadata("clusterId", listenerName)

Review comment:
   Since we're looking up the cluster by listener name here, we don't see 
the offline broker in the MetadataImage because it's endpoints map is empty. 
   
   @hachikuji @cmccabe is a change in metadata behavior, or does this test have 
bad assumptions





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #10049: Refactor MetadataCache for Raft metadata

2021-02-04 Thread GitBox


mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r570451366



##
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##
@@ -483,61 +475,4 @@ class MetadataCacheTest {
 assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet)
   }
 
-  @Test
-  def testGetClusterMetadataWithOfflineReplicas(): Unit = {

Review comment:
   Hmm, I think this was an artifact of the merge, I'll restore this test





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org