[GitHub] [kafka] hachikuji commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-26 Thread GitBox


hachikuji commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r583886777



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
   requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+  request.context.apiVersion(),
+  authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+  names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, 
names)(n => n),
+  names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, 
names)(n => n))
+requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+  val responseData = new DeleteTopicsResponseData().
+setResponses(new DeletableTopicResultCollection(responses.iterator())).
+setThrottleTimeMs(throttleTimeMs)
+  new DeleteTopicsResponse(responseData)
+})
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,

Review comment:
   There seems to be enough complexity in the handling here that it might 
be worth pulling this logic into a separate class. Not required for this PR, 
but it would be nice to come up with a nicer pattern so that we don't end up 
with a giant class like `KafkaApis`.

##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -154,6 +161,147 @@ class ControllerApis(val requestChannel: RequestChannel,
   requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+val responses = deleteTopics(request.body[DeleteTopicsRequest].data(),
+  request.context.apiVersion(),
+  authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+  names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, 
names)(n => n),
+  names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, 
names)(n => n))
+requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+  val responseData = new DeleteTopicsResponseData().
+setResponses(new DeletableTopicResultCollection(responses.iterator())).
+setThrottleTimeMs(throttleTimeMs)
+  new DeleteTopicsResponse(responseData)
+})
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+   apiVersion: Int,
+   hasClusterAuth: Boolean,
+   getDescribableTopics: Iterable[String] => Set[String],
+   getDeletableTopics: Iterable[String] => Set[String]): 
util.List[DeletableTopicResult] = {
+if (!config.deleteTopicEnable) {
+  if (apiVersion < 3) {
+throw new InvalidRequestException("Topic deletion is disabled.")
+  } else {
+throw new TopicDeletionDisabledException()
+  }
+}
+val responses = new util.ArrayList[DeletableTopicResult]
+val duplicatedTopicNames = new util.HashSet[String]
+val topicNamesToResolve = new util.HashSet[String]
+val topicIdsToResolve = new util.HashSet[Uuid]
+val duplicatedTopicIds = new util.HashSet[Uuid]
+
+def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+  responses.add(new DeletableTopicResult().
+setName(name).
+setTopicId(id).
+setErrorCode(error.error().code()).
+setErrorMessage(error.message()))
+}
+
+def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+  if (duplicatedTopicNames.contains(name) || 
!topicNamesToResolve.add(name)) {
+appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, 
"Duplicate topic name."))
+topicNamesToResolve.remove(name)
+duplicatedTopicNames.add(name)
+  }
+}
+
+def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+  if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate 
topic ID."))
+topicIdsToResolve.remove(id)
+duplicatedTopicIds.add(id)
+  }
+}
+
+
request.topicNames().iterator().asScala.foreach(maybeAppendToTopicNamesToResolve)
+
+request.topics().iterator().asScala.foreach {
+  topic => if (topic.name() == null) {
+if (topic.topicId.equals(ZERO_UUID)) {
+  appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+"Neither topic name nor id were specified."))
+} else {
+  maybeAppendToIdsToResolve(topic.topicId())
+}
+  } else {
+if (topic.topicId().equals(ZERO_UUID)) {
+  maybeAppendToTopicNamesToResolve(topic.name())
+} else {
+  appendResponse(topic.name(), topic.topicId(), new 
ApiError(INVALID_REQUEST,
+"You may not specify both topic name and topic id

[GitHub] [kafka] hachikuji commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-03-01 Thread GitBox


hachikuji commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r585086425



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -154,17 +162,179 @@ class ControllerApis(val requestChannel: RequestChannel,
   requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+val responses = deleteTopics(request.body[DeleteTopicsRequest].data,
+  request.context.apiVersion,
+  authHelper.authorize(request.context, DELETE, CLUSTER, CLUSTER_NAME),
+  names => authHelper.filterByAuthorized(request.context, DESCRIBE, TOPIC, 
names)(n => n),
+  names => authHelper.filterByAuthorized(request.context, DELETE, TOPIC, 
names)(n => n))
+requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+  val responseData = new DeleteTopicsResponseData().
+setResponses(new DeletableTopicResultCollection(responses.iterator)).
+setThrottleTimeMs(throttleTimeMs)
+  new DeleteTopicsResponse(responseData)
+})
+  }
+
+  def deleteTopics(request: DeleteTopicsRequestData,
+   apiVersion: Int,
+   hasClusterAuth: Boolean,
+   getDescribableTopics: Iterable[String] => Set[String],
+   getDeletableTopics: Iterable[String] => Set[String]): 
util.List[DeletableTopicResult] = {
+if (!config.deleteTopicEnable) {
+  if (apiVersion < 3) {
+throw new InvalidRequestException("Topic deletion is disabled.")
+  } else {
+throw new TopicDeletionDisabledException()
+  }
+}
+val responses = new util.ArrayList[DeletableTopicResult]
+val duplicatedTopicNames = new util.HashSet[String]
+val topicNamesToResolve = new util.HashSet[String]
+val topicIdsToResolve = new util.HashSet[Uuid]
+val duplicatedTopicIds = new util.HashSet[Uuid]
+
+def appendResponse(name: String, id: Uuid, error: ApiError): Unit = {
+  responses.add(new DeletableTopicResult().
+setName(name).
+setTopicId(id).
+setErrorCode(error.error.code).
+setErrorMessage(error.message))
+}
+
+def maybeAppendToTopicNamesToResolve(name: String): Unit = {
+  if (duplicatedTopicNames.contains(name) || 
!topicNamesToResolve.add(name)) {
+appendResponse(name, ZERO_UUID, new ApiError(INVALID_REQUEST, 
"Duplicate topic name."))
+topicNamesToResolve.remove(name)
+duplicatedTopicNames.add(name)
+  }
+}
+
+def maybeAppendToIdsToResolve(id: Uuid): Unit = {
+  if (duplicatedTopicIds.contains(id) || !topicIdsToResolve.add(id)) {
+appendResponse(null, id, new ApiError(INVALID_REQUEST, "Duplicate 
topic ID."))
+topicIdsToResolve.remove(id)
+duplicatedTopicIds.add(id)
+  }
+}
+
+request.topicNames.forEach(maybeAppendToTopicNamesToResolve)
+
+request.topics.forEach {
+  topic => if (topic.name == null) {
+if (topic.topicId.equals(ZERO_UUID)) {
+  appendResponse(null, ZERO_UUID, new ApiError(INVALID_REQUEST,
+"Neither topic name nor id were specified."))
+} else {
+  maybeAppendToIdsToResolve(topic.topicId)
+}
+  } else {
+if (topic.topicId.equals(ZERO_UUID)) {
+  maybeAppendToTopicNamesToResolve(topic.name)
+} else {
+  appendResponse(topic.name, topic.topicId, new 
ApiError(INVALID_REQUEST,
+"You may not specify both topic name and topic id."))
+}
+  }
+}
+
+val idToName = new util.HashMap[Uuid, String]
+val unknownTopicNameErrors = new util.HashMap[String, ApiError]
+def maybeAppendToIdToName(id: Uuid, name: String): Unit = {
+  if (duplicatedTopicIds.contains(id) || idToName.put(id, name) != null) {
+  appendResponse(name, id, new ApiError(INVALID_REQUEST,
+  "The same topic was specified by name and by id."))
+  idToName.remove(id)
+  duplicatedTopicIds.add(id)
+  }
+}
+controller.findTopicIds(topicNamesToResolve).get().forEach { (name, 
idOrError) =>
+  if (idOrError.isError)
+unknownTopicNameErrors.put(name, idOrError.error)
+  else
+maybeAppendToIdToName(idOrError.result, name)
+}
+
+/**
+ * There are 6 error cases to handle here if we don't have permission to 
delete:
+ *
+ * 1. ID provided, topic missing => UNKNOWN_TOPIC_ID
+ * 2. ID provided, topic present, describeable => 
TOPIC_AUTHORIZATION_FAILED
+ * 3. ID provided, topic present, undescribeable => UNKNOWN_TOPIC_ID
+ * 4. name provided, topic missing, undescribable => 
UNKNOWN_TOPIC_OR_PARTITION

Review comment:
   These cases seem wrong. It should be the following:
   ```
* 4. name provided, topic missing, undescribable => 
TOPIC_AUTHORIZATION_FAILED
* 5. name provided, topic missing, describable => 
UNKNOWN_TOPIC_OR_

[GitHub] [kafka] hachikuji commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-03-03 Thread GitBox


hachikuji commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r586632247



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -541,6 +575,64 @@ static void validateNewTopicNames(Map 
topicErrors,
 return configChanges;
 }
 
+Map> findTopicIds(long offset, 
Collection names) {
+Map> results = new HashMap<>();
+for (String name : names) {
+if (name == null) {
+results.put(null, new ResultOrError<>(INVALID_REQUEST, 
"Invalid null topic name."));
+} else {
+Uuid id = topicsByName.get(name, offset);
+if (id == null) {
+results.put(name, new ResultOrError<>(
+new ApiError(UNKNOWN_TOPIC_OR_PARTITION)));
+} else {
+results.put(name, new ResultOrError<>(id));
+}
+}
+}
+return results;
+}
+
+Map> findTopicNames(long offset, 
Collection ids) {
+Map> results = new HashMap<>();
+for (Uuid id : ids) {
+TopicControlInfo topic = topics.get(id, offset);
+if (topic == null) {
+results.put(id, new ResultOrError<>(new 
ApiError(UNKNOWN_TOPIC_ID)));
+} else {
+results.put(id, new ResultOrError<>(topic.name));
+}
+}
+return results;
+}
+
+ControllerResult> deleteTopics(Collection ids) {
+Map results = new HashMap<>();
+List records = new ArrayList<>();
+for (Uuid id : ids) {
+try {
+deleteTopic(id, records);
+results.put(id, ApiError.NONE);
+} catch (ApiException e) {
+results.put(id, ApiError.fromThrowable(e));
+} catch (Exception e) {
+log.error("Unexpected deleteTopics error for {}", id, e);
+results.put(id, ApiError.fromThrowable(e));
+}
+}
+return new ControllerResult<>(records, results);
+}
+
+void deleteTopic(Uuid id, List records) {
+TopicControlInfo topic = topics.get(id);
+if (topic == null) {
+throw new UnknownTopicIdException(UNKNOWN_TOPIC_ID.message());
+}
+configurationControl.deleteTopicConfigs(topic.name);

Review comment:
   Why do we need to do this both here and in `replay`?

##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -541,6 +575,64 @@ static void validateNewTopicNames(Map 
topicErrors,
 return configChanges;
 }
 
+Map> findTopicIds(long offset, 
Collection names) {
+Map> results = new HashMap<>();

Review comment:
   nit: I guess we could initialize the size (a couple similar cases below 
as well)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-03-03 Thread GitBox


hachikuji commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r586979443



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -349,6 +359,29 @@ public void replay(PartitionChangeRecord record) {
 log.debug("Applied ISR change record: {}", record.toString());
 }
 
+public void replay(RemoveTopicRecord record) {
+// Remove this topic from the topics map and the topicsByName map.
+TopicControlInfo topic = topics.remove(record.topicId());
+if (topic == null) {
+throw new RuntimeException("Can't find topic with ID " + 
record.topicId() +
+" to remove.");
+}
+topicsByName.remove(topic.name);
+
+// Delete the configurations associated with this topic.
+configurationControl.deleteTopicConfigs(topic.name);
+
+// Remove the entries for this topic in brokersToIsrs.
+for (PartitionControlInfo partition : topic.parts.values()) {
+for (int i = 0; i < partition.isr.length; i++) {
+brokersToIsrs.removeTopicEntryForBroker(topic.id, 
partition.isr[i]);
+}
+}
+brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);

Review comment:
   The test case `BrokersToIsrsTest.testNoLeader` suggests that it is a 
possible case. It looks like the path through 
`ReplicationControlManager.handleNodeDeactivated` could result in a 
`PartitionChangeRecord` which has leaderId set to -1. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-03-03 Thread GitBox


hachikuji commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r587002273



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -349,6 +359,29 @@ public void replay(PartitionChangeRecord record) {
 log.debug("Applied ISR change record: {}", record.toString());
 }
 
+public void replay(RemoveTopicRecord record) {
+// Remove this topic from the topics map and the topicsByName map.
+TopicControlInfo topic = topics.remove(record.topicId());
+if (topic == null) {
+throw new RuntimeException("Can't find topic with ID " + 
record.topicId() +
+" to remove.");
+}
+topicsByName.remove(topic.name);
+
+// Delete the configurations associated with this topic.
+configurationControl.deleteTopicConfigs(topic.name);
+
+// Remove the entries for this topic in brokersToIsrs.
+for (PartitionControlInfo partition : topic.parts.values()) {
+for (int i = 0; i < partition.isr.length; i++) {
+brokersToIsrs.removeTopicEntryForBroker(topic.id, 
partition.isr[i]);
+}
+}
+brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);

Review comment:
   I stepped through `testNoLeader` and it seems that -1 can indeed be a 
key in `isrMembers`. The `noLeaderIterator` makes the expectation explicit. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-03-04 Thread GitBox


hachikuji commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r587670430



##
File path: core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
##
@@ -142,6 +148,199 @@ class ControllerApisTest {
   brokerRegistrationResponse.errorCounts().asScala)
   }
 
+  @Test
+  def testDeleteTopicsByName(): Unit = {
+val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+val controller = new MockController.Builder().newInitialTopic("foo", 
fooId).build()
+val controllerApis = createControllerApis(None, controller)
+val request = new DeleteTopicsRequestData().setTopicNames(
+  util.Arrays.asList("foo", "bar", "quux", "quux"))
+val expectedResponse = Set(new DeletableTopicResult().setName("quux").
+setErrorCode(Errors.INVALID_REQUEST.code()).
+setErrorMessage("Duplicate topic name."),
+  new DeletableTopicResult().setName("bar").
+setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()).
+setErrorMessage("This server does not host this topic-partition."),
+  new DeletableTopicResult().setName("foo").setTopicId(fooId))
+assertEquals(expectedResponse, controllerApis.deleteTopics(request,
+  ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+  true,
+  _ => Set.empty,
+  _ => Set.empty).asScala.toSet)
+  }
+
+  @Test
+  def testDeleteTopicsById(): Unit = {
+val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+val quuxId = Uuid.fromString("ObXkLhL_S5W62FAE67U3MQ")
+val controller = new MockController.Builder().newInitialTopic("foo", 
fooId).build()
+val controllerApis = createControllerApis(None, controller)
+val request = new DeleteTopicsRequestData()
+request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(fooId))
+request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(barId))
+request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(quuxId))
+request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(quuxId))
+val response = Set(new 
DeletableTopicResult().setName(null).setTopicId(quuxId).
+setErrorCode(Errors.INVALID_REQUEST.code()).
+setErrorMessage("Duplicate topic id."),
+  new DeletableTopicResult().setName(null).setTopicId(barId).
+setErrorCode(Errors.UNKNOWN_TOPIC_ID.code()).
+setErrorMessage("This server does not host this topic ID."),
+  new DeletableTopicResult().setName("foo").setTopicId(fooId))
+assertEquals(response, controllerApis.deleteTopics(request,
+  ApiKeys.DELETE_TOPICS.latestVersion().toInt,
+  true,
+  _ => Set.empty,
+  _ => Set.empty).asScala.toSet)
+  }
+
+  @Test
+  def testInvalidDeleteTopicsRequest(): Unit = {
+val fooId = Uuid.fromString("vZKYST0pSA2HO5x_6hoO2Q")
+val barId = Uuid.fromString("VlFu5c51ToiNx64wtwkhQw")
+val bazId = Uuid.fromString("YOS4oQ3UT9eSAZahN1ysSA")
+val controller = new MockController.Builder().
+  newInitialTopic("foo", fooId).
+  newInitialTopic("bar", barId).build()
+val controllerApis = createControllerApis(None, controller)
+val request = new DeleteTopicsRequestData()
+request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(ZERO_UUID))
+request.topics().add(new 
DeleteTopicState().setName("foo").setTopicId(fooId))
+request.topics().add(new 
DeleteTopicState().setName("bar").setTopicId(ZERO_UUID))
+request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(barId))
+request.topics().add(new 
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+request.topics().add(new 
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+request.topics().add(new 
DeleteTopicState().setName("quux").setTopicId(ZERO_UUID))
+request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(bazId))
+request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(bazId))
+request.topics().add(new 
DeleteTopicState().setName(null).setTopicId(bazId))
+val response = Set(new 
DeletableTopicResult().setName(null).setTopicId(ZERO_UUID).
+setErrorCode(Errors.INVALID_REQUEST.code()).
+setErrorMessage("Neither topic name nor id were specified."),
+  new DeletableTopicResult().setName("foo").setTopicId(fooId).
+setErrorCode(Errors.INVALID_REQUEST.code()).
+setErrorMessage("You may not specify both topic name and topic id."),
+  new DeletableTopicResult().setName("bar").setTopicId(barId).
+setErrorCode(Errors.INVALID_REQUEST.code()).
+setErrorMessage("The provided topic name maps to an ID that was 
already supplied."),
+  new DeletableTopicResult().setName("quux").setTopicId(ZERO_UUID).
+setErrorCode(Errors.INVALID_REQUEST.code()).
+setErrorMessage("Duplicate topic name."),
+  new DeletableTopicResult().setName(null).setTopicId(ba