Repository: kafka Updated Branches: refs/heads/trunk 51fc50ed0 -> d68f9e2fe
HOTFIX: Improve error handling for ACL requests - Use ResourceType.toJava instead of ResourceType.fromString. The latter doesn't work for TransactionalId (or any type with two camel-case words). - Replace Throwable with ApiError in response classes. - Return InvalidRequest instead of Unknown error if ANY or UNKNOWN are provided during ACL creation. - Rename `unknown()` to `isUnknown()` in a few places that were missed previously. - Add tests. Author: Ismael Juma <[email protected]> Reviewers: Jason Gustafson <[email protected]> Closes #3364 from ijuma/acls-fromString-fixes Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d68f9e2f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d68f9e2f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d68f9e2f Branch: refs/heads/trunk Commit: d68f9e2fe6cf2f76a81105ace5061eb7abb85995 Parents: 51fc50e Author: Ismael Juma <[email protected]> Authored: Sat Jun 17 14:02:01 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Sat Jun 17 14:02:01 2017 +0100 ---------------------------------------------------------------------- .../kafka/clients/admin/KafkaAdminClient.java | 18 ++-- .../org/apache/kafka/common/acl/AclBinding.java | 2 +- .../kafka/common/acl/AclBindingFilter.java | 4 +- .../apache/kafka/common/requests/ApiError.java | 10 ++ .../common/requests/CreateAclsRequest.java | 5 +- .../common/requests/CreateAclsResponse.java | 33 ++---- .../common/requests/DeleteAclsRequest.java | 2 +- .../common/requests/DeleteAclsResponse.java | 95 ++++++------------ .../common/requests/DescribeAclsRequest.java | 3 +- .../common/requests/DescribeAclsResponse.java | 50 ++++------ .../kafka/common/resource/ResourceFilter.java | 2 +- .../clients/admin/KafkaAdminClientTest.java | 100 ++++++------------- .../apache/kafka/common/acl/AclBindingTest.java | 14 +-- .../common/requests/RequestResponseTest.java | 31 +++--- .../scala/kafka/security/SecurityUtils.scala | 17 ++-- .../main/scala/kafka/server/AdminManager.scala | 8 +- .../kafka/server/DelayedCreateTopics.scala | 4 +- .../src/main/scala/kafka/server/KafkaApis.scala | 36 +++---- .../kafka/api/AuthorizerIntegrationTest.scala | 96 ++++++++++-------- .../api/SaslSslAdminClientIntegrationTest.scala | 32 ++++-- .../AbstractCreateTopicsRequestTest.scala | 4 +- .../processor/internals/StreamsKafkaClient.java | 2 +- 22 files changed, 251 insertions(+), 317 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index e92b1d3..881f8d2 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -1277,8 +1277,8 @@ public class KafkaAdminClient extends AdminClient { @Override void handleResponse(AbstractResponse abstractResponse) { DescribeAclsResponse response = (DescribeAclsResponse) abstractResponse; - if (response.throwable() != null) { - future.completeExceptionally(response.throwable()); + if (response.error().isFailure()) { + future.completeExceptionally(response.error().exception()); } else { future.complete(response.acls()); } @@ -1330,8 +1330,8 @@ public class KafkaAdminClient extends AdminClient { "The broker reported no creation result for the given ACL.")); } else { AclCreationResponse creation = iter.next(); - if (creation.throwable() != null) { - future.completeExceptionally(creation.throwable()); + if (creation.error().isFailure()) { + future.completeExceptionally(creation.error().exception()); } else { future.complete(null); } @@ -1378,12 +1378,12 @@ public class KafkaAdminClient extends AdminClient { "The broker reported no deletion result for the given filter.")); } else { AclFilterResponse deletion = iter.next(); - if (deletion.throwable() != null) { - future.completeExceptionally(deletion.throwable()); + if (deletion.error().isFailure()) { + future.completeExceptionally(deletion.error().exception()); } else { List<FilterResult> filterResults = new ArrayList<>(); for (AclDeletionResult deletionResult : deletion.deletions()) { - filterResults.add(new FilterResult(deletionResult.acl(), deletionResult.exception())); + filterResults.add(new FilterResult(deletionResult.acl(), deletionResult.error().exception())); } future.complete(new FilterResults(filterResults)); } @@ -1433,7 +1433,7 @@ public class KafkaAdminClient extends AdminClient { ConfigResource configResource = entry.getKey(); KafkaFutureImpl<Config> future = entry.getValue(); DescribeConfigsResponse.Config config = response.config(configResourceToResource(configResource)); - if (!config.error().is(Errors.NONE)) { + if (config.error().isFailure()) { future.completeExceptionally(config.error().exception()); continue; } @@ -1469,7 +1469,7 @@ public class KafkaAdminClient extends AdminClient { DescribeConfigsResponse response = (DescribeConfigsResponse) abstractResponse; DescribeConfigsResponse.Config config = response.configs().get(resource); - if (!config.error().is(Errors.NONE)) + if (config.error().isFailure()) brokerFuture.completeExceptionally(config.error().exception()); else { List<ConfigEntry> configEntries = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java index ea58434..d264ef1 100644 --- a/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java +++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java @@ -48,7 +48,7 @@ public class AclBinding { /** * Return true if this binding has any UNKNOWN components. */ - public boolean unknown() { + public boolean isUnknown() { return resource.isUnknown() || entry.isUnknown(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java index 807b730..64f16cd 100644 --- a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java +++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java @@ -56,8 +56,8 @@ public class AclBindingFilter { /** * Return true if this filter has any UNKNOWN components. */ - public boolean unknown() { - return resourceFilter.unknown() || entryFilter.isUnknown(); + public boolean isUnknown() { + return resourceFilter.isUnknown() || entryFilter.isUnknown(); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java index 26034eb..d712123 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java @@ -29,6 +29,8 @@ import org.apache.kafka.common.protocol.types.Struct; */ public class ApiError { + public static final ApiError NONE = new ApiError(Errors.NONE, null); + private static final String CODE_KEY_NAME = "error_code"; private static final String MESSAGE_KEY_NAME = "error_message"; @@ -67,6 +69,14 @@ public class ApiError { return this.error == error; } + public boolean isFailure() { + return !isSuccess(); + } + + public boolean isSuccess() { + return is(Errors.NONE); + } + public Errors error() { return error; } http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java index 757b5af..3598d4f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java @@ -117,9 +117,8 @@ public class CreateAclsRequest extends AbstractRequest { switch (versionId) { case 0: List<CreateAclsResponse.AclCreationResponse> responses = new ArrayList<>(); - for (int i = 0; i < aclCreations.size(); i++) { - responses.add(new CreateAclsResponse.AclCreationResponse(throwable)); - } + for (int i = 0; i < aclCreations.size(); i++) + responses.add(new CreateAclsResponse.AclCreationResponse(ApiError.fromThrowable(throwable))); return new CreateAclsResponse(throttleTimeMs, responses); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java index c84b97c..1fc75da 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java @@ -17,7 +17,6 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -26,23 +25,21 @@ import java.util.List; public class CreateAclsResponse extends AbstractResponse { private final static String CREATION_RESPONSES = "creation_responses"; - private final static String ERROR_CODE = "error_code"; - private final static String ERROR_MESSAGE = "error_message"; public static class AclCreationResponse { - private final Throwable throwable; + private final ApiError error; - public AclCreationResponse(Throwable throwable) { - this.throwable = throwable; + public AclCreationResponse(ApiError error) { + this.error = error; } - public Throwable throwable() { - return throwable; + public ApiError error() { + return error; } @Override public String toString() { - return "(" + throwable + ")"; + return "(" + error + ")"; } } @@ -60,14 +57,8 @@ public class CreateAclsResponse extends AbstractResponse { this.aclCreationResponses = new ArrayList<>(); for (Object responseStructObj : struct.getArray(CREATION_RESPONSES)) { Struct responseStruct = (Struct) responseStructObj; - short errorCode = responseStruct.getShort(ERROR_CODE); - String errorMessage = responseStruct.getString(ERROR_MESSAGE); - if (errorCode != 0) { - this.aclCreationResponses.add(new AclCreationResponse( - Errors.forCode(errorCode).exception(errorMessage))); - } else { - this.aclCreationResponses.add(new AclCreationResponse(null)); - } + ApiError error = new ApiError(responseStruct); + this.aclCreationResponses.add(new AclCreationResponse(error)); } } @@ -78,13 +69,7 @@ public class CreateAclsResponse extends AbstractResponse { List<Struct> responseStructs = new ArrayList<>(); for (AclCreationResponse response : aclCreationResponses) { Struct responseStruct = struct.instance(CREATION_RESPONSES); - if (response.throwable() == null) { - responseStruct.set(ERROR_CODE, (short) 0); - } else { - Errors errors = Errors.forException(response.throwable()); - responseStruct.set(ERROR_CODE, errors.code()); - responseStruct.set(ERROR_MESSAGE, response.throwable().getMessage()); - } + response.error.write(responseStruct); responseStructs.add(responseStruct); } struct.set(CREATION_RESPONSES, responseStructs.toArray()); http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java index 246b5e5..c05bec6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java @@ -96,7 +96,7 @@ public class DeleteAclsRequest extends AbstractRequest { List<DeleteAclsResponse.AclFilterResponse> responses = new ArrayList<>(); for (int i = 0; i < filters.size(); i++) { responses.add(new DeleteAclsResponse.AclFilterResponse( - throwable, Collections.<DeleteAclsResponse.AclDeletionResult>emptySet())); + ApiError.fromThrowable(throwable), Collections.<DeleteAclsResponse.AclDeletionResult>emptySet())); } return new DeleteAclsResponse(throttleTimeMs, responses); default: http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java index 94cd6aa..973aa8e 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java @@ -18,9 +18,7 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; -import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.resource.Resource; import org.apache.kafka.common.utils.Utils; @@ -30,31 +28,28 @@ import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; public class DeleteAclsResponse extends AbstractResponse { public static final Logger log = LoggerFactory.getLogger(DeleteAclsResponse.class); private final static String FILTER_RESPONSES = "filter_responses"; - private final static String ERROR_CODE = "error_code"; - private final static String ERROR_MESSAGE = "error_message"; private final static String MATCHING_ACLS = "matching_acls"; public static class AclDeletionResult { - private final ApiException exception; + private final ApiError error; private final AclBinding acl; - public AclDeletionResult(ApiException exception, AclBinding acl) { - this.exception = exception; + public AclDeletionResult(ApiError error, AclBinding acl) { + this.error = error; this.acl = acl; } public AclDeletionResult(AclBinding acl) { - this(null, acl); + this(ApiError.NONE, acl); } - public ApiException exception() { - return exception; + public ApiError error() { + return error; } public AclBinding acl() { @@ -63,25 +58,25 @@ public class DeleteAclsResponse extends AbstractResponse { @Override public String toString() { - return "(apiException=" + exception + ", acl=" + acl + ")"; + return "(error=" + error + ", acl=" + acl + ")"; } } public static class AclFilterResponse { - private final Throwable throwable; + private final ApiError error; private final Collection<AclDeletionResult> deletions; - public AclFilterResponse(Throwable throwable, Collection<AclDeletionResult> deletions) { - this.throwable = throwable; + public AclFilterResponse(ApiError error, Collection<AclDeletionResult> deletions) { + this.error = error; this.deletions = deletions; } public AclFilterResponse(Collection<AclDeletionResult> deletions) { - this(null, deletions); + this(ApiError.NONE, deletions); } - public Throwable throwable() { - return throwable; + public ApiError error() { + return error; } public Collection<AclDeletionResult> deletions() { @@ -90,7 +85,7 @@ public class DeleteAclsResponse extends AbstractResponse { @Override public String toString() { - return "(throwable=" + throwable + ", deletions=" + Utils.join(deletions, ",") + ")"; + return "(error=" + error + ", deletions=" + Utils.join(deletions, ",") + ")"; } } @@ -108,29 +103,16 @@ public class DeleteAclsResponse extends AbstractResponse { this.responses = new ArrayList<>(); for (Object responseStructObj : struct.getArray(FILTER_RESPONSES)) { Struct responseStruct = (Struct) responseStructObj; - short responseErrorCode = responseStruct.getShort(ERROR_CODE); - String responseErrorMessage = responseStruct.getString(ERROR_MESSAGE); - if (responseErrorCode != 0) { - this.responses.add(new AclFilterResponse( - Errors.forCode(responseErrorCode).exception(responseErrorMessage), - Collections.<AclDeletionResult>emptySet())); - } else { - List<AclDeletionResult> deletions = new ArrayList<>(); - for (Object matchingAclStructObj : responseStruct.getArray(MATCHING_ACLS)) { - Struct matchingAclStruct = (Struct) matchingAclStructObj; - short matchErrorCode = matchingAclStruct.getShort(ERROR_CODE); - ApiException exception = null; - if (matchErrorCode != 0) { - Errors errors = Errors.forCode(matchErrorCode); - String matchErrorMessage = matchingAclStruct.getString(ERROR_MESSAGE); - exception = errors.exception(matchErrorMessage); - } - AccessControlEntry entry = RequestUtils.aceFromStructFields(matchingAclStruct); - Resource resource = RequestUtils.resourceFromStructFields(matchingAclStruct); - deletions.add(new AclDeletionResult(exception, new AclBinding(resource, entry))); - } - this.responses.add(new AclFilterResponse(null, deletions)); + ApiError error = new ApiError(responseStruct); + List<AclDeletionResult> deletions = new ArrayList<>(); + for (Object matchingAclStructObj : responseStruct.getArray(MATCHING_ACLS)) { + Struct matchingAclStruct = (Struct) matchingAclStructObj; + ApiError matchError = new ApiError(matchingAclStruct); + AccessControlEntry entry = RequestUtils.aceFromStructFields(matchingAclStruct); + Resource resource = RequestUtils.resourceFromStructFields(matchingAclStruct); + deletions.add(new AclDeletionResult(matchError, new AclBinding(resource, entry))); } + this.responses.add(new AclFilterResponse(error, deletions)); } } @@ -141,29 +123,16 @@ public class DeleteAclsResponse extends AbstractResponse { List<Struct> responseStructs = new ArrayList<>(); for (AclFilterResponse response : responses) { Struct responseStruct = struct.instance(FILTER_RESPONSES); - if (response.throwable() != null) { - Errors error = Errors.forException(response.throwable()); - responseStruct.set(ERROR_CODE, error.code()); - responseStruct.set(ERROR_MESSAGE, response.throwable().getMessage()); - responseStruct.set(MATCHING_ACLS, new Struct[0]); - } else { - responseStruct.set(ERROR_CODE, (short) 0); - List<Struct> deletionStructs = new ArrayList<>(); - for (AclDeletionResult deletion : response.deletions()) { - Struct deletionStruct = responseStruct.instance(MATCHING_ACLS); - if (deletion.exception() != null) { - Errors error = Errors.forException(deletion.exception); - deletionStruct.set(ERROR_CODE, error.code()); - deletionStruct.set(ERROR_MESSAGE, deletion.exception.getMessage()); - } else { - deletionStruct.set(ERROR_CODE, (short) 0); - } - RequestUtils.resourceSetStructFields(deletion.acl().resource(), deletionStruct); - RequestUtils.aceSetStructFields(deletion.acl().entry(), deletionStruct); - deletionStructs.add(deletionStruct); - } - responseStruct.set(MATCHING_ACLS, deletionStructs.toArray(new Struct[0])); + response.error.write(responseStruct); + List<Struct> deletionStructs = new ArrayList<>(); + for (AclDeletionResult deletion : response.deletions()) { + Struct deletionStruct = responseStruct.instance(MATCHING_ACLS); + deletion.error.write(deletionStruct); + RequestUtils.resourceSetStructFields(deletion.acl().resource(), deletionStruct); + RequestUtils.aceSetStructFields(deletion.acl().entry(), deletionStruct); + deletionStructs.add(deletionStruct); } + responseStruct.set(MATCHING_ACLS, deletionStructs.toArray(new Struct[0])); responseStructs.add(responseStruct); } struct.set(FILTER_RESPONSES, responseStructs.toArray()); http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java index 6573b6e..58ce539 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java @@ -73,7 +73,8 @@ public class DescribeAclsRequest extends AbstractRequest { short versionId = version(); switch (versionId) { case 0: - return new DescribeAclsResponse(throttleTimeMs, throwable, Collections.<AclBinding>emptySet()); + return new DescribeAclsResponse(throttleTimeMs, ApiError.fromThrowable(throwable), + Collections.<AclBinding>emptySet()); default: throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", versionId, this.getClass().getSimpleName(), ApiKeys.DESCRIBE_ACLS.latestVersion())); http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java index cf21aa6..993a45f 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java @@ -20,51 +20,41 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.resource.Resource; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; public class DescribeAclsResponse extends AbstractResponse { - private final static String ERROR_CODE = "error_code"; - private final static String ERROR_MESSAGE = "error_message"; private final static String RESOURCES = "resources"; private final static String ACLS = "acls"; private final int throttleTimeMs; - private final Throwable throwable; + private final ApiError error; private final Collection<AclBinding> acls; - public DescribeAclsResponse(int throttleTimeMs, Throwable throwable, Collection<AclBinding> acls) { + public DescribeAclsResponse(int throttleTimeMs, ApiError error, Collection<AclBinding> acls) { this.throttleTimeMs = throttleTimeMs; - this.throwable = throwable; + this.error = error; this.acls = acls; } public DescribeAclsResponse(Struct struct) { this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); - Errors error = Errors.forCode(struct.getShort(ERROR_CODE)); - if (error != Errors.NONE) { - this.throwable = error.exception(struct.getString(ERROR_MESSAGE)); - this.acls = Collections.emptySet(); - } else { - this.throwable = null; - this.acls = new ArrayList<>(); - for (Object resourceStructObj : struct.getArray(RESOURCES)) { - Struct resourceStruct = (Struct) resourceStructObj; - Resource resource = RequestUtils.resourceFromStructFields(resourceStruct); - for (Object aclDataStructObj : resourceStruct.getArray(ACLS)) { - Struct aclDataStruct = (Struct) aclDataStructObj; - AccessControlEntry entry = RequestUtils.aceFromStructFields(aclDataStruct); - this.acls.add(new AclBinding(resource, entry)); - } + this.error = new ApiError(struct); + this.acls = new ArrayList<>(); + for (Object resourceStructObj : struct.getArray(RESOURCES)) { + Struct resourceStruct = (Struct) resourceStructObj; + Resource resource = RequestUtils.resourceFromStructFields(resourceStruct); + for (Object aclDataStructObj : resourceStruct.getArray(ACLS)) { + Struct aclDataStruct = (Struct) aclDataStructObj; + AccessControlEntry entry = RequestUtils.aceFromStructFields(aclDataStruct); + this.acls.add(new AclBinding(resource, entry)); } } } @@ -73,15 +63,8 @@ public class DescribeAclsResponse extends AbstractResponse { protected Struct toStruct(short version) { Struct struct = new Struct(ApiKeys.DESCRIBE_ACLS.responseSchema(version)); struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); - if (throwable != null) { - Errors errors = Errors.forException(throwable); - struct.set(ERROR_CODE, errors.code()); - struct.set(ERROR_MESSAGE, throwable.getMessage()); - struct.set(RESOURCES, new Struct[0]); - return struct; - } - struct.set(ERROR_CODE, (short) 0); - struct.set(ERROR_MESSAGE, null); + error.write(struct); + Map<Resource, List<AccessControlEntry>> resourceToData = new HashMap<>(); for (AclBinding acl : acls) { List<AccessControlEntry> entry = resourceToData.get(acl.resource()); @@ -91,6 +74,7 @@ public class DescribeAclsResponse extends AbstractResponse { } entry.add(acl.entry()); } + List<Struct> resourceStructs = new ArrayList<>(); for (Map.Entry<Resource, List<AccessControlEntry>> tuple : resourceToData.entrySet()) { Resource resource = tuple.getKey(); @@ -113,8 +97,8 @@ public class DescribeAclsResponse extends AbstractResponse { return throttleTimeMs; } - public Throwable throwable() { - return throwable; + public ApiError error() { + return error; } public Collection<AclBinding> acls() { http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java index 5032660..0a4611f 100644 --- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java +++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java @@ -70,7 +70,7 @@ public class ResourceFilter { /** * Return true if this ResourceFilter has any UNKNOWN components. */ - public boolean unknown() { + public boolean isUnknown() { return resourceType.isUnknown(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index cd6ed6b..8300e0f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -48,8 +48,6 @@ import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -60,6 +58,7 @@ import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -178,7 +177,7 @@ public class KafkaAdminClientTest { env.kafkaClient().setNode(new Node(0, "localhost", 8121)); env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, "")))); KafkaFuture<Void> future = env.adminClient().createTopics( - Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 2})))), + Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))), new CreateTopicsOptions().timeoutMs(1000)).all(); assertFutureError(future, TimeoutException.class); } @@ -192,7 +191,7 @@ public class KafkaAdminClientTest { env.kafkaClient().setNode(env.cluster().controller()); env.kafkaClient().prepareResponse(new CreateTopicsResponse(Collections.singletonMap("myTopic", new ApiError(Errors.NONE, "")))); KafkaFuture<Void> future = env.adminClient().createTopics( - Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 2})))), + Collections.singleton(new NewTopic("myTopic", Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))), new CreateTopicsOptions().timeoutMs(10000)).all(); future.get(); } @@ -215,21 +214,18 @@ public class KafkaAdminClientTest { env.kafkaClient().setNode(env.cluster().controller()); // Test a call where we get back ACL1 and ACL2. - env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, null, - new ArrayList<AclBinding>() {{ - add(ACL1); - add(ACL2); - }})); + env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, ApiError.NONE, + asList(ACL1, ACL2))); assertCollectionIs(env.adminClient().describeAcls(FILTER1).values().get(), ACL1, ACL2); // Test a call where we get back no results. - env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, null, + env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, ApiError.NONE, Collections.<AclBinding>emptySet())); assertTrue(env.adminClient().describeAcls(FILTER2).values().get().isEmpty()); // Test a call where we get back an error. env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, - new SecurityDisabledException("Security is disabled"), Collections.<AclBinding>emptySet())); + new ApiError(Errors.SECURITY_DISABLED, "Security is disabled"), Collections.<AclBinding>emptySet())); assertFutureError(env.adminClient().describeAcls(FILTER2).values(), SecurityDisabledException.class); } } @@ -243,30 +239,19 @@ public class KafkaAdminClientTest { // Test a call where we successfully create two ACLs. env.kafkaClient().prepareResponse(new CreateAclsResponse(0, - new ArrayList<AclCreationResponse>() {{ - add(new AclCreationResponse(null)); - add(new AclCreationResponse(null)); - }})); - CreateAclsResult results = env.adminClient().createAcls(new ArrayList<AclBinding>() {{ - add(ACL1); - add(ACL2); - }}); + asList(new AclCreationResponse(ApiError.NONE), new AclCreationResponse(ApiError.NONE)))); + CreateAclsResult results = env.adminClient().createAcls(asList(ACL1, ACL2)); assertCollectionIs(results.values().keySet(), ACL1, ACL2); - for (KafkaFuture<Void> future : results.values().values()) { + for (KafkaFuture<Void> future : results.values().values()) future.get(); - } results.all().get(); // Test a call where we fail to create one ACL. - env.kafkaClient().prepareResponse(new CreateAclsResponse(0, - new ArrayList<AclCreationResponse>() {{ - add(new AclCreationResponse(new SecurityDisabledException("Security is disabled"))); - add(new AclCreationResponse(null)); - }})); - results = env.adminClient().createAcls(new ArrayList<AclBinding>() {{ - add(ACL1); - add(ACL2); - }}); + env.kafkaClient().prepareResponse(new CreateAclsResponse(0, asList( + new AclCreationResponse(new ApiError(Errors.SECURITY_DISABLED, "Security is disabled")), + new AclCreationResponse(ApiError.NONE)) + )); + results = env.adminClient().createAcls(asList(ACL1, ACL2)); assertCollectionIs(results.values().keySet(), ACL1, ACL2); assertFutureError(results.values().get(ACL1), SecurityDisabledException.class); results.values().get(ACL2).get(); @@ -282,19 +267,11 @@ public class KafkaAdminClientTest { env.kafkaClient().setNode(env.cluster().controller()); // Test a call where one filter has an error. - env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, new ArrayList<AclFilterResponse>() {{ - add(new AclFilterResponse(null, - new ArrayList<AclDeletionResult>() {{ - add(new AclDeletionResult(null, ACL1)); - add(new AclDeletionResult(null, ACL2)); - }})); - add(new AclFilterResponse(new SecurityDisabledException("No security"), - Collections.<AclDeletionResult>emptySet())); - }})); - DeleteAclsResult results = env.adminClient().deleteAcls(new ArrayList<AclBindingFilter>() {{ - add(FILTER1); - add(FILTER2); - }}); + env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList( + new AclFilterResponse(asList(new AclDeletionResult(ACL1), new AclDeletionResult(ACL2))), + new AclFilterResponse(new ApiError(Errors.SECURITY_DISABLED, "No security"), + Collections.<AclDeletionResult>emptySet())))); + DeleteAclsResult results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2)); Map<AclBindingFilter, KafkaFuture<FilterResults>> filterResults = results.values(); FilterResults filter1Results = filterResults.get(FILTER1).get(); assertEquals(null, filter1Results.values().get(0).exception()); @@ -305,38 +282,19 @@ public class KafkaAdminClientTest { assertFutureError(results.all(), SecurityDisabledException.class); // Test a call where one deletion result has an error. - env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, new ArrayList<AclFilterResponse>() {{ - add(new AclFilterResponse(null, - new ArrayList<AclDeletionResult>() {{ - add(new AclDeletionResult(null, ACL1)); - add(new AclDeletionResult(new SecurityDisabledException("No security"), ACL2)); - }})); - add(new AclFilterResponse(null, Collections.<AclDeletionResult>emptySet())); - }})); - results = env.adminClient().deleteAcls( - new ArrayList<AclBindingFilter>() {{ - add(FILTER1); - add(FILTER2); - }}); + env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList( + new AclFilterResponse(asList(new AclDeletionResult(ACL1), + new AclDeletionResult(new ApiError(Errors.SECURITY_DISABLED, "No security"), ACL2))), + new AclFilterResponse(Collections.<AclDeletionResult>emptySet())))); + results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2)); assertTrue(results.values().get(FILTER2).get().values().isEmpty()); assertFutureError(results.all(), SecurityDisabledException.class); // Test a call where there are no errors. - env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, new ArrayList<AclFilterResponse>() {{ - add(new AclFilterResponse(null, - new ArrayList<AclDeletionResult>() {{ - add(new AclDeletionResult(null, ACL1)); - }})); - add(new AclFilterResponse(null, - new ArrayList<AclDeletionResult>() {{ - add(new AclDeletionResult(null, ACL2)); - }})); - }})); - results = env.adminClient().deleteAcls( - new ArrayList<AclBindingFilter>() {{ - add(FILTER1); - add(FILTER2); - }}); + env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList( + new AclFilterResponse(asList(new AclDeletionResult(ACL1))), + new AclFilterResponse(asList(new AclDeletionResult(ACL2)))))); + results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2)); Collection<AclBinding> deleted = results.all().get(); assertCollectionIs(deleted, ACL1, ACL2); } http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java index e0a0598..0ebcdfe 100644 --- a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java +++ b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java @@ -92,13 +92,13 @@ public class AclBindingTest { @Test public void testUnknowns() throws Exception { - assertFalse(ACL1.unknown()); - assertFalse(ACL2.unknown()); - assertFalse(ACL3.unknown()); - assertFalse(ANY_ANONYMOUS.unknown()); - assertFalse(ANY_DENY.unknown()); - assertFalse(ANY_MYTOPIC.unknown()); - assertTrue(UNKNOWN_ACL.unknown()); + assertFalse(ACL1.isUnknown()); + assertFalse(ACL2.isUnknown()); + assertFalse(ACL3.isUnknown()); + assertFalse(ANY_ANONYMOUS.isUnknown()); + assertFalse(ANY_DENY.isUnknown()); + assertFalse(ANY_MYTOPIC.isUnknown()); + assertTrue(UNKNOWN_ACL.isUnknown()); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 327f228..467afb3 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -24,7 +24,6 @@ import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.InvalidRequestException; import org.apache.kafka.common.errors.NotCoordinatorException; import org.apache.kafka.common.errors.NotEnoughReplicasException; import org.apache.kafka.common.errors.SecurityDisabledException; @@ -226,7 +225,7 @@ public class RequestResponseTest { checkResponse(createTxnOffsetCommitResponse(), 0); checkRequest(createListAclsRequest()); checkErrorResponse(createListAclsRequest(), new SecurityDisabledException("Security is not enabled.")); - checkResponse(createListAclsResponse(), ApiKeys.DESCRIBE_ACLS.latestVersion()); + checkResponse(createDescribeAclsResponse(), ApiKeys.DESCRIBE_ACLS.latestVersion()); checkRequest(createCreateAclsRequest()); checkErrorResponse(createCreateAclsRequest(), new SecurityDisabledException("Security is not enabled.")); checkResponse(createCreateAclsResponse(), ApiKeys.CREATE_ACLS.latestVersion()); @@ -1001,8 +1000,8 @@ public class RequestResponseTest { new AccessControlEntryFilter(null, null, AclOperation.ANY, AclPermissionType.ANY))).build(); } - private DescribeAclsResponse createListAclsResponse() { - return new DescribeAclsResponse(0, null, Collections.singleton(new AclBinding( + private DescribeAclsResponse createDescribeAclsResponse() { + return new DescribeAclsResponse(0, ApiError.NONE, Collections.singleton(new AclBinding( new Resource(ResourceType.TOPIC, "mytopic"), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW)))); } @@ -1019,8 +1018,8 @@ public class RequestResponseTest { } private CreateAclsResponse createCreateAclsResponse() { - return new CreateAclsResponse(0, Arrays.asList(new AclCreationResponse(null), - new AclCreationResponse(new InvalidRequestException("Foo bar")))); + return new CreateAclsResponse(0, Arrays.asList(new AclCreationResponse(ApiError.NONE), + new AclCreationResponse(new ApiError(Errors.INVALID_REQUEST, "Foo bar")))); } private DeleteAclsRequest createDeleteAclsRequest() { @@ -1036,16 +1035,14 @@ public class RequestResponseTest { private DeleteAclsResponse createDeleteAclsResponse() { List<AclFilterResponse> responses = new ArrayList<>(); - responses.add(new AclFilterResponse(null, - new HashSet<AclDeletionResult>() {{ - add(new AclDeletionResult(null, new AclBinding( + responses.add(new AclFilterResponse(Utils.mkSet( + new AclDeletionResult(new AclBinding( new Resource(ResourceType.TOPIC, "mytopic3"), - new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW)))); - add(new AclDeletionResult(null, new AclBinding( + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.ALLOW))), + new AclDeletionResult(new AclBinding( new Resource(ResourceType.TOPIC, "mytopic4"), - new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.DENY)))); - }})); - responses.add(new AclFilterResponse(new SecurityDisabledException("No security"), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.DESCRIBE, AclPermissionType.DENY)))))); + responses.add(new AclFilterResponse(new ApiError(Errors.SECURITY_DISABLED, "No security"), Collections.<AclDeletionResult>emptySet())); return new DeleteAclsResponse(0, responses); } @@ -1071,9 +1068,9 @@ public class RequestResponseTest { new DescribeConfigsResponse.ConfigEntry("another_name", "another value", true, false, true) ); configs.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), new DescribeConfigsResponse.Config( - new ApiError(Errors.NONE, null), configEntries)); + ApiError.NONE, configEntries)); configs.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"), new DescribeConfigsResponse.Config( - new ApiError(Errors.NONE, null), Collections.<DescribeConfigsResponse.ConfigEntry>emptyList())); + ApiError.NONE, Collections.<DescribeConfigsResponse.ConfigEntry>emptyList())); return new DescribeConfigsResponse(200, configs); } @@ -1091,7 +1088,7 @@ public class RequestResponseTest { private AlterConfigsResponse createAlterConfigsResponse() { Map<org.apache.kafka.common.requests.Resource, ApiError> errors = new HashMap<>(); - errors.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), new ApiError(Errors.NONE, null)); + errors.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), ApiError.NONE); errors.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"), new ApiError(Errors.INVALID_REQUEST, "This request is invalid")); return new AlterConfigsResponse(20, errors); } http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/core/src/main/scala/kafka/security/SecurityUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/SecurityUtils.scala b/core/src/main/scala/kafka/security/SecurityUtils.scala index bbfc42c..573a16b 100644 --- a/core/src/main/scala/kafka/security/SecurityUtils.scala +++ b/core/src/main/scala/kafka/security/SecurityUtils.scala @@ -19,27 +19,32 @@ package kafka.security import kafka.security.auth.{Acl, Operation, PermissionType, Resource, ResourceType} import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, AclBindingFilter} -import org.apache.kafka.common.resource.{Resource => AdminResource, ResourceType => AdminResourceType} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.ApiError +import org.apache.kafka.common.resource.{Resource => AdminResource} import org.apache.kafka.common.security.auth.KafkaPrincipal -import scala.util.Try +import scala.util.{Failure, Success, Try} object SecurityUtils { - def convertToResourceAndAcl(filter: AclBindingFilter): Try[(Resource, Acl)] = { - for { + def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError, (Resource, Acl)] = { + (for { resourceType <- Try(ResourceType.fromJava(filter.resourceFilter.resourceType)) principal <- Try(KafkaPrincipal.fromString(filter.entryFilter.principal)) operation <- Try(Operation.fromJava(filter.entryFilter.operation)) permissionType <- Try(PermissionType.fromJava(filter.entryFilter.permissionType)) resource = Resource(resourceType, filter.resourceFilter.name) acl = Acl(principal, permissionType, filter.entryFilter.host, operation) - } yield (resource, acl) + } yield (resource, acl)) match { + case Failure(throwable) => Left(new ApiError(Errors.INVALID_REQUEST, throwable.getMessage)) + case Success(s) => Right(s) + } } def convertToAclBinding(resource: Resource, acl: Acl): AclBinding = { - val adminResource = new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name) + val adminResource = new AdminResource(resource.resourceType.toJava, resource.name) val entry = new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava, acl.permissionType.toJava) new AclBinding(adminResource, entry) http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/core/src/main/scala/kafka/server/AdminManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index 33c6b77..84972f3 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -119,7 +119,7 @@ class AdminManager(val config: KafkaConfig, else AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignments, configs, update = false) } - CreateTopicMetadata(topic, assignments, new ApiError(Errors.NONE, null)) + CreateTopicMetadata(topic, assignments, ApiError.NONE) } catch { // Log client errors at a lower level than unexpected exceptions case e@ (_: PolicyViolationException | _: ApiException) => @@ -135,7 +135,7 @@ class AdminManager(val config: KafkaConfig, if (timeout <= 0 || validateOnly || !metadata.exists(_.error.is(Errors.NONE))) { val results = metadata.map { createTopicMetadata => // ignore topics that already have errors - if (createTopicMetadata.error.is(Errors.NONE) && !validateOnly) { + if (createTopicMetadata.error.isSuccess() && !validateOnly) { (createTopicMetadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null)) } else { (createTopicMetadata.topic, createTopicMetadata.error) @@ -212,7 +212,7 @@ class AdminManager(val config: KafkaConfig, new DescribeConfigsResponse.ConfigEntry(name, valueAsString, isSensitive, isDefault(name), isReadOnly) } - new DescribeConfigsResponse.Config(new ApiError(Errors.NONE, null), configEntries.asJava) + new DescribeConfigsResponse.Config(ApiError.NONE, configEntries.asJava) } try { @@ -280,7 +280,7 @@ class AdminManager(val config: KafkaConfig, else AdminUtils.changeTopicConfig(zkUtils, topic, properties) } - resource -> new ApiError(Errors.NONE, null) + resource -> ApiError.NONE case resourceType => throw new InvalidRequestException(s"AlterConfigs is only supported for topics, but resource type is $resourceType") } http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/core/src/main/scala/kafka/server/DelayedCreateTopics.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedCreateTopics.scala b/core/src/main/scala/kafka/server/DelayedCreateTopics.scala index abf6bc0..83cdd67 100644 --- a/core/src/main/scala/kafka/server/DelayedCreateTopics.scala +++ b/core/src/main/scala/kafka/server/DelayedCreateTopics.scala @@ -48,7 +48,7 @@ class DelayedCreateTopics(delayMs: Long, override def tryComplete() : Boolean = { trace(s"Trying to complete operation for $createMetadata") - val leaderlessPartitionCount = createMetadata.filter(_.error.is(Errors.NONE)) + val leaderlessPartitionCount = createMetadata.filter(_.error.isSuccess) .foldLeft(0) { case (topicCounter, metadata) => topicCounter + missingLeaderCount(metadata.topic, metadata.replicaAssignments.keySet) } @@ -69,7 +69,7 @@ class DelayedCreateTopics(delayMs: Long, trace(s"Completing operation for $createMetadata") val results = createMetadata.map { metadata => // ignore topics that already have errors - if (metadata.error.is(Errors.NONE) && missingLeaderCount(metadata.topic, metadata.replicaAssignments.keySet) > 0) + if (metadata.error.isSuccess && missingLeaderCount(metadata.topic, metadata.replicaAssignments.keySet) > 0) (metadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null)) else (metadata.topic, metadata.error) http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 337c740..a4fd30c 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -51,7 +51,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{Node, TopicPartition} import org.apache.kafka.common.requests.SaslHandshakeResponse -import org.apache.kafka.common.resource.{Resource => AdminResource, ResourceType => AdminResourceType} +import org.apache.kafka.common.resource.{Resource => AdminResource} import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding} import scala.collection._ @@ -1763,22 +1763,19 @@ class KafkaApis(val requestChannel: RequestChannel, case None => sendResponseMaybeThrottle(request, requestThrottleMs => new DescribeAclsResponse(requestThrottleMs, - new SecurityDisabledException("No Authorizer is configured on the broker."), - Collections.emptySet())) + new ApiError(Errors.SECURITY_DISABLED, "No Authorizer is configured on the broker"), Collections.emptySet())) case Some(auth) => val filter = describeAclsRequest.filter() - val returnedAcls = new util.ArrayList[AclBinding] - val aclMap = auth.getAcls() - aclMap.foreach { case (resource, acls) => - acls.foreach { acl => - val fixture = new AclBinding(new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name), + val returnedAcls = auth.getAcls.toSeq.flatMap { case (resource, acls) => + acls.flatMap { acl => + val fixture = new AclBinding(new AdminResource(resource.resourceType.toJava, resource.name), new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava, acl.permissionType.toJava)) - if (filter.matches(fixture)) - returnedAcls.add(fixture) + if (filter.matches(fixture)) Some(fixture) + else None } } sendResponseMaybeThrottle(request, requestThrottleMs => - new DescribeAclsResponse(requestThrottleMs, null, returnedAcls)) + new DescribeAclsResponse(requestThrottleMs, ApiError.NONE, returnedAcls.asJava)) } } @@ -1793,8 +1790,8 @@ class KafkaApis(val requestChannel: RequestChannel, case Some(auth) => val aclCreationResults = createAclsRequest.aclCreations.asScala.map { aclCreation => SecurityUtils.convertToResourceAndAcl(aclCreation.acl.toFilter) match { - case Failure(throwable) => new AclCreationResponse(throwable) - case Success((resource, acl)) => try { + case Left(apiError) => new AclCreationResponse(apiError) + case Right((resource, acl)) => try { if (resource.resourceType.equals(Cluster) && !resource.name.equals(Resource.ClusterResourceName)) throw new InvalidRequestException("The only valid name for the CLUSTER resource is " + @@ -1805,11 +1802,11 @@ class KafkaApis(val requestChannel: RequestChannel, logger.debug(s"Added acl $acl to $resource") - new AclCreationResponse(null) + new AclCreationResponse(ApiError.NONE) } catch { case throwable: Throwable => logger.debug(s"Failed to add acl $acl to $resource", throwable) - new AclCreationResponse(throwable) + new AclCreationResponse(ApiError.fromThrowable(throwable)) } } } @@ -1835,8 +1832,8 @@ class KafkaApis(val requestChannel: RequestChannel, // Delete based on a list of ACL fixtures. for ((filter, i) <- filters.zipWithIndex) { SecurityUtils.convertToResourceAndAcl(filter) match { - case Failure(throwable) => filterResponseMap.put(i, new AclFilterResponse(throwable, Seq.empty.asJava)) - case Success(fixture) => toDelete.put(i, ArrayBuffer(fixture)) + case Left(apiError) => filterResponseMap.put(i, new AclFilterResponse(apiError, Seq.empty.asJava)) + case Right(binding) => toDelete.put(i, ArrayBuffer(binding)) } } } else { @@ -1845,7 +1842,7 @@ class KafkaApis(val requestChannel: RequestChannel, val filtersWithIndex = filters.zipWithIndex for ((resource, acls) <- aclMap; acl <- acls) { val binding = new AclBinding( - new AdminResource(AdminResourceType.fromString(resource.resourceType.toString), resource.name), + new AdminResource(resource.resourceType.toJava, resource.name), new AccessControlEntry(acl.principal.toString, acl.host.toString, acl.operation.toJava, acl.permissionType.toJava)) @@ -1863,8 +1860,7 @@ class KafkaApis(val requestChannel: RequestChannel, else None } catch { case throwable: Throwable => - Some(new AclDeletionResult(new UnknownServerException(s"Failed to delete ACL $acl: $throwable"), - aclBinding)) + Some(new AclDeletionResult(ApiError.fromThrowable(throwable), aclBinding)) } }.asJava http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index bd04c7b..09ff9be 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -14,7 +14,7 @@ package kafka.api import java.nio.ByteBuffer import java.util -import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit} +import java.util.concurrent.ExecutionException import java.util.regex.Pattern import java.util.{ArrayList, Collections, Properties} @@ -22,14 +22,24 @@ import kafka.common.TopicAndPartition import kafka.security.auth._ import kafka.server.{BaseRequestTest, KafkaConfig} import kafka.utils.TestUtils +import kafka.admin.AdminUtils +import kafka.log.LogConfig +import kafka.network.SocketServer +import org.apache.kafka.clients.consumer.OffsetAndMetadata import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener import org.apache.kafka.clients.consumer._ import org.apache.kafka.clients.producer._ import org.apache.kafka.common.errors._ import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME +import org.apache.kafka.common.KafkaException import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol} import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _} -import CreateTopicsRequest.TopicDetails +import org.apache.kafka.common.acl.{AccessControlEntry, AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, AclPermissionType} +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord} +import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation +import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails +import org.apache.kafka.common.resource.{ResourceFilter, Resource => AdminResource, ResourceType => AdminResourceType} import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.{Node, TopicPartition, requests} import org.junit.Assert._ @@ -38,13 +48,6 @@ import org.junit.{After, Assert, Before, Test} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.Buffer -import org.apache.kafka.common.KafkaException -import kafka.admin.AdminUtils -import kafka.log.LogConfig -import kafka.network.SocketServer -import org.apache.kafka.clients.consumer.OffsetAndMetadata -import org.apache.kafka.common.network.ListenerName -import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, SimpleRecord} class AuthorizerIntegrationTest extends BaseRequestTest { @@ -72,6 +75,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { val groupDescribeAcl = Map(groupResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe))) val clusterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction))) val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create))) + val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter))) + val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe))) val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, IdempotentWrite))) val topicReadAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read))) val topicWriteAcl = Map(topicResource -> Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write))) @@ -125,7 +130,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.ADD_PARTITIONS_TO_TXN -> classOf[AddPartitionsToTxnResponse], ApiKeys.ADD_OFFSETS_TO_TXN -> classOf[AddOffsetsToTxnResponse], ApiKeys.END_TXN -> classOf[EndTxnResponse], - ApiKeys.TXN_OFFSET_COMMIT -> classOf[TxnOffsetCommitResponse] + ApiKeys.TXN_OFFSET_COMMIT -> classOf[TxnOffsetCommitResponse], + ApiKeys.CREATE_ACLS -> classOf[CreateAclsResponse], + ApiKeys.DELETE_ACLS -> classOf[DeleteAclsResponse], + ApiKeys.DESCRIBE_ACLS -> classOf[DescribeAclsResponse] ) val requestKeyToError = Map[ApiKeys, Nothing => Errors]( @@ -156,7 +164,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => resp.errors.get(tp)), ApiKeys.ADD_OFFSETS_TO_TXN -> ((resp: AddOffsetsToTxnResponse) => resp.error), ApiKeys.END_TXN -> ((resp: EndTxnResponse) => resp.error), - ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) => resp.errors.get(tp)) + ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) => resp.errors.get(tp)), + ApiKeys.CREATE_ACLS -> ((resp: CreateAclsResponse) => resp.aclCreationResponses.asScala.head.error.error), + ApiKeys.DESCRIBE_ACLS -> ((resp: DescribeAclsResponse) => resp.error.error), + ApiKeys.DELETE_ACLS -> ((resp: DeleteAclsResponse) => resp.responses.asScala.head.error.error) ) val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]]( @@ -185,7 +196,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.ADD_PARTITIONS_TO_TXN -> (topicWriteAcl ++ transactionIdWriteAcl), ApiKeys.ADD_OFFSETS_TO_TXN -> (groupReadAcl ++ transactionIdWriteAcl), ApiKeys.END_TXN -> transactionIdWriteAcl, - ApiKeys.TXN_OFFSET_COMMIT -> (groupReadAcl ++ transactionIdWriteAcl) + ApiKeys.TXN_OFFSET_COMMIT -> (groupReadAcl ++ transactionIdWriteAcl), + ApiKeys.CREATE_ACLS -> clusterAlterAcl, + ApiKeys.DESCRIBE_ACLS -> clusterDescribeAcl, + ApiKeys.DELETE_ACLS -> clusterAlterAcl ) @Before @@ -284,46 +298,47 @@ class AuthorizerIntegrationTest extends BaseRequestTest { build() } - private def createHeartbeatRequest = { - new HeartbeatRequest.Builder(group, 1, "").build() - } + private def heartbeatRequest = new HeartbeatRequest.Builder(group, 1, "").build() - private def createLeaveGroupRequest = { - new LeaveGroupRequest.Builder(group, "").build() - } + private def leaveGroupRequest = new LeaveGroupRequest.Builder(group, "").build() - private def createLeaderAndIsrRequest = { + private def leaderAndIsrRequest = { new requests.LeaderAndIsrRequest.Builder(brokerId, Int.MaxValue, Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, List(brokerId).asJava, 2, Seq(brokerId).asJava)).asJava, Set(new Node(brokerId, "localhost", 0)).asJava).build() } - private def createStopReplicaRequest = { - new requests.StopReplicaRequest.Builder(brokerId, Int.MaxValue, true, Set(tp).asJava).build() - } + private def stopReplicaRequest = new StopReplicaRequest.Builder(brokerId, Int.MaxValue, true, Set(tp).asJava).build() - private def createControlledShutdownRequest = { - new requests.ControlledShutdownRequest.Builder(brokerId).build() - } + private def controlledShutdownRequest = new requests.ControlledShutdownRequest.Builder(brokerId).build() - private def createTopicsRequest = { + private def createTopicsRequest = new CreateTopicsRequest.Builder(Map(createTopic -> new TopicDetails(1, 1.toShort)).asJava, 0).build() - } - private def deleteTopicsRequest = { - new DeleteTopicsRequest.Builder(Set(deleteTopic).asJava, 5000).build() - } + private def deleteTopicsRequest = new DeleteTopicsRequest.Builder(Set(deleteTopic).asJava, 5000).build() - private def createDescribeConfigsRequest = + private def describeConfigsRequest = new DescribeConfigsRequest.Builder(Collections.singleton(new RResource(RResourceType.TOPIC, tp.topic))).build() - private def createAlterConfigsRequest = + private def alterConfigsRequest = new AlterConfigsRequest.Builder( Collections.singletonMap(new RResource(RResourceType.TOPIC, tp.topic), new AlterConfigsRequest.Config(Collections.singleton( new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, "1000000") ))), true).build() + private def describeAclsRequest = new DescribeAclsRequest.Builder(AclBindingFilter.ANY).build() + + private def createAclsRequest = new CreateAclsRequest.Builder( + Collections.singletonList(new AclCreation(new AclBinding( + new AdminResource(AdminResourceType.TOPIC, "mytopic"), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.DENY))))).build() + + private def deleteAclsRequest = new DeleteAclsRequest.Builder( + Collections.singletonList(new AclBindingFilter( + new ResourceFilter(AdminResourceType.TOPIC, null), + new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, AclPermissionType.DENY)))).build() + @Test def testAuthorizationWithTopicExisting() { @@ -338,16 +353,19 @@ class AuthorizerIntegrationTest extends BaseRequestTest { ApiKeys.JOIN_GROUP -> createJoinGroupRequest, ApiKeys.SYNC_GROUP -> createSyncGroupRequest, ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest, - ApiKeys.HEARTBEAT -> createHeartbeatRequest, - ApiKeys.LEAVE_GROUP -> createLeaveGroupRequest, - ApiKeys.LEADER_AND_ISR -> createLeaderAndIsrRequest, - ApiKeys.STOP_REPLICA -> createStopReplicaRequest, - ApiKeys.CONTROLLED_SHUTDOWN_KEY -> createControlledShutdownRequest, + ApiKeys.HEARTBEAT -> heartbeatRequest, + ApiKeys.LEAVE_GROUP -> leaveGroupRequest, + ApiKeys.LEADER_AND_ISR -> leaderAndIsrRequest, + ApiKeys.STOP_REPLICA -> stopReplicaRequest, + ApiKeys.CONTROLLED_SHUTDOWN_KEY -> controlledShutdownRequest, ApiKeys.CREATE_TOPICS -> createTopicsRequest, ApiKeys.DELETE_TOPICS -> deleteTopicsRequest, ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest, - ApiKeys.DESCRIBE_CONFIGS -> createDescribeConfigsRequest, - ApiKeys.ALTER_CONFIGS -> createAlterConfigsRequest + ApiKeys.DESCRIBE_CONFIGS -> describeConfigsRequest, + ApiKeys.ALTER_CONFIGS -> alterConfigsRequest, + ApiKeys.CREATE_ACLS -> createAclsRequest, + ApiKeys.DELETE_ACLS -> deleteAclsRequest, + ApiKeys.DESCRIBE_ACLS -> describeAclsRequest ) for ((key, request) <- requestKeyToRequest) { http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala index b4e09b3..03afc9e 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala @@ -31,13 +31,13 @@ import scala.util.{Failure, Success, Try} class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with SaslSetup { this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true") - this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName()) + this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, classOf[SimpleAclAuthorizer].getName) override protected def securityProtocol = SecurityProtocol.SASL_SSL override protected lazy val trustStoreFile = Some(File.createTempFile("truststore", ".jks")) override def configureSecurityBeforeServersStart() { - val authorizer = CoreUtils.createObject[Authorizer](classOf[SimpleAclAuthorizer].getName()) + val authorizer = CoreUtils.createObject[Authorizer](classOf[SimpleAclAuthorizer].getName) try { authorizer.configure(this.configs.head.originals()) authorizer.addAcls(Set(new AuthAcl(AuthAcl.WildCardPrincipal, Allow, @@ -92,6 +92,8 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW)) val fooAcl = new AclBinding(new Resource(ResourceType.TOPIC, "foobar"), new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, AclPermissionType.ALLOW)) + val transactionalIdAcl = new AclBinding(new Resource(ResourceType.TRANSACTIONAL_ID, "transactional_id"), + new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, AclPermissionType.ALLOW)) @Test override def testAclOperations(): Unit = { @@ -116,28 +118,33 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with TestUtils.waitUntilTrue(() => { val results = client.describeAcls(filter).values.get() acls == results.asScala.toSet - }, "timed out waiting for ACLs") + }, s"timed out waiting for ACLs $acls") } @Test def testAclOperations2(): Unit = { client = AdminClient.create(createConfig()) - val results = client.createAcls(List(acl2, acl2).asJava) - assertEquals(Set(acl2, acl2), results.values.keySet().asScala) + val results = client.createAcls(List(acl2, acl2, transactionalIdAcl).asJava) + assertEquals(Set(acl2, acl2, transactionalIdAcl), results.values.keySet.asScala) results.all.get() waitForDescribeAcls(client, acl2.toFilter, Set(acl2)) + waitForDescribeAcls(client, transactionalIdAcl.toFilter, Set(transactionalIdAcl)) val filterA = new AclBindingFilter(new ResourceFilter(ResourceType.GROUP, null), AccessControlEntryFilter.ANY) val filterB = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, "mytopic2"), AccessControlEntryFilter.ANY) + val filterC = new AclBindingFilter(new ResourceFilter(ResourceType.TRANSACTIONAL_ID, null), AccessControlEntryFilter.ANY) waitForDescribeAcls(client, filterA, Set()) + waitForDescribeAcls(client, filterC, Set(transactionalIdAcl)) - val results2 = client.deleteAcls(List(filterA, filterB).asJava, new DeleteAclsOptions()) - assertEquals(Set(filterA, filterB), results2.values.keySet().asScala) + val results2 = client.deleteAcls(List(filterA, filterB, filterC).asJava, new DeleteAclsOptions()) + assertEquals(Set(filterA, filterB, filterC), results2.values.keySet.asScala) assertEquals(Set(), results2.values.get(filterA).get.values.asScala.map(_.binding).toSet) + assertEquals(Set(transactionalIdAcl), results2.values.get(filterC).get.values.asScala.map(_.binding).toSet) assertEquals(Set(acl2), results2.values.get(filterB).get.values.asScala.map(_.binding).toSet) waitForDescribeAcls(client, filterB, Set()) + waitForDescribeAcls(client, filterC, Set()) } @Test @@ -161,7 +168,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with private def testAclCreateGetDelete(expectAuth: Boolean): Unit = { TestUtils.waitUntilTrue(() => { - val result = client.createAcls(List(fooAcl).asJava, new CreateAclsOptions) + val result = client.createAcls(List(fooAcl, transactionalIdAcl).asJava, new CreateAclsOptions) if (expectAuth) { Try(result.all.get) match { case Failure(e) => @@ -180,9 +187,10 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with }, "timed out waiting for createAcls to " + (if (expectAuth) "succeed" else "fail")) if (expectAuth) { waitForDescribeAcls(client, fooAcl.toFilter, Set(fooAcl)) + waitForDescribeAcls(client, transactionalIdAcl.toFilter, Set(transactionalIdAcl)) } TestUtils.waitUntilTrue(() => { - val result = client.deleteAcls(List(fooAcl.toFilter).asJava, new DeleteAclsOptions) + val result = client.deleteAcls(List(fooAcl.toFilter, transactionalIdAcl.toFilter).asJava, new DeleteAclsOptions) if (expectAuth) { Try(result.all.get) match { case Failure(e) => @@ -196,13 +204,17 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with verifyCauseIsClusterAuth(e) true case Success(_) => + assertEquals(Set(fooAcl, transactionalIdAcl), result.values.keySet) assertEquals(Set(fooAcl), result.values.get(fooAcl.toFilter).get.values.asScala.map(_.binding).toSet) + assertEquals(Set(transactionalIdAcl), + result.values.get(transactionalIdAcl.toFilter).get.values.asScala.map(_.binding).toSet) true } } }, "timed out waiting for deleteAcls to " + (if (expectAuth) "succeed" else "fail")) if (expectAuth) { - waitForDescribeAcls(client, fooAcl.toFilter, Set()) + waitForDescribeAcls(client, fooAcl.toFilter, Set.empty) + waitForDescribeAcls(client, transactionalIdAcl.toFilter, Set.empty) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala index 0ef3405..d89a9df 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala @@ -36,7 +36,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest { protected def validateValidCreateTopicsRequests(request: CreateTopicsRequest): Unit = { val response = sendCreateTopicRequest(request) - val error = response.errors.values.asScala.find(!_.is(Errors.NONE)) + val error = response.errors.values.asScala.find(_.isFailure) assertTrue(s"There should be no errors, found ${response.errors.asScala}", error.isEmpty) request.topics.asScala.foreach { case (topic, details) => @@ -118,7 +118,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest { assertEquals(expected.messageWithFallback, actual.messageWithFallback) } // If no error validate topic exists - if (expectedError.is(Errors.NONE) && !request.validateOnly) { + if (expectedError.isSuccess && !request.validateOnly) { validateTopicExists(topic) } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java index 2d47876..ebb3344 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java @@ -193,7 +193,7 @@ public class StreamsKafkaClient { for (InternalTopicConfig internalTopicConfig : topicsMap.keySet()) { ApiError error = createTopicsResponse.errors().get(internalTopicConfig.name()); - if (!error.is(Errors.NONE) && !error.is(Errors.TOPIC_ALREADY_EXISTS)) { + if (error.isFailure() && !error.is(Errors.TOPIC_ALREADY_EXISTS)) { throw new StreamsException("Could not create topic: " + internalTopicConfig.name() + " due to " + error.messageWithFallback()); } }
