This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 27847e0c78a KAFKA-14571: Include rack info in 
ZkMetadataCache.getClusterMetadata (#13073)
27847e0c78a is described below

commit 27847e0c78a43f51cdf37d12253e683738d630a2
Author: Edoardo Comar <eco...@uk.ibm.com>
AuthorDate: Wed Jan 4 22:24:16 2023 +0000

    KAFKA-14571: Include rack info in ZkMetadataCache.getClusterMetadata 
(#13073)
    
    ZkMetadataCache.getClusterMetadata returns a Cluster object where the 
aliveNodes were
    missing their rack info.
    
    Problem: when ZkMetadataCache updates the metadataSnapshot, includes the 
rack in
    aliveBrokers but not in aliveNodes
    
    Trivial fix with matching assertion in existing unit test.
    
    Note that the Cluster object returned from 
`MetadataCache.getClusterMetadata(...)`
    is passed to `ClientQuotaCallback.updateClusterMetadata(...)`
    so it is used, though not by clients, but by service providers.
    
    Reviewers: Ismael Juma <ism...@juma.me.uk>
---
 core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala | 2 +-
 core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala   | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala 
b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
index d69785f90f6..235c15db674 100755
--- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
+++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala
@@ -365,7 +365,7 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: 
MetadataVersion, brokerFea
         broker.endpoints.forEach { ep =>
           val listenerName = new ListenerName(ep.listener)
           endPoints += new EndPoint(ep.host, ep.port, listenerName, 
SecurityProtocol.forId(ep.securityProtocol))
-          nodes.put(listenerName, new Node(broker.id, ep.host, ep.port))
+          nodes.put(listenerName, new Node(broker.id, ep.host, ep.port, 
broker.rack()))
         }
         aliveBrokers(broker.id) = Broker(broker.id, endPoints, 
Option(broker.rack))
         aliveNodes(broker.id) = nodes.asScala
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index d92c76f7118..4d4df4243a6 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -596,7 +596,7 @@ class MetadataCacheTest {
     val brokers = Seq(
       new UpdateMetadataBroker()
         .setId(0)
-        .setRack("")
+        .setRack("r")
         .setEndpoints(Seq(new UpdateMetadataEndpoint()
           .setHost("foo")
           .setPort(9092)
@@ -627,7 +627,7 @@ class MetadataCacheTest {
       brokers.asJava, Collections.emptyMap()).build()
     MetadataCacheTest.updateCache(cache, updateMetadataRequest)
 
-    val expectedNode0 = new Node(0, "foo", 9092)
+    val expectedNode0 = new Node(0, "foo", 9092, "r")
     val expectedNode1 = new Node(1, "", -1)
 
     val cluster = cache.getClusterMetadata("clusterId", listenerName)

Reply via email to