This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 3.4 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.4 by this push: new b857fdec873 KAFKA-14571: Include rack info in ZkMetadataCache.getClusterMetadata (#13073) b857fdec873 is described below commit b857fdec87392127cb177bb95fd224836f28701a Author: Edoardo Comar <eco...@uk.ibm.com> AuthorDate: Wed Jan 4 22:24:16 2023 +0000 KAFKA-14571: Include rack info in ZkMetadataCache.getClusterMetadata (#13073) ZkMetadataCache.getClusterMetadata returns a Cluster object where the aliveNodes were missing their rack info. Problem: when ZkMetadataCache updates the metadataSnapshot, includes the rack in aliveBrokers but not in aliveNodes Trivial fix with matching assertion in existing unit test. Note that the Cluster object returned from `MetadataCache.getClusterMetadata(...)` is passed to `ClientQuotaCallback.updateClusterMetadata(...)` so it is used, though not by clients, but by service providers. Reviewers: Ismael Juma <ism...@juma.me.uk> --- core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala | 2 +- core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala index d774cd41a5c..feaaf1c43f1 100755 --- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala @@ -398,7 +398,7 @@ class ZkMetadataCache( broker.endpoints.forEach { ep => val listenerName = new ListenerName(ep.listener) endPoints += new EndPoint(ep.host, ep.port, listenerName, SecurityProtocol.forId(ep.securityProtocol)) - nodes.put(listenerName, new Node(broker.id, ep.host, ep.port)) + nodes.put(listenerName, new Node(broker.id, ep.host, ep.port, broker.rack())) } aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack)) aliveNodes(broker.id) = nodes.asScala diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 7dadd5bf759..d2df68f6da9 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -595,7 +595,7 @@ class MetadataCacheTest { val brokers = Seq( new UpdateMetadataBroker() .setId(0) - .setRack("") + .setRack("r") .setEndpoints(Seq(new UpdateMetadataEndpoint() .setHost("foo") .setPort(9092) @@ -626,7 +626,7 @@ class MetadataCacheTest { brokers.asJava, Collections.emptyMap()).build() MetadataCacheTest.updateCache(cache, updateMetadataRequest) - val expectedNode0 = new Node(0, "foo", 9092) + val expectedNode0 = new Node(0, "foo", 9092, "r") val expectedNode1 = new Node(1, "", -1) val cluster = cache.getClusterMetadata("clusterId", listenerName)