mumrah commented on code in PR #13767:
URL: https://github.com/apache/kafka/pull/13767#discussion_r1207482435


##########
core/src/main/scala/kafka/zk/ZkMigrationClient.scala:
##########
@@ -145,44 +144,47 @@ class ZkMigrationClient(
     topicClient.iterateTopics(
       util.EnumSet.allOf(classOf[TopicVisitorInterest]),
       new TopicVisitor() {
-      override def visitTopic(topicName: String, topicId: Uuid, assignments: 
util.Map[Integer, util.List[Integer]]): Unit = {
-        if (!topicBatch.isEmpty) {
-          recordConsumer.accept(topicBatch)
-          topicBatch = new util.ArrayList[ApiMessageAndVersion]()
-        }
+        override def visitTopic(topicName: String, topicId: Uuid, assignments: 
util.Map[Integer, util.List[Integer]]): Unit = {
+          if (!topicBatch.isEmpty) {
+            recordConsumer.accept(topicBatch)
+            topicBatch = new util.ArrayList[ApiMessageAndVersion]()
+          }
 
-        topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
-          .setName(topicName)
-          .setTopicId(topicId), 0.toShort))
-      }
+          topicBatch.add(new ApiMessageAndVersion(new TopicRecord()
+            .setName(topicName)
+            .setTopicId(topicId), 0.toShort))
 
-      override def visitPartition(topicIdPartition: TopicIdPartition, 
partitionRegistration: PartitionRegistration): Unit = {
-        val record = new PartitionRecord()
-          .setTopicId(topicIdPartition.topicId())
-          .setPartitionId(topicIdPartition.partition())
-          
.setReplicas(partitionRegistration.replicas.map(Integer.valueOf).toList.asJava)
-          
.setAddingReplicas(partitionRegistration.addingReplicas.map(Integer.valueOf).toList.asJava)
-          
.setRemovingReplicas(partitionRegistration.removingReplicas.map(Integer.valueOf).toList.asJava)
-          .setIsr(partitionRegistration.isr.map(Integer.valueOf).toList.asJava)
-          .setLeader(partitionRegistration.leader)
-          .setLeaderEpoch(partitionRegistration.leaderEpoch)
-          .setPartitionEpoch(partitionRegistration.partitionEpoch)
-          
.setLeaderRecoveryState(partitionRegistration.leaderRecoveryState.value())
-        partitionRegistration.replicas.foreach(brokerIdConsumer.accept(_))
-        
partitionRegistration.addingReplicas.foreach(brokerIdConsumer.accept(_))
-        topicBatch.add(new ApiMessageAndVersion(record, 0.toShort))
-      }
+          // This breaks the abstraction a bit, but the topic configs belong 
in the topic batch

Review Comment:
   I was considering the fact that we don't atomically apply the migration 
records during the migration. I think it's possible for the controller or 
broker to publish the migration metadata before it's all committed. In this 
case, I think it's probably safer to include the config records with the topic 
batch.
   
   This won't be an issue once we implement KIP-868.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to