mumrah commented on code in PR #13767: URL: https://github.com/apache/kafka/pull/13767#discussion_r1207482435
########## core/src/main/scala/kafka/zk/ZkMigrationClient.scala: ########## @@ -145,44 +144,47 @@ class ZkMigrationClient( topicClient.iterateTopics( util.EnumSet.allOf(classOf[TopicVisitorInterest]), new TopicVisitor() { - override def visitTopic(topicName: String, topicId: Uuid, assignments: util.Map[Integer, util.List[Integer]]): Unit = { - if (!topicBatch.isEmpty) { - recordConsumer.accept(topicBatch) - topicBatch = new util.ArrayList[ApiMessageAndVersion]() - } + override def visitTopic(topicName: String, topicId: Uuid, assignments: util.Map[Integer, util.List[Integer]]): Unit = { + if (!topicBatch.isEmpty) { + recordConsumer.accept(topicBatch) + topicBatch = new util.ArrayList[ApiMessageAndVersion]() + } - topicBatch.add(new ApiMessageAndVersion(new TopicRecord() - .setName(topicName) - .setTopicId(topicId), 0.toShort)) - } + topicBatch.add(new ApiMessageAndVersion(new TopicRecord() + .setName(topicName) + .setTopicId(topicId), 0.toShort)) - override def visitPartition(topicIdPartition: TopicIdPartition, partitionRegistration: PartitionRegistration): Unit = { - val record = new PartitionRecord() - .setTopicId(topicIdPartition.topicId()) - .setPartitionId(topicIdPartition.partition()) - .setReplicas(partitionRegistration.replicas.map(Integer.valueOf).toList.asJava) - .setAddingReplicas(partitionRegistration.addingReplicas.map(Integer.valueOf).toList.asJava) - .setRemovingReplicas(partitionRegistration.removingReplicas.map(Integer.valueOf).toList.asJava) - .setIsr(partitionRegistration.isr.map(Integer.valueOf).toList.asJava) - .setLeader(partitionRegistration.leader) - .setLeaderEpoch(partitionRegistration.leaderEpoch) - .setPartitionEpoch(partitionRegistration.partitionEpoch) - .setLeaderRecoveryState(partitionRegistration.leaderRecoveryState.value()) - partitionRegistration.replicas.foreach(brokerIdConsumer.accept(_)) - partitionRegistration.addingReplicas.foreach(brokerIdConsumer.accept(_)) - topicBatch.add(new ApiMessageAndVersion(record, 0.toShort)) - } + // This breaks the abstraction a bit, but the topic configs belong in the topic batch Review Comment: I was considering the fact that we don't atomically apply the migration records during the migration. I think it's possible for the controller or broker to publish the migration metadata before it's all committed. In this case, I think it's probably safer to include the config records with the topic batch. This won't be an issue once we implement KIP-868. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org