This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 9425acb6b41 KAFKA-18578: Remove `UpdateMetadataRequest` from 
`MetadataCacheTest` (#18628)
9425acb6b41 is described below

commit 9425acb6b41dada0b991d495017cea895cc82dc8
Author: TaiJuWu <[email protected]>
AuthorDate: Mon Jan 20 04:18:43 2025 +0800

    KAFKA-18578: Remove `UpdateMetadataRequest` from `MetadataCacheTest` 
(#18628)
    
    Reviewers: Ismael Juma <[email protected]>
---
 .../unit/kafka/server/MetadataCacheTest.scala      | 649 ++++++++-------------
 1 file changed, 238 insertions(+), 411 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala 
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 9f91790619e..669210f8fcc 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -16,19 +16,17 @@
   */
 package kafka.server
 
-import kafka.server.metadata.{KRaftMetadataCache}
+import kafka.server.metadata.KRaftMetadataCache
 import 
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition
-import 
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
 UpdateMetadataEndpoint, UpdateMetadataPartitionState, UpdateMetadataTopicState}
 import org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint, 
BrokerEndpointCollection}
 import org.apache.kafka.common.metadata._
 import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
+import org.apache.kafka.common.protocol.{ApiMessage, Errors}
 import org.apache.kafka.common.record.RecordBatch
-import org.apache.kafka.common.requests.{AbstractControlRequest, 
UpdateMetadataRequest}
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.{DirectoryId, Uuid}
-import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage, 
MetadataProvenance}
-import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage, 
MetadataProvenance}
+import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.server.common.KRaftVersion
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
@@ -54,7 +52,7 @@ object MetadataCacheTest {
         val partialImage = new MetadataImage(
           new MetadataProvenance(100L, 10, 1000L, true),
           image.features(),
-          ClusterImage.EMPTY,
+          image.cluster(),
           image.topics(),
           image.configs(),
           image.clientQuotas(),
@@ -69,85 +67,6 @@ object MetadataCacheTest {
       case _ => throw new RuntimeException("Unsupported cache type")
     }
   }
-
-  def updateCache(cache: MetadataCache, request: UpdateMetadataRequest, 
records: Seq[ApiMessage] = List()): Unit = {
-    cache match {
-      case c: KRaftMetadataCache => {
-        // UpdateMetadataRequest always contains a full list of brokers, but 
may contain
-        // a partial list of partitions. Therefore, base our delta off a 
partial image that
-        // contains no brokers, but which contains the previous partitions.
-        val image = c.currentImage()
-        val partialImage = new MetadataImage(
-          new MetadataProvenance(100L, 10, 1000L, true),
-          image.features(),
-          ClusterImage.EMPTY,
-          image.topics(),
-          image.configs(),
-          image.clientQuotas(),
-          image.producerIds(),
-          image.acls(),
-          image.scram(),
-          image.delegationTokens())
-        val delta = new MetadataDelta.Builder().setImage(partialImage).build()
-
-        def toRecord(broker: UpdateMetadataBroker): RegisterBrokerRecord = {
-          val endpoints = new BrokerEndpointCollection()
-          broker.endpoints().forEach { e =>
-            endpoints.add(new BrokerEndpoint().
-              setName(e.listener()).
-              setHost(e.host()).
-              setPort(e.port()).
-              setSecurityProtocol(e.securityProtocol()))
-          }
-          val prevBroker = Option(image.cluster().broker(broker.id()))
-          // UpdateMetadataRequest doesn't contain all the broker registration 
fields, so get
-          // them from the previous registration if available.
-          val (epoch, incarnationId, fenced) = prevBroker match {
-            case None => (0L, Uuid.ZERO_UUID, false)
-            case Some(b) => (b.epoch(), b.incarnationId(), b.fenced())
-          }
-          new RegisterBrokerRecord().
-            setBrokerId(broker.id()).
-            setBrokerEpoch(epoch).
-            setIncarnationId(incarnationId).
-            setEndPoints(endpoints).
-            setRack(broker.rack()).
-            setFenced(fenced)
-        }
-        request.liveBrokers().iterator().asScala.foreach { brokerInfo =>
-          delta.replay(toRecord(brokerInfo))
-        }
-
-        def toRecords(topic: UpdateMetadataTopicState): Seq[ApiMessage] = {
-          val results = new mutable.ArrayBuffer[ApiMessage]()
-          results += new 
TopicRecord().setName(topic.topicName()).setTopicId(topic.topicId())
-          topic.partitionStates().forEach { partition =>
-            if (partition.leader() == LeaderAndIsr.LEADER_DURING_DELETE) {
-              results += new RemoveTopicRecord().setTopicId(topic.topicId())
-            } else {
-              results += new PartitionRecord().
-                setPartitionId(partition.partitionIndex()).
-                setTopicId(topic.topicId()).
-                setReplicas(partition.replicas()).
-                setIsr(partition.isr()).
-                setRemovingReplicas(Collections.emptyList()).
-                setAddingReplicas(Collections.emptyList()).
-                setLeader(partition.leader()).
-                setLeaderEpoch(partition.leaderEpoch()).
-                setPartitionEpoch(partition.zkVersion())
-            }
-          }
-          results
-        }
-        request.topicStates().forEach { topic =>
-          toRecords(topic).foreach(delta.replay)
-        }
-        records.foreach(delta.replay)
-        c.setImage(delta.apply(new MetadataProvenance(100L, 10, 1000L, true)))
-      }
-      case _ => throw new RuntimeException("Unsupported cache type")
-    }
-  }
 }
 
 class MetadataCacheTest {
@@ -167,70 +86,59 @@ class MetadataCacheTest {
     val topic0 = "topic-0"
     val topic1 = "topic-1"
 
-    val zkVersion = 3
-    val controllerId = 2
-    val controllerEpoch = 1
+    val topicIds = new util.HashMap[String, Uuid]()
+    topicIds.put(topic0, Uuid.randomUuid())
+    topicIds.put(topic1, Uuid.randomUuid())
 
-    def endpoints(brokerId: Int): Seq[UpdateMetadataEndpoint] = {
+    def endpoints(brokerId: Int): BrokerEndpointCollection = {
       val host = s"foo-$brokerId"
-      Seq(
-        new UpdateMetadataEndpoint()
+      new BrokerEndpointCollection(Seq(
+        new BrokerEndpoint()
           .setHost(host)
           .setPort(9092)
           .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
-          
.setListener(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT).value),
-        new UpdateMetadataEndpoint()
+          
.setName(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT).value),
+        new BrokerEndpoint()
           .setHost(host)
           .setPort(9093)
           .setSecurityProtocol(SecurityProtocol.SSL.id)
-          
.setListener(ListenerName.forSecurityProtocol(SecurityProtocol.SSL).value)
-      )
+          
.setName(ListenerName.forSecurityProtocol(SecurityProtocol.SSL).value)
+      ).iterator.asJava)
     }
 
     val brokers = (0 to 4).map { brokerId =>
-      new UpdateMetadataBroker()
-        .setId(brokerId)
-        .setEndpoints(endpoints(brokerId).asJava)
+      new RegisterBrokerRecord()
+        .setBrokerId(brokerId)
+        .setEndPoints(endpoints(brokerId))
         .setRack("rack1")
     }
 
+    val topic0Record = new 
TopicRecord().setName(topic0).setTopicId(topicIds.get(topic0))
+    val topic1Record = new 
TopicRecord().setName(topic1).setTopicId(topicIds.get(topic1))
+
     val partitionStates = Seq(
-      new UpdateMetadataPartitionState()
-        .setTopicName(topic0)
-        .setPartitionIndex(0)
-        .setControllerEpoch(controllerEpoch)
+      new PartitionRecord()
+        .setTopicId(topicIds.get(topic0))
+        .setPartitionId(0)
         .setLeader(0)
         .setLeaderEpoch(0)
         .setIsr(asList(0, 1, 3))
-        .setZkVersion(zkVersion)
         .setReplicas(asList(0, 1, 3)),
-      new UpdateMetadataPartitionState()
-        .setTopicName(topic0)
-        .setPartitionIndex(1)
-        .setControllerEpoch(controllerEpoch)
+      new PartitionRecord()
+        .setTopicId(topicIds.get(topic0))
+        .setPartitionId(1)
         .setLeader(1)
         .setLeaderEpoch(1)
         .setIsr(asList(1, 0))
-        .setZkVersion(zkVersion)
         .setReplicas(asList(1, 2, 0, 4)),
-      new UpdateMetadataPartitionState()
-        .setTopicName(topic1)
-        .setPartitionIndex(0)
-        .setControllerEpoch(controllerEpoch)
+      new PartitionRecord()
+        .setTopicId(topicIds.get(topic1))
+        .setPartitionId(0)
         .setLeader(2)
         .setLeaderEpoch(2)
         .setIsr(asList(2, 1))
-        .setZkVersion(zkVersion)
         .setReplicas(asList(2, 1, 3)))
-
-    val topicIds = new util.HashMap[String, Uuid]()
-    topicIds.put(topic0, Uuid.randomUuid())
-    topicIds.put(topic1, Uuid.randomUuid())
-
-    val version = ApiKeys.UPDATE_METADATA.latestVersion
-    val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
controllerId, controllerEpoch, brokerEpoch,
-      partitionStates.asJava, brokers.asJava, topicIds).build()
-    MetadataCacheTest.updateCache(cache, updateMetadataRequest)
+    MetadataCacheTest.updateCache(cache, brokers ++ Seq(topic0Record, 
topic1Record) ++ partitionStates)
 
     for (securityProtocol <- Seq(SecurityProtocol.PLAINTEXT, 
SecurityProtocol.SSL)) {
       val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
@@ -244,14 +152,14 @@ class MetadataCacheTest {
         assertEquals(topic, topicMetadata.name)
         assertEquals(topicIds.get(topic), topicMetadata.topicId())
 
-        val topicPartitionStates = partitionStates.filter { ps => ps.topicName 
== topic }
+        val topicPartitionStates = partitionStates.filter { ps => ps.topicId 
== topicIds.get(topic) }
         val partitionMetadatas = 
topicMetadata.partitions.asScala.sortBy(_.partitionIndex)
         assertEquals(topicPartitionStates.size, partitionMetadatas.size, 
s"Unexpected partition count for topic $topic")
 
         partitionMetadatas.zipWithIndex.foreach { case (partitionMetadata, 
partitionId) =>
           assertEquals(Errors.NONE.code, partitionMetadata.errorCode)
           assertEquals(partitionId, partitionMetadata.partitionIndex)
-          val partitionState = topicPartitionStates.find(_.partitionIndex == 
partitionId).getOrElse(
+          val partitionState = topicPartitionStates.find(_.partitionId == 
partitionId).getOrElse(
             fail(s"Unable to find partition state for partition $partitionId"))
           assertEquals(partitionState.leader, partitionMetadata.leaderId)
           assertEquals(partitionState.leaderEpoch, 
partitionMetadata.leaderEpoch)
@@ -271,18 +179,20 @@ class MetadataCacheTest {
   def getTopicMetadataPartitionLeaderNotAvailable(cache: MetadataCache): Unit 
= {
     val securityProtocol = SecurityProtocol.PLAINTEXT
     val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-    val brokers = Seq(new UpdateMetadataBroker()
-      .setId(0)
-      .setEndpoints(Seq(new UpdateMetadataEndpoint()
+    val brokers = Seq(new RegisterBrokerRecord()
+      .setBrokerId(0)
+      .setFenced(false)
+      .setEndPoints(new BrokerEndpointCollection(Seq(new BrokerEndpoint()
         .setHost("foo")
         .setPort(9092)
         .setSecurityProtocol(securityProtocol.id)
-        .setListener(listenerName.value)).asJava))
-    val metadataCacheBrokerId = 0
+        .setName(listenerName.value)
+      ).iterator.asJava)))
+
     // leader is not available. expect LEADER_NOT_AVAILABLE for any metadata 
version.
-    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, 
metadataCacheBrokerId, brokers, listenerName,
+    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, brokers, 
listenerName,
       leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = 
false)
-    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, 
metadataCacheBrokerId, brokers, listenerName,
+    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, brokers, 
listenerName,
       leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = 
true)
   }
 
@@ -293,66 +203,66 @@ class MetadataCacheTest {
     // return LEADER_NOT_AVAILABLE or LISTENER_NOT_FOUND errors for old and 
new versions respectively.
     val plaintextListenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
     val sslListenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.SSL)
-    val broker0Endpoints = Seq(
-      new UpdateMetadataEndpoint()
+    val broker0Endpoints = new BrokerEndpointCollection(Seq(
+      new BrokerEndpoint()
         .setHost("host0")
         .setPort(9092)
         .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
-        .setListener(plaintextListenerName.value),
-      new UpdateMetadataEndpoint()
+        .setName(plaintextListenerName.value),
+      new BrokerEndpoint()
         .setHost("host0")
         .setPort(9093)
         .setSecurityProtocol(SecurityProtocol.SSL.id)
-        .setListener(sslListenerName.value))
-    val broker1Endpoints = Seq(new UpdateMetadataEndpoint()
-      .setHost("host1")
-      .setPort(9092)
-      .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
-      .setListener(plaintextListenerName.value))
+        .setName(sslListenerName.value)
+    ).iterator.asJava)
+
+    val broker1Endpoints = new BrokerEndpointCollection(Seq(
+      new BrokerEndpoint()
+        .setHost("host1")
+        .setPort(9092)
+        .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
+        .setName(plaintextListenerName.value)
+    ).iterator.asJava)
+
     val brokers = Seq(
-      new UpdateMetadataBroker()
-        .setId(0)
-        .setEndpoints(broker0Endpoints.asJava),
-      new UpdateMetadataBroker()
-        .setId(1)
-        .setEndpoints(broker1Endpoints.asJava))
-    val metadataCacheBrokerId = 0
+      new RegisterBrokerRecord()
+        .setBrokerId(0)
+        .setFenced(false)
+        .setEndPoints(broker0Endpoints),
+      new RegisterBrokerRecord()
+        .setBrokerId(1)
+        .setFenced(false)
+        .setEndPoints(broker1Endpoints))
+
     // leader available in cache but listener name not present. expect 
LISTENER_NOT_FOUND error for new metadata version
-    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, 
metadataCacheBrokerId, brokers, sslListenerName,
+    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, brokers, 
sslListenerName,
       leader = 1, Errors.LISTENER_NOT_FOUND, errorUnavailableListeners = true)
     // leader available in cache but listener name not present. expect 
LEADER_NOT_AVAILABLE error for old metadata version
-    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, 
metadataCacheBrokerId, brokers, sslListenerName,
+    verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, brokers, 
sslListenerName,
       leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners = 
false)
   }
 
   private def verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache: 
MetadataCache,
-                                                                       
metadataCacheBrokerId: Int,
-                                                                       
brokers: Seq[UpdateMetadataBroker],
+                                                                       
brokers: Seq[RegisterBrokerRecord],
                                                                        
listenerName: ListenerName,
                                                                        leader: 
Int,
                                                                        
expectedError: Errors,
                                                                        
errorUnavailableListeners: Boolean): Unit = {
     val topic = "topic"
-
-    val zkVersion = 3
-    val controllerId = 2
-    val controllerEpoch = 1
+    val topicId = Uuid.randomUuid()
+    val topicRecords = Seq(new 
TopicRecord().setName(topic).setTopicId(topicId))
 
     val leaderEpoch = 1
-    val partitionStates = Seq(new UpdateMetadataPartitionState()
-      .setTopicName(topic)
-      .setPartitionIndex(0)
-      .setControllerEpoch(controllerEpoch)
-      .setLeader(leader)
-      .setLeaderEpoch(leaderEpoch)
-      .setIsr(asList(0))
-      .setZkVersion(zkVersion)
-      .setReplicas(asList(0)))
-
-    val version = ApiKeys.UPDATE_METADATA.latestVersion
-    val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
controllerId, controllerEpoch, brokerEpoch,
-      partitionStates.asJava, brokers.asJava, 
util.Collections.emptyMap()).build()
-    MetadataCacheTest.updateCache(cache, updateMetadataRequest)
+    val partitionEpoch = 3
+    val partitionStates = Seq(new PartitionRecord()
+        .setTopicId(topicId)
+        .setPartitionId(0)
+        .setPartitionEpoch(partitionEpoch)
+        .setLeader(leader)
+        .setLeaderEpoch(leaderEpoch)
+        .setIsr(asList(0))
+        .setReplicas(asList(0)))
+    MetadataCacheTest.updateCache(cache, brokers ++ topicRecords ++ 
partitionStates)
 
     val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName, 
errorUnavailableListeners = errorUnavailableListeners)
     assertEquals(1, topicMetadatas.size)
@@ -374,20 +284,26 @@ class MetadataCacheTest {
   @MethodSource(Array("cacheProvider"))
   def getTopicMetadataReplicaNotAvailable(cache: MetadataCache): Unit = {
     val topic = "topic"
+    val topicId = Uuid.randomUuid()
 
-    val zkVersion = 3
-    val controllerId = 2
-    val controllerEpoch = 1
+    val partitionEpoch = 3
     val securityProtocol = SecurityProtocol.PLAINTEXT
     val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-    val brokers = Seq(new UpdateMetadataBroker()
-      .setId(0)
-      .setEndpoints(Seq(new UpdateMetadataEndpoint()
+    val endPoints = new BrokerEndpointCollection(Seq(new BrokerEndpoint()
         .setHost("foo")
         .setPort(9092)
         .setSecurityProtocol(securityProtocol.id)
-        .setListener(listenerName.value)).asJava))
+        .setName(listenerName.value)
+    ).iterator.asJava)
 
+    val brokers = Seq(new RegisterBrokerRecord()
+        .setBrokerId(0)
+        .setFenced(false)
+        .setEndPoints(endPoints))
+
+    val topicRecords = Seq(new TopicRecord()
+        .setName(topic)
+        .setTopicId(topicId))
     // replica 1 is not available
     val leader = 0
     val leaderEpoch = 0
@@ -395,20 +311,15 @@ class MetadataCacheTest {
     val isr = asList[Integer](0)
 
     val partitionStates = Seq(
-      new UpdateMetadataPartitionState()
-        .setTopicName(topic)
-        .setPartitionIndex(0)
-        .setControllerEpoch(controllerEpoch)
+      new PartitionRecord()
+        .setTopicId(topicId)
+        .setPartitionId(0)
         .setLeader(leader)
         .setLeaderEpoch(leaderEpoch)
         .setIsr(isr)
-        .setZkVersion(zkVersion)
+        .setPartitionEpoch(partitionEpoch)
         .setReplicas(replicas))
-
-    val version = ApiKeys.UPDATE_METADATA.latestVersion
-    val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
controllerId, controllerEpoch, brokerEpoch,
-      partitionStates.asJava, brokers.asJava, 
util.Collections.emptyMap()).build()
-    MetadataCacheTest.updateCache(cache, updateMetadataRequest)
+    MetadataCacheTest.updateCache(cache, brokers ++ topicRecords ++ 
partitionStates)
 
     // Validate errorUnavailableEndpoints = false
     val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName, 
errorUnavailableEndpoints = false)
@@ -447,20 +358,27 @@ class MetadataCacheTest {
   @MethodSource(Array("cacheProvider"))
   def getTopicMetadataIsrNotAvailable(cache: MetadataCache): Unit = {
     val topic = "topic"
+    val topicId = Uuid.randomUuid()
 
-    val zkVersion = 3
-    val controllerId = 2
-    val controllerEpoch = 1
     val securityProtocol = SecurityProtocol.PLAINTEXT
     val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-    val brokers = Seq(new UpdateMetadataBroker()
-      .setId(0)
-      .setRack("rack1")
-      .setEndpoints(Seq(new UpdateMetadataEndpoint()
+
+    val endpoints = new BrokerEndpointCollection(Seq(new BrokerEndpoint()
         .setHost("foo")
         .setPort(9092)
         .setSecurityProtocol(securityProtocol.id)
-        .setListener(listenerName.value)).asJava))
+        .setName(listenerName.value)
+    ).iterator.asJava)
+
+    val brokers = Seq(new RegisterBrokerRecord()
+      .setBrokerId(0)
+      .setRack("rack1")
+      .setFenced(false)
+      .setEndPoints(endpoints))
+
+    val topicRecords = Seq(new TopicRecord()
+      .setName(topic)
+      .setTopicId(topicId))
 
     // replica 1 is not available
     val leader = 0
@@ -468,20 +386,14 @@ class MetadataCacheTest {
     val replicas = asList[Integer](0)
     val isr = asList[Integer](0, 1)
 
-    val partitionStates = Seq(new UpdateMetadataPartitionState()
-      .setTopicName(topic)
-      .setPartitionIndex(0)
-      .setControllerEpoch(controllerEpoch)
+    val partitionStates = Seq(new PartitionRecord()
+      .setTopicId(topicId)
+      .setPartitionId(0)
       .setLeader(leader)
       .setLeaderEpoch(leaderEpoch)
       .setIsr(isr)
-      .setZkVersion(zkVersion)
       .setReplicas(replicas))
-
-    val version = ApiKeys.UPDATE_METADATA.latestVersion
-    val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
controllerId, controllerEpoch, brokerEpoch,
-      partitionStates.asJava, brokers.asJava, 
util.Collections.emptyMap()).build()
-    MetadataCacheTest.updateCache(cache, updateMetadataRequest)
+    MetadataCacheTest.updateCache(cache, brokers ++ topicRecords ++ 
partitionStates)
 
     // Validate errorUnavailableEndpoints = false
     val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName, 
errorUnavailableEndpoints = false)
@@ -520,33 +432,33 @@ class MetadataCacheTest {
   @MethodSource(Array("cacheProvider"))
   def getTopicMetadataWithNonSupportedSecurityProtocol(cache: MetadataCache): 
Unit = {
     val topic = "topic"
+    val topicId = Uuid.randomUuid()
     val securityProtocol = SecurityProtocol.PLAINTEXT
-    val brokers = Seq(new UpdateMetadataBroker()
-      .setId(0)
+
+    val brokers = new RegisterBrokerRecord()
+      .setBrokerId(0)
       .setRack("")
-      .setEndpoints(Seq(new UpdateMetadataEndpoint()
+      .setEndPoints(new BrokerEndpointCollection(Seq(new BrokerEndpoint()
         .setHost("foo")
         .setPort(9092)
         .setSecurityProtocol(securityProtocol.id)
-        
.setListener(ListenerName.forSecurityProtocol(securityProtocol).value)).asJava))
-    val controllerEpoch = 1
+        .setName(ListenerName.forSecurityProtocol(securityProtocol).value)
+      ).iterator.asJava))
+
+    val topicRecord = new TopicRecord().setName(topic).setTopicId(topicId)
+
     val leader = 0
     val leaderEpoch = 0
     val replicas = asList[Integer](0)
     val isr = asList[Integer](0, 1)
-    val partitionStates = Seq(new UpdateMetadataPartitionState()
-      .setTopicName(topic)
-      .setPartitionIndex(0)
-      .setControllerEpoch(controllerEpoch)
+    val partitionStates = Seq(new PartitionRecord()
+      .setTopicId(topicId)
+      .setPartitionId(0)
       .setLeader(leader)
       .setLeaderEpoch(leaderEpoch)
       .setIsr(isr)
-      .setZkVersion(3)
       .setReplicas(replicas))
-    val version = ApiKeys.UPDATE_METADATA.latestVersion
-    val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2, 
controllerEpoch, brokerEpoch, partitionStates.asJava,
-      brokers.asJava, util.Collections.emptyMap()).build()
-    MetadataCacheTest.updateCache(cache, updateMetadataRequest)
+    MetadataCacheTest.updateCache(cache, Seq(brokers, topicRecord) ++ 
partitionStates)
 
     val topicMetadata = cache.getTopicMetadata(Set(topic), 
ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
     assertEquals(1, topicMetadata.size)
@@ -558,37 +470,37 @@ class MetadataCacheTest {
   @MethodSource(Array("cacheProvider"))
   def getAliveBrokersShouldNotBeMutatedByUpdateCache(cache: MetadataCache): 
Unit = {
     val topic = "topic"
+    val topicId = Uuid.randomUuid()
+    val topicRecords = Seq(new 
TopicRecord().setName(topic).setTopicId(topicId))
 
     def updateCache(brokerIds: Seq[Int]): Unit = {
       val brokers = brokerIds.map { brokerId =>
         val securityProtocol = SecurityProtocol.PLAINTEXT
-        new UpdateMetadataBroker()
-          .setId(brokerId)
+        new RegisterBrokerRecord()
+          .setBrokerId(brokerId)
           .setRack("")
-          .setEndpoints(Seq(new UpdateMetadataEndpoint()
+          .setFenced(false)
+          .setBrokerEpoch(brokerEpoch)
+          .setEndPoints(new BrokerEndpointCollection(Seq(new BrokerEndpoint()
             .setHost("foo")
             .setPort(9092)
             .setSecurityProtocol(securityProtocol.id)
-            
.setListener(ListenerName.forSecurityProtocol(securityProtocol).value)).asJava)
+            .setName(ListenerName.forSecurityProtocol(securityProtocol).value)
+          ).iterator.asJava))
       }
-      val controllerEpoch = 1
       val leader = 0
       val leaderEpoch = 0
       val replicas = asList[Integer](0)
       val isr = asList[Integer](0, 1)
-      val partitionStates = Seq(new UpdateMetadataPartitionState()
-        .setTopicName(topic)
-        .setPartitionIndex(0)
-        .setControllerEpoch(controllerEpoch)
+      val partitionStates = Seq(new PartitionRecord()
+        .setTopicId(topicId)
+        .setPartitionId(0)
         .setLeader(leader)
         .setLeaderEpoch(leaderEpoch)
         .setIsr(isr)
-        .setZkVersion(3)
         .setReplicas(replicas))
-      val version = ApiKeys.UPDATE_METADATA.latestVersion
-      val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
2, controllerEpoch, brokerEpoch, partitionStates.asJava,
-        brokers.asJava, util.Collections.emptyMap()).build()
-      MetadataCacheTest.updateCache(cache, updateMetadataRequest)
+
+      MetadataCacheTest.updateCache(cache, brokers ++ topicRecords ++ 
partitionStates)
     }
 
     val initialBrokerIds = (0 to 2)
@@ -704,8 +616,6 @@ class MetadataCacheTest {
   def testGetTopicMetadataForDescribeTopicPartitionsResponse(): Unit = {
     val metadataCache = MetadataCache.kRaftMetadataCache(0, () => 
KRaftVersion.KRAFT_VERSION_0)
 
-    val controllerId = 2
-    val controllerEpoch = 1
     val securityProtocol = SecurityProtocol.PLAINTEXT
     val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
     val topic0 = "test0"
@@ -761,23 +671,32 @@ class MetadataCacheTest {
         .setPartitionEpoch(11)
         .setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
     )
-
+    new BrokerEndpointCollection()
     val brokers = Seq(
-      new UpdateMetadataBroker().setId(0).setEndpoints(Seq(new 
UpdateMetadataEndpoint().setHost("foo0").setPort(9092).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value)).asJava),
-      new UpdateMetadataBroker().setId(1).setEndpoints(Seq(new 
UpdateMetadataEndpoint().setHost("foo1").setPort(9093).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value)).asJava),
-      new UpdateMetadataBroker().setId(2).setEndpoints(Seq(new 
UpdateMetadataEndpoint().setHost("foo2").setPort(9094).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value)).asJava),
-      new UpdateMetadataBroker().setId(3).setEndpoints(Seq(new 
UpdateMetadataEndpoint().setHost("foo3").setPort(9095).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value)).asJava),
+      new 
RegisterBrokerRecord().setBrokerEpoch(brokerEpoch).setFenced(false).setBrokerId(0)
+        .setEndPoints(new BrokerEndpointCollection(Seq(new 
BrokerEndpoint().setHost("foo0").setPort(9092)
+          .setSecurityProtocol(securityProtocol.id).setName(listenerName.value)
+        ).iterator.asJava)),
+      new 
RegisterBrokerRecord().setBrokerEpoch(brokerEpoch).setFenced(false).setBrokerId(1)
+        .setEndPoints(new BrokerEndpointCollection(Seq(new 
BrokerEndpoint().setHost("foo1").setPort(9093)
+          .setSecurityProtocol(securityProtocol.id).setName(listenerName.value)
+        ).iterator.asJava)),
+      new 
RegisterBrokerRecord().setBrokerEpoch(brokerEpoch).setFenced(false).setBrokerId(2)
+        .setEndPoints(new BrokerEndpointCollection(Seq(new 
BrokerEndpoint().setHost("foo2").setPort(9094)
+          .setSecurityProtocol(securityProtocol.id).setName(listenerName.value)
+        ).iterator.asJava)),
+      new 
RegisterBrokerRecord().setBrokerEpoch(brokerEpoch).setFenced(false).setBrokerId(3)
+        .setEndPoints(new BrokerEndpointCollection(Seq(new 
BrokerEndpoint().setHost("foo3").setPort(9095)
+          .setSecurityProtocol(securityProtocol.id).setName(listenerName.value)
+        ).iterator.asJava)),
     )
 
-    val version = ApiKeys.UPDATE_METADATA.latestVersion
-    val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
controllerId, controllerEpoch, brokerEpoch,
-      List[UpdateMetadataPartitionState]().asJava, brokers.asJava, 
topicIds).build()
     var recordSeq = Seq[ApiMessage](
       new TopicRecord().setName(topic0).setTopicId(topicIds.get(topic0)),
       new TopicRecord().setName(topic1).setTopicId(topicIds.get(topic1))
     )
     recordSeq = recordSeq ++ partitionMap.values.toSeq
-    MetadataCacheTest.updateCache(metadataCache, updateMetadataRequest, 
recordSeq)
+    MetadataCacheTest.updateCache(metadataCache, brokers ++ recordSeq)
 
     def checkTopicMetadata(topic: String, partitionIds: Set[Int], partitions: 
mutable.Buffer[DescribeTopicPartitionsResponsePartition]): Unit = {
       partitions.foreach(partition => {
@@ -867,42 +786,38 @@ class MetadataCacheTest {
   @MethodSource(Array("cacheProvider"))
   def testGetPartitionInfo(cache: MetadataCache): Unit = {
     val topic = "topic"
+    val topicId = Uuid.randomUuid()
     val partitionIndex = 0
-    val controllerEpoch = 1
     val leader = 0
     val leaderEpoch = 0
     val isr = asList[Integer](2, 3, 0)
-    val zkVersion = 3
     val replicas = asList[Integer](2, 3, 0, 1, 4)
-    val offlineReplicas = asList[Integer](0)
 
-    val partitionStates = Seq(new UpdateMetadataPartitionState()
-      .setTopicName(topic)
-      .setPartitionIndex(partitionIndex)
-      .setControllerEpoch(controllerEpoch)
+    val topicRecords = Seq(new 
TopicRecord().setName(topic).setTopicId(topicId))
+
+    val partitionStates = Seq(new PartitionRecord()
+      .setTopicId(topicId)
+      .setPartitionId(partitionIndex)
       .setLeader(leader)
       .setLeaderEpoch(leaderEpoch)
       .setIsr(isr)
-      .setZkVersion(zkVersion)
-      .setReplicas(replicas)
-      .setOfflineReplicas(offlineReplicas))
-
-    val version = ApiKeys.UPDATE_METADATA.latestVersion
+      .setReplicas(replicas))
 
-    val controllerId = 2
     val securityProtocol = SecurityProtocol.PLAINTEXT
     val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
-    val brokers = Seq(new UpdateMetadataBroker()
-      .setId(0)
+    val brokers = Seq(new RegisterBrokerRecord()
+      .setBrokerId(0)
+      .setBrokerEpoch(brokerEpoch)
       .setRack("rack1")
-      .setEndpoints(Seq(new UpdateMetadataEndpoint()
-        .setHost("foo")
-        .setPort(9092)
-        .setSecurityProtocol(securityProtocol.id)
-        .setListener(listenerName.value)).asJava))
-    val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 
controllerId, controllerEpoch, brokerEpoch,
-      partitionStates.asJava, brokers.asJava, util.Collections.emptyMap(), 
false, AbstractControlRequest.Type.UNKNOWN).build()
-    MetadataCacheTest.updateCache(cache, updateMetadataRequest)
+      .setEndPoints(new BrokerEndpointCollection(
+        Seq(new BrokerEndpoint()
+          .setHost("foo")
+          .setPort(9092)
+          .setSecurityProtocol(securityProtocol.id)
+          .setName(listenerName.value)
+        ).iterator.asJava)))
+
+    MetadataCacheTest.updateCache(cache, brokers ++ topicRecords ++ 
partitionStates)
 
     val partitionState = cache.getPartitionInfo(topic, partitionIndex).get
     assertEquals(topic, partitionState.topicName())
@@ -911,60 +826,9 @@ class MetadataCacheTest {
     assertEquals(leader, partitionState.leader())
     assertEquals(leaderEpoch, partitionState.leaderEpoch())
     assertEquals(isr, partitionState.isr())
-    assertEquals(zkVersion, partitionState.zkVersion())
     assertEquals(replicas, partitionState.replicas())
   }
 
-  def setupInitialAndFullMetadata(): (
-    Map[String, Uuid], mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]],
-    Map[String, Uuid], Seq[UpdateMetadataPartitionState]
-  ) = {
-    def addTopic(
-      name: String,
-      partitions: Int,
-      topicStates: mutable.AnyRefMap[String, 
mutable.LongMap[UpdateMetadataPartitionState]]
-    ): Unit = {
-      val partitionMap = mutable.LongMap.empty[UpdateMetadataPartitionState]
-      for (i <- 0 until partitions) {
-        partitionMap.put(i, new UpdateMetadataPartitionState()
-          .setTopicName(name)
-          .setPartitionIndex(i)
-          .setControllerEpoch(2)
-          .setLeader(0)
-          .setLeaderEpoch(10)
-          .setIsr(asList(0, 1))
-          .setZkVersion(10)
-          .setReplicas(asList(0, 1, 2)))
-      }
-      topicStates.put(name, partitionMap)
-    }
-
-    val initialTopicStates = mutable.AnyRefMap.empty[String, 
mutable.LongMap[UpdateMetadataPartitionState]]
-    addTopic("test-topic-1", 3, initialTopicStates)
-    addTopic("test-topic-2", 3, initialTopicStates)
-
-    val initialTopicIds = Map(
-      "test-topic-1" -> Uuid.fromString("IQ2F1tpCRoSbjfq4zBJwpg"),
-      "test-topic-2" -> Uuid.fromString("4N8_J-q7SdWHPFkos275pQ")
-    )
-
-    val newTopicIds = Map(
-      "different-topic" -> Uuid.fromString("DraFMNOJQOC5maTb1vtZ8Q")
-    )
-
-    val newPartitionStates = Seq(new UpdateMetadataPartitionState()
-      .setTopicName("different-topic")
-      .setPartitionIndex(0)
-      .setControllerEpoch(42)
-      .setLeader(0)
-      .setLeaderEpoch(10)
-      .setIsr(asList[Integer](0, 1, 2))
-      .setZkVersion(1)
-      .setReplicas(asList[Integer](0, 1, 2)))
-
-    (initialTopicIds, initialTopicStates, newTopicIds, newPartitionStates)
-  }
-
   @Test
   def testGetOfflineReplicasConsidersDirAssignment(): Unit = {
     case class Broker(id: Int, dirs: util.List[Uuid])
@@ -1012,125 +876,88 @@ class MetadataCacheTest {
 
   val fooTopicName: String = "foo"
   val fooTopicId: Uuid = Uuid.fromString("HDceyWK0Ry-j3XLR8DvvGA")
-  val oldFooPart0 = new UpdateMetadataPartitionState().
-    setTopicName(fooTopicName).
-    setPartitionIndex(0).
-    setControllerEpoch(oldRequestControllerEpoch).
+  val oldFooPart0 = new PartitionRecord().
+    setTopicId(fooTopicId).
+    setPartitionId(0).
     setLeader(4).
     setIsr(java.util.Arrays.asList(4, 5, 6)).
-    setZkVersion(789).
-    setReplicas(java.util.Arrays.asList(4, 5, 6)).
-    setOfflineReplicas(java.util.Collections.emptyList())
-  val newFooPart0 = new UpdateMetadataPartitionState().
-    setTopicName(fooTopicName).
-    setPartitionIndex(0).
-    setControllerEpoch(newRequestControllerEpoch).
+    setReplicas(java.util.Arrays.asList(4, 5, 6))
+  val newFooPart0 = new PartitionRecord().
+    setTopicId(fooTopicId).
+    setPartitionId(0).
     setLeader(5).
     setIsr(java.util.Arrays.asList(4, 5, 6)).
-    setZkVersion(789).
-    setReplicas(java.util.Arrays.asList(4, 5, 6)).
-    setOfflineReplicas(java.util.Collections.emptyList())
-  val oldFooPart1 = new UpdateMetadataPartitionState().
-    setTopicName(fooTopicName).
-    setPartitionIndex(1).
-    setControllerEpoch(oldRequestControllerEpoch).
+    setReplicas(java.util.Arrays.asList(4, 5, 6))
+  val oldFooPart1 = new PartitionRecord().
+    setTopicId(fooTopicId).
+    setPartitionId(1).
     setLeader(5).
     setIsr(java.util.Arrays.asList(4, 5, 6)).
-    setZkVersion(789).
-    setReplicas(java.util.Arrays.asList(4, 5, 6)).
-    setOfflineReplicas(java.util.Collections.emptyList())
-  val newFooPart1 = new UpdateMetadataPartitionState().
-    setTopicName(fooTopicName).
-    setPartitionIndex(1).
-    setControllerEpoch(newRequestControllerEpoch).
+    setReplicas(java.util.Arrays.asList(4, 5, 6))
+  val newFooPart1 = new PartitionRecord().
+    setTopicId(fooTopicId).
+    setPartitionId(1).
     setLeader(5).
     setIsr(java.util.Arrays.asList(4, 5)).
-    setZkVersion(789).
-    setReplicas(java.util.Arrays.asList(4, 5, 6)).
-    setOfflineReplicas(java.util.Collections.emptyList())
-
+    setReplicas(java.util.Arrays.asList(4, 5, 6))
   val barTopicName: String = "bar"
   val barTopicId: Uuid = Uuid.fromString("97FBD1g4QyyNNZNY94bkRA")
   val recreatedBarTopicId: Uuid = Uuid.fromString("lZokxuaPRty7c5P4dNdTYA")
-  val oldBarPart0 = new UpdateMetadataPartitionState().
-    setTopicName(barTopicName).
-    setPartitionIndex(0).
-    setControllerEpoch(oldRequestControllerEpoch).
+  val oldBarPart0 = new PartitionRecord().
+    setTopicId(fooTopicId).
+    setPartitionId(0).
     setLeader(7).
     setIsr(java.util.Arrays.asList(7, 8)).
-    setZkVersion(789).
-    setReplicas(java.util.Arrays.asList(7, 8, 9)).
-    setOfflineReplicas(java.util.Collections.emptyList())
-  val newBarPart0 = new UpdateMetadataPartitionState().
-    setTopicName(barTopicName).
-    setPartitionIndex(0).
-    setControllerEpoch(newRequestControllerEpoch).
+    setReplicas(java.util.Arrays.asList(7, 8, 9))
+  val newBarPart0 = new PartitionRecord().
+    setTopicId(barTopicId).
+    setPartitionId(0).
     setLeader(7).
     setIsr(java.util.Arrays.asList(7, 8)).
-    setZkVersion(789).
-    setReplicas(java.util.Arrays.asList(7, 8, 9)).
-    setOfflineReplicas(java.util.Collections.emptyList())
-  val deletedBarPart0 = new UpdateMetadataPartitionState().
-    setTopicName(barTopicName).
-    setPartitionIndex(0).
-    setControllerEpoch(newRequestControllerEpoch).
+    setReplicas(java.util.Arrays.asList(7, 8, 9))
+  val deletedBarPart0 = new PartitionRecord().
+    setTopicId(barTopicId).
+    setPartitionId(0).
     setLeader(-2).
     setIsr(java.util.Arrays.asList(7, 8)).
-    setZkVersion(0).
-    setReplicas(java.util.Arrays.asList(7, 8, 9)).
-    setOfflineReplicas(java.util.Collections.emptyList())
-  val oldBarPart1 = new UpdateMetadataPartitionState().
-    setTopicName(barTopicName).
-    setPartitionIndex(1).
-    setControllerEpoch(oldRequestControllerEpoch).
+    setReplicas(java.util.Arrays.asList(7, 8, 9))
+  val oldBarPart1 = new PartitionRecord().
+    setTopicId(barTopicId).
+    setPartitionId(1).
     setLeader(5).
     setIsr(java.util.Arrays.asList(4, 5, 6)).
-    setZkVersion(789).
-    setReplicas(java.util.Arrays.asList(4, 5, 6)).
-    setOfflineReplicas(java.util.Collections.emptyList())
-  val newBarPart1 = new UpdateMetadataPartitionState().
-    setTopicName(barTopicName).
-    setPartitionIndex(1).
-    setControllerEpoch(newRequestControllerEpoch).
+    setReplicas(java.util.Arrays.asList(4, 5, 6))
+  val newBarPart1 = new PartitionRecord().
+    setTopicId(barTopicId).
+    setPartitionId(1).
     setLeader(5).
     setIsr(java.util.Arrays.asList(4, 5, 6)).
-    setZkVersion(789).
-    setReplicas(java.util.Arrays.asList(4, 5, 6)).
-    setOfflineReplicas(java.util.Collections.emptyList())
-  val deletedBarPart1 = new UpdateMetadataPartitionState().
-    setTopicName(barTopicName).
-    setPartitionIndex(1).
-    setControllerEpoch(newRequestControllerEpoch).
+    setReplicas(java.util.Arrays.asList(4, 5, 6))
+  val deletedBarPart1 = new PartitionRecord().
+    setTopicId(barTopicId).
+    setPartitionId(1).
     setLeader(-2).
     setIsr(java.util.Arrays.asList(4, 5, 6)).
-    setZkVersion(0).
-    setReplicas(java.util.Arrays.asList(4, 5, 6)).
-    setOfflineReplicas(java.util.Collections.emptyList())
-  val oldBarPart2 = new UpdateMetadataPartitionState().
-    setTopicName(barTopicName).
-    setPartitionIndex(2).
-    setControllerEpoch(oldRequestControllerEpoch).
+    setReplicas(java.util.Arrays.asList(4, 5, 6))
+
+  val oldBarPart2 = new PartitionRecord().
+    setTopicId(barTopicId).
+    setPartitionId(2).
     setLeader(9).
     setIsr(java.util.Arrays.asList(7, 8, 9)).
-    setZkVersion(789).
-    setReplicas(java.util.Arrays.asList(7, 8, 9)).
-    setOfflineReplicas(java.util.Collections.emptyList())
-  val newBarPart2 = new UpdateMetadataPartitionState().
-    setTopicName(barTopicName).
-    setPartitionIndex(2).
-    setControllerEpoch(newRequestControllerEpoch).
+    setReplicas(java.util.Arrays.asList(7, 8, 9))
+
+  val newBarPart2 = new PartitionRecord().
+    setTopicId(barTopicId).
+    setPartitionId(2).
     setLeader(8).
     setIsr(java.util.Arrays.asList(7, 8)).
-    setZkVersion(789).
-    setReplicas(java.util.Arrays.asList(7, 8, 9)).
-    setOfflineReplicas(java.util.Collections.emptyList())
-  val deletedBarPart2 = new UpdateMetadataPartitionState().
-    setTopicName(barTopicName).
-    setPartitionIndex(2).
-    setControllerEpoch(newRequestControllerEpoch).
+    setReplicas(java.util.Arrays.asList(7, 8, 9))
+
+  val deletedBarPart2 = new PartitionRecord().
+    setTopicId(barTopicId).
+    setPartitionId(2).
     setLeader(-2).
     setIsr(java.util.Arrays.asList(7, 8, 9)).
-    setZkVersion(0).
-    setReplicas(java.util.Arrays.asList(7, 8, 9)).
-    setOfflineReplicas(java.util.Collections.emptyList())
+    setReplicas(java.util.Arrays.asList(7, 8, 9))
 }


Reply via email to