rajinisivaram commented on a change in pull request #9622: URL: https://github.com/apache/kafka/pull/9622#discussion_r544150030
########## File path: core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala ########## @@ -223,6 +224,31 @@ class MetadataRequestTest extends BaseRequestTest { assertEquals("V1 Response should have 2 (all) topics", 2, metadataResponseV1.topicMetadata.size()) } + @Test + def testTopicIdsInResponse(): Unit = { + val replicaAssignment = Map(0 -> Seq(1, 2, 0), 1 -> Seq(2, 0, 1)) + val topic1 = "topic1" + val topic2 = "topic2" + createTopic(topic1, replicaAssignment) + createTopic(topic2, replicaAssignment) + + // if version < 9, return ZERO_UUID in MetadataResponse + val resp1 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, topic2).asJava, true, 0, 9).build(), Some(controllerSocketServer)) + assertEquals(2, resp1.topicMetadata.size) + resp1.topicMetadata.forEach { topicMetadata => + assertEquals(Errors.NONE, topicMetadata.error) + assertEquals(Uuid.ZERO_UUID, topicMetadata.topicId()) + } + + // from version 10, UUID will be included in MetadataResponse + val resp2 = sendMetadataRequest(new MetadataRequest.Builder(Seq(topic1, topic2).asJava, true, 10, 10).build(), Some(notControllerSocketServer)) + assertEquals(2, resp2.topicMetadata.size) + resp2.topicMetadata.forEach { topicMetadata => + assertEquals(Errors.NONE, topicMetadata.error) + assertNotEquals(Uuid.ZERO_UUID, topicMetadata.topicId()) Review comment: we probably also want to assert that the topic id is not null here (even though we currently never return null). ########## File path: core/src/main/scala/kafka/server/MetadataCache.scala ########## @@ -314,9 +315,16 @@ class MetadataCache(brokerId: Int) extends Logging { error(s"Listeners are not identical across brokers: $aliveNodes") } + val newTopicIds = updateMetadataRequest.topicStates().asScala + .map(topicState => (topicState.topicName(), topicState.topicId())) + .filter(_._2 != Uuid.ZERO_UUID).toMap + val topicIds = mutable.Map.empty[String, Uuid] + topicIds.addAll(metadataSnapshot.topicIds) + topicIds.addAll(newTopicIds) Review comment: When a topic is deleted, brokers process UpdateMetadataRequest and remove deleted topics from their cache. We track deletion state in ZooKeeper and as you mentioned, you can get this information by directly going to ZK in kafka-topics.sh. But we don't retain that information in every broker. I would remove topic id in the code segment just below this when the topic is removed from the MetadataCache since we cannot clearly have a map that keeps growing in brokers. Is there a reason why we would want to retain topic id in every broker even after the topic has been deleted? We can't get this information through existing metadata request from brokers anyway. I guess in future, we can add additional metadata to track deleted topic ids if we wanted to, but for now it seems better to delete topic ids from MetadataCache when we delete the topic from the cache. What do you think? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java ########## @@ -320,6 +333,7 @@ public String toString() { return "TopicMetadata{" + "error=" + error + ", topic='" + topic + '\'' + + ", topicId='" + topicId.toString() + '\'' + Review comment: nit: toString() unnecessary ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org