mumrah commented on code in PR #15293:
URL: https://github.com/apache/kafka/pull/15293#discussion_r1473671017


##########
core/src/main/scala/kafka/server/metadata/ZkMetadataCache.scala:
##########
@@ -65,48 +65,84 @@ case class MetadataSnapshot(partitionStates: 
mutable.AnyRefMap[String, mutable.L
 }
 
 object ZkMetadataCache {
-  /**
-   * Create topic deletions (leader=-2) for topics that are missing in a FULL 
UpdateMetadataRequest coming from a
-   * KRaft controller during a ZK migration. This will modify the 
UpdateMetadataRequest object passed into this method.
-   */
-  def maybeInjectDeletedPartitionsFromFullMetadataRequest(
+  def transformKRaftControllerFullMetadataRequest(
     currentMetadata: MetadataSnapshot,
     requestControllerEpoch: Int,
     requestTopicStates: util.List[UpdateMetadataTopicState],
-  ): Seq[Uuid] = {
-    val prevTopicIds = currentMetadata.topicIds.values.toSet
-    val requestTopics = requestTopicStates.asScala.map { topicState =>
-      topicState.topicName() -> topicState.topicId()
-    }.toMap
-
-    val deleteTopics = prevTopicIds -- requestTopics.values.toSet
-    if (deleteTopics.isEmpty) {
-      return Seq.empty
+  ): (util.List[UpdateMetadataTopicState], util.List[String]) = {
+    val topicIdToNewState = new util.HashMap[Uuid, UpdateMetadataTopicState]()
+    requestTopicStates.forEach(state => topicIdToNewState.put(state.topicId(), 
state))
+    val logMessages = new util.ArrayList[String]
+    val newRequestTopicStates = new util.ArrayList[UpdateMetadataTopicState]()
+    currentMetadata.topicNames.forKeyValue((id, name) => {
+      Option(topicIdToNewState.get(id)) match {
+        case None =>
+          currentMetadata.partitionStates.get(name) match {
+            case None => logMessages.add(s"Error: topic ${name} appeared in 
currentMetadata.topicNames, " +
+              "but not in currentMetadata.partitionStates.")
+            case Some(oldPartitionStates) =>
+              logMessages.add(s"Removing topic ${name} with ID ${id} from the 
metadata cache since " +
+                "the full UMR did not include it.")
+              newRequestTopicStates.add(createDeletionEntries(name,
+                id,
+                oldPartitionStates.values,
+                requestControllerEpoch))
+          }
+        case Some(newTopicState) =>
+          val indexToState = new util.HashMap[Integer, 
UpdateMetadataPartitionState]
+          newTopicState.partitionStates().forEach(part => 
indexToState.put(part.partitionIndex, part))
+          currentMetadata.partitionStates.get(name) match {
+            case None => logMessages.add(s"Error: topic ${name} appeared in 
currentMetadata.topicNames, " +
+              "but not in currentMetadata.partitionStates.")
+            case Some(oldPartitionStates) =>
+              oldPartitionStates.foreach(state => 
indexToState.remove(state._1.toInt))
+              if (!indexToState.isEmpty) {
+                logMessages.add(s"Removing ${indexToState.size()} partition(s) 
from topic ${name} with " +
+                  s"ID ${id} from the metadata cache since the full UMR did 
not include them.")
+                newRequestTopicStates.add(createDeletionEntries(name,
+                  id,
+                  indexToState.values().asScala,
+                  requestControllerEpoch))
+              }
+          }
+      }
+    })
+    if (newRequestTopicStates.isEmpty) {
+      // If the output is the same as the input, optimize by just returning 
the input.
+      (requestTopicStates, logMessages)
+    } else {
+      // If the output has some new entries, they should all appear at the 
beginning. This will
+      // ensure that the old stuff is cleared out before the new stuff is 
added. We will need a
+      // new list for this, of course.
+      newRequestTopicStates.addAll(requestTopicStates)
+      (newRequestTopicStates, logMessages)
     }
+  }
 
-    deleteTopics.foreach { deletedTopicId =>
-      val topicName = currentMetadata.topicNames(deletedTopicId)

Review Comment:
   I believe this was the source of the NoSuchElementException reported in the 
JIRA, is that right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to