This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 9425acb6b41 KAFKA-18578: Remove `UpdateMetadataRequest` from
`MetadataCacheTest` (#18628)
9425acb6b41 is described below
commit 9425acb6b41dada0b991d495017cea895cc82dc8
Author: TaiJuWu <[email protected]>
AuthorDate: Mon Jan 20 04:18:43 2025 +0800
KAFKA-18578: Remove `UpdateMetadataRequest` from `MetadataCacheTest`
(#18628)
Reviewers: Ismael Juma <[email protected]>
---
.../unit/kafka/server/MetadataCacheTest.scala | 649 ++++++++-------------
1 file changed, 238 insertions(+), 411 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
index 9f91790619e..669210f8fcc 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
@@ -16,19 +16,17 @@
*/
package kafka.server
-import kafka.server.metadata.{KRaftMetadataCache}
+import kafka.server.metadata.KRaftMetadataCache
import
org.apache.kafka.common.message.DescribeTopicPartitionsResponseData.DescribeTopicPartitionsResponsePartition
-import
org.apache.kafka.common.message.UpdateMetadataRequestData.{UpdateMetadataBroker,
UpdateMetadataEndpoint, UpdateMetadataPartitionState, UpdateMetadataTopicState}
import org.apache.kafka.common.metadata.RegisterBrokerRecord.{BrokerEndpoint,
BrokerEndpointCollection}
import org.apache.kafka.common.metadata._
import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
+import org.apache.kafka.common.protocol.{ApiMessage, Errors}
import org.apache.kafka.common.record.RecordBatch
-import org.apache.kafka.common.requests.{AbstractControlRequest,
UpdateMetadataRequest}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.{DirectoryId, Uuid}
-import org.apache.kafka.image.{ClusterImage, MetadataDelta, MetadataImage,
MetadataProvenance}
-import org.apache.kafka.metadata.{LeaderAndIsr, LeaderRecoveryState}
+import org.apache.kafka.image.{MetadataDelta, MetadataImage,
MetadataProvenance}
+import org.apache.kafka.metadata.LeaderRecoveryState
import org.apache.kafka.server.common.KRaftVersion
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.Test
@@ -54,7 +52,7 @@ object MetadataCacheTest {
val partialImage = new MetadataImage(
new MetadataProvenance(100L, 10, 1000L, true),
image.features(),
- ClusterImage.EMPTY,
+ image.cluster(),
image.topics(),
image.configs(),
image.clientQuotas(),
@@ -69,85 +67,6 @@ object MetadataCacheTest {
case _ => throw new RuntimeException("Unsupported cache type")
}
}
-
- def updateCache(cache: MetadataCache, request: UpdateMetadataRequest,
records: Seq[ApiMessage] = List()): Unit = {
- cache match {
- case c: KRaftMetadataCache => {
- // UpdateMetadataRequest always contains a full list of brokers, but
may contain
- // a partial list of partitions. Therefore, base our delta off a
partial image that
- // contains no brokers, but which contains the previous partitions.
- val image = c.currentImage()
- val partialImage = new MetadataImage(
- new MetadataProvenance(100L, 10, 1000L, true),
- image.features(),
- ClusterImage.EMPTY,
- image.topics(),
- image.configs(),
- image.clientQuotas(),
- image.producerIds(),
- image.acls(),
- image.scram(),
- image.delegationTokens())
- val delta = new MetadataDelta.Builder().setImage(partialImage).build()
-
- def toRecord(broker: UpdateMetadataBroker): RegisterBrokerRecord = {
- val endpoints = new BrokerEndpointCollection()
- broker.endpoints().forEach { e =>
- endpoints.add(new BrokerEndpoint().
- setName(e.listener()).
- setHost(e.host()).
- setPort(e.port()).
- setSecurityProtocol(e.securityProtocol()))
- }
- val prevBroker = Option(image.cluster().broker(broker.id()))
- // UpdateMetadataRequest doesn't contain all the broker registration
fields, so get
- // them from the previous registration if available.
- val (epoch, incarnationId, fenced) = prevBroker match {
- case None => (0L, Uuid.ZERO_UUID, false)
- case Some(b) => (b.epoch(), b.incarnationId(), b.fenced())
- }
- new RegisterBrokerRecord().
- setBrokerId(broker.id()).
- setBrokerEpoch(epoch).
- setIncarnationId(incarnationId).
- setEndPoints(endpoints).
- setRack(broker.rack()).
- setFenced(fenced)
- }
- request.liveBrokers().iterator().asScala.foreach { brokerInfo =>
- delta.replay(toRecord(brokerInfo))
- }
-
- def toRecords(topic: UpdateMetadataTopicState): Seq[ApiMessage] = {
- val results = new mutable.ArrayBuffer[ApiMessage]()
- results += new
TopicRecord().setName(topic.topicName()).setTopicId(topic.topicId())
- topic.partitionStates().forEach { partition =>
- if (partition.leader() == LeaderAndIsr.LEADER_DURING_DELETE) {
- results += new RemoveTopicRecord().setTopicId(topic.topicId())
- } else {
- results += new PartitionRecord().
- setPartitionId(partition.partitionIndex()).
- setTopicId(topic.topicId()).
- setReplicas(partition.replicas()).
- setIsr(partition.isr()).
- setRemovingReplicas(Collections.emptyList()).
- setAddingReplicas(Collections.emptyList()).
- setLeader(partition.leader()).
- setLeaderEpoch(partition.leaderEpoch()).
- setPartitionEpoch(partition.zkVersion())
- }
- }
- results
- }
- request.topicStates().forEach { topic =>
- toRecords(topic).foreach(delta.replay)
- }
- records.foreach(delta.replay)
- c.setImage(delta.apply(new MetadataProvenance(100L, 10, 1000L, true)))
- }
- case _ => throw new RuntimeException("Unsupported cache type")
- }
- }
}
class MetadataCacheTest {
@@ -167,70 +86,59 @@ class MetadataCacheTest {
val topic0 = "topic-0"
val topic1 = "topic-1"
- val zkVersion = 3
- val controllerId = 2
- val controllerEpoch = 1
+ val topicIds = new util.HashMap[String, Uuid]()
+ topicIds.put(topic0, Uuid.randomUuid())
+ topicIds.put(topic1, Uuid.randomUuid())
- def endpoints(brokerId: Int): Seq[UpdateMetadataEndpoint] = {
+ def endpoints(brokerId: Int): BrokerEndpointCollection = {
val host = s"foo-$brokerId"
- Seq(
- new UpdateMetadataEndpoint()
+ new BrokerEndpointCollection(Seq(
+ new BrokerEndpoint()
.setHost(host)
.setPort(9092)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
-
.setListener(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT).value),
- new UpdateMetadataEndpoint()
+
.setName(ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT).value),
+ new BrokerEndpoint()
.setHost(host)
.setPort(9093)
.setSecurityProtocol(SecurityProtocol.SSL.id)
-
.setListener(ListenerName.forSecurityProtocol(SecurityProtocol.SSL).value)
- )
+
.setName(ListenerName.forSecurityProtocol(SecurityProtocol.SSL).value)
+ ).iterator.asJava)
}
val brokers = (0 to 4).map { brokerId =>
- new UpdateMetadataBroker()
- .setId(brokerId)
- .setEndpoints(endpoints(brokerId).asJava)
+ new RegisterBrokerRecord()
+ .setBrokerId(brokerId)
+ .setEndPoints(endpoints(brokerId))
.setRack("rack1")
}
+ val topic0Record = new
TopicRecord().setName(topic0).setTopicId(topicIds.get(topic0))
+ val topic1Record = new
TopicRecord().setName(topic1).setTopicId(topicIds.get(topic1))
+
val partitionStates = Seq(
- new UpdateMetadataPartitionState()
- .setTopicName(topic0)
- .setPartitionIndex(0)
- .setControllerEpoch(controllerEpoch)
+ new PartitionRecord()
+ .setTopicId(topicIds.get(topic0))
+ .setPartitionId(0)
.setLeader(0)
.setLeaderEpoch(0)
.setIsr(asList(0, 1, 3))
- .setZkVersion(zkVersion)
.setReplicas(asList(0, 1, 3)),
- new UpdateMetadataPartitionState()
- .setTopicName(topic0)
- .setPartitionIndex(1)
- .setControllerEpoch(controllerEpoch)
+ new PartitionRecord()
+ .setTopicId(topicIds.get(topic0))
+ .setPartitionId(1)
.setLeader(1)
.setLeaderEpoch(1)
.setIsr(asList(1, 0))
- .setZkVersion(zkVersion)
.setReplicas(asList(1, 2, 0, 4)),
- new UpdateMetadataPartitionState()
- .setTopicName(topic1)
- .setPartitionIndex(0)
- .setControllerEpoch(controllerEpoch)
+ new PartitionRecord()
+ .setTopicId(topicIds.get(topic1))
+ .setPartitionId(0)
.setLeader(2)
.setLeaderEpoch(2)
.setIsr(asList(2, 1))
- .setZkVersion(zkVersion)
.setReplicas(asList(2, 1, 3)))
-
- val topicIds = new util.HashMap[String, Uuid]()
- topicIds.put(topic0, Uuid.randomUuid())
- topicIds.put(topic1, Uuid.randomUuid())
-
- val version = ApiKeys.UPDATE_METADATA.latestVersion
- val updateMetadataRequest = new UpdateMetadataRequest.Builder(version,
controllerId, controllerEpoch, brokerEpoch,
- partitionStates.asJava, brokers.asJava, topicIds).build()
- MetadataCacheTest.updateCache(cache, updateMetadataRequest)
+ MetadataCacheTest.updateCache(cache, brokers ++ Seq(topic0Record,
topic1Record) ++ partitionStates)
for (securityProtocol <- Seq(SecurityProtocol.PLAINTEXT,
SecurityProtocol.SSL)) {
val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
@@ -244,14 +152,14 @@ class MetadataCacheTest {
assertEquals(topic, topicMetadata.name)
assertEquals(topicIds.get(topic), topicMetadata.topicId())
- val topicPartitionStates = partitionStates.filter { ps => ps.topicName
== topic }
+ val topicPartitionStates = partitionStates.filter { ps => ps.topicId
== topicIds.get(topic) }
val partitionMetadatas =
topicMetadata.partitions.asScala.sortBy(_.partitionIndex)
assertEquals(topicPartitionStates.size, partitionMetadatas.size,
s"Unexpected partition count for topic $topic")
partitionMetadatas.zipWithIndex.foreach { case (partitionMetadata,
partitionId) =>
assertEquals(Errors.NONE.code, partitionMetadata.errorCode)
assertEquals(partitionId, partitionMetadata.partitionIndex)
- val partitionState = topicPartitionStates.find(_.partitionIndex ==
partitionId).getOrElse(
+ val partitionState = topicPartitionStates.find(_.partitionId ==
partitionId).getOrElse(
fail(s"Unable to find partition state for partition $partitionId"))
assertEquals(partitionState.leader, partitionMetadata.leaderId)
assertEquals(partitionState.leaderEpoch,
partitionMetadata.leaderEpoch)
@@ -271,18 +179,20 @@ class MetadataCacheTest {
def getTopicMetadataPartitionLeaderNotAvailable(cache: MetadataCache): Unit
= {
val securityProtocol = SecurityProtocol.PLAINTEXT
val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
- val brokers = Seq(new UpdateMetadataBroker()
- .setId(0)
- .setEndpoints(Seq(new UpdateMetadataEndpoint()
+ val brokers = Seq(new RegisterBrokerRecord()
+ .setBrokerId(0)
+ .setFenced(false)
+ .setEndPoints(new BrokerEndpointCollection(Seq(new BrokerEndpoint()
.setHost("foo")
.setPort(9092)
.setSecurityProtocol(securityProtocol.id)
- .setListener(listenerName.value)).asJava))
- val metadataCacheBrokerId = 0
+ .setName(listenerName.value)
+ ).iterator.asJava)))
+
// leader is not available. expect LEADER_NOT_AVAILABLE for any metadata
version.
- verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache,
metadataCacheBrokerId, brokers, listenerName,
+ verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, brokers,
listenerName,
leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners =
false)
- verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache,
metadataCacheBrokerId, brokers, listenerName,
+ verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, brokers,
listenerName,
leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners =
true)
}
@@ -293,66 +203,66 @@ class MetadataCacheTest {
// return LEADER_NOT_AVAILABLE or LISTENER_NOT_FOUND errors for old and
new versions respectively.
val plaintextListenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)
val sslListenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.SSL)
- val broker0Endpoints = Seq(
- new UpdateMetadataEndpoint()
+ val broker0Endpoints = new BrokerEndpointCollection(Seq(
+ new BrokerEndpoint()
.setHost("host0")
.setPort(9092)
.setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
- .setListener(plaintextListenerName.value),
- new UpdateMetadataEndpoint()
+ .setName(plaintextListenerName.value),
+ new BrokerEndpoint()
.setHost("host0")
.setPort(9093)
.setSecurityProtocol(SecurityProtocol.SSL.id)
- .setListener(sslListenerName.value))
- val broker1Endpoints = Seq(new UpdateMetadataEndpoint()
- .setHost("host1")
- .setPort(9092)
- .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
- .setListener(plaintextListenerName.value))
+ .setName(sslListenerName.value)
+ ).iterator.asJava)
+
+ val broker1Endpoints = new BrokerEndpointCollection(Seq(
+ new BrokerEndpoint()
+ .setHost("host1")
+ .setPort(9092)
+ .setSecurityProtocol(SecurityProtocol.PLAINTEXT.id)
+ .setName(plaintextListenerName.value)
+ ).iterator.asJava)
+
val brokers = Seq(
- new UpdateMetadataBroker()
- .setId(0)
- .setEndpoints(broker0Endpoints.asJava),
- new UpdateMetadataBroker()
- .setId(1)
- .setEndpoints(broker1Endpoints.asJava))
- val metadataCacheBrokerId = 0
+ new RegisterBrokerRecord()
+ .setBrokerId(0)
+ .setFenced(false)
+ .setEndPoints(broker0Endpoints),
+ new RegisterBrokerRecord()
+ .setBrokerId(1)
+ .setFenced(false)
+ .setEndPoints(broker1Endpoints))
+
// leader available in cache but listener name not present. expect
LISTENER_NOT_FOUND error for new metadata version
- verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache,
metadataCacheBrokerId, brokers, sslListenerName,
+ verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, brokers,
sslListenerName,
leader = 1, Errors.LISTENER_NOT_FOUND, errorUnavailableListeners = true)
// leader available in cache but listener name not present. expect
LEADER_NOT_AVAILABLE error for old metadata version
- verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache,
metadataCacheBrokerId, brokers, sslListenerName,
+ verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache, brokers,
sslListenerName,
leader = 1, Errors.LEADER_NOT_AVAILABLE, errorUnavailableListeners =
false)
}
private def verifyTopicMetadataPartitionLeaderOrEndpointNotAvailable(cache:
MetadataCache,
-
metadataCacheBrokerId: Int,
-
brokers: Seq[UpdateMetadataBroker],
+
brokers: Seq[RegisterBrokerRecord],
listenerName: ListenerName,
leader:
Int,
expectedError: Errors,
errorUnavailableListeners: Boolean): Unit = {
val topic = "topic"
-
- val zkVersion = 3
- val controllerId = 2
- val controllerEpoch = 1
+ val topicId = Uuid.randomUuid()
+ val topicRecords = Seq(new
TopicRecord().setName(topic).setTopicId(topicId))
val leaderEpoch = 1
- val partitionStates = Seq(new UpdateMetadataPartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(controllerEpoch)
- .setLeader(leader)
- .setLeaderEpoch(leaderEpoch)
- .setIsr(asList(0))
- .setZkVersion(zkVersion)
- .setReplicas(asList(0)))
-
- val version = ApiKeys.UPDATE_METADATA.latestVersion
- val updateMetadataRequest = new UpdateMetadataRequest.Builder(version,
controllerId, controllerEpoch, brokerEpoch,
- partitionStates.asJava, brokers.asJava,
util.Collections.emptyMap()).build()
- MetadataCacheTest.updateCache(cache, updateMetadataRequest)
+ val partitionEpoch = 3
+ val partitionStates = Seq(new PartitionRecord()
+ .setTopicId(topicId)
+ .setPartitionId(0)
+ .setPartitionEpoch(partitionEpoch)
+ .setLeader(leader)
+ .setLeaderEpoch(leaderEpoch)
+ .setIsr(asList(0))
+ .setReplicas(asList(0)))
+ MetadataCacheTest.updateCache(cache, brokers ++ topicRecords ++
partitionStates)
val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName,
errorUnavailableListeners = errorUnavailableListeners)
assertEquals(1, topicMetadatas.size)
@@ -374,20 +284,26 @@ class MetadataCacheTest {
@MethodSource(Array("cacheProvider"))
def getTopicMetadataReplicaNotAvailable(cache: MetadataCache): Unit = {
val topic = "topic"
+ val topicId = Uuid.randomUuid()
- val zkVersion = 3
- val controllerId = 2
- val controllerEpoch = 1
+ val partitionEpoch = 3
val securityProtocol = SecurityProtocol.PLAINTEXT
val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
- val brokers = Seq(new UpdateMetadataBroker()
- .setId(0)
- .setEndpoints(Seq(new UpdateMetadataEndpoint()
+ val endPoints = new BrokerEndpointCollection(Seq(new BrokerEndpoint()
.setHost("foo")
.setPort(9092)
.setSecurityProtocol(securityProtocol.id)
- .setListener(listenerName.value)).asJava))
+ .setName(listenerName.value)
+ ).iterator.asJava)
+ val brokers = Seq(new RegisterBrokerRecord()
+ .setBrokerId(0)
+ .setFenced(false)
+ .setEndPoints(endPoints))
+
+ val topicRecords = Seq(new TopicRecord()
+ .setName(topic)
+ .setTopicId(topicId))
// replica 1 is not available
val leader = 0
val leaderEpoch = 0
@@ -395,20 +311,15 @@ class MetadataCacheTest {
val isr = asList[Integer](0)
val partitionStates = Seq(
- new UpdateMetadataPartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(controllerEpoch)
+ new PartitionRecord()
+ .setTopicId(topicId)
+ .setPartitionId(0)
.setLeader(leader)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
- .setZkVersion(zkVersion)
+ .setPartitionEpoch(partitionEpoch)
.setReplicas(replicas))
-
- val version = ApiKeys.UPDATE_METADATA.latestVersion
- val updateMetadataRequest = new UpdateMetadataRequest.Builder(version,
controllerId, controllerEpoch, brokerEpoch,
- partitionStates.asJava, brokers.asJava,
util.Collections.emptyMap()).build()
- MetadataCacheTest.updateCache(cache, updateMetadataRequest)
+ MetadataCacheTest.updateCache(cache, brokers ++ topicRecords ++
partitionStates)
// Validate errorUnavailableEndpoints = false
val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName,
errorUnavailableEndpoints = false)
@@ -447,20 +358,27 @@ class MetadataCacheTest {
@MethodSource(Array("cacheProvider"))
def getTopicMetadataIsrNotAvailable(cache: MetadataCache): Unit = {
val topic = "topic"
+ val topicId = Uuid.randomUuid()
- val zkVersion = 3
- val controllerId = 2
- val controllerEpoch = 1
val securityProtocol = SecurityProtocol.PLAINTEXT
val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
- val brokers = Seq(new UpdateMetadataBroker()
- .setId(0)
- .setRack("rack1")
- .setEndpoints(Seq(new UpdateMetadataEndpoint()
+
+ val endpoints = new BrokerEndpointCollection(Seq(new BrokerEndpoint()
.setHost("foo")
.setPort(9092)
.setSecurityProtocol(securityProtocol.id)
- .setListener(listenerName.value)).asJava))
+ .setName(listenerName.value)
+ ).iterator.asJava)
+
+ val brokers = Seq(new RegisterBrokerRecord()
+ .setBrokerId(0)
+ .setRack("rack1")
+ .setFenced(false)
+ .setEndPoints(endpoints))
+
+ val topicRecords = Seq(new TopicRecord()
+ .setName(topic)
+ .setTopicId(topicId))
// replica 1 is not available
val leader = 0
@@ -468,20 +386,14 @@ class MetadataCacheTest {
val replicas = asList[Integer](0)
val isr = asList[Integer](0, 1)
- val partitionStates = Seq(new UpdateMetadataPartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(controllerEpoch)
+ val partitionStates = Seq(new PartitionRecord()
+ .setTopicId(topicId)
+ .setPartitionId(0)
.setLeader(leader)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
- .setZkVersion(zkVersion)
.setReplicas(replicas))
-
- val version = ApiKeys.UPDATE_METADATA.latestVersion
- val updateMetadataRequest = new UpdateMetadataRequest.Builder(version,
controllerId, controllerEpoch, brokerEpoch,
- partitionStates.asJava, brokers.asJava,
util.Collections.emptyMap()).build()
- MetadataCacheTest.updateCache(cache, updateMetadataRequest)
+ MetadataCacheTest.updateCache(cache, brokers ++ topicRecords ++
partitionStates)
// Validate errorUnavailableEndpoints = false
val topicMetadatas = cache.getTopicMetadata(Set(topic), listenerName,
errorUnavailableEndpoints = false)
@@ -520,33 +432,33 @@ class MetadataCacheTest {
@MethodSource(Array("cacheProvider"))
def getTopicMetadataWithNonSupportedSecurityProtocol(cache: MetadataCache):
Unit = {
val topic = "topic"
+ val topicId = Uuid.randomUuid()
val securityProtocol = SecurityProtocol.PLAINTEXT
- val brokers = Seq(new UpdateMetadataBroker()
- .setId(0)
+
+ val brokers = new RegisterBrokerRecord()
+ .setBrokerId(0)
.setRack("")
- .setEndpoints(Seq(new UpdateMetadataEndpoint()
+ .setEndPoints(new BrokerEndpointCollection(Seq(new BrokerEndpoint()
.setHost("foo")
.setPort(9092)
.setSecurityProtocol(securityProtocol.id)
-
.setListener(ListenerName.forSecurityProtocol(securityProtocol).value)).asJava))
- val controllerEpoch = 1
+ .setName(ListenerName.forSecurityProtocol(securityProtocol).value)
+ ).iterator.asJava))
+
+ val topicRecord = new TopicRecord().setName(topic).setTopicId(topicId)
+
val leader = 0
val leaderEpoch = 0
val replicas = asList[Integer](0)
val isr = asList[Integer](0, 1)
- val partitionStates = Seq(new UpdateMetadataPartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(controllerEpoch)
+ val partitionStates = Seq(new PartitionRecord()
+ .setTopicId(topicId)
+ .setPartitionId(0)
.setLeader(leader)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
- .setZkVersion(3)
.setReplicas(replicas))
- val version = ApiKeys.UPDATE_METADATA.latestVersion
- val updateMetadataRequest = new UpdateMetadataRequest.Builder(version, 2,
controllerEpoch, brokerEpoch, partitionStates.asJava,
- brokers.asJava, util.Collections.emptyMap()).build()
- MetadataCacheTest.updateCache(cache, updateMetadataRequest)
+ MetadataCacheTest.updateCache(cache, Seq(brokers, topicRecord) ++
partitionStates)
val topicMetadata = cache.getTopicMetadata(Set(topic),
ListenerName.forSecurityProtocol(SecurityProtocol.SSL))
assertEquals(1, topicMetadata.size)
@@ -558,37 +470,37 @@ class MetadataCacheTest {
@MethodSource(Array("cacheProvider"))
def getAliveBrokersShouldNotBeMutatedByUpdateCache(cache: MetadataCache):
Unit = {
val topic = "topic"
+ val topicId = Uuid.randomUuid()
+ val topicRecords = Seq(new
TopicRecord().setName(topic).setTopicId(topicId))
def updateCache(brokerIds: Seq[Int]): Unit = {
val brokers = brokerIds.map { brokerId =>
val securityProtocol = SecurityProtocol.PLAINTEXT
- new UpdateMetadataBroker()
- .setId(brokerId)
+ new RegisterBrokerRecord()
+ .setBrokerId(brokerId)
.setRack("")
- .setEndpoints(Seq(new UpdateMetadataEndpoint()
+ .setFenced(false)
+ .setBrokerEpoch(brokerEpoch)
+ .setEndPoints(new BrokerEndpointCollection(Seq(new BrokerEndpoint()
.setHost("foo")
.setPort(9092)
.setSecurityProtocol(securityProtocol.id)
-
.setListener(ListenerName.forSecurityProtocol(securityProtocol).value)).asJava)
+ .setName(ListenerName.forSecurityProtocol(securityProtocol).value)
+ ).iterator.asJava))
}
- val controllerEpoch = 1
val leader = 0
val leaderEpoch = 0
val replicas = asList[Integer](0)
val isr = asList[Integer](0, 1)
- val partitionStates = Seq(new UpdateMetadataPartitionState()
- .setTopicName(topic)
- .setPartitionIndex(0)
- .setControllerEpoch(controllerEpoch)
+ val partitionStates = Seq(new PartitionRecord()
+ .setTopicId(topicId)
+ .setPartitionId(0)
.setLeader(leader)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
- .setZkVersion(3)
.setReplicas(replicas))
- val version = ApiKeys.UPDATE_METADATA.latestVersion
- val updateMetadataRequest = new UpdateMetadataRequest.Builder(version,
2, controllerEpoch, brokerEpoch, partitionStates.asJava,
- brokers.asJava, util.Collections.emptyMap()).build()
- MetadataCacheTest.updateCache(cache, updateMetadataRequest)
+
+ MetadataCacheTest.updateCache(cache, brokers ++ topicRecords ++
partitionStates)
}
val initialBrokerIds = (0 to 2)
@@ -704,8 +616,6 @@ class MetadataCacheTest {
def testGetTopicMetadataForDescribeTopicPartitionsResponse(): Unit = {
val metadataCache = MetadataCache.kRaftMetadataCache(0, () =>
KRaftVersion.KRAFT_VERSION_0)
- val controllerId = 2
- val controllerEpoch = 1
val securityProtocol = SecurityProtocol.PLAINTEXT
val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
val topic0 = "test0"
@@ -761,23 +671,32 @@ class MetadataCacheTest {
.setPartitionEpoch(11)
.setLeaderRecoveryState(LeaderRecoveryState.RECOVERED.value()),
)
-
+ new BrokerEndpointCollection()
val brokers = Seq(
- new UpdateMetadataBroker().setId(0).setEndpoints(Seq(new
UpdateMetadataEndpoint().setHost("foo0").setPort(9092).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value)).asJava),
- new UpdateMetadataBroker().setId(1).setEndpoints(Seq(new
UpdateMetadataEndpoint().setHost("foo1").setPort(9093).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value)).asJava),
- new UpdateMetadataBroker().setId(2).setEndpoints(Seq(new
UpdateMetadataEndpoint().setHost("foo2").setPort(9094).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value)).asJava),
- new UpdateMetadataBroker().setId(3).setEndpoints(Seq(new
UpdateMetadataEndpoint().setHost("foo3").setPort(9095).setSecurityProtocol(securityProtocol.id).setListener(listenerName.value)).asJava),
+ new
RegisterBrokerRecord().setBrokerEpoch(brokerEpoch).setFenced(false).setBrokerId(0)
+ .setEndPoints(new BrokerEndpointCollection(Seq(new
BrokerEndpoint().setHost("foo0").setPort(9092)
+ .setSecurityProtocol(securityProtocol.id).setName(listenerName.value)
+ ).iterator.asJava)),
+ new
RegisterBrokerRecord().setBrokerEpoch(brokerEpoch).setFenced(false).setBrokerId(1)
+ .setEndPoints(new BrokerEndpointCollection(Seq(new
BrokerEndpoint().setHost("foo1").setPort(9093)
+ .setSecurityProtocol(securityProtocol.id).setName(listenerName.value)
+ ).iterator.asJava)),
+ new
RegisterBrokerRecord().setBrokerEpoch(brokerEpoch).setFenced(false).setBrokerId(2)
+ .setEndPoints(new BrokerEndpointCollection(Seq(new
BrokerEndpoint().setHost("foo2").setPort(9094)
+ .setSecurityProtocol(securityProtocol.id).setName(listenerName.value)
+ ).iterator.asJava)),
+ new
RegisterBrokerRecord().setBrokerEpoch(brokerEpoch).setFenced(false).setBrokerId(3)
+ .setEndPoints(new BrokerEndpointCollection(Seq(new
BrokerEndpoint().setHost("foo3").setPort(9095)
+ .setSecurityProtocol(securityProtocol.id).setName(listenerName.value)
+ ).iterator.asJava)),
)
- val version = ApiKeys.UPDATE_METADATA.latestVersion
- val updateMetadataRequest = new UpdateMetadataRequest.Builder(version,
controllerId, controllerEpoch, brokerEpoch,
- List[UpdateMetadataPartitionState]().asJava, brokers.asJava,
topicIds).build()
var recordSeq = Seq[ApiMessage](
new TopicRecord().setName(topic0).setTopicId(topicIds.get(topic0)),
new TopicRecord().setName(topic1).setTopicId(topicIds.get(topic1))
)
recordSeq = recordSeq ++ partitionMap.values.toSeq
- MetadataCacheTest.updateCache(metadataCache, updateMetadataRequest,
recordSeq)
+ MetadataCacheTest.updateCache(metadataCache, brokers ++ recordSeq)
def checkTopicMetadata(topic: String, partitionIds: Set[Int], partitions:
mutable.Buffer[DescribeTopicPartitionsResponsePartition]): Unit = {
partitions.foreach(partition => {
@@ -867,42 +786,38 @@ class MetadataCacheTest {
@MethodSource(Array("cacheProvider"))
def testGetPartitionInfo(cache: MetadataCache): Unit = {
val topic = "topic"
+ val topicId = Uuid.randomUuid()
val partitionIndex = 0
- val controllerEpoch = 1
val leader = 0
val leaderEpoch = 0
val isr = asList[Integer](2, 3, 0)
- val zkVersion = 3
val replicas = asList[Integer](2, 3, 0, 1, 4)
- val offlineReplicas = asList[Integer](0)
- val partitionStates = Seq(new UpdateMetadataPartitionState()
- .setTopicName(topic)
- .setPartitionIndex(partitionIndex)
- .setControllerEpoch(controllerEpoch)
+ val topicRecords = Seq(new
TopicRecord().setName(topic).setTopicId(topicId))
+
+ val partitionStates = Seq(new PartitionRecord()
+ .setTopicId(topicId)
+ .setPartitionId(partitionIndex)
.setLeader(leader)
.setLeaderEpoch(leaderEpoch)
.setIsr(isr)
- .setZkVersion(zkVersion)
- .setReplicas(replicas)
- .setOfflineReplicas(offlineReplicas))
-
- val version = ApiKeys.UPDATE_METADATA.latestVersion
+ .setReplicas(replicas))
- val controllerId = 2
val securityProtocol = SecurityProtocol.PLAINTEXT
val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
- val brokers = Seq(new UpdateMetadataBroker()
- .setId(0)
+ val brokers = Seq(new RegisterBrokerRecord()
+ .setBrokerId(0)
+ .setBrokerEpoch(brokerEpoch)
.setRack("rack1")
- .setEndpoints(Seq(new UpdateMetadataEndpoint()
- .setHost("foo")
- .setPort(9092)
- .setSecurityProtocol(securityProtocol.id)
- .setListener(listenerName.value)).asJava))
- val updateMetadataRequest = new UpdateMetadataRequest.Builder(version,
controllerId, controllerEpoch, brokerEpoch,
- partitionStates.asJava, brokers.asJava, util.Collections.emptyMap(),
false, AbstractControlRequest.Type.UNKNOWN).build()
- MetadataCacheTest.updateCache(cache, updateMetadataRequest)
+ .setEndPoints(new BrokerEndpointCollection(
+ Seq(new BrokerEndpoint()
+ .setHost("foo")
+ .setPort(9092)
+ .setSecurityProtocol(securityProtocol.id)
+ .setName(listenerName.value)
+ ).iterator.asJava)))
+
+ MetadataCacheTest.updateCache(cache, brokers ++ topicRecords ++
partitionStates)
val partitionState = cache.getPartitionInfo(topic, partitionIndex).get
assertEquals(topic, partitionState.topicName())
@@ -911,60 +826,9 @@ class MetadataCacheTest {
assertEquals(leader, partitionState.leader())
assertEquals(leaderEpoch, partitionState.leaderEpoch())
assertEquals(isr, partitionState.isr())
- assertEquals(zkVersion, partitionState.zkVersion())
assertEquals(replicas, partitionState.replicas())
}
- def setupInitialAndFullMetadata(): (
- Map[String, Uuid], mutable.AnyRefMap[String,
mutable.LongMap[UpdateMetadataPartitionState]],
- Map[String, Uuid], Seq[UpdateMetadataPartitionState]
- ) = {
- def addTopic(
- name: String,
- partitions: Int,
- topicStates: mutable.AnyRefMap[String,
mutable.LongMap[UpdateMetadataPartitionState]]
- ): Unit = {
- val partitionMap = mutable.LongMap.empty[UpdateMetadataPartitionState]
- for (i <- 0 until partitions) {
- partitionMap.put(i, new UpdateMetadataPartitionState()
- .setTopicName(name)
- .setPartitionIndex(i)
- .setControllerEpoch(2)
- .setLeader(0)
- .setLeaderEpoch(10)
- .setIsr(asList(0, 1))
- .setZkVersion(10)
- .setReplicas(asList(0, 1, 2)))
- }
- topicStates.put(name, partitionMap)
- }
-
- val initialTopicStates = mutable.AnyRefMap.empty[String,
mutable.LongMap[UpdateMetadataPartitionState]]
- addTopic("test-topic-1", 3, initialTopicStates)
- addTopic("test-topic-2", 3, initialTopicStates)
-
- val initialTopicIds = Map(
- "test-topic-1" -> Uuid.fromString("IQ2F1tpCRoSbjfq4zBJwpg"),
- "test-topic-2" -> Uuid.fromString("4N8_J-q7SdWHPFkos275pQ")
- )
-
- val newTopicIds = Map(
- "different-topic" -> Uuid.fromString("DraFMNOJQOC5maTb1vtZ8Q")
- )
-
- val newPartitionStates = Seq(new UpdateMetadataPartitionState()
- .setTopicName("different-topic")
- .setPartitionIndex(0)
- .setControllerEpoch(42)
- .setLeader(0)
- .setLeaderEpoch(10)
- .setIsr(asList[Integer](0, 1, 2))
- .setZkVersion(1)
- .setReplicas(asList[Integer](0, 1, 2)))
-
- (initialTopicIds, initialTopicStates, newTopicIds, newPartitionStates)
- }
-
@Test
def testGetOfflineReplicasConsidersDirAssignment(): Unit = {
case class Broker(id: Int, dirs: util.List[Uuid])
@@ -1012,125 +876,88 @@ class MetadataCacheTest {
val fooTopicName: String = "foo"
val fooTopicId: Uuid = Uuid.fromString("HDceyWK0Ry-j3XLR8DvvGA")
- val oldFooPart0 = new UpdateMetadataPartitionState().
- setTopicName(fooTopicName).
- setPartitionIndex(0).
- setControllerEpoch(oldRequestControllerEpoch).
+ val oldFooPart0 = new PartitionRecord().
+ setTopicId(fooTopicId).
+ setPartitionId(0).
setLeader(4).
setIsr(java.util.Arrays.asList(4, 5, 6)).
- setZkVersion(789).
- setReplicas(java.util.Arrays.asList(4, 5, 6)).
- setOfflineReplicas(java.util.Collections.emptyList())
- val newFooPart0 = new UpdateMetadataPartitionState().
- setTopicName(fooTopicName).
- setPartitionIndex(0).
- setControllerEpoch(newRequestControllerEpoch).
+ setReplicas(java.util.Arrays.asList(4, 5, 6))
+ val newFooPart0 = new PartitionRecord().
+ setTopicId(fooTopicId).
+ setPartitionId(0).
setLeader(5).
setIsr(java.util.Arrays.asList(4, 5, 6)).
- setZkVersion(789).
- setReplicas(java.util.Arrays.asList(4, 5, 6)).
- setOfflineReplicas(java.util.Collections.emptyList())
- val oldFooPart1 = new UpdateMetadataPartitionState().
- setTopicName(fooTopicName).
- setPartitionIndex(1).
- setControllerEpoch(oldRequestControllerEpoch).
+ setReplicas(java.util.Arrays.asList(4, 5, 6))
+ val oldFooPart1 = new PartitionRecord().
+ setTopicId(fooTopicId).
+ setPartitionId(1).
setLeader(5).
setIsr(java.util.Arrays.asList(4, 5, 6)).
- setZkVersion(789).
- setReplicas(java.util.Arrays.asList(4, 5, 6)).
- setOfflineReplicas(java.util.Collections.emptyList())
- val newFooPart1 = new UpdateMetadataPartitionState().
- setTopicName(fooTopicName).
- setPartitionIndex(1).
- setControllerEpoch(newRequestControllerEpoch).
+ setReplicas(java.util.Arrays.asList(4, 5, 6))
+ val newFooPart1 = new PartitionRecord().
+ setTopicId(fooTopicId).
+ setPartitionId(1).
setLeader(5).
setIsr(java.util.Arrays.asList(4, 5)).
- setZkVersion(789).
- setReplicas(java.util.Arrays.asList(4, 5, 6)).
- setOfflineReplicas(java.util.Collections.emptyList())
-
+ setReplicas(java.util.Arrays.asList(4, 5, 6))
val barTopicName: String = "bar"
val barTopicId: Uuid = Uuid.fromString("97FBD1g4QyyNNZNY94bkRA")
val recreatedBarTopicId: Uuid = Uuid.fromString("lZokxuaPRty7c5P4dNdTYA")
- val oldBarPart0 = new UpdateMetadataPartitionState().
- setTopicName(barTopicName).
- setPartitionIndex(0).
- setControllerEpoch(oldRequestControllerEpoch).
+ val oldBarPart0 = new PartitionRecord().
+ setTopicId(fooTopicId).
+ setPartitionId(0).
setLeader(7).
setIsr(java.util.Arrays.asList(7, 8)).
- setZkVersion(789).
- setReplicas(java.util.Arrays.asList(7, 8, 9)).
- setOfflineReplicas(java.util.Collections.emptyList())
- val newBarPart0 = new UpdateMetadataPartitionState().
- setTopicName(barTopicName).
- setPartitionIndex(0).
- setControllerEpoch(newRequestControllerEpoch).
+ setReplicas(java.util.Arrays.asList(7, 8, 9))
+ val newBarPart0 = new PartitionRecord().
+ setTopicId(barTopicId).
+ setPartitionId(0).
setLeader(7).
setIsr(java.util.Arrays.asList(7, 8)).
- setZkVersion(789).
- setReplicas(java.util.Arrays.asList(7, 8, 9)).
- setOfflineReplicas(java.util.Collections.emptyList())
- val deletedBarPart0 = new UpdateMetadataPartitionState().
- setTopicName(barTopicName).
- setPartitionIndex(0).
- setControllerEpoch(newRequestControllerEpoch).
+ setReplicas(java.util.Arrays.asList(7, 8, 9))
+ val deletedBarPart0 = new PartitionRecord().
+ setTopicId(barTopicId).
+ setPartitionId(0).
setLeader(-2).
setIsr(java.util.Arrays.asList(7, 8)).
- setZkVersion(0).
- setReplicas(java.util.Arrays.asList(7, 8, 9)).
- setOfflineReplicas(java.util.Collections.emptyList())
- val oldBarPart1 = new UpdateMetadataPartitionState().
- setTopicName(barTopicName).
- setPartitionIndex(1).
- setControllerEpoch(oldRequestControllerEpoch).
+ setReplicas(java.util.Arrays.asList(7, 8, 9))
+ val oldBarPart1 = new PartitionRecord().
+ setTopicId(barTopicId).
+ setPartitionId(1).
setLeader(5).
setIsr(java.util.Arrays.asList(4, 5, 6)).
- setZkVersion(789).
- setReplicas(java.util.Arrays.asList(4, 5, 6)).
- setOfflineReplicas(java.util.Collections.emptyList())
- val newBarPart1 = new UpdateMetadataPartitionState().
- setTopicName(barTopicName).
- setPartitionIndex(1).
- setControllerEpoch(newRequestControllerEpoch).
+ setReplicas(java.util.Arrays.asList(4, 5, 6))
+ val newBarPart1 = new PartitionRecord().
+ setTopicId(barTopicId).
+ setPartitionId(1).
setLeader(5).
setIsr(java.util.Arrays.asList(4, 5, 6)).
- setZkVersion(789).
- setReplicas(java.util.Arrays.asList(4, 5, 6)).
- setOfflineReplicas(java.util.Collections.emptyList())
- val deletedBarPart1 = new UpdateMetadataPartitionState().
- setTopicName(barTopicName).
- setPartitionIndex(1).
- setControllerEpoch(newRequestControllerEpoch).
+ setReplicas(java.util.Arrays.asList(4, 5, 6))
+ val deletedBarPart1 = new PartitionRecord().
+ setTopicId(barTopicId).
+ setPartitionId(1).
setLeader(-2).
setIsr(java.util.Arrays.asList(4, 5, 6)).
- setZkVersion(0).
- setReplicas(java.util.Arrays.asList(4, 5, 6)).
- setOfflineReplicas(java.util.Collections.emptyList())
- val oldBarPart2 = new UpdateMetadataPartitionState().
- setTopicName(barTopicName).
- setPartitionIndex(2).
- setControllerEpoch(oldRequestControllerEpoch).
+ setReplicas(java.util.Arrays.asList(4, 5, 6))
+
+ val oldBarPart2 = new PartitionRecord().
+ setTopicId(barTopicId).
+ setPartitionId(2).
setLeader(9).
setIsr(java.util.Arrays.asList(7, 8, 9)).
- setZkVersion(789).
- setReplicas(java.util.Arrays.asList(7, 8, 9)).
- setOfflineReplicas(java.util.Collections.emptyList())
- val newBarPart2 = new UpdateMetadataPartitionState().
- setTopicName(barTopicName).
- setPartitionIndex(2).
- setControllerEpoch(newRequestControllerEpoch).
+ setReplicas(java.util.Arrays.asList(7, 8, 9))
+
+ val newBarPart2 = new PartitionRecord().
+ setTopicId(barTopicId).
+ setPartitionId(2).
setLeader(8).
setIsr(java.util.Arrays.asList(7, 8)).
- setZkVersion(789).
- setReplicas(java.util.Arrays.asList(7, 8, 9)).
- setOfflineReplicas(java.util.Collections.emptyList())
- val deletedBarPart2 = new UpdateMetadataPartitionState().
- setTopicName(barTopicName).
- setPartitionIndex(2).
- setControllerEpoch(newRequestControllerEpoch).
+ setReplicas(java.util.Arrays.asList(7, 8, 9))
+
+ val deletedBarPart2 = new PartitionRecord().
+ setTopicId(barTopicId).
+ setPartitionId(2).
setLeader(-2).
setIsr(java.util.Arrays.asList(7, 8, 9)).
- setZkVersion(0).
- setReplicas(java.util.Arrays.asList(7, 8, 9)).
- setOfflineReplicas(java.util.Collections.emptyList())
+ setReplicas(java.util.Arrays.asList(7, 8, 9))
}