rajinisivaram commented on a change in pull request #9622:
URL: https://github.com/apache/kafka/pull/9622#discussion_r544150030



##########
File path: core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
##########
@@ -223,6 +224,31 @@ class MetadataRequestTest extends BaseRequestTest {
     assertEquals("V1 Response should have 2 (all) topics", 2, 
metadataResponseV1.topicMetadata.size())
   }
 
+  @Test
+  def testTopicIdsInResponse(): Unit = {
+    val replicaAssignment = Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1))
+    val topic1 = "topic1"
+    val topic2 = "topic2"
+    createTopic(topic1, replicaAssignment)
+    createTopic(topic2, replicaAssignment)
+
+    // if version < 9, return ZERO_UUID in MetadataResponse
+    val resp1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, 
topic2).asJava, true, 0, 9).build(), Some(controllerSocketServer))
+    assertEquals(2, resp1.topicMetadata.size)
+    resp1.topicMetadata.forEach { topicMetadata =>
+      assertEquals(Errors.NONE, topicMetadata.error)
+      assertEquals(Uuid.ZERO_UUID, topicMetadata.topicId())
+    }
+
+    // from version 10, UUID will be included in MetadataResponse
+    val resp2 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, 
topic2).asJava, true, 10, 10).build(), Some(notControllerSocketServer))
+    assertEquals(2, resp2.topicMetadata.size)
+    resp2.topicMetadata.forEach { topicMetadata =>
+      assertEquals(Errors.NONE, topicMetadata.error)
+      assertNotEquals(Uuid.ZERO_UUID, topicMetadata.topicId())

Review comment:
       we probably also want to assert that the topic id is not null here (even 
though we currently never return null).

##########
File path: core/src/main/scala/kafka/server/MetadataCache.scala
##########
@@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging {
           error(s"Listeners are not identical across brokers: $aliveNodes")
       }
 
+      val newTopicIds = updateMetadataRequest.topicStates().asScala
+        .map(topicState => (topicState.topicName(), topicState.topicId()))
+        .filter(_._2 != Uuid.ZERO_UUID).toMap
+      val topicIds = mutable.Map.empty[String, Uuid]
+      topicIds.addAll(metadataSnapshot.topicIds)
+      topicIds.addAll(newTopicIds)

Review comment:
       When a topic is deleted, brokers process UpdateMetadataRequest and 
remove deleted topics from their cache. We track deletion state in ZooKeeper 
and as you mentioned, you can get this information by directly going to ZK in 
kafka-topics.sh. But we don't retain that information in every broker. I would 
remove topic id in the code segment just below this when the topic is removed 
from the MetadataCache since we cannot clearly have a map that keeps growing in 
brokers. Is there a reason why we would want to retain topic id in every broker 
even after the topic has been deleted? We can't get this information through 
existing metadata request from brokers anyway. I guess in future, we can add 
additional metadata to track deleted topic ids if we wanted to, but for now it 
seems better to delete topic ids from MetadataCache when we delete the topic 
from the cache.  What do you think?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
##########
@@ -320,6 +333,7 @@ public String toString() {
             return "TopicMetadata{" +
                 "error=" + error +
                 ", topic='" + topic + '\'' +
+                ", topicId='" + topicId.toString() + '\'' +

Review comment:
       nit: toString() unnecessary




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to