chia7712 commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r586208095



##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -124,8 +125,30 @@ class MetadataPartitionsBuilder(val brokerId: Int,
 
   def removeTopicById(id: Uuid): Iterable[MetadataPartition] = {
     Option(newIdMap.remove(id)) match {
-      case None => throw new RuntimeException(s"Unable to locate topic with ID 
$id")
-      case Some(name) => newNameMap.remove(name).values().asScala
+      case None =>
+        throw new RuntimeException(s"Unable to locate topic with ID $id")
+
+      case Some(name) =>
+        newReverseIdMap.remove(name)
+
+        val prevPartitionMap = newNameMap.remove(name)

Review comment:
       Could you add comment to explain why prevPartitionMap is NOT null? 
`newNameMap` is updated by `PartitionRecord` that it is different to 
`newReverseIdMap` and `newIdMap`.

##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -172,23 +203,36 @@ class MetadataPartitionsBuilder(val brokerId: Int,
     if (partition.isReplicaFor(brokerId)) {
       _localChanged.add(partition)
     } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) {
-      _localRemoved.add(prevPartition)
+      maybeAddToLocalRemoved(prevPartition)
     }
     newNameMap.put(partition.topicName, newPartitionMap)
   }
 
+  private def maybeAddToLocalRemoved(partition: MetadataPartition): Unit= {
+    val currentTopicId = newReverseIdMap.get(partition.topicName)
+    val prevImageContainsTopic = if (currentTopicId != null) {
+      prevPartitions.topicIdToName(currentTopicId).isDefined
+    } else {
+      prevPartitions.allTopicNames().contains(partition.topicName)
+    }
+
+    if (prevImageContainsTopic) {
+      _localRemoved.add(partition)
+    }
+  }
+
   def remove(topicName: String, partitionId: Int): Unit = {
     val prevPartitionMap = newNameMap.get(topicName)
     if (prevPartitionMap != null) {
       if (changed.contains(prevPartitionMap)) {
         val prevPartition = prevPartitionMap.remove(partitionId)
         if (prevPartition.isReplicaFor(brokerId)) {

Review comment:
       Does it need null check? The following code calls 
`Option(prevPartitionMap.get(partitionId))` so it seems to me the null check is 
necessary.

##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -172,23 +203,36 @@ class MetadataPartitionsBuilder(val brokerId: Int,
     if (partition.isReplicaFor(brokerId)) {
       _localChanged.add(partition)
     } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) {

Review comment:
       Is it possible that both previous partition (`prevPartition`) and new 
partition are replica for this broker?

##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -243,6 +288,12 @@ case class MetadataPartitions(private val nameMap: 
util.Map[String, util.Map[Int
     copy
   }
 
+  def copyReverseIdMap(): util.Map[String, Uuid] = {

Review comment:
       How about using `new util.HashMap[String, Uuid](reverseIdMap)`?

##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -124,8 +125,30 @@ class MetadataPartitionsBuilder(val brokerId: Int,
 
   def removeTopicById(id: Uuid): Iterable[MetadataPartition] = {
     Option(newIdMap.remove(id)) match {
-      case None => throw new RuntimeException(s"Unable to locate topic with ID 
$id")
-      case Some(name) => newNameMap.remove(name).values().asScala
+      case None =>
+        throw new RuntimeException(s"Unable to locate topic with ID $id")

Review comment:
       It seems there are many similar check in this class. Could we have a 
helper method to deal with this case?

##########
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -222,9 +216,16 @@ class BrokerMetadataListener(brokerId: Int,
 
   def handleRemoveTopicRecord(imageBuilder: MetadataImageBuilder,
                               record: RemoveTopicRecord): Unit = {
-    val removedPartitions = imageBuilder.partitionsBuilder().
-      removeTopicById(record.topicId())
-    
groupCoordinator.handleDeletedPartitions(removedPartitions.map(_.toTopicPartition).toSeq)
+    imageBuilder.topicIdToName(record.topicId()) match {
+      case None =>
+        throw new RuntimeException(s"Unable to locate topic with ID 
${record.topicId}")
+
+      case Some(topicName) =>
+        info(s"Processing deletion of topic $topicName with id 
${record.topicId}")
+        val removedPartitions = 
imageBuilder.partitionsBuilder().removeTopicById(record.topicId())
+        
groupCoordinator.handleDeletedPartitions(removedPartitions.map(_.toTopicPartition).toSeq)
+        configRepository.remove(new ConfigResource(ConfigResource.Type.TOPIC, 
topicName))

Review comment:
       It seems to me removing config should be called even if topic id does 
not exist since we set config without checking existence of topic.

##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -124,8 +125,30 @@ class MetadataPartitionsBuilder(val brokerId: Int,
 
   def removeTopicById(id: Uuid): Iterable[MetadataPartition] = {
     Option(newIdMap.remove(id)) match {
-      case None => throw new RuntimeException(s"Unable to locate topic with ID 
$id")
-      case Some(name) => newNameMap.remove(name).values().asScala
+      case None =>
+        throw new RuntimeException(s"Unable to locate topic with ID $id")
+
+      case Some(name) =>
+        newReverseIdMap.remove(name)
+
+        val prevPartitionMap = newNameMap.remove(name)
+        changed.remove(prevPartitionMap)
+
+        val removedPartitions = prevPartitionMap.values
+        if (prevPartitions.topicIdToName(id).isDefined) {
+          removedPartitions.forEach { partition =>
+            if (partition.isReplicaFor(brokerId)) {

Review comment:
       Could we reuse `maybeAddToLocalRemoved` to handle `localRemoved` set?

##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -204,9 +248,10 @@ class MetadataPartitionsBuilder(val brokerId: Int,
   }
 
   def build(): MetadataPartitions = {
-    val result = MetadataPartitions(newNameMap, newIdMap)
+    val result = new MetadataPartitions(newNameMap, newIdMap, newReverseIdMap)
     newNameMap = Collections.unmodifiableMap(newNameMap)

Review comment:
       just curious, why we don't wrap values of `newNameMap` to immutable map? 
It seems `MetadataPartitionsBuilder#set(MetadataPartition)`is able to cause 
change after making metadata image.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to