chia7712 commented on a change in pull request #10252: URL: https://github.com/apache/kafka/pull/10252#discussion_r587205708
########## File path: core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala ########## @@ -185,6 +184,134 @@ class MetadataPartitionsTest { assertEquals(2, updatedLeader.leaderId) } + @Test + def testTopicCreateAndDelete(): Unit = { + val topic = "foo" + val numPartitions = 3 + val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)).toSet + val builder = new MetadataPartitionsBuilder(0, emptyPartitions) + val topicId = createTopic(topic, numPartitions, builder) + val localTopicPartitions = localChanged(builder) + + assertTrue(localTopicPartitions.forall(topicPartitions.contains)) Review comment: How about `assertTrue(localTopicPartitions.subsetOf(topicPartitions))` (same to other test cases)? ########## File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala ########## @@ -171,25 +200,38 @@ class MetadataPartitionsBuilder(val brokerId: Int, val prevPartition = newPartitionMap.put(partition.partitionIndex, partition) if (partition.isReplicaFor(brokerId)) { _localChanged.add(partition) - } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) { - _localRemoved.add(prevPartition) + } else if (prevPartition != null) { + maybeAddToLocalRemoved(prevPartition) } newNameMap.put(partition.topicName, newPartitionMap) } + private def maybeAddToLocalRemoved(partition: MetadataPartition): Unit = { + if (partition.isReplicaFor(brokerId)) { + val currentTopicId = newReverseIdMap.get(partition.topicName) Review comment: Is it possible that `newReverseIdMap` has no related id? For example, `PartitionRecord` is processed before `TopicRecord` or `TopicRecord` was discarded (due to error)? ########## File path: core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala ########## @@ -185,6 +184,134 @@ class MetadataPartitionsTest { assertEquals(2, updatedLeader.leaderId) } + @Test + def testTopicCreateAndDelete(): Unit = { + val topic = "foo" + val numPartitions = 3 + val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)).toSet + val builder = new MetadataPartitionsBuilder(0, emptyPartitions) + val topicId = createTopic(topic, numPartitions, builder) + val localTopicPartitions = localChanged(builder) + + assertTrue(localTopicPartitions.forall(topicPartitions.contains)) + assertTrue(localTopicPartitions.nonEmpty) + assertNotEquals(topicPartitions, localTopicPartitions) + + builder.removeTopicById(topicId) + assertEquals(None, builder.topicIdToName(topicId)) + assertEquals(Set.empty, filterPartitions(builder, topicPartitions)) + assertEquals(Set.empty, localRemoved(builder)) + assertEquals(Set.empty, localChanged(builder)) + + val metadata = builder.build() + assertEquals(Set.empty, metadata.allTopicNames()) + assertEquals(None, metadata.topicIdToName(topicId)) + assertEquals(Set.empty, metadata.topicPartitions(topic).toSet) + } + + @Test + def testTopicRemoval(): Unit = { + val brokerId = 0 + val topic = "foo" + val numPartitions = 3 + val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)).toSet + val createBuilder = new MetadataPartitionsBuilder(brokerId, emptyPartitions) + val topicId = createTopic(topic, numPartitions, createBuilder) + val localTopicPartitions = localChanged(createBuilder) + val createMetadata = createBuilder.build() + + assertTrue(localTopicPartitions.subsetOf(topicPartitions)) + assertTrue(localTopicPartitions.nonEmpty) + assertNotEquals(topicPartitions, localTopicPartitions) + + val deleteBuilder = new MetadataPartitionsBuilder(brokerId = 0, createMetadata) + deleteBuilder.removeTopicById(topicId) + assertEquals(None, deleteBuilder.topicIdToName(topicId)) + assertEquals(Set.empty, filterPartitions(deleteBuilder, topicPartitions)) + assertEquals(localTopicPartitions, localRemoved(deleteBuilder)) + assertEquals(Set.empty, localChanged(deleteBuilder)) + + val deleteMetadata = deleteBuilder.build() + assertEquals(Set.empty, deleteMetadata.allTopicNames()) + assertEquals(None, deleteMetadata.topicIdToName(topicId)) Review comment: ditto. Could you check that `reverseIdMap` should has no id related to "topic"? ########## File path: core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala ########## @@ -185,6 +184,134 @@ class MetadataPartitionsTest { assertEquals(2, updatedLeader.leaderId) } + @Test + def testTopicCreateAndDelete(): Unit = { + val topic = "foo" + val numPartitions = 3 + val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)).toSet + val builder = new MetadataPartitionsBuilder(0, emptyPartitions) + val topicId = createTopic(topic, numPartitions, builder) + val localTopicPartitions = localChanged(builder) + + assertTrue(localTopicPartitions.forall(topicPartitions.contains)) + assertTrue(localTopicPartitions.nonEmpty) + assertNotEquals(topicPartitions, localTopicPartitions) + + builder.removeTopicById(topicId) + assertEquals(None, builder.topicIdToName(topicId)) + assertEquals(Set.empty, filterPartitions(builder, topicPartitions)) + assertEquals(Set.empty, localRemoved(builder)) + assertEquals(Set.empty, localChanged(builder)) + + val metadata = builder.build() + assertEquals(Set.empty, metadata.allTopicNames()) + assertEquals(None, metadata.topicIdToName(topicId)) + assertEquals(Set.empty, metadata.topicPartitions(topic).toSet) + } + + @Test + def testTopicRemoval(): Unit = { + val brokerId = 0 + val topic = "foo" + val numPartitions = 3 + val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)).toSet + val createBuilder = new MetadataPartitionsBuilder(brokerId, emptyPartitions) + val topicId = createTopic(topic, numPartitions, createBuilder) + val localTopicPartitions = localChanged(createBuilder) + val createMetadata = createBuilder.build() + + assertTrue(localTopicPartitions.subsetOf(topicPartitions)) + assertTrue(localTopicPartitions.nonEmpty) + assertNotEquals(topicPartitions, localTopicPartitions) + + val deleteBuilder = new MetadataPartitionsBuilder(brokerId = 0, createMetadata) + deleteBuilder.removeTopicById(topicId) + assertEquals(None, deleteBuilder.topicIdToName(topicId)) + assertEquals(Set.empty, filterPartitions(deleteBuilder, topicPartitions)) + assertEquals(localTopicPartitions, localRemoved(deleteBuilder)) + assertEquals(Set.empty, localChanged(deleteBuilder)) + + val deleteMetadata = deleteBuilder.build() + assertEquals(Set.empty, deleteMetadata.allTopicNames()) + assertEquals(None, deleteMetadata.topicIdToName(topicId)) + assertEquals(Set.empty, deleteMetadata.topicPartitions(topic).toSet) + } + + @Test + def testTopicDeleteAndRecreate(): Unit = { + val topic = "foo" + val numPartitions = 3 + val initialBuilder = new MetadataPartitionsBuilder(0, emptyPartitions) + val initialTopicId = createTopic(topic, numPartitions, initialBuilder) + val initialLocalTopicPartitions = initialBuilder.localChanged().map(_.toTopicPartition).toSet + val initialMetadata = initialBuilder.build() + + val recreateBuilder = new MetadataPartitionsBuilder(brokerId = 0, initialMetadata) + recreateBuilder.removeTopicById(initialTopicId) + assertEquals(initialLocalTopicPartitions, localRemoved(recreateBuilder)) + + val recreatedNumPartitions = 10 + val recreatedTopicId = createTopic(topic, recreatedNumPartitions, recreateBuilder) + val recreatedTopicPartitions = (0 until recreatedNumPartitions).map(new TopicPartition(topic, _)).toSet + val recreatedLocalTopicPartitions = localChanged(recreateBuilder) + + assertTrue(recreatedLocalTopicPartitions.nonEmpty) + assertNotEquals(recreatedLocalTopicPartitions, recreatedTopicPartitions) + assertTrue(recreatedLocalTopicPartitions.subsetOf(recreatedTopicPartitions)) + assertFalse(recreatedLocalTopicPartitions.subsetOf(initialLocalTopicPartitions)) + assertEquals(Some(topic), recreateBuilder.topicIdToName(recreatedTopicId)) Review comment: Could you check whether the id related to "topic" is changed to "recreatedTopicId"? ########## File path: core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala ########## @@ -142,28 +145,19 @@ class BrokerMetadataListener(brokerId: Int, case e: Exception => throw new RuntimeException("Unknown metadata record type " + s"${record.apiKey()} in batch ending at offset ${lastOffset}.") } - recordType match { - case REGISTER_BROKER_RECORD => handleRegisterBrokerRecord(imageBuilder, - record.asInstanceOf[RegisterBrokerRecord]) - case UNREGISTER_BROKER_RECORD => handleUnregisterBrokerRecord(imageBuilder, - record.asInstanceOf[UnregisterBrokerRecord]) - case TOPIC_RECORD => handleTopicRecord(imageBuilder, - record.asInstanceOf[TopicRecord]) - case PARTITION_RECORD => handlePartitionRecord(imageBuilder, - record.asInstanceOf[PartitionRecord]) - case CONFIG_RECORD => handleConfigRecord(record.asInstanceOf[ConfigRecord]) - case PARTITION_CHANGE_RECORD => handlePartitionChangeRecord(imageBuilder, - record.asInstanceOf[PartitionChangeRecord]) - case FENCE_BROKER_RECORD => handleFenceBrokerRecord(imageBuilder, - record.asInstanceOf[FenceBrokerRecord]) - case UNFENCE_BROKER_RECORD => handleUnfenceBrokerRecord(imageBuilder, - record.asInstanceOf[UnfenceBrokerRecord]) - case REMOVE_TOPIC_RECORD => handleRemoveTopicRecord(imageBuilder, - record.asInstanceOf[RemoveTopicRecord]) - case QUOTA_RECORD => handleQuotaRecord(imageBuilder, - record.asInstanceOf[QuotaRecord]) - // TODO: handle FEATURE_LEVEL_RECORD - case _ => throw new RuntimeException(s"Unsupported record type ${recordType}") + + record match { + case rec: RegisterBrokerRecord => handleRegisterBrokerRecord(imageBuilder, rec) + case rec: UnregisterBrokerRecord => handleUnregisterBrokerRecord(imageBuilder, rec) + case rec: FenceBrokerRecord => handleFenceBrokerRecord(imageBuilder, rec) + case rec: UnfenceBrokerRecord => handleUnfenceBrokerRecord(imageBuilder, rec) + case rec: TopicRecord => handleTopicRecord(imageBuilder, rec) + case rec: PartitionRecord => handlePartitionRecord(imageBuilder, rec) Review comment: Do we "assume" `PartitionRecord` is processed after `TopicRecord`? [BrokerMetadataListener#handlePartitionRecord](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala#L188) check existence of topic id first. It can throw exception if `PartitionRecord` is processed "before" `TopicRecord` (previous image has no related data as it is new topic). On the other hand, [RaftMetadataCache#updateMetadata](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala#L364) sets partition info before updating topic info. ########## File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala ########## @@ -171,25 +200,38 @@ class MetadataPartitionsBuilder(val brokerId: Int, val prevPartition = newPartitionMap.put(partition.partitionIndex, partition) if (partition.isReplicaFor(brokerId)) { _localChanged.add(partition) - } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) { - _localRemoved.add(prevPartition) + } else if (prevPartition != null) { + maybeAddToLocalRemoved(prevPartition) } newNameMap.put(partition.topicName, newPartitionMap) } + private def maybeAddToLocalRemoved(partition: MetadataPartition): Unit = { + if (partition.isReplicaFor(brokerId)) { + val currentTopicId = newReverseIdMap.get(partition.topicName) Review comment: For another, could it be replaced by `prevPartitions.contains(partition.topicName)`? It seems all we want to check is the existence of topic name in previous image. ########## File path: core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala ########## @@ -185,6 +184,134 @@ class MetadataPartitionsTest { assertEquals(2, updatedLeader.leaderId) } + @Test + def testTopicCreateAndDelete(): Unit = { + val topic = "foo" + val numPartitions = 3 + val topicPartitions = (0 until numPartitions).map(new TopicPartition(topic, _)).toSet + val builder = new MetadataPartitionsBuilder(0, emptyPartitions) + val topicId = createTopic(topic, numPartitions, builder) + val localTopicPartitions = localChanged(builder) + + assertTrue(localTopicPartitions.forall(topicPartitions.contains)) + assertTrue(localTopicPartitions.nonEmpty) + assertNotEquals(topicPartitions, localTopicPartitions) + + builder.removeTopicById(topicId) + assertEquals(None, builder.topicIdToName(topicId)) + assertEquals(Set.empty, filterPartitions(builder, topicPartitions)) + assertEquals(Set.empty, localRemoved(builder)) + assertEquals(Set.empty, localChanged(builder)) + + val metadata = builder.build() + assertEquals(Set.empty, metadata.allTopicNames()) + assertEquals(None, metadata.topicIdToName(topicId)) Review comment: Could you check `reverseIdMap` also? It should return `none`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org