This is an automated email from the ASF dual-hosted git repository. ijuma pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push: new 27847e0c78a KAFKA-14571: Include rack info in ZkMetadataCache.getClusterMetadata (#13073) 27847e0c78a is described below commit 27847e0c78a43f51cdf37d12253e683738d630a2 Author: Edoardo Comar <eco...@uk.ibm.com> AuthorDate: Wed Jan 4 22:24:16 2023 +0000 KAFKA-14571: Include rack info in ZkMetadataCache.getClusterMetadata (#13073) ZkMetadataCache.getClusterMetadata returns a Cluster object where the aliveNodes were missing their rack info. Problem: when ZkMetadataCache updates the metadataSnapshot, includes the rack in aliveBrokers but not in aliveNodes Trivial fix with matching assertion in existing unit test. Note that the Cluster object returned from `MetadataCache.getClusterMetadata(...)` is passed to `ClientQuotaCallback.updateClusterMetadata(...)` so it is used, though not by clients, but by service providers. Reviewers: Ismael Juma <ism...@juma.me.uk> --- core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala | 2 +- core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala index d69785f90f6..235c15db674 100755 --- a/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala +++ b/core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala @@ -365,7 +365,7 @@ class ZkMetadataCache(brokerId: Int, metadataVersion: MetadataVersion, brokerFea broker.endpoints.forEach { ep => val listenerName = new ListenerName(ep.listener) endPoints += new EndPoint(ep.host, ep.port, listenerName, SecurityProtocol.forId(ep.securityProtocol)) - nodes.put(listenerName, new Node(broker.id, ep.host, ep.port)) + nodes.put(listenerName, new Node(broker.id, ep.host, ep.port, broker.rack())) } aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack)) aliveNodes(broker.id) = nodes.asScala diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index d92c76f7118..4d4df4243a6 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -596,7 +596,7 @@ class MetadataCacheTest { val brokers = Seq( new UpdateMetadataBroker() .setId(0) - .setRack("") + .setRack("r") .setEndpoints(Seq(new UpdateMetadataEndpoint() .setHost("foo") .setPort(9092) @@ -627,7 +627,7 @@ class MetadataCacheTest { brokers.asJava, Collections.emptyMap()).build() MetadataCacheTest.updateCache(cache, updateMetadataRequest) - val expectedNode0 = new Node(0, "foo", 9092) + val expectedNode0 = new Node(0, "foo", 9092, "r") val expectedNode1 = new Node(1, "", -1) val cluster = cache.getClusterMetadata("clusterId", listenerName)