mumrah commented on code in PR #13767:
URL: https://github.com/apache/kafka/pull/13767#discussion_r1207482435
##
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##
@@ -145,44 +144,47 @@ class ZkMigrationClient(
topicClient.iterateTopics(
util.EnumSet.allOf(classOf[TopicVisitorInterest]),
new TopicVisitor() {
- override def visitTopic(topicName: String, topicId: Uuid, assignments:
util.Map[Integer, util.List[Integer]]): Unit = {
-if (!topicBatch.isEmpty) {
- recordConsumer.accept(topicBatch)
- topicBatch = new util.ArrayList[ApiMessageAndVersion]()
-}
+override def visitTopic(topicName: String, topicId: Uuid, assignments:
util.Map[Integer, util.List[Integer]]): Unit = {
+ if (!topicBatch.isEmpty) {
+recordConsumer.accept(topicBatch)
+topicBatch = new util.ArrayList[ApiMessageAndVersion]()
+ }
-topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
- .setName(topicName)
- .setTopicId(topicId), 0.toShort))
- }
+ topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
+.setName(topicName)
+.setTopicId(topicId), 0.toShort))
- override def visitPartition(topicIdPartition: TopicIdPartition,
partitionRegistration: PartitionRegistration): Unit = {
-val record = new PartitionRecord()
- .setTopicId(topicIdPartition.topicId())
- .setPartitionId(topicIdPartition.partition())
-
.setReplicas(partitionRegistration.replicas.map(Integer.valueOf).toList.asJava)
-
.setAddingReplicas(partitionRegistration.addingReplicas.map(Integer.valueOf).toList.asJava)
-
.setRemovingReplicas(partitionRegistration.removingReplicas.map(Integer.valueOf).toList.asJava)
- .setIsr(partitionRegistration.isr.map(Integer.valueOf).toList.asJava)
- .setLeader(partitionRegistration.leader)
- .setLeaderEpoch(partitionRegistration.leaderEpoch)
- .setPartitionEpoch(partitionRegistration.partitionEpoch)
-
.setLeaderRecoveryState(partitionRegistration.leaderRecoveryState.value())
-partitionRegistration.replicas.foreach(brokerIdConsumer.accept(_))
-
partitionRegistration.addingReplicas.foreach(brokerIdConsumer.accept(_))
-topicBatch.add(new ApiMessageAndVersion(record, 0.toShort))
- }
+ // This breaks the abstraction a bit, but the topic configs belong
in the topic batch
Review Comment:
I was considering the fact that we don't atomically apply the migration
records during the migration. I think it's possible for the controller or
broker to publish the migration metadata before it's all committed. In this
case, I think it's probably safer to include the config records with the topic
batch.
This won't be an issue once we implement KIP-868.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org