rajinisivaram commented on a change in pull request #9684:
URL: https://github.com/apache/kafka/pull/9684#discussion_r564876908



##########
File path: clients/src/main/resources/common/message/DeleteTopicsResponse.json
##########
@@ -27,15 +27,20 @@
   //
   // Version 5 adds ErrorMessage in the response and may return a 
THROTTLING_QUOTA_EXCEEDED error
   // in the response if the topics deletion is throttled (KIP-599).
-  "validVersions": "0-5",
+  //
+  // Version 6 adds topic ID to responses. An UNSUPPORTED_VERSION error code 
will be returned when attempting to
+  // delete using topic IDs when IBP > 2.8. UNKNOWN_TOPIC_ID error code will 
be returned when IBP is at least 2.8, but

Review comment:
       IBP < 2.8 for UNSUPPORTED_VERSION?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1625,6 +1628,32 @@ public DeleteTopicsResult deleteTopics(final 
Collection<String> topicNames,
         return new DeleteTopicsResult(new HashMap<>(topicFutures));
     }
 
+    @Override
+    public DeleteTopicsWithIdsResult deleteTopicsWithIds(final 
Collection<Uuid> topicIds,
+                                           final DeleteTopicsOptions options) {

Review comment:
       nit: indentation

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/CreateTopicsResult.java
##########
@@ -68,6 +69,19 @@ protected CreateTopicsResult(Map<String, 
KafkaFuture<TopicMetadataAndConfig>> fu
         return futures.get(topic).thenApply(TopicMetadataAndConfig::config);
     }
 
+    /**
+     * Returns a future that provides topic ID for the topic when the request 
completes.
+     * <p>
+     * If broker version doesn't support replication factor in the response, 
throw
+     * {@link org.apache.kafka.common.errors.UnsupportedVersionException}.
+     * If broker returned an error for topic configs, throw appropriate 
exception. For example,
+     * {@link org.apache.kafka.common.errors.TopicAuthorizationException} is 
thrown if user does not
+     * have permission to describe topic configs.
+     */
+    public KafkaFuture<Uuid> topicId(String topic) {
+        return futures.get(topic).thenApply(TopicMetadataAndConfig::topicId);
+    }
+    
     /**
      * Returns a future that provides number of partitions in the topic when 
the request completes.

Review comment:
       In the create result, we wouldn't see UnsupportedVersionException for 
topicId() just because topic ids are not enabled, would we?

##########
File path: 
core/src/test/scala/unit/kafka/server/MetadataRequestIBPTest/TopicIdWithOldInterBrokerProtocolTest.scala
##########
@@ -59,8 +61,56 @@ class TopicIdWithOldInterBrokerProtocolTest extends 
BaseRequestTest {
     }
   }
 
+  @Test
+  def testDeleteTopicsWithOldIBP(): Unit = {
+    val timeout = 10000
+    createTopic("topic-3", 5, 2)
+    createTopic("topic-4", 1, 2)
+    val request = new DeleteTopicsRequest.Builder(
+      new DeleteTopicsRequestData()
+        .setTopicNames(Arrays.asList("topic-3", "topic-4"))
+        .setTimeoutMs(timeout)).build()
+    val resp = sendDeleteTopicsRequest(request)
+    val error = resp.errorCounts.asScala.find(_._1 != Errors.NONE)
+    assertTrue(error.isEmpty, s"There should be no errors, found 
${resp.data.responses.asScala}")
+    request.data.topicNames.forEach { topic =>
+      validateTopicIsDeleted(topic)
+    }
+    resp.data.responses.forEach { response =>
+      assertEquals(Uuid.ZERO_UUID, response.topicId())
+    }
+  }
+
+  @Test
+  def testDeleteTopicsWithOldIBPUsingIDs(): Unit = {
+    val timeout = 10000
+    createTopic("topic-7", 3, 2)
+    createTopic("topic-6", 1, 2)
+    val ids = Map("topic-7" -> Uuid.randomUuid(), "topic-6" -> 
Uuid.randomUuid())
+    val request = new DeleteTopicsRequest.Builder(
+      new DeleteTopicsRequestData()
+        .setTopics(Arrays.asList(new 
DeleteTopicState().setTopicId(ids("topic-7")),
+          new DeleteTopicState().setTopicId(ids("topic-6"))
+        )
+        ).setTimeoutMs(timeout)).build()

Review comment:
       This could move to the previous line, so the identation doesn't look odd

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1930,29 +1932,43 @@ class KafkaApis(val requestChannel: RequestChannel,
     val results = new 
DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size)
     val toDelete = mutable.Set[String]()
     if (!controller.isActive) {
-      deleteTopicRequest.data.topicNames.forEach { topic =>
+      deleteTopicRequest.topics().forEach { topic =>
         results.add(new DeletableTopicResult()
-          .setName(topic)
+          .setName(topic.name())
+          .setTopicId(topic.topicId())
           .setErrorCode(Errors.NOT_CONTROLLER.code))
       }
       sendResponseCallback(results)
     } else if (!config.deleteTopicEnable) {
       val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST 
else Errors.TOPIC_DELETION_DISABLED
-      deleteTopicRequest.data.topicNames.forEach { topic =>
+      deleteTopicRequest.topics().forEach { topic =>
         results.add(new DeletableTopicResult()
-          .setName(topic)
+          .setName(topic.name())
+          .setTopicId(topic.topicId())
           .setErrorCode(error.code))
       }
       sendResponseCallback(results)
     } else {
-      deleteTopicRequest.data.topicNames.forEach { topic =>
+      deleteTopicRequest.topics().forEach { topic =>
+        val name = if (topic.topicId().equals(Uuid.ZERO_UUID)) topic.name()

Review comment:
       We can use `topic.topicId == Uuid.ZERO_UUID` in Scala instead of 
`equals`. (a couple of places below too)

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1843,6 +1843,8 @@ class KafkaApis(val requestChannel: RequestChannel,
               .setNumPartitions(-1)
               .setReplicationFactor(-1)
               .setTopicConfigErrorCode(Errors.NONE.code)
+          } else {
+            
result.setTopicId(controller.controllerContext.topicIds.getOrElse(result.name(),
 Uuid.ZERO_UUID))

Review comment:
       Can't we populate topic id in AdminManager similar to other metadata in 
the result?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1930,29 +1932,43 @@ class KafkaApis(val requestChannel: RequestChannel,
     val results = new 
DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size)
     val toDelete = mutable.Set[String]()
     if (!controller.isActive) {
-      deleteTopicRequest.data.topicNames.forEach { topic =>
+      deleteTopicRequest.topics().forEach { topic =>
         results.add(new DeletableTopicResult()
-          .setName(topic)
+          .setName(topic.name())
+          .setTopicId(topic.topicId())
           .setErrorCode(Errors.NOT_CONTROLLER.code))
       }
       sendResponseCallback(results)
     } else if (!config.deleteTopicEnable) {
       val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST 
else Errors.TOPIC_DELETION_DISABLED
-      deleteTopicRequest.data.topicNames.forEach { topic =>
+      deleteTopicRequest.topics().forEach { topic =>
         results.add(new DeletableTopicResult()
-          .setName(topic)
+          .setName(topic.name())
+          .setTopicId(topic.topicId())
           .setErrorCode(error.code))
       }
       sendResponseCallback(results)
     } else {
-      deleteTopicRequest.data.topicNames.forEach { topic =>
+      deleteTopicRequest.topics().forEach { topic =>
+        val name = if (topic.topicId().equals(Uuid.ZERO_UUID)) topic.name()
+          else 
controller.controllerContext.topicNames.getOrElse(topic.topicId(), null)

Review comment:
       What happens if both topic id and topic name are provided in the request?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java
##########
@@ -36,8 +42,25 @@ public Builder(DeleteTopicsRequestData data) {
 
         @Override
         public DeleteTopicsRequest build(short version) {
+            if (version >= 6 && data.topicNames().size() != 0) {

Review comment:
       nit: isEmpty() instead of .size()?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -1696,6 +1725,79 @@ void handleFailure(Throwable throwable) {
             }
         };
     }
+   
+    private Call getDeleteTopicsWithIdsCall(final DeleteTopicsOptions options,
+                                     final Map<Uuid, KafkaFutureImpl<Void>> 
futures,

Review comment:
       nit: indentation

##########
File path: core/src/test/scala/unit/kafka/server/BaseRequestTest.scala
##########
@@ -21,6 +21,7 @@ import java.io.{DataInputStream, DataOutputStream}
 import java.net.Socket
 import java.nio.ByteBuffer
 import java.util.Properties
+

Review comment:
       We could revert changes to this file since there aren't any real changes?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -919,6 +973,36 @@ public void 
testDeleteTopicsRetryThrottlingExceptionWhenEnabled() throws Excepti
             assertNull(result.values().get("topic1").get());
             assertNull(result.values().get("topic2").get());
             TestUtils.assertFutureThrows(result.values().get("topic3"), 
TopicExistsException.class);
+            
+            // With topic IDs
+            Uuid topicId1 = Uuid.randomUuid();
+            Uuid topicId2 = Uuid.randomUuid();
+            Uuid topicId3 = Uuid.randomUuid();
+            
+            env.kafkaClient().prepareResponse(
+                    expectDeleteTopicsRequestWithTopicIds(topicId1, topicId2, 
topicId3),
+                    prepareDeleteTopicsResponse(1000,
+                            deletableTopicResultWithId(topicId1, Errors.NONE),
+                            deletableTopicResultWithId(topicId2, 
Errors.THROTTLING_QUOTA_EXCEEDED),
+                            deletableTopicResultWithId(topicId3, 
Errors.TOPIC_ALREADY_EXISTS)));

Review comment:
       Doesn't really matter for this test, but perhaps we could use 
`UNKNOWN_TOPIC_ID` (multiple tests below)?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -1930,29 +1932,43 @@ class KafkaApis(val requestChannel: RequestChannel,
     val results = new 
DeletableTopicResultCollection(deleteTopicRequest.data.topicNames.size)
     val toDelete = mutable.Set[String]()
     if (!controller.isActive) {
-      deleteTopicRequest.data.topicNames.forEach { topic =>
+      deleteTopicRequest.topics().forEach { topic =>
         results.add(new DeletableTopicResult()
-          .setName(topic)
+          .setName(topic.name())
+          .setTopicId(topic.topicId())
           .setErrorCode(Errors.NOT_CONTROLLER.code))
       }
       sendResponseCallback(results)
     } else if (!config.deleteTopicEnable) {
       val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST 
else Errors.TOPIC_DELETION_DISABLED
-      deleteTopicRequest.data.topicNames.forEach { topic =>
+      deleteTopicRequest.topics().forEach { topic =>
         results.add(new DeletableTopicResult()
-          .setName(topic)
+          .setName(topic.name())
+          .setTopicId(topic.topicId())
           .setErrorCode(error.code))
       }
       sendResponseCallback(results)
     } else {
-      deleteTopicRequest.data.topicNames.forEach { topic =>
+      deleteTopicRequest.topics().forEach { topic =>
+        val name = if (topic.topicId().equals(Uuid.ZERO_UUID)) topic.name()
+          else 
controller.controllerContext.topicNames.getOrElse(topic.topicId(), null)
         results.add(new DeletableTopicResult()
-          .setName(topic))
+          .setName(name)
+          .setTopicId(topic.topicId()))
       }
       val authorizedTopics = authHelper.filterByAuthorized(request.context, 
DELETE, TOPIC,
         results.asScala)(_.name)
       results.forEach { topic =>
-         if (!authorizedTopics.contains(topic.name))
+         val foundTopicId = !topic.topicId().equals(Uuid.ZERO_UUID) && 
topic.name() != null
+         val topicIdSpecified = !topic.topicId().equals(Uuid.ZERO_UUID)

Review comment:
       We can move this one line above and use `topicIdSpecified`  in the line 
above




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to