This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 981815c KAFKA-8034: Use automatic RPC generation in DeleteTopics 981815c is described below commit 981815c8d14daf1042e06f8fa9cb355187719b1d Author: Mickael Maison <mimai...@users.noreply.github.com> AuthorDate: Fri Mar 29 21:32:36 2019 +0000 KAFKA-8034: Use automatic RPC generation in DeleteTopics Reviewers: Colin P. McCabe <cmcc...@apache.org> --- .../kafka/clients/admin/KafkaAdminClient.java | 16 ++-- .../org/apache/kafka/common/protocol/ApiKeys.java | 6 +- .../kafka/common/requests/AbstractResponse.java | 2 +- .../kafka/common/requests/DeleteTopicsRequest.java | 86 +++++++------------- .../common/requests/DeleteTopicsResponse.java | 93 +++++----------------- .../common/message/DeleteTopicsResponse.json | 6 +- .../kafka/clients/admin/KafkaAdminClientTest.java | 16 +++- .../kafka/common/requests/RequestResponseTest.java | 20 +++-- core/src/main/scala/kafka/server/KafkaApis.scala | 75 ++++++++++------- .../kafka/api/AuthorizerIntegrationTest.scala | 18 +++-- .../kafka/server/DeleteTopicsRequestTest.scala | 55 ++++++++----- ...leteTopicsRequestWithDeletionDisabledTest.scala | 17 ++-- .../scala/unit/kafka/server/RequestQuotaTest.scala | 10 ++- 13 files changed, 206 insertions(+), 214 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 317cd7c..336597f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -63,6 +63,8 @@ import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.message.CreateTopicsRequestData; import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicSet; import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.DeleteTopicsRequestData; +import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult; import org.apache.kafka.common.message.DescribeGroupsRequestData; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroup; import org.apache.kafka.common.message.DescribeGroupsResponseData.DescribedGroupMember; @@ -1365,14 +1367,16 @@ public class KafkaAdminClient extends AdminClient { @Override AbstractRequest.Builder createRequest(int timeoutMs) { - return new DeleteTopicsRequest.Builder(new HashSet<>(validTopicNames), timeoutMs); + return new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData() + .setTopicNames(validTopicNames) + .setTimeoutMs(timeoutMs)); } @Override void handleResponse(AbstractResponse abstractResponse) { DeleteTopicsResponse response = (DeleteTopicsResponse) abstractResponse; // Check for controller change - for (Errors error : response.errors().values()) { + for (Errors error : response.errorCounts().keySet()) { if (error == Errors.NOT_CONTROLLER) { metadataManager.clearController(); metadataManager.requestUpdate(); @@ -1380,12 +1384,12 @@ public class KafkaAdminClient extends AdminClient { } } // Handle server responses for particular topics. - for (Map.Entry<String, Errors> entry : response.errors().entrySet()) { - KafkaFutureImpl<Void> future = topicFutures.get(entry.getKey()); + for (DeletableTopicResult result : response.data().responses()) { + KafkaFutureImpl<Void> future = topicFutures.get(result.name()); if (future == null) { - log.warn("Server response mentioned unknown topic {}", entry.getKey()); + log.warn("Server response mentioned unknown topic {}", result.name()); } else { - ApiException exception = entry.getValue().exception(); + ApiException exception = Errors.forCode(result.errorCode()).exception(); if (exception != null) { future.completeExceptionally(exception); } else { diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index 3f8d80d..ed9787f 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -18,6 +18,8 @@ package org.apache.kafka.common.protocol; import org.apache.kafka.common.message.CreateTopicsRequestData; import org.apache.kafka.common.message.CreateTopicsResponseData; +import org.apache.kafka.common.message.DeleteTopicsRequestData; +import org.apache.kafka.common.message.DeleteTopicsResponseData; import org.apache.kafka.common.message.DescribeGroupsRequestData; import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.ElectPreferredLeadersRequestData; @@ -61,8 +63,6 @@ import org.apache.kafka.common.requests.DeleteGroupsRequest; import org.apache.kafka.common.requests.DeleteGroupsResponse; import org.apache.kafka.common.requests.DeleteRecordsRequest; import org.apache.kafka.common.requests.DeleteRecordsResponse; -import org.apache.kafka.common.requests.DeleteTopicsRequest; -import org.apache.kafka.common.requests.DeleteTopicsResponse; import org.apache.kafka.common.requests.DescribeAclsRequest; import org.apache.kafka.common.requests.DescribeAclsResponse; import org.apache.kafka.common.requests.DescribeConfigsRequest; @@ -153,7 +153,7 @@ public enum ApiKeys { } }, CREATE_TOPICS(19, "CreateTopics", CreateTopicsRequestData.SCHEMAS, CreateTopicsResponseData.SCHEMAS), - DELETE_TOPICS(20, "DeleteTopics", DeleteTopicsRequest.schemaVersions(), DeleteTopicsResponse.schemaVersions()), + DELETE_TOPICS(20, "DeleteTopics", DeleteTopicsRequestData.SCHEMAS, DeleteTopicsResponseData.SCHEMAS), DELETE_RECORDS(21, "DeleteRecords", DeleteRecordsRequest.schemaVersions(), DeleteRecordsResponse.schemaVersions()), INIT_PRODUCER_ID(22, "InitProducerId", InitProducerIdRequest.schemaVersions(), InitProducerIdResponse.schemaVersions()), diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java index eadd302..f74e1ae 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractResponse.java @@ -111,7 +111,7 @@ public abstract class AbstractResponse extends AbstractRequestResponse { case CREATE_TOPICS: return new CreateTopicsResponse(struct, version); case DELETE_TOPICS: - return new DeleteTopicsResponse(struct); + return new DeleteTopicsResponse(struct, version); case DELETE_RECORDS: return new DeleteRecordsResponse(struct); case INIT_PRODUCER_ID: diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java index facb55e..978d1c0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsRequest.java @@ -16,19 +16,16 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.message.DeleteTopicsRequestData; +import org.apache.kafka.common.message.DeleteTopicsResponseData; +import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.ArrayOf; import org.apache.kafka.common.protocol.types.Field; import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.utils.Utils; import java.nio.ByteBuffer; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; import static org.apache.kafka.common.protocol.types.Type.INT32; import static org.apache.kafka.common.protocol.types.Type.STRING; @@ -62,85 +59,62 @@ public class DeleteTopicsRequest extends AbstractRequest { DELETE_TOPICS_REQUEST_V2, DELETE_TOPICS_REQUEST_V3}; } - private final Set<String> topics; - private final Integer timeout; + private DeleteTopicsRequestData data; + private final short version; public static class Builder extends AbstractRequest.Builder<DeleteTopicsRequest> { - private final Set<String> topics; - private final Integer timeout; + private DeleteTopicsRequestData data; - public Builder(Set<String> topics, Integer timeout) { + public Builder(DeleteTopicsRequestData data) { super(ApiKeys.DELETE_TOPICS); - this.topics = topics; - this.timeout = timeout; + this.data = data; } @Override public DeleteTopicsRequest build(short version) { - return new DeleteTopicsRequest(topics, timeout, version); + return new DeleteTopicsRequest(data, version); } @Override public String toString() { - StringBuilder bld = new StringBuilder(); - bld.append("(type=DeleteTopicsRequest"). - append(", topics=(").append(Utils.join(topics, ", ")).append(")"). - append(", timeout=").append(timeout). - append(")"); - return bld.toString(); + return data.toString(); } } - private DeleteTopicsRequest(Set<String> topics, Integer timeout, short version) { + private DeleteTopicsRequest(DeleteTopicsRequestData data, short version) { super(ApiKeys.DELETE_TOPICS, version); - this.topics = topics; - this.timeout = timeout; + this.data = data; + this.version = version; } public DeleteTopicsRequest(Struct struct, short version) { super(ApiKeys.DELETE_TOPICS, version); - Object[] topicsArray = struct.getArray(TOPICS_KEY_NAME); - Set<String> topics = new HashSet<>(topicsArray.length); - for (Object topic : topicsArray) - topics.add((String) topic); - - this.topics = topics; - this.timeout = struct.getInt(TIMEOUT_KEY_NAME); + this.data = new DeleteTopicsRequestData(struct, version); + this.version = version; } @Override protected Struct toStruct() { - Struct struct = new Struct(ApiKeys.DELETE_TOPICS.requestSchema(version())); - struct.set(TOPICS_KEY_NAME, topics.toArray()); - struct.set(TIMEOUT_KEY_NAME, timeout); - return struct; + return data.toStruct(version); + } + + public DeleteTopicsRequestData data() { + return data; } @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - Map<String, Errors> topicErrors = new HashMap<>(); - for (String topic : topics) - topicErrors.put(topic, Errors.forException(e)); - - switch (version()) { - case 0: - return new DeleteTopicsResponse(topicErrors); - case 1: - case 2: - case 3: - return new DeleteTopicsResponse(throttleTimeMs, topicErrors); - default: - throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", - version(), this.getClass().getSimpleName(), ApiKeys.DELETE_TOPICS.latestVersion())); + DeleteTopicsResponseData response = new DeleteTopicsResponseData(); + if (version >= 1) { + response.setThrottleTimeMs(throttleTimeMs); } - } - - public Set<String> topics() { - return topics; - } - - public Integer timeout() { - return this.timeout; + ApiError apiError = ApiError.fromThrowable(e); + for (String topic : data.topicNames()) { + response.responses().add(new DeletableTopicResult() + .setName(topic) + .setErrorCode(apiError.error().code())); + } + return new DeleteTopicsResponse(response); } public static DeleteTopicsRequest parse(ByteBuffer buffer, short version) { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java index 650caa8..aa8e552 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteTopicsResponse.java @@ -16,54 +16,18 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.message.DeleteTopicsResponseData; +import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; -import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; public class DeleteTopicsResponse extends AbstractResponse { - private static final String TOPIC_ERROR_CODES_KEY_NAME = "topic_error_codes"; - - private static final Schema TOPIC_ERROR_CODE = new Schema( - TOPIC_NAME, - ERROR_CODE); - - private static final Schema DELETE_TOPICS_RESPONSE_V0 = new Schema( - new Field(TOPIC_ERROR_CODES_KEY_NAME, - new ArrayOf(TOPIC_ERROR_CODE), "An array of per topic error codes.")); - - private static final Schema DELETE_TOPICS_RESPONSE_V1 = new Schema( - THROTTLE_TIME_MS, - new Field(TOPIC_ERROR_CODES_KEY_NAME, new ArrayOf(TOPIC_ERROR_CODE), "An array of per topic error codes.")); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema DELETE_TOPICS_RESPONSE_V2 = DELETE_TOPICS_RESPONSE_V1; - - /** - * v3 request is the same that as v2. The response is different based on the request version. - * In v3 version a TopicDeletionDisabledException is returned - */ - private static final Schema DELETE_TOPICS_RESPONSE_V3 = DELETE_TOPICS_RESPONSE_V2; - - public static Schema[] schemaVersions() { - return new Schema[]{DELETE_TOPICS_RESPONSE_V0, DELETE_TOPICS_RESPONSE_V1, - DELETE_TOPICS_RESPONSE_V2, DELETE_TOPICS_RESPONSE_V3}; - } - /** * Possible error codes: @@ -75,63 +39,42 @@ public class DeleteTopicsResponse extends AbstractResponse { * INVALID_REQUEST(42) * TOPIC_DELETION_DISABLED(73) */ - private final Map<String, Errors> errors; - private final int throttleTimeMs; + private DeleteTopicsResponseData data; - public DeleteTopicsResponse(Map<String, Errors> errors) { - this(DEFAULT_THROTTLE_TIME, errors); + public DeleteTopicsResponse(DeleteTopicsResponseData data) { + this.data = data; } - public DeleteTopicsResponse(int throttleTimeMs, Map<String, Errors> errors) { - this.throttleTimeMs = throttleTimeMs; - this.errors = errors; - } - - public DeleteTopicsResponse(Struct struct) { - this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); - Object[] topicErrorCodesStructs = struct.getArray(TOPIC_ERROR_CODES_KEY_NAME); - Map<String, Errors> errors = new HashMap<>(); - for (Object topicErrorCodeStructObj : topicErrorCodesStructs) { - Struct topicErrorCodeStruct = (Struct) topicErrorCodeStructObj; - String topic = topicErrorCodeStruct.get(TOPIC_NAME); - Errors error = Errors.forCode(topicErrorCodeStruct.get(ERROR_CODE)); - errors.put(topic, error); - } - - this.errors = errors; + public DeleteTopicsResponse(Struct struct, short version) { + this.data = new DeleteTopicsResponseData(struct, version); } @Override protected Struct toStruct(short version) { - Struct struct = new Struct(ApiKeys.DELETE_TOPICS.responseSchema(version)); - struct.setIfExists(THROTTLE_TIME_MS, throttleTimeMs); - List<Struct> topicErrorCodeStructs = new ArrayList<>(errors.size()); - for (Map.Entry<String, Errors> topicError : errors.entrySet()) { - Struct topicErrorCodeStruct = struct.instance(TOPIC_ERROR_CODES_KEY_NAME); - topicErrorCodeStruct.set(TOPIC_NAME, topicError.getKey()); - topicErrorCodeStruct.set(ERROR_CODE, topicError.getValue().code()); - topicErrorCodeStructs.add(topicErrorCodeStruct); - } - struct.set(TOPIC_ERROR_CODES_KEY_NAME, topicErrorCodeStructs.toArray()); - return struct; + return data.toStruct(version); } @Override public int throttleTimeMs() { - return throttleTimeMs; + return data.throttleTimeMs(); } - public Map<String, Errors> errors() { - return errors; + public DeleteTopicsResponseData data() { + return data; } @Override public Map<Errors, Integer> errorCounts() { - return errorCounts(errors); + HashMap<Errors, Integer> counts = new HashMap<>(); + for (DeletableTopicResult result : data.responses()) { + Errors error = Errors.forCode(result.errorCode()); + counts.put(error, counts.getOrDefault(error, 0) + 1); + } + return counts; } public static DeleteTopicsResponse parse(ByteBuffer buffer, short version) { - return new DeleteTopicsResponse(ApiKeys.DELETE_TOPICS.responseSchema(version).read(buffer)); + return new DeleteTopicsResponse(ApiKeys.DELETE_TOPICS.parseResponse(version, buffer), version); } @Override diff --git a/clients/src/main/resources/common/message/DeleteTopicsResponse.json b/clients/src/main/resources/common/message/DeleteTopicsResponse.json index cf0837b..4cea44b 100644 --- a/clients/src/main/resources/common/message/DeleteTopicsResponse.json +++ b/clients/src/main/resources/common/message/DeleteTopicsResponse.json @@ -22,11 +22,11 @@ // Starting in version 3, a TOPIC_DELETION_DISABLED error code may be returned. "validVersions": "0-3", "fields": [ - { "name": "throttleTimeMs", "type": "int32", "versions": "1+", + { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { "name": "Responses", "type": "[]DeletableTopicResult", "versions": "0+", - "about": "The results for each topic.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", + "about": "The results for each topic we tried to delete.", "fields": [ + { "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "about": "The topic name" }, { "name": "ErrorCode", "type": "int16", "versions": "0+", "about": "The deletion error, or 0 if the deletion succeeded." } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index af6f49b..220fc50 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -53,6 +53,8 @@ import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.message.CreateTopicsResponseData; import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.DeleteTopicsResponseData; +import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult; import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.ElectPreferredLeadersResponseData; import org.apache.kafka.common.message.ElectPreferredLeadersResponseData.PartitionResult; @@ -223,6 +225,14 @@ public class KafkaAdminClientTest { return new CreateTopicsResponse(data); } + private static DeleteTopicsResponse prepareDeleteTopicsResponse(String topicName, Errors error) { + DeleteTopicsResponseData data = new DeleteTopicsResponseData(); + data.responses().add(new DeletableTopicResult() + .setName(topicName) + .setErrorCode(error.code())); + return new DeleteTopicsResponse(data); + } + /** * Test that the client properly times out when we don't receive any metadata. */ @@ -394,19 +404,19 @@ public class KafkaAdminClientTest { env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest, - new DeleteTopicsResponse(Collections.singletonMap("myTopic", Errors.NONE))); + prepareDeleteTopicsResponse("myTopic", Errors.NONE)); KafkaFuture<Void> future = env.adminClient().deleteTopics(Collections.singletonList("myTopic"), new DeleteTopicsOptions()).all(); future.get(); env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest, - new DeleteTopicsResponse(Collections.singletonMap("myTopic", Errors.TOPIC_DELETION_DISABLED))); + prepareDeleteTopicsResponse("myTopic", Errors.TOPIC_DELETION_DISABLED)); future = env.adminClient().deleteTopics(Collections.singletonList("myTopic"), new DeleteTopicsOptions()).all(); TestUtils.assertFutureError(future, TopicDeletionDisabledException.class); env.kafkaClient().prepareResponse(body -> body instanceof DeleteTopicsRequest, - new DeleteTopicsResponse(Collections.singletonMap("myTopic", Errors.UNKNOWN_TOPIC_OR_PARTITION))); + prepareDeleteTopicsResponse("myTopic", Errors.UNKNOWN_TOPIC_OR_PARTITION)); future = env.adminClient().deleteTopics(Collections.singletonList("myTopic"), new DeleteTopicsOptions()).all(); TestUtils.assertFutureError(future, UnknownTopicOrPartitionException.class); diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 5690743..5b92b1b 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -38,6 +38,9 @@ import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic; import org.apache.kafka.common.message.CreateTopicsRequestData.CreateableTopicConfig; import org.apache.kafka.common.message.CreateTopicsResponseData; import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult; +import org.apache.kafka.common.message.DeleteTopicsRequestData; +import org.apache.kafka.common.message.DeleteTopicsResponseData; +import org.apache.kafka.common.message.DeleteTopicsResponseData.DeletableTopicResult; import org.apache.kafka.common.message.DescribeGroupsRequestData; import org.apache.kafka.common.message.DescribeGroupsResponseData; import org.apache.kafka.common.message.ElectPreferredLeadersRequestData; @@ -1124,14 +1127,21 @@ public class RequestResponseTest { } private DeleteTopicsRequest createDeleteTopicsRequest() { - return new DeleteTopicsRequest.Builder(Utils.mkSet("my_t1", "my_t2"), 10000).build(); + return new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopicNames(Arrays.asList("my_t1", "my_t2")) + .setTimeoutMs(1000)).build(); } private DeleteTopicsResponse createDeleteTopicsResponse() { - Map<String, Errors> errors = new HashMap<>(); - errors.put("t1", Errors.INVALID_TOPIC_EXCEPTION); - errors.put("t2", Errors.TOPIC_AUTHORIZATION_FAILED); - return new DeleteTopicsResponse(errors); + DeleteTopicsResponseData data = new DeleteTopicsResponseData(); + data.responses().add(new DeletableTopicResult() + .setName("t1") + .setErrorCode(Errors.INVALID_TOPIC_EXCEPTION.code())); + data.responses().add(new DeletableTopicResult() + .setName("t2") + .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code())); + return new DeleteTopicsResponse(data); } private InitProducerIdRequest createInitPidRequest() { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 68d0823..fc06162 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -47,6 +47,8 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic import org.apache.kafka.common.message.CreateTopicsResponseData import org.apache.kafka.common.message.CreateTopicsResponseData.{CreatableTopicResult, CreatableTopicResultSet} +import org.apache.kafka.common.message.DeleteTopicsResponseData +import org.apache.kafka.common.message.DeleteTopicsResponseData.{DeletableTopicResult, DeletableTopicResultSet} import org.apache.kafka.common.message.DescribeGroupsResponseData import org.apache.kafka.common.message.ElectPreferredLeadersResponseData import org.apache.kafka.common.message.JoinGroupResponseData @@ -1565,51 +1567,66 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleDeleteTopicsRequest(request: RequestChannel.Request) { - val deleteTopicRequest = request.body[DeleteTopicsRequest] - - val unauthorizedTopicErrors = mutable.Map[String, Errors]() - val nonExistingTopicErrors = mutable.Map[String, Errors]() - val authorizedForDeleteTopics = mutable.Set[String]() - - for (topic <- deleteTopicRequest.topics.asScala) { - if (!authorize(request.session, Delete, Resource(Topic, topic, LITERAL))) - unauthorizedTopicErrors += topic -> Errors.TOPIC_AUTHORIZATION_FAILED - else if (!metadataCache.contains(topic)) - nonExistingTopicErrors += topic -> Errors.UNKNOWN_TOPIC_OR_PARTITION - else - authorizedForDeleteTopics.add(topic) - } - - def sendResponseCallback(authorizedTopicErrors: Map[String, Errors]): Unit = { + def sendResponseCallback(results: DeletableTopicResultSet): Unit = { def createResponse(requestThrottleMs: Int): AbstractResponse = { - val completeResults = unauthorizedTopicErrors ++ nonExistingTopicErrors ++ authorizedTopicErrors - val responseBody = new DeleteTopicsResponse(requestThrottleMs, completeResults.asJava) + val responseData = new DeleteTopicsResponseData() + .setThrottleTimeMs(requestThrottleMs) + .setResponses(results) + val responseBody = new DeleteTopicsResponse(responseData) trace(s"Sending delete topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.") responseBody } sendResponseMaybeThrottle(request, createResponse) } + val deleteTopicRequest = request.body[DeleteTopicsRequest] + val results = new DeletableTopicResultSet(deleteTopicRequest.data.topicNames.size) + val toDelete = mutable.Set[String]() if (!controller.isActive) { - val results = deleteTopicRequest.topics.asScala.map { topic => - (topic, Errors.NOT_CONTROLLER) - }.toMap + deleteTopicRequest.data.topicNames.asScala.foreach { case topic => + results.add(new DeletableTopicResult() + .setName(topic) + .setErrorCode(Errors.NOT_CONTROLLER.code)) + } sendResponseCallback(results) } else if (!config.deleteTopicEnable) { val error = if (request.context.apiVersion < 3) Errors.INVALID_REQUEST else Errors.TOPIC_DELETION_DISABLED - val results = deleteTopicRequest.topics.asScala.map { topic => - (topic, error) - }.toMap + deleteTopicRequest.data.topicNames.asScala.foreach { case topic => + results.add(new DeletableTopicResult() + .setName(topic) + .setErrorCode(error.code)) + } sendResponseCallback(results) } else { + deleteTopicRequest.data.topicNames.asScala.foreach { case topic => + results.add(new DeletableTopicResult() + .setName(topic)) + } + results.asScala.foreach(topic => { + if (!authorize(request.session, Delete, Resource(Topic, topic.name, LITERAL))) + topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code) + else if (!metadataCache.contains(topic.name)) + topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code) + else + toDelete += topic.name + }) // If no authorized topics return immediately - if (authorizedForDeleteTopics.isEmpty) - sendResponseCallback(Map()) + if (toDelete.isEmpty) + sendResponseCallback(results) else { + def handleDeleteTopicsResults(errors: Map[String, Errors]): Unit = { + errors.foreach { + case (topicName, error) => + results.find(topicName) + .setErrorCode(error.code) + } + sendResponseCallback(results) + } + adminManager.deleteTopics( - deleteTopicRequest.timeout.toInt, - authorizedForDeleteTopics, - sendResponseCallback + deleteTopicRequest.data.timeoutMs.toInt, + toDelete, + handleDeleteTopicsResults ) } } diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 30cc161..11ee87c 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -33,7 +33,7 @@ import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME -import org.apache.kafka.common.message.{CreateTopicsRequestData, DescribeGroupsRequestData, JoinGroupRequestData, LeaveGroupRequestData} +import org.apache.kafka.common.message.{CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, JoinGroupRequestData, LeaveGroupRequestData} import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet} import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors} @@ -172,8 +172,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.LEADER_AND_ISR -> ((resp: requests.LeaderAndIsrResponse) => resp.responses.asScala.find(_._1 == tp).get._2), ApiKeys.STOP_REPLICA -> ((resp: requests.StopReplicaResponse) => resp.responses.asScala.find(_._1 == tp).get._2), ApiKeys.CONTROLLED_SHUTDOWN -> ((resp: requests.ControlledShutdownResponse) => resp.error), - ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => Errors.forCode(resp.data().topics().find(createTopic).errorCode())), - ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => resp.errors.asScala.find(_._1 == deleteTopic).get._2), + ApiKeys.CREATE_TOPICS -> ((resp: CreateTopicsResponse) => Errors.forCode(resp.data.topics.find(createTopic).errorCode())), + ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) => Errors.forCode(resp.data.responses.find(deleteTopic).errorCode())), ApiKeys.DELETE_RECORDS -> ((resp: requests.DeleteRecordsResponse) => resp.responses.get(deleteRecordsPartition).error), ApiKeys.OFFSET_FOR_LEADER_EPOCH -> ((resp: OffsetsForLeaderEpochResponse) => resp.responses.get(tp).error), ApiKeys.DESCRIBE_CONFIGS -> ((resp: DescribeConfigsResponse) => @@ -371,7 +371,11 @@ class AuthorizerIntegrationTest extends BaseRequestTest { setName(createTopic).setNumPartitions(1). setReplicationFactor(1.toShort)).iterator))).build() - private def deleteTopicsRequest = new DeleteTopicsRequest.Builder(Set(deleteTopic).asJava, 5000).build() + private def deleteTopicsRequest = + new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopicNames(Collections.singletonList(deleteTopic)) + .setTimeoutMs(5000)).build() private def deleteRecordsRequest = new DeleteRecordsRequest.Builder(5000, Collections.singletonMap(deleteRecordsPartition, 0L)).build() @@ -1184,7 +1188,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val response = connectAndSend(deleteTopicsRequest, ApiKeys.DELETE_TOPICS) val version = ApiKeys.DELETE_TOPICS.latestVersion val deleteResponse = DeleteTopicsResponse.parse(response, version) - assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, deleteResponse.errors.asScala.head._2) + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, deleteResponse.data.responses.find(deleteTopic).errorCode) } @Test @@ -1194,7 +1198,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val version = ApiKeys.DELETE_TOPICS.latestVersion val deleteResponse = DeleteTopicsResponse.parse(response, version) - assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, deleteResponse.errors.asScala.head._2) + assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code, deleteResponse.data.responses.find(deleteTopic).errorCode) } @Test @@ -1204,7 +1208,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val version = ApiKeys.DELETE_TOPICS.latestVersion val deleteResponse = DeleteTopicsResponse.parse(response, version) - assertEquals(Errors.NONE, deleteResponse.errors.asScala.head._2) + assertEquals(Errors.NONE.code, deleteResponse.data.responses.find(deleteTopic).errorCode) } @Test diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala index 4388e64..2df7528 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestTest.scala @@ -19,12 +19,15 @@ package kafka.server import kafka.network.SocketServer import kafka.utils._ +import org.apache.kafka.common.message.DeleteTopicsRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{DeleteTopicsRequest, DeleteTopicsResponse, MetadataRequest, MetadataResponse} import org.junit.Assert._ import org.junit.Test import scala.collection.JavaConverters._ +import java.util.Collections +import java.util.Arrays class DeleteTopicsRequestTest extends BaseRequestTest { @@ -33,20 +36,24 @@ class DeleteTopicsRequestTest extends BaseRequestTest { val timeout = 10000 // Single topic createTopic("topic-1", 1, 1) - validateValidDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set("topic-1").asJava, timeout).build()) + validateValidDeleteTopicRequests(new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopicNames(Arrays.asList("topic-1")) + .setTimeoutMs(timeout)).build()) // Multi topic createTopic("topic-3", 5, 2) createTopic("topic-4", 1, 2) - validateValidDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set("topic-3", "topic-4").asJava, timeout).build()) + validateValidDeleteTopicRequests(new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopicNames(Arrays.asList("topic-3", "topic-4")) + .setTimeoutMs(timeout)).build()) } private def validateValidDeleteTopicRequests(request: DeleteTopicsRequest): Unit = { val response = sendDeleteTopicsRequest(request) - - val error = response.errors.values.asScala.find(_ != Errors.NONE) - assertTrue(s"There should be no errors, found ${response.errors.asScala}", error.isEmpty) - - request.topics.asScala.foreach { topic => + val error = response.errorCounts.asScala.find(_._1 != Errors.NONE) + assertTrue(s"There should be no errors, found ${response.data.responses.asScala}", error.isEmpty) + request.data.topicNames.asScala.foreach { topic => validateTopicIsDeleted(topic) } } @@ -57,14 +64,18 @@ class DeleteTopicsRequestTest extends BaseRequestTest { val timeoutTopic = "invalid-timeout" // Basic - validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set("invalid-topic").asJava, timeout).build(), + validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopicNames(Arrays.asList("invalid-topic")) + .setTimeoutMs(timeout)).build(), Map("invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION)) // Partial createTopic("partial-topic-1", 1, 1) - validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set( - "partial-topic-1", - "partial-invalid-topic").asJava, timeout).build(), + validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopicNames(Arrays.asList("partial-topic-1", "partial-invalid-topic")) + .setTimeoutMs(timeout)).build(), Map( "partial-topic-1" -> Errors.NONE, "partial-invalid-topic" -> Errors.UNKNOWN_TOPIC_OR_PARTITION @@ -74,7 +85,10 @@ class DeleteTopicsRequestTest extends BaseRequestTest { // Timeout createTopic(timeoutTopic, 5, 2) // Must be a 0ms timeout to avoid transient test failures. Even a timeout of 1ms has succeeded in the past. - validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder(Set(timeoutTopic).asJava, 0).build(), + validateErrorDeleteTopicRequests(new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopicNames(Arrays.asList(timeoutTopic)) + .setTimeoutMs(0)).build(), Map(timeoutTopic -> Errors.REQUEST_TIMED_OUT)) // The topic should still get deleted eventually TestUtils.waitUntilTrue(() => !servers.head.metadataCache.contains(timeoutTopic), s"Topic $timeoutTopic is never deleted") @@ -83,11 +97,13 @@ class DeleteTopicsRequestTest extends BaseRequestTest { private def validateErrorDeleteTopicRequests(request: DeleteTopicsRequest, expectedResponse: Map[String, Errors]): Unit = { val response = sendDeleteTopicsRequest(request) - val errors = response.errors.asScala - assertEquals("The response size should match", expectedResponse.size, response.errors.size) + val errors = response.data.responses + + val errorCount = response.errorCounts().asScala.foldLeft(0)(_+_._2) + assertEquals("The response size should match", expectedResponse.size, errorCount) expectedResponse.foreach { case (topic, expectedError) => - assertEquals("The response error should match", expectedResponse(topic), errors(topic)) + assertEquals("The response error should match", expectedResponse(topic).code, errors.find(topic).errorCode) // If no error validate the topic was deleted if (expectedError == Errors.NONE) { validateTopicIsDeleted(topic) @@ -97,11 +113,14 @@ class DeleteTopicsRequestTest extends BaseRequestTest { @Test def testNotController() { - val request = new DeleteTopicsRequest.Builder(Set("not-controller").asJava, 1000).build() + val request = new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopicNames(Collections.singletonList("not-controller")) + .setTimeoutMs(1000)).build() val response = sendDeleteTopicsRequest(request, notControllerSocketServer) - val error = response.errors.asScala.head._2 - assertEquals("Expected controller error when routed incorrectly", Errors.NOT_CONTROLLER, error) + val error = response.data.responses().find("not-controller").errorCode() + assertEquals("Expected controller error when routed incorrectly", Errors.NOT_CONTROLLER.code, error) } private def validateTopicIsDeleted(topic: String): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala index 20e30c0..7240d77 100644 --- a/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala +++ b/core/src/test/scala/unit/kafka/server/DeleteTopicsRequestWithDeletionDisabledTest.scala @@ -19,12 +19,13 @@ package kafka.server import kafka.network.SocketServer import kafka.utils._ +import org.apache.kafka.common.message.DeleteTopicsRequestData import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{DeleteTopicsRequest, DeleteTopicsResponse} import org.junit.Assert._ import org.junit.Test -import scala.collection.JavaConverters._ +import java.util.Collections class DeleteTopicsRequestWithDeletionDisabledTest extends BaseRequestTest { @@ -42,13 +43,19 @@ class DeleteTopicsRequestWithDeletionDisabledTest extends BaseRequestTest { @Test def testDeleteRecordsRequest() { val topic = "topic-1" - val request = new DeleteTopicsRequest.Builder(Set(topic).asJava, 1000).build() + val request = new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopicNames(Collections.singletonList(topic)) + .setTimeoutMs(1000)).build() val response = sendDeleteTopicsRequest(request) - assertEquals(Errors.TOPIC_DELETION_DISABLED, response.errors.get(topic)) + assertEquals(Errors.TOPIC_DELETION_DISABLED.code, response.data.responses.find(topic).errorCode) - val v2request = new DeleteTopicsRequest.Builder(Set(topic).asJava, 1000).build(2) + val v2request = new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopicNames(Collections.singletonList(topic)) + .setTimeoutMs(1000)).build(2) val v2response = sendDeleteTopicsRequest(v2request) - assertEquals(Errors.INVALID_REQUEST, v2response.errors.get(topic)) + assertEquals(Errors.INVALID_REQUEST.code, v2response.data.responses.find(topic).errorCode) } private def sendDeleteTopicsRequest(request: DeleteTopicsRequest, socketServer: SocketServer = controllerSocketServer): DeleteTopicsResponse = { diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 22967e2..28f7e07 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -24,7 +24,7 @@ import kafka.security.auth._ import kafka.utils.TestUtils import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} import org.apache.kafka.common.config.ConfigResource -import org.apache.kafka.common.message.{CreateTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, LeaveGroupRequestData, JoinGroupRequestData} +import org.apache.kafka.common.message.{CreateTopicsRequestData, DeleteTopicsRequestData, DescribeGroupsRequestData, ElectPreferredLeadersRequestData, LeaveGroupRequestData, JoinGroupRequestData} import org.apache.kafka.common.resource.{PatternType, ResourcePattern, ResourcePatternFilter, ResourceType => AdminResourceType} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.message.CreateTopicsRequestData.{CreatableTopic, CreatableTopicSet} @@ -307,7 +307,10 @@ class RequestQuotaTest extends BaseRequestTest { } case ApiKeys.DELETE_TOPICS => - new DeleteTopicsRequest.Builder(Set("topic-2").asJava, 5000) + new DeleteTopicsRequest.Builder( + new DeleteTopicsRequestData() + .setTopicNames(Collections.singletonList("topic-2")) + .setTimeoutMs(5000)) case ApiKeys.DELETE_RECORDS => new DeleteRecordsRequest.Builder(5000, Map(tp -> (0L: java.lang.Long)).asJava) @@ -469,7 +472,8 @@ class RequestQuotaTest extends BaseRequestTest { case ApiKeys.API_VERSIONS => new ApiVersionsResponse(response).throttleTimeMs case ApiKeys.CREATE_TOPICS => new CreateTopicsResponse(response, ApiKeys.CREATE_TOPICS.latestVersion()).throttleTimeMs - case ApiKeys.DELETE_TOPICS => new DeleteTopicsResponse(response).throttleTimeMs + case ApiKeys.DELETE_TOPICS => + new DeleteTopicsResponse(response, ApiKeys.DELETE_TOPICS.latestVersion()).throttleTimeMs case ApiKeys.DELETE_RECORDS => new DeleteRecordsResponse(response).throttleTimeMs case ApiKeys.INIT_PRODUCER_ID => new InitProducerIdResponse(response).throttleTimeMs case ApiKeys.ADD_PARTITIONS_TO_TXN => new AddPartitionsToTxnResponse(response).throttleTimeMs