mumrah commented on a change in pull request #10049:
URL: https://github.com/apache/kafka/pull/10049#discussion_r570451366



##########
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##########
@@ -483,61 +475,4 @@ class MetadataCacheTest {
     assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet)
   }
 
-  @Test
-  def testGetClusterMetadataWithOfflineReplicas(): Unit = {

Review comment:
       Hmm, I think this was an artifact of the merge, I'll restore this test

##########
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##########
@@ -483,61 +475,4 @@ class MetadataCacheTest {
     assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet)
   }
 
-  @Test
-  def testGetClusterMetadataWithOfflineReplicas(): Unit = {
-    val cache = new MetadataCache(1)
-    val topic = "topic"
-    val topicPartition = new TopicPartition(topic, 0)
-    val securityProtocol = SecurityProtocol.PLAINTEXT
-    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-
-    val brokers = Seq(
-      new UpdateMetadataBroker()
-        .setId(0)
-        .setRack("")
-        .setEndpoints(Seq(new UpdateMetadataEndpoint()
-          .setHost("foo")
-          .setPort(9092)
-          .setSecurityProtocol(securityProtocol.id)
-          .setListener(listenerName.value)).asJava),
-      new UpdateMetadataBroker()
-        .setId(1)
-        .setEndpoints(Seq.empty.asJava)
-    )
-    val controllerEpoch = 1
-    val leader = 1
-    val leaderEpoch = 0
-    val replicas = asList[Integer](0, 1)
-    val isr = asList[Integer](0, 1)
-    val offline = asList[Integer](1)
-    val partitionStates = Seq(new UpdateMetadataPartitionState()
-      .setTopicName(topic)
-      .setPartitionIndex(topicPartition.partition)
-      .setControllerEpoch(controllerEpoch)
-      .setLeader(leader)
-      .setLeaderEpoch(leaderEpoch)
-      .setIsr(isr)
-      .setZkVersion(3)
-      .setReplicas(replicas)
-      .setOfflineReplicas(offline))
-    val version = ApiKeys.UPDATE_METADATA.latestVersion
-    val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, 
controllerEpoch, brokerEpoch, partitionStates.asJava,
-      brokers.asJava, Collections.emptyMap()).build()
-    cache.updateMetadata(15, updateMetadataRequest)
-
-    val expectedNode0 = new Node(0, "foo", 9092)
-    val expectedNode1 = new Node(1, "", -1)
-
-    val cluster = cache.getClusterMetadata("clusterId", listenerName)

Review comment:
       Since we're looking up the cluster by listener name here, we don't see 
the offline broker in the MetadataImage because it's endpoints map is empty. 
   
   @hachikuji @cmccabe is a change in metadata behavior, or does this test have 
bad assumptions

##########
File path: core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
##########
@@ -483,61 +475,4 @@ class MetadataCacheTest {
     assertEquals(initialBrokerIds.toSet, aliveBrokersFromCache.map(_.id).toSet)
   }
 
-  @Test
-  def testGetClusterMetadataWithOfflineReplicas(): Unit = {
-    val cache = new MetadataCache(1)
-    val topic = "topic"
-    val topicPartition = new TopicPartition(topic, 0)
-    val securityProtocol = SecurityProtocol.PLAINTEXT
-    val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-
-    val brokers = Seq(
-      new UpdateMetadataBroker()
-        .setId(0)
-        .setRack("")
-        .setEndpoints(Seq(new UpdateMetadataEndpoint()
-          .setHost("foo")
-          .setPort(9092)
-          .setSecurityProtocol(securityProtocol.id)
-          .setListener(listenerName.value)).asJava),
-      new UpdateMetadataBroker()
-        .setId(1)
-        .setEndpoints(Seq.empty.asJava)
-    )
-    val controllerEpoch = 1
-    val leader = 1
-    val leaderEpoch = 0
-    val replicas = asList[Integer](0, 1)
-    val isr = asList[Integer](0, 1)
-    val offline = asList[Integer](1)
-    val partitionStates = Seq(new UpdateMetadataPartitionState()
-      .setTopicName(topic)
-      .setPartitionIndex(topicPartition.partition)
-      .setControllerEpoch(controllerEpoch)
-      .setLeader(leader)
-      .setLeaderEpoch(leaderEpoch)
-      .setIsr(isr)
-      .setZkVersion(3)
-      .setReplicas(replicas)
-      .setOfflineReplicas(offline))
-    val version = ApiKeys.UPDATE_METADATA.latestVersion
-    val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, 
controllerEpoch, brokerEpoch, partitionStates.asJava,
-      brokers.asJava, Collections.emptyMap()).build()
-    cache.updateMetadata(15, updateMetadataRequest)
-
-    val expectedNode0 = new Node(0, "foo", 9092)
-    val expectedNode1 = new Node(1, "", -1)
-
-    val cluster = cache.getClusterMetadata("clusterId", listenerName)

Review comment:
       Since we're looking up the cluster by listener name here, we don't see 
the offline broker in the MetadataImage because it's endpoints map is empty. 
This leads to `cluster.leaderFor` on L534 returning null
   
   @hachikuji @cmccabe is a change in metadata behavior, or does this test have 
bad assumptions




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to