chia7712 commented on a change in pull request #10252: URL: https://github.com/apache/kafka/pull/10252#discussion_r588882359
########## File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala ########## @@ -171,25 +202,38 @@ class MetadataPartitionsBuilder(val brokerId: Int, val prevPartition = newPartitionMap.put(partition.partitionIndex, partition) if (partition.isReplicaFor(brokerId)) { _localChanged.add(partition) Review comment: Is it redundant to add this partition to `_localChanged` if the broker is already the replica of that partition? ########## File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala ########## @@ -198,15 +242,18 @@ class MetadataPartitionsBuilder(val brokerId: Int, } changed.add(newPartitionMap) newNameMap.put(topicName, newPartitionMap) + prevPartition } } + removedPartition.foreach(maybeAddToLocalRemoved) } } def build(): MetadataPartitions = { - val result = MetadataPartitions(newNameMap, newIdMap) + val result = new MetadataPartitions(newNameMap, newIdMap, newReverseIdMap) Review comment: Is it worth checking consistency of those collections? It can produce quick failure if we are going to build invalid image. ########## File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala ########## @@ -171,25 +202,38 @@ class MetadataPartitionsBuilder(val brokerId: Int, val prevPartition = newPartitionMap.put(partition.partitionIndex, partition) if (partition.isReplicaFor(brokerId)) { _localChanged.add(partition) - } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) { - _localRemoved.add(prevPartition) + } else if (prevPartition != null) { + maybeAddToLocalRemoved(prevPartition) } newNameMap.put(partition.topicName, newPartitionMap) } + private def maybeAddToLocalRemoved(partition: MetadataPartition): Unit = { + if (partition.isReplicaFor(brokerId)) { + val currentTopicId = newReverseIdMap.get(partition.topicName) + val prevImageHasTopic = if (currentTopicId != null) { + prevImageHasTopicId(currentTopicId) + } else { + prevPartitions.allTopicNames().contains(partition.topicName) + } + + if (prevImageHasTopic) { + _localRemoved.add(partition) + } + } + } + + private def prevImageHasTopicId(topicId: Uuid): Boolean = { + prevPartitions.topicIdToName(topicId).isDefined + } + def remove(topicName: String, partitionId: Int): Unit = { val prevPartitionMap = newNameMap.get(topicName) if (prevPartitionMap != null) { - if (changed.contains(prevPartitionMap)) { - val prevPartition = prevPartitionMap.remove(partitionId) - if (prevPartition.isReplicaFor(brokerId)) { - _localRemoved.add(prevPartition) - } + val removedPartition = if (changed.contains(prevPartitionMap)) { + Option(prevPartitionMap.remove(partitionId)) } else { - Option(prevPartitionMap.get(partitionId)).foreach { prevPartition => - if (prevPartition.isReplicaFor(brokerId)) { - _localRemoved.add(prevPartition) - } + Option(prevPartitionMap.get(partitionId)).map { prevPartition => val newPartitionMap = new util.HashMap[Int, MetadataPartition](prevPartitionMap.size() - 1) prevPartitionMap.forEach { (prevPartitionId, prevPartition) => if (!prevPartitionId.equals(partitionId)) { Review comment: As we are in scala code, could it be replaced by `prevPartitionId != partitionId`? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org