chia7712 commented on a change in pull request #10252:
URL: https://github.com/apache/kafka/pull/10252#discussion_r587205708



##########
File path: 
core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala
##########
@@ -185,6 +184,134 @@ class MetadataPartitionsTest {
     assertEquals(2, updatedLeader.leaderId)
   }
 
+  @Test
+  def testTopicCreateAndDelete(): Unit = {
+    val topic = "foo"
+    val numPartitions = 3
+    val topicPartitions = (0 until numPartitions).map(new 
TopicPartition(topic, _)).toSet
+    val builder = new MetadataPartitionsBuilder(0, emptyPartitions)
+    val topicId = createTopic(topic, numPartitions, builder)
+    val localTopicPartitions = localChanged(builder)
+
+    assertTrue(localTopicPartitions.forall(topicPartitions.contains))

Review comment:
       How about `assertTrue(localTopicPartitions.subsetOf(topicPartitions))` 
(same to other test cases)?

##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -171,25 +200,38 @@ class MetadataPartitionsBuilder(val brokerId: Int,
     val prevPartition = newPartitionMap.put(partition.partitionIndex, 
partition)
     if (partition.isReplicaFor(brokerId)) {
       _localChanged.add(partition)
-    } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) {
-      _localRemoved.add(prevPartition)
+    } else if (prevPartition != null) {
+      maybeAddToLocalRemoved(prevPartition)
     }
     newNameMap.put(partition.topicName, newPartitionMap)
   }
 
+  private def maybeAddToLocalRemoved(partition: MetadataPartition): Unit = {
+    if (partition.isReplicaFor(brokerId)) {
+      val currentTopicId = newReverseIdMap.get(partition.topicName)

Review comment:
       Is it possible that `newReverseIdMap` has no related id? For example, 
`PartitionRecord` is processed before `TopicRecord` or `TopicRecord` was 
discarded (due to error)?

##########
File path: 
core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala
##########
@@ -185,6 +184,134 @@ class MetadataPartitionsTest {
     assertEquals(2, updatedLeader.leaderId)
   }
 
+  @Test
+  def testTopicCreateAndDelete(): Unit = {
+    val topic = "foo"
+    val numPartitions = 3
+    val topicPartitions = (0 until numPartitions).map(new 
TopicPartition(topic, _)).toSet
+    val builder = new MetadataPartitionsBuilder(0, emptyPartitions)
+    val topicId = createTopic(topic, numPartitions, builder)
+    val localTopicPartitions = localChanged(builder)
+
+    assertTrue(localTopicPartitions.forall(topicPartitions.contains))
+    assertTrue(localTopicPartitions.nonEmpty)
+    assertNotEquals(topicPartitions, localTopicPartitions)
+
+    builder.removeTopicById(topicId)
+    assertEquals(None, builder.topicIdToName(topicId))
+    assertEquals(Set.empty, filterPartitions(builder, topicPartitions))
+    assertEquals(Set.empty, localRemoved(builder))
+    assertEquals(Set.empty, localChanged(builder))
+
+    val metadata = builder.build()
+    assertEquals(Set.empty, metadata.allTopicNames())
+    assertEquals(None, metadata.topicIdToName(topicId))
+    assertEquals(Set.empty, metadata.topicPartitions(topic).toSet)
+  }
+
+  @Test
+  def testTopicRemoval(): Unit = {
+    val brokerId = 0
+    val topic = "foo"
+    val numPartitions = 3
+    val topicPartitions = (0 until numPartitions).map(new 
TopicPartition(topic, _)).toSet
+    val createBuilder = new MetadataPartitionsBuilder(brokerId, 
emptyPartitions)
+    val topicId = createTopic(topic, numPartitions, createBuilder)
+    val localTopicPartitions = localChanged(createBuilder)
+    val createMetadata = createBuilder.build()
+
+    assertTrue(localTopicPartitions.subsetOf(topicPartitions))
+    assertTrue(localTopicPartitions.nonEmpty)
+    assertNotEquals(topicPartitions, localTopicPartitions)
+
+    val deleteBuilder = new MetadataPartitionsBuilder(brokerId = 0, 
createMetadata)
+    deleteBuilder.removeTopicById(topicId)
+    assertEquals(None, deleteBuilder.topicIdToName(topicId))
+    assertEquals(Set.empty, filterPartitions(deleteBuilder, topicPartitions))
+    assertEquals(localTopicPartitions, localRemoved(deleteBuilder))
+    assertEquals(Set.empty, localChanged(deleteBuilder))
+
+    val deleteMetadata = deleteBuilder.build()
+    assertEquals(Set.empty, deleteMetadata.allTopicNames())
+    assertEquals(None, deleteMetadata.topicIdToName(topicId))

Review comment:
       ditto. Could you check that `reverseIdMap` should has no id related to 
"topic"?

##########
File path: 
core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala
##########
@@ -185,6 +184,134 @@ class MetadataPartitionsTest {
     assertEquals(2, updatedLeader.leaderId)
   }
 
+  @Test
+  def testTopicCreateAndDelete(): Unit = {
+    val topic = "foo"
+    val numPartitions = 3
+    val topicPartitions = (0 until numPartitions).map(new 
TopicPartition(topic, _)).toSet
+    val builder = new MetadataPartitionsBuilder(0, emptyPartitions)
+    val topicId = createTopic(topic, numPartitions, builder)
+    val localTopicPartitions = localChanged(builder)
+
+    assertTrue(localTopicPartitions.forall(topicPartitions.contains))
+    assertTrue(localTopicPartitions.nonEmpty)
+    assertNotEquals(topicPartitions, localTopicPartitions)
+
+    builder.removeTopicById(topicId)
+    assertEquals(None, builder.topicIdToName(topicId))
+    assertEquals(Set.empty, filterPartitions(builder, topicPartitions))
+    assertEquals(Set.empty, localRemoved(builder))
+    assertEquals(Set.empty, localChanged(builder))
+
+    val metadata = builder.build()
+    assertEquals(Set.empty, metadata.allTopicNames())
+    assertEquals(None, metadata.topicIdToName(topicId))
+    assertEquals(Set.empty, metadata.topicPartitions(topic).toSet)
+  }
+
+  @Test
+  def testTopicRemoval(): Unit = {
+    val brokerId = 0
+    val topic = "foo"
+    val numPartitions = 3
+    val topicPartitions = (0 until numPartitions).map(new 
TopicPartition(topic, _)).toSet
+    val createBuilder = new MetadataPartitionsBuilder(brokerId, 
emptyPartitions)
+    val topicId = createTopic(topic, numPartitions, createBuilder)
+    val localTopicPartitions = localChanged(createBuilder)
+    val createMetadata = createBuilder.build()
+
+    assertTrue(localTopicPartitions.subsetOf(topicPartitions))
+    assertTrue(localTopicPartitions.nonEmpty)
+    assertNotEquals(topicPartitions, localTopicPartitions)
+
+    val deleteBuilder = new MetadataPartitionsBuilder(brokerId = 0, 
createMetadata)
+    deleteBuilder.removeTopicById(topicId)
+    assertEquals(None, deleteBuilder.topicIdToName(topicId))
+    assertEquals(Set.empty, filterPartitions(deleteBuilder, topicPartitions))
+    assertEquals(localTopicPartitions, localRemoved(deleteBuilder))
+    assertEquals(Set.empty, localChanged(deleteBuilder))
+
+    val deleteMetadata = deleteBuilder.build()
+    assertEquals(Set.empty, deleteMetadata.allTopicNames())
+    assertEquals(None, deleteMetadata.topicIdToName(topicId))
+    assertEquals(Set.empty, deleteMetadata.topicPartitions(topic).toSet)
+  }
+
+  @Test
+  def testTopicDeleteAndRecreate(): Unit = {
+    val topic = "foo"
+    val numPartitions = 3
+    val initialBuilder = new MetadataPartitionsBuilder(0, emptyPartitions)
+    val initialTopicId = createTopic(topic, numPartitions, initialBuilder)
+    val initialLocalTopicPartitions = 
initialBuilder.localChanged().map(_.toTopicPartition).toSet
+    val initialMetadata = initialBuilder.build()
+
+    val recreateBuilder = new MetadataPartitionsBuilder(brokerId = 0, 
initialMetadata)
+    recreateBuilder.removeTopicById(initialTopicId)
+    assertEquals(initialLocalTopicPartitions, localRemoved(recreateBuilder))
+
+    val recreatedNumPartitions = 10
+    val recreatedTopicId = createTopic(topic, recreatedNumPartitions, 
recreateBuilder)
+    val recreatedTopicPartitions = (0 until recreatedNumPartitions).map(new 
TopicPartition(topic, _)).toSet
+    val recreatedLocalTopicPartitions = localChanged(recreateBuilder)
+
+    assertTrue(recreatedLocalTopicPartitions.nonEmpty)
+    assertNotEquals(recreatedLocalTopicPartitions, recreatedTopicPartitions)
+    
assertTrue(recreatedLocalTopicPartitions.subsetOf(recreatedTopicPartitions))
+    
assertFalse(recreatedLocalTopicPartitions.subsetOf(initialLocalTopicPartitions))
+    assertEquals(Some(topic), recreateBuilder.topicIdToName(recreatedTopicId))

Review comment:
       Could you check whether the id related to "topic" is changed to 
"recreatedTopicId"?

##########
File path: 
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
##########
@@ -142,28 +145,19 @@ class BrokerMetadataListener(brokerId: Int,
       case e: Exception => throw new RuntimeException("Unknown metadata record 
type " +
       s"${record.apiKey()} in batch ending at offset ${lastOffset}.")
     }
-    recordType match {
-      case REGISTER_BROKER_RECORD => handleRegisterBrokerRecord(imageBuilder,
-        record.asInstanceOf[RegisterBrokerRecord])
-      case UNREGISTER_BROKER_RECORD => 
handleUnregisterBrokerRecord(imageBuilder,
-        record.asInstanceOf[UnregisterBrokerRecord])
-      case TOPIC_RECORD => handleTopicRecord(imageBuilder,
-        record.asInstanceOf[TopicRecord])
-      case PARTITION_RECORD => handlePartitionRecord(imageBuilder,
-        record.asInstanceOf[PartitionRecord])
-      case CONFIG_RECORD => 
handleConfigRecord(record.asInstanceOf[ConfigRecord])
-      case PARTITION_CHANGE_RECORD => handlePartitionChangeRecord(imageBuilder,
-        record.asInstanceOf[PartitionChangeRecord])
-      case FENCE_BROKER_RECORD => handleFenceBrokerRecord(imageBuilder,
-        record.asInstanceOf[FenceBrokerRecord])
-      case UNFENCE_BROKER_RECORD => handleUnfenceBrokerRecord(imageBuilder,
-        record.asInstanceOf[UnfenceBrokerRecord])
-      case REMOVE_TOPIC_RECORD => handleRemoveTopicRecord(imageBuilder,
-        record.asInstanceOf[RemoveTopicRecord])
-      case QUOTA_RECORD => handleQuotaRecord(imageBuilder,
-        record.asInstanceOf[QuotaRecord])
-      // TODO: handle FEATURE_LEVEL_RECORD
-      case _ => throw new RuntimeException(s"Unsupported record type 
${recordType}")
+
+    record match {
+      case rec: RegisterBrokerRecord => 
handleRegisterBrokerRecord(imageBuilder, rec)
+      case rec: UnregisterBrokerRecord => 
handleUnregisterBrokerRecord(imageBuilder, rec)
+      case rec: FenceBrokerRecord => handleFenceBrokerRecord(imageBuilder, rec)
+      case rec: UnfenceBrokerRecord => handleUnfenceBrokerRecord(imageBuilder, 
rec)
+      case rec: TopicRecord => handleTopicRecord(imageBuilder, rec)
+      case rec: PartitionRecord => handlePartitionRecord(imageBuilder, rec)

Review comment:
       Do we "assume" `PartitionRecord` is processed after `TopicRecord`? 
[BrokerMetadataListener#handlePartitionRecord](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala#L188)
 check existence of topic id first. It can throw exception if `PartitionRecord` 
is processed "before" `TopicRecord` (previous image has no related data as it 
is new topic).
   
   On the other hand, 
[RaftMetadataCache#updateMetadata](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/metadata/RaftMetadataCache.scala#L364)
 sets partition info before updating topic info.

##########
File path: core/src/main/scala/kafka/server/metadata/MetadataPartitions.scala
##########
@@ -171,25 +200,38 @@ class MetadataPartitionsBuilder(val brokerId: Int,
     val prevPartition = newPartitionMap.put(partition.partitionIndex, 
partition)
     if (partition.isReplicaFor(brokerId)) {
       _localChanged.add(partition)
-    } else if (prevPartition != null && prevPartition.isReplicaFor(brokerId)) {
-      _localRemoved.add(prevPartition)
+    } else if (prevPartition != null) {
+      maybeAddToLocalRemoved(prevPartition)
     }
     newNameMap.put(partition.topicName, newPartitionMap)
   }
 
+  private def maybeAddToLocalRemoved(partition: MetadataPartition): Unit = {
+    if (partition.isReplicaFor(brokerId)) {
+      val currentTopicId = newReverseIdMap.get(partition.topicName)

Review comment:
       For another, could it be replaced by 
`prevPartitions.contains(partition.topicName)`? It seems all we want to check 
is the existence of topic name in previous image.

##########
File path: 
core/src/test/scala/kafka/server/metadata/MetadataPartitionsTest.scala
##########
@@ -185,6 +184,134 @@ class MetadataPartitionsTest {
     assertEquals(2, updatedLeader.leaderId)
   }
 
+  @Test
+  def testTopicCreateAndDelete(): Unit = {
+    val topic = "foo"
+    val numPartitions = 3
+    val topicPartitions = (0 until numPartitions).map(new 
TopicPartition(topic, _)).toSet
+    val builder = new MetadataPartitionsBuilder(0, emptyPartitions)
+    val topicId = createTopic(topic, numPartitions, builder)
+    val localTopicPartitions = localChanged(builder)
+
+    assertTrue(localTopicPartitions.forall(topicPartitions.contains))
+    assertTrue(localTopicPartitions.nonEmpty)
+    assertNotEquals(topicPartitions, localTopicPartitions)
+
+    builder.removeTopicById(topicId)
+    assertEquals(None, builder.topicIdToName(topicId))
+    assertEquals(Set.empty, filterPartitions(builder, topicPartitions))
+    assertEquals(Set.empty, localRemoved(builder))
+    assertEquals(Set.empty, localChanged(builder))
+
+    val metadata = builder.build()
+    assertEquals(Set.empty, metadata.allTopicNames())
+    assertEquals(None, metadata.topicIdToName(topicId))

Review comment:
       Could you check `reverseIdMap` also? It should return `none`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to