Repository: kafka
Updated Branches:
  refs/heads/trunk 51fc50ed0 -> d68f9e2fe


HOTFIX: Improve error handling for ACL requests

- Use ResourceType.toJava instead of ResourceType.fromString. The latter
doesn't work for TransactionalId (or any type with two camel-case
words).
- Replace Throwable with ApiError in response classes.
- Return InvalidRequest instead of Unknown error if ANY or UNKNOWN
are provided during ACL creation.
- Rename `unknown()` to `isUnknown()` in a few places that
were missed previously.
- Add tests.

Author: Ismael Juma <[email protected]>

Reviewers: Jason Gustafson <[email protected]>

Closes #3364 from ijuma/acls-fromString-fixes


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d68f9e2f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d68f9e2f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d68f9e2f

Branch: refs/heads/trunk
Commit: d68f9e2fe6cf2f76a81105ace5061eb7abb85995
Parents: 51fc50e
Author: Ismael Juma <[email protected]>
Authored: Sat Jun 17 14:02:01 2017 +0100
Committer: Ismael Juma <[email protected]>
Committed: Sat Jun 17 14:02:01 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/admin/KafkaAdminClient.java   |  18 ++--
 .../org/apache/kafka/common/acl/AclBinding.java |   2 +-
 .../kafka/common/acl/AclBindingFilter.java      |   4 +-
 .../apache/kafka/common/requests/ApiError.java  |  10 ++
 .../common/requests/CreateAclsRequest.java      |   5 +-
 .../common/requests/CreateAclsResponse.java     |  33 ++----
 .../common/requests/DeleteAclsRequest.java      |   2 +-
 .../common/requests/DeleteAclsResponse.java     |  95 ++++++------------
 .../common/requests/DescribeAclsRequest.java    |   3 +-
 .../common/requests/DescribeAclsResponse.java   |  50 ++++------
 .../kafka/common/resource/ResourceFilter.java   |   2 +-
 .../clients/admin/KafkaAdminClientTest.java     | 100 ++++++-------------
 .../apache/kafka/common/acl/AclBindingTest.java |  14 +--
 .../common/requests/RequestResponseTest.java    |  31 +++---
 .../scala/kafka/security/SecurityUtils.scala    |  17 ++--
 .../main/scala/kafka/server/AdminManager.scala  |   8 +-
 .../kafka/server/DelayedCreateTopics.scala      |   4 +-
 .../src/main/scala/kafka/server/KafkaApis.scala |  36 +++----
 .../kafka/api/AuthorizerIntegrationTest.scala   |  96 ++++++++++--------
 .../api/SaslSslAdminClientIntegrationTest.scala |  32 ++++--
 .../AbstractCreateTopicsRequestTest.scala       |   4 +-
 .../processor/internals/StreamsKafkaClient.java |   2 +-
 22 files changed, 251 insertions(+), 317 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index e92b1d3..881f8d2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1277,8 +1277,8 @@ public class KafkaAdminClient extends AdminClient {
             @Override
             void handleResponse(AbstractResponse abstractResponse) {
                 DescribeAclsResponse response = (DescribeAclsResponse) 
abstractResponse;
-                if (response.throwable() != null) {
-                    future.completeExceptionally(response.throwable());
+                if (response.error().isFailure()) {
+                    future.completeExceptionally(response.error().exception());
                 } else {
                     future.complete(response.acls());
                 }
@@ -1330,8 +1330,8 @@ public class KafkaAdminClient extends AdminClient {
                             "The broker reported no creation result for the 
given ACL."));
                     } else {
                         AclCreationResponse creation = iter.next();
-                        if (creation.throwable() != null) {
-                            future.completeExceptionally(creation.throwable());
+                        if (creation.error().isFailure()) {
+                            
future.completeExceptionally(creation.error().exception());
                         } else {
                             future.complete(null);
                         }
@@ -1378,12 +1378,12 @@ public class KafkaAdminClient extends AdminClient {
                             "The broker reported no deletion result for the 
given filter."));
                     } else {
                         AclFilterResponse deletion = iter.next();
-                        if (deletion.throwable() != null) {
-                            future.completeExceptionally(deletion.throwable());
+                        if (deletion.error().isFailure()) {
+                            
future.completeExceptionally(deletion.error().exception());
                         } else {
                             List<FilterResult> filterResults = new 
ArrayList<>();
                             for (AclDeletionResult deletionResult : 
deletion.deletions()) {
-                                filterResults.add(new 
FilterResult(deletionResult.acl(), deletionResult.exception()));
+                                filterResults.add(new 
FilterResult(deletionResult.acl(), deletionResult.error().exception()));
                             }
                             future.complete(new FilterResults(filterResults));
                         }
@@ -1433,7 +1433,7 @@ public class KafkaAdminClient extends AdminClient {
                     ConfigResource configResource = entry.getKey();
                     KafkaFutureImpl<Config> future = entry.getValue();
                     DescribeConfigsResponse.Config config = 
response.config(configResourceToResource(configResource));
-                    if (!config.error().is(Errors.NONE)) {
+                    if (config.error().isFailure()) {
                         
future.completeExceptionally(config.error().exception());
                         continue;
                     }
@@ -1469,7 +1469,7 @@ public class KafkaAdminClient extends AdminClient {
                     DescribeConfigsResponse response = 
(DescribeConfigsResponse) abstractResponse;
                     DescribeConfigsResponse.Config config = 
response.configs().get(resource);
 
-                    if (!config.error().is(Errors.NONE))
+                    if (config.error().isFailure())
                         
brokerFuture.completeExceptionally(config.error().exception());
                     else {
                         List<ConfigEntry> configEntries = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java 
b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
index ea58434..d264ef1 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBinding.java
@@ -48,7 +48,7 @@ public class AclBinding {
     /**
      * Return true if this binding has any UNKNOWN components.
      */
-    public boolean unknown() {
+    public boolean isUnknown() {
         return resource.isUnknown() || entry.isUnknown();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java 
b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
index 807b730..64f16cd 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
@@ -56,8 +56,8 @@ public class AclBindingFilter {
     /**
      * Return true if this filter has any UNKNOWN components.
      */
-    public boolean unknown() {
-        return resourceFilter.unknown() || entryFilter.isUnknown();
+    public boolean isUnknown() {
+        return resourceFilter.isUnknown() || entryFilter.isUnknown();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
index 26034eb..d712123 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
@@ -29,6 +29,8 @@ import org.apache.kafka.common.protocol.types.Struct;
  */
 public class ApiError {
 
+    public static final ApiError NONE = new ApiError(Errors.NONE, null);
+
     private static final String CODE_KEY_NAME = "error_code";
     private static final String MESSAGE_KEY_NAME = "error_message";
 
@@ -67,6 +69,14 @@ public class ApiError {
         return this.error == error;
     }
 
+    public boolean isFailure() {
+        return !isSuccess();
+    }
+
+    public boolean isSuccess() {
+        return is(Errors.NONE);
+    }
+
     public Errors error() {
         return error;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
index 757b5af..3598d4f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
@@ -117,9 +117,8 @@ public class CreateAclsRequest extends AbstractRequest {
         switch (versionId) {
             case 0:
                 List<CreateAclsResponse.AclCreationResponse> responses = new 
ArrayList<>();
-                for (int i = 0; i < aclCreations.size(); i++) {
-                    responses.add(new 
CreateAclsResponse.AclCreationResponse(throwable));
-                }
+                for (int i = 0; i < aclCreations.size(); i++)
+                    responses.add(new 
CreateAclsResponse.AclCreationResponse(ApiError.fromThrowable(throwable)));
                 return new CreateAclsResponse(throttleTimeMs, responses);
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",

http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
index c84b97c..1fc75da 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -26,23 +25,21 @@ import java.util.List;
 
 public class CreateAclsResponse extends AbstractResponse {
     private final static String CREATION_RESPONSES = "creation_responses";
-    private final static String ERROR_CODE = "error_code";
-    private final static String ERROR_MESSAGE = "error_message";
 
     public static class AclCreationResponse {
-        private final Throwable throwable;
+        private final ApiError error;
 
-        public AclCreationResponse(Throwable throwable) {
-            this.throwable = throwable;
+        public AclCreationResponse(ApiError error) {
+            this.error = error;
         }
 
-        public Throwable throwable() {
-            return throwable;
+        public ApiError error() {
+            return error;
         }
 
         @Override
         public String toString() {
-            return "(" + throwable + ")";
+            return "(" + error + ")";
         }
     }
 
@@ -60,14 +57,8 @@ public class CreateAclsResponse extends AbstractResponse {
         this.aclCreationResponses = new ArrayList<>();
         for (Object responseStructObj : struct.getArray(CREATION_RESPONSES)) {
             Struct responseStruct = (Struct) responseStructObj;
-            short errorCode = responseStruct.getShort(ERROR_CODE);
-            String errorMessage = responseStruct.getString(ERROR_MESSAGE);
-            if (errorCode != 0) {
-                this.aclCreationResponses.add(new AclCreationResponse(
-                        Errors.forCode(errorCode).exception(errorMessage)));
-            } else {
-                this.aclCreationResponses.add(new AclCreationResponse(null));
-            }
+            ApiError error = new ApiError(responseStruct);
+            this.aclCreationResponses.add(new AclCreationResponse(error));
         }
     }
 
@@ -78,13 +69,7 @@ public class CreateAclsResponse extends AbstractResponse {
         List<Struct> responseStructs = new ArrayList<>();
         for (AclCreationResponse response : aclCreationResponses) {
             Struct responseStruct = struct.instance(CREATION_RESPONSES);
-            if (response.throwable() == null) {
-                responseStruct.set(ERROR_CODE, (short) 0);
-            } else {
-                Errors errors = Errors.forException(response.throwable());
-                responseStruct.set(ERROR_CODE, errors.code());
-                responseStruct.set(ERROR_MESSAGE, 
response.throwable().getMessage());
-            }
+            response.error.write(responseStruct);
             responseStructs.add(responseStruct);
         }
         struct.set(CREATION_RESPONSES, responseStructs.toArray());

http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
index 246b5e5..c05bec6 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
@@ -96,7 +96,7 @@ public class DeleteAclsRequest extends AbstractRequest {
                 List<DeleteAclsResponse.AclFilterResponse> responses = new 
ArrayList<>();
                 for (int i = 0; i < filters.size(); i++) {
                     responses.add(new DeleteAclsResponse.AclFilterResponse(
-                        throwable, 
Collections.<DeleteAclsResponse.AclDeletionResult>emptySet()));
+                        ApiError.fromThrowable(throwable), 
Collections.<DeleteAclsResponse.AclDeletionResult>emptySet()));
                 }
                 return new DeleteAclsResponse(throttleTimeMs, responses);
             default:

http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
index 94cd6aa..973aa8e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
@@ -18,9 +18,7 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
-import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.utils.Utils;
@@ -30,31 +28,28 @@ import org.slf4j.LoggerFactory;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 
 public class DeleteAclsResponse extends AbstractResponse {
     public static final Logger log = 
LoggerFactory.getLogger(DeleteAclsResponse.class);
     private final static String FILTER_RESPONSES = "filter_responses";
-    private final static String ERROR_CODE = "error_code";
-    private final static String ERROR_MESSAGE = "error_message";
     private final static String MATCHING_ACLS = "matching_acls";
 
     public static class AclDeletionResult {
-        private final ApiException exception;
+        private final ApiError error;
         private final AclBinding acl;
 
-        public AclDeletionResult(ApiException exception, AclBinding acl) {
-            this.exception = exception;
+        public AclDeletionResult(ApiError error, AclBinding acl) {
+            this.error = error;
             this.acl = acl;
         }
 
         public AclDeletionResult(AclBinding acl) {
-            this(null, acl);
+            this(ApiError.NONE, acl);
         }
 
-        public ApiException exception() {
-            return exception;
+        public ApiError error() {
+            return error;
         }
 
         public AclBinding acl() {
@@ -63,25 +58,25 @@ public class DeleteAclsResponse extends AbstractResponse {
 
         @Override
         public String toString() {
-            return "(apiException=" + exception + ", acl=" + acl + ")";
+            return "(error=" + error + ", acl=" + acl + ")";
         }
     }
 
     public static class AclFilterResponse {
-        private final Throwable throwable;
+        private final ApiError error;
         private final Collection<AclDeletionResult> deletions;
 
-        public AclFilterResponse(Throwable throwable, 
Collection<AclDeletionResult> deletions) {
-            this.throwable = throwable;
+        public AclFilterResponse(ApiError error, Collection<AclDeletionResult> 
deletions) {
+            this.error = error;
             this.deletions = deletions;
         }
 
         public AclFilterResponse(Collection<AclDeletionResult> deletions) {
-            this(null, deletions);
+            this(ApiError.NONE, deletions);
         }
 
-        public Throwable throwable() {
-            return throwable;
+        public ApiError error() {
+            return error;
         }
 
         public Collection<AclDeletionResult> deletions() {
@@ -90,7 +85,7 @@ public class DeleteAclsResponse extends AbstractResponse {
 
         @Override
         public String toString() {
-            return "(throwable=" + throwable + ", deletions=" + 
Utils.join(deletions, ",") + ")";
+            return "(error=" + error + ", deletions=" + Utils.join(deletions, 
",") + ")";
         }
     }
 
@@ -108,29 +103,16 @@ public class DeleteAclsResponse extends AbstractResponse {
         this.responses = new ArrayList<>();
         for (Object responseStructObj : struct.getArray(FILTER_RESPONSES)) {
             Struct responseStruct = (Struct) responseStructObj;
-            short responseErrorCode = responseStruct.getShort(ERROR_CODE);
-            String responseErrorMessage = 
responseStruct.getString(ERROR_MESSAGE);
-            if (responseErrorCode != 0) {
-                this.responses.add(new AclFilterResponse(
-                    
Errors.forCode(responseErrorCode).exception(responseErrorMessage),
-                    Collections.<AclDeletionResult>emptySet()));
-            } else {
-                List<AclDeletionResult> deletions = new ArrayList<>();
-                for (Object matchingAclStructObj : 
responseStruct.getArray(MATCHING_ACLS)) {
-                    Struct matchingAclStruct = (Struct) matchingAclStructObj;
-                    short matchErrorCode = 
matchingAclStruct.getShort(ERROR_CODE);
-                    ApiException exception = null;
-                    if (matchErrorCode != 0) {
-                        Errors errors = Errors.forCode(matchErrorCode);
-                        String matchErrorMessage = 
matchingAclStruct.getString(ERROR_MESSAGE);
-                        exception = errors.exception(matchErrorMessage);
-                    }
-                    AccessControlEntry entry = 
RequestUtils.aceFromStructFields(matchingAclStruct);
-                    Resource resource = 
RequestUtils.resourceFromStructFields(matchingAclStruct);
-                    deletions.add(new AclDeletionResult(exception, new 
AclBinding(resource, entry)));
-                }
-                this.responses.add(new AclFilterResponse(null, deletions));
+            ApiError error = new ApiError(responseStruct);
+            List<AclDeletionResult> deletions = new ArrayList<>();
+            for (Object matchingAclStructObj : 
responseStruct.getArray(MATCHING_ACLS)) {
+                Struct matchingAclStruct = (Struct) matchingAclStructObj;
+                ApiError matchError = new ApiError(matchingAclStruct);
+                AccessControlEntry entry = 
RequestUtils.aceFromStructFields(matchingAclStruct);
+                Resource resource = 
RequestUtils.resourceFromStructFields(matchingAclStruct);
+                deletions.add(new AclDeletionResult(matchError, new 
AclBinding(resource, entry)));
             }
+            this.responses.add(new AclFilterResponse(error, deletions));
         }
     }
 
@@ -141,29 +123,16 @@ public class DeleteAclsResponse extends AbstractResponse {
         List<Struct> responseStructs = new ArrayList<>();
         for (AclFilterResponse response : responses) {
             Struct responseStruct = struct.instance(FILTER_RESPONSES);
-            if (response.throwable() != null) {
-                Errors error = Errors.forException(response.throwable());
-                responseStruct.set(ERROR_CODE, error.code());
-                responseStruct.set(ERROR_MESSAGE, 
response.throwable().getMessage());
-                responseStruct.set(MATCHING_ACLS, new Struct[0]);
-            } else {
-                responseStruct.set(ERROR_CODE, (short) 0);
-                List<Struct> deletionStructs = new ArrayList<>();
-                for (AclDeletionResult deletion : response.deletions()) {
-                    Struct deletionStruct = 
responseStruct.instance(MATCHING_ACLS);
-                    if (deletion.exception() != null) {
-                        Errors error = Errors.forException(deletion.exception);
-                        deletionStruct.set(ERROR_CODE, error.code());
-                        deletionStruct.set(ERROR_MESSAGE, 
deletion.exception.getMessage());
-                    } else {
-                        deletionStruct.set(ERROR_CODE, (short) 0);
-                    }
-                    
RequestUtils.resourceSetStructFields(deletion.acl().resource(), deletionStruct);
-                    RequestUtils.aceSetStructFields(deletion.acl().entry(), 
deletionStruct);
-                    deletionStructs.add(deletionStruct);
-                }
-                responseStruct.set(MATCHING_ACLS, deletionStructs.toArray(new 
Struct[0]));
+            response.error.write(responseStruct);
+            List<Struct> deletionStructs = new ArrayList<>();
+            for (AclDeletionResult deletion : response.deletions()) {
+                Struct deletionStruct = responseStruct.instance(MATCHING_ACLS);
+                deletion.error.write(deletionStruct);
+                
RequestUtils.resourceSetStructFields(deletion.acl().resource(), deletionStruct);
+                RequestUtils.aceSetStructFields(deletion.acl().entry(), 
deletionStruct);
+                deletionStructs.add(deletionStruct);
             }
+            responseStruct.set(MATCHING_ACLS, deletionStructs.toArray(new 
Struct[0]));
             responseStructs.add(responseStruct);
         }
         struct.set(FILTER_RESPONSES, responseStructs.toArray());

http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
index 6573b6e..58ce539 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
@@ -73,7 +73,8 @@ public class DescribeAclsRequest extends AbstractRequest {
         short versionId = version();
         switch (versionId) {
             case 0:
-                return new DescribeAclsResponse(throttleTimeMs, throwable, 
Collections.<AclBinding>emptySet());
+                return new DescribeAclsResponse(throttleTimeMs, 
ApiError.fromThrowable(throwable),
+                        Collections.<AclBinding>emptySet());
             default:
                 throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
                         versionId, this.getClass().getSimpleName(), 
ApiKeys.DESCRIBE_ACLS.latestVersion()));

http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
index cf21aa6..993a45f 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
@@ -20,51 +20,41 @@ package org.apache.kafka.common.requests;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 import org.apache.kafka.common.resource.Resource;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 public class DescribeAclsResponse extends AbstractResponse {
-    private final static String ERROR_CODE = "error_code";
-    private final static String ERROR_MESSAGE = "error_message";
     private final static String RESOURCES = "resources";
     private final static String ACLS = "acls";
 
     private final int throttleTimeMs;
-    private final Throwable throwable;
+    private final ApiError error;
     private final Collection<AclBinding> acls;
 
-    public DescribeAclsResponse(int throttleTimeMs, Throwable throwable, 
Collection<AclBinding> acls) {
+    public DescribeAclsResponse(int throttleTimeMs, ApiError error, 
Collection<AclBinding> acls) {
         this.throttleTimeMs = throttleTimeMs;
-        this.throwable = throwable;
+        this.error = error;
         this.acls = acls;
     }
 
     public DescribeAclsResponse(Struct struct) {
         this.throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
-        Errors error = Errors.forCode(struct.getShort(ERROR_CODE));
-        if (error != Errors.NONE) {
-            this.throwable = error.exception(struct.getString(ERROR_MESSAGE));
-            this.acls = Collections.emptySet();
-        } else {
-            this.throwable = null;
-            this.acls = new ArrayList<>();
-            for (Object resourceStructObj : struct.getArray(RESOURCES)) {
-                Struct resourceStruct = (Struct) resourceStructObj;
-                Resource resource = 
RequestUtils.resourceFromStructFields(resourceStruct);
-                for (Object aclDataStructObj : resourceStruct.getArray(ACLS)) {
-                    Struct aclDataStruct = (Struct) aclDataStructObj;
-                    AccessControlEntry entry = 
RequestUtils.aceFromStructFields(aclDataStruct);
-                    this.acls.add(new AclBinding(resource, entry));
-                }
+        this.error = new ApiError(struct);
+        this.acls = new ArrayList<>();
+        for (Object resourceStructObj : struct.getArray(RESOURCES)) {
+            Struct resourceStruct = (Struct) resourceStructObj;
+            Resource resource = 
RequestUtils.resourceFromStructFields(resourceStruct);
+            for (Object aclDataStructObj : resourceStruct.getArray(ACLS)) {
+                Struct aclDataStruct = (Struct) aclDataStructObj;
+                AccessControlEntry entry = 
RequestUtils.aceFromStructFields(aclDataStruct);
+                this.acls.add(new AclBinding(resource, entry));
             }
         }
     }
@@ -73,15 +63,8 @@ public class DescribeAclsResponse extends AbstractResponse {
     protected Struct toStruct(short version) {
         Struct struct = new 
Struct(ApiKeys.DESCRIBE_ACLS.responseSchema(version));
         struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
-        if (throwable != null) {
-            Errors errors = Errors.forException(throwable);
-            struct.set(ERROR_CODE, errors.code());
-            struct.set(ERROR_MESSAGE, throwable.getMessage());
-            struct.set(RESOURCES, new Struct[0]);
-            return struct;
-        }
-        struct.set(ERROR_CODE, (short) 0);
-        struct.set(ERROR_MESSAGE, null);
+        error.write(struct);
+
         Map<Resource, List<AccessControlEntry>> resourceToData = new 
HashMap<>();
         for (AclBinding acl : acls) {
             List<AccessControlEntry> entry = 
resourceToData.get(acl.resource());
@@ -91,6 +74,7 @@ public class DescribeAclsResponse extends AbstractResponse {
             }
             entry.add(acl.entry());
         }
+
         List<Struct> resourceStructs = new ArrayList<>();
         for (Map.Entry<Resource, List<AccessControlEntry>> tuple : 
resourceToData.entrySet()) {
             Resource resource = tuple.getKey();
@@ -113,8 +97,8 @@ public class DescribeAclsResponse extends AbstractResponse {
         return throttleTimeMs;
     }
 
-    public Throwable throwable() {
-        return throwable;
+    public ApiError error() {
+        return error;
     }
 
     public Collection<AclBinding> acls() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java 
b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
index 5032660..0a4611f 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
@@ -70,7 +70,7 @@ public class ResourceFilter {
     /**
      * Return true if this ResourceFilter has any UNKNOWN components.
      */
-    public boolean unknown() {
+    public boolean isUnknown() {
         return resourceType.isUnknown();
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index cd6ed6b..8300e0f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -48,8 +48,6 @@ import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -60,6 +58,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
+import static java.util.Arrays.asList;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -178,7 +177,7 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNode(new Node(0, "localhost", 8121));
             env.kafkaClient().prepareResponse(new 
CreateTopicsResponse(Collections.singletonMap("myTopic", new 
ApiError(Errors.NONE, ""))));
             KafkaFuture<Void> future = env.adminClient().createTopics(
-                    Collections.singleton(new NewTopic("myTopic", 
Collections.singletonMap(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 
2})))),
+                    Collections.singleton(new NewTopic("myTopic", 
Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
                     new CreateTopicsOptions().timeoutMs(1000)).all();
             assertFutureError(future, TimeoutException.class);
         }
@@ -192,7 +191,7 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNode(env.cluster().controller());
             env.kafkaClient().prepareResponse(new 
CreateTopicsResponse(Collections.singletonMap("myTopic", new 
ApiError(Errors.NONE, ""))));
             KafkaFuture<Void> future = env.adminClient().createTopics(
-                    Collections.singleton(new NewTopic("myTopic", 
Collections.singletonMap(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 
2})))),
+                    Collections.singleton(new NewTopic("myTopic", 
Collections.singletonMap(Integer.valueOf(0), asList(new Integer[]{0, 1, 2})))),
                     new CreateTopicsOptions().timeoutMs(10000)).all();
             future.get();
         }
@@ -215,21 +214,18 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNode(env.cluster().controller());
 
             // Test a call where we get back ACL1 and ACL2.
-            env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, null,
-                new ArrayList<AclBinding>() {{
-                        add(ACL1);
-                        add(ACL2);
-                    }}));
+            env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, 
ApiError.NONE,
+                    asList(ACL1, ACL2)));
             
assertCollectionIs(env.adminClient().describeAcls(FILTER1).values().get(), 
ACL1, ACL2);
 
             // Test a call where we get back no results.
-            env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, null,
+            env.kafkaClient().prepareResponse(new DescribeAclsResponse(0, 
ApiError.NONE,
                 Collections.<AclBinding>emptySet()));
             
assertTrue(env.adminClient().describeAcls(FILTER2).values().get().isEmpty());
 
             // Test a call where we get back an error.
             env.kafkaClient().prepareResponse(new DescribeAclsResponse(0,
-                new SecurityDisabledException("Security is disabled"), 
Collections.<AclBinding>emptySet()));
+                new ApiError(Errors.SECURITY_DISABLED, "Security is 
disabled"), Collections.<AclBinding>emptySet()));
             
assertFutureError(env.adminClient().describeAcls(FILTER2).values(), 
SecurityDisabledException.class);
         }
     }
@@ -243,30 +239,19 @@ public class KafkaAdminClientTest {
 
             // Test a call where we successfully create two ACLs.
             env.kafkaClient().prepareResponse(new CreateAclsResponse(0,
-                new ArrayList<AclCreationResponse>() {{
-                        add(new AclCreationResponse(null));
-                        add(new AclCreationResponse(null));
-                    }}));
-            CreateAclsResult results = env.adminClient().createAcls(new 
ArrayList<AclBinding>() {{
-                        add(ACL1);
-                        add(ACL2);
-                    }});
+                asList(new AclCreationResponse(ApiError.NONE), new 
AclCreationResponse(ApiError.NONE))));
+            CreateAclsResult results = 
env.adminClient().createAcls(asList(ACL1, ACL2));
             assertCollectionIs(results.values().keySet(), ACL1, ACL2);
-            for (KafkaFuture<Void> future : results.values().values()) {
+            for (KafkaFuture<Void> future : results.values().values())
                 future.get();
-            }
             results.all().get();
 
             // Test a call where we fail to create one ACL.
-            env.kafkaClient().prepareResponse(new CreateAclsResponse(0,
-                    new ArrayList<AclCreationResponse>() {{
-                        add(new AclCreationResponse(new 
SecurityDisabledException("Security is disabled")));
-                        add(new AclCreationResponse(null));
-                    }}));
-            results = env.adminClient().createAcls(new ArrayList<AclBinding>() 
{{
-                    add(ACL1);
-                    add(ACL2);
-                }});
+            env.kafkaClient().prepareResponse(new CreateAclsResponse(0, asList(
+                new AclCreationResponse(new ApiError(Errors.SECURITY_DISABLED, 
"Security is disabled")),
+                new AclCreationResponse(ApiError.NONE))
+            ));
+            results = env.adminClient().createAcls(asList(ACL1, ACL2));
             assertCollectionIs(results.values().keySet(), ACL1, ACL2);
             assertFutureError(results.values().get(ACL1), 
SecurityDisabledException.class);
             results.values().get(ACL2).get();
@@ -282,19 +267,11 @@ public class KafkaAdminClientTest {
             env.kafkaClient().setNode(env.cluster().controller());
 
             // Test a call where one filter has an error.
-            env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, new 
ArrayList<AclFilterResponse>() {{
-                    add(new AclFilterResponse(null,
-                            new ArrayList<AclDeletionResult>() {{
-                                add(new AclDeletionResult(null, ACL1));
-                                add(new AclDeletionResult(null, ACL2));
-                            }}));
-                    add(new AclFilterResponse(new 
SecurityDisabledException("No security"),
-                        Collections.<AclDeletionResult>emptySet()));
-                }}));
-            DeleteAclsResult results = env.adminClient().deleteAcls(new 
ArrayList<AclBindingFilter>() {{
-                        add(FILTER1);
-                        add(FILTER2);
-                    }});
+            env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
+                    new AclFilterResponse(asList(new AclDeletionResult(ACL1), 
new AclDeletionResult(ACL2))),
+                    new AclFilterResponse(new 
ApiError(Errors.SECURITY_DISABLED, "No security"),
+                            Collections.<AclDeletionResult>emptySet()))));
+            DeleteAclsResult results = 
env.adminClient().deleteAcls(asList(FILTER1, FILTER2));
             Map<AclBindingFilter, KafkaFuture<FilterResults>> filterResults = 
results.values();
             FilterResults filter1Results = filterResults.get(FILTER1).get();
             assertEquals(null, filter1Results.values().get(0).exception());
@@ -305,38 +282,19 @@ public class KafkaAdminClientTest {
             assertFutureError(results.all(), SecurityDisabledException.class);
 
             // Test a call where one deletion result has an error.
-            env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, new 
ArrayList<AclFilterResponse>() {{
-                    add(new AclFilterResponse(null,
-                        new ArrayList<AclDeletionResult>() {{
-                                add(new AclDeletionResult(null, ACL1));
-                                add(new AclDeletionResult(new 
SecurityDisabledException("No security"), ACL2));
-                            }}));
-                    add(new AclFilterResponse(null, 
Collections.<AclDeletionResult>emptySet()));
-                }}));
-            results = env.adminClient().deleteAcls(
-                    new ArrayList<AclBindingFilter>() {{
-                            add(FILTER1);
-                            add(FILTER2);
-                        }});
+            env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
+                    new AclFilterResponse(asList(new AclDeletionResult(ACL1),
+                            new AclDeletionResult(new 
ApiError(Errors.SECURITY_DISABLED, "No security"), ACL2))),
+                    new 
AclFilterResponse(Collections.<AclDeletionResult>emptySet()))));
+            results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2));
             assertTrue(results.values().get(FILTER2).get().values().isEmpty());
             assertFutureError(results.all(), SecurityDisabledException.class);
 
             // Test a call where there are no errors.
-            env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, new 
ArrayList<AclFilterResponse>() {{
-                    add(new AclFilterResponse(null,
-                        new ArrayList<AclDeletionResult>() {{
-                                add(new AclDeletionResult(null, ACL1));
-                            }}));
-                    add(new AclFilterResponse(null,
-                        new ArrayList<AclDeletionResult>() {{
-                                add(new AclDeletionResult(null, ACL2));
-                            }}));
-                }}));
-            results = env.adminClient().deleteAcls(
-                    new ArrayList<AclBindingFilter>() {{
-                        add(FILTER1);
-                        add(FILTER2);
-                    }});
+            env.kafkaClient().prepareResponse(new DeleteAclsResponse(0, asList(
+                    new AclFilterResponse(asList(new AclDeletionResult(ACL1))),
+                    new AclFilterResponse(asList(new 
AclDeletionResult(ACL2))))));
+            results = env.adminClient().deleteAcls(asList(FILTER1, FILTER2));
             Collection<AclBinding> deleted = results.all().get();
             assertCollectionIs(deleted, ACL1, ACL2);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java 
b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
index e0a0598..0ebcdfe 100644
--- a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
@@ -92,13 +92,13 @@ public class AclBindingTest {
 
     @Test
     public void testUnknowns() throws Exception {
-        assertFalse(ACL1.unknown());
-        assertFalse(ACL2.unknown());
-        assertFalse(ACL3.unknown());
-        assertFalse(ANY_ANONYMOUS.unknown());
-        assertFalse(ANY_DENY.unknown());
-        assertFalse(ANY_MYTOPIC.unknown());
-        assertTrue(UNKNOWN_ACL.unknown());
+        assertFalse(ACL1.isUnknown());
+        assertFalse(ACL2.isUnknown());
+        assertFalse(ACL3.isUnknown());
+        assertFalse(ANY_ANONYMOUS.isUnknown());
+        assertFalse(ANY_DENY.isUnknown());
+        assertFalse(ANY_MYTOPIC.isUnknown());
+        assertTrue(UNKNOWN_ACL.isUnknown());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 327f228..467afb3 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -24,7 +24,6 @@ import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.NotCoordinatorException;
 import org.apache.kafka.common.errors.NotEnoughReplicasException;
 import org.apache.kafka.common.errors.SecurityDisabledException;
@@ -226,7 +225,7 @@ public class RequestResponseTest {
         checkResponse(createTxnOffsetCommitResponse(), 0);
         checkRequest(createListAclsRequest());
         checkErrorResponse(createListAclsRequest(), new 
SecurityDisabledException("Security is not enabled."));
-        checkResponse(createListAclsResponse(), 
ApiKeys.DESCRIBE_ACLS.latestVersion());
+        checkResponse(createDescribeAclsResponse(), 
ApiKeys.DESCRIBE_ACLS.latestVersion());
         checkRequest(createCreateAclsRequest());
         checkErrorResponse(createCreateAclsRequest(), new 
SecurityDisabledException("Security is not enabled."));
         checkResponse(createCreateAclsResponse(), 
ApiKeys.CREATE_ACLS.latestVersion());
@@ -1001,8 +1000,8 @@ public class RequestResponseTest {
                 new AccessControlEntryFilter(null, null, AclOperation.ANY, 
AclPermissionType.ANY))).build();
     }
 
-    private DescribeAclsResponse createListAclsResponse() {
-        return new DescribeAclsResponse(0, null, Collections.singleton(new 
AclBinding(
+    private DescribeAclsResponse createDescribeAclsResponse() {
+        return new DescribeAclsResponse(0, ApiError.NONE, 
Collections.singleton(new AclBinding(
             new Resource(ResourceType.TOPIC, "mytopic"),
             new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, 
AclPermissionType.ALLOW))));
     }
@@ -1019,8 +1018,8 @@ public class RequestResponseTest {
     }
 
     private CreateAclsResponse createCreateAclsResponse() {
-        return new CreateAclsResponse(0, Arrays.asList(new 
AclCreationResponse(null),
-            new AclCreationResponse(new InvalidRequestException("Foo bar"))));
+        return new CreateAclsResponse(0, Arrays.asList(new 
AclCreationResponse(ApiError.NONE),
+            new AclCreationResponse(new ApiError(Errors.INVALID_REQUEST, "Foo 
bar"))));
     }
 
     private DeleteAclsRequest createDeleteAclsRequest() {
@@ -1036,16 +1035,14 @@ public class RequestResponseTest {
 
     private DeleteAclsResponse createDeleteAclsResponse() {
         List<AclFilterResponse> responses = new ArrayList<>();
-        responses.add(new AclFilterResponse(null,
-            new HashSet<AclDeletionResult>() {{
-                    add(new AclDeletionResult(null, new AclBinding(
+        responses.add(new AclFilterResponse(Utils.mkSet(
+                new AclDeletionResult(new AclBinding(
                         new Resource(ResourceType.TOPIC, "mytopic3"),
-                        new AccessControlEntry("User:ANONYMOUS", "*", 
AclOperation.DESCRIBE, AclPermissionType.ALLOW))));
-                    add(new AclDeletionResult(null, new AclBinding(
+                        new AccessControlEntry("User:ANONYMOUS", "*", 
AclOperation.DESCRIBE, AclPermissionType.ALLOW))),
+                new AclDeletionResult(new AclBinding(
                         new Resource(ResourceType.TOPIC, "mytopic4"),
-                        new AccessControlEntry("User:ANONYMOUS", "*", 
AclOperation.DESCRIBE, AclPermissionType.DENY))));
-                }}));
-        responses.add(new AclFilterResponse(new SecurityDisabledException("No 
security"),
+                        new AccessControlEntry("User:ANONYMOUS", "*", 
AclOperation.DESCRIBE, AclPermissionType.DENY))))));
+        responses.add(new AclFilterResponse(new 
ApiError(Errors.SECURITY_DISABLED, "No security"),
             Collections.<AclDeletionResult>emptySet()));
         return new DeleteAclsResponse(0, responses);
     }
@@ -1071,9 +1068,9 @@ public class RequestResponseTest {
                 new DescribeConfigsResponse.ConfigEntry("another_name", 
"another value", true, false, true)
         );
         configs.put(new 
org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER,
 "0"), new DescribeConfigsResponse.Config(
-                new ApiError(Errors.NONE, null), configEntries));
+                ApiError.NONE, configEntries));
         configs.put(new 
org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC,
 "topic"), new DescribeConfigsResponse.Config(
-                new ApiError(Errors.NONE, null), 
Collections.<DescribeConfigsResponse.ConfigEntry>emptyList()));
+                ApiError.NONE, 
Collections.<DescribeConfigsResponse.ConfigEntry>emptyList()));
         return new DescribeConfigsResponse(200, configs);
     }
 
@@ -1091,7 +1088,7 @@ public class RequestResponseTest {
 
     private AlterConfigsResponse createAlterConfigsResponse() {
         Map<org.apache.kafka.common.requests.Resource, ApiError> errors = new 
HashMap<>();
-        errors.put(new 
org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER,
 "0"), new ApiError(Errors.NONE, null));
+        errors.put(new 
org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER,
 "0"), ApiError.NONE);
         errors.put(new 
org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC,
 "topic"), new ApiError(Errors.INVALID_REQUEST, "This request is invalid"));
         return new AlterConfigsResponse(20, errors);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/core/src/main/scala/kafka/security/SecurityUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/SecurityUtils.scala 
b/core/src/main/scala/kafka/security/SecurityUtils.scala
index bbfc42c..573a16b 100644
--- a/core/src/main/scala/kafka/security/SecurityUtils.scala
+++ b/core/src/main/scala/kafka/security/SecurityUtils.scala
@@ -19,27 +19,32 @@ package kafka.security
 
 import kafka.security.auth.{Acl, Operation, PermissionType, Resource, 
ResourceType}
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, 
AclBindingFilter}
-import org.apache.kafka.common.resource.{Resource => AdminResource, 
ResourceType => AdminResourceType}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.ApiError
+import org.apache.kafka.common.resource.{Resource => AdminResource}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
 
 
 object SecurityUtils {
 
-  def convertToResourceAndAcl(filter: AclBindingFilter): Try[(Resource, Acl)] 
= {
-    for {
+  def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError, 
(Resource, Acl)] = {
+    (for {
       resourceType <- 
Try(ResourceType.fromJava(filter.resourceFilter.resourceType))
       principal <- Try(KafkaPrincipal.fromString(filter.entryFilter.principal))
       operation <- Try(Operation.fromJava(filter.entryFilter.operation))
       permissionType <- 
Try(PermissionType.fromJava(filter.entryFilter.permissionType))
       resource = Resource(resourceType, filter.resourceFilter.name)
       acl = Acl(principal, permissionType, filter.entryFilter.host, operation)
-    } yield (resource, acl)
+    } yield (resource, acl)) match {
+      case Failure(throwable) => Left(new ApiError(Errors.INVALID_REQUEST, 
throwable.getMessage))
+      case Success(s) => Right(s)
+    }
   }
 
   def convertToAclBinding(resource: Resource, acl: Acl): AclBinding = {
-    val adminResource = new 
AdminResource(AdminResourceType.fromString(resource.resourceType.toString), 
resource.name)
+    val adminResource = new AdminResource(resource.resourceType.toJava, 
resource.name)
     val entry = new AccessControlEntry(acl.principal.toString, 
acl.host.toString,
       acl.operation.toJava, acl.permissionType.toJava)
     new AclBinding(adminResource, entry)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala 
b/core/src/main/scala/kafka/server/AdminManager.scala
index 33c6b77..84972f3 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -119,7 +119,7 @@ class AdminManager(val config: KafkaConfig,
             else
               
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, 
assignments, configs, update = false)
         }
-        CreateTopicMetadata(topic, assignments, new ApiError(Errors.NONE, 
null))
+        CreateTopicMetadata(topic, assignments, ApiError.NONE)
       } catch {
         // Log client errors at a lower level than unexpected exceptions
         case e@ (_: PolicyViolationException | _: ApiException) =>
@@ -135,7 +135,7 @@ class AdminManager(val config: KafkaConfig,
     if (timeout <= 0 || validateOnly || 
!metadata.exists(_.error.is(Errors.NONE))) {
       val results = metadata.map { createTopicMetadata =>
         // ignore topics that already have errors
-        if (createTopicMetadata.error.is(Errors.NONE) && !validateOnly) {
+        if (createTopicMetadata.error.isSuccess() && !validateOnly) {
           (createTopicMetadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, 
null))
         } else {
           (createTopicMetadata.topic, createTopicMetadata.error)
@@ -212,7 +212,7 @@ class AdminManager(val config: KafkaConfig,
           new DescribeConfigsResponse.ConfigEntry(name, valueAsString, 
isSensitive, isDefault(name), isReadOnly)
         }
 
-        new DescribeConfigsResponse.Config(new ApiError(Errors.NONE, null), 
configEntries.asJava)
+        new DescribeConfigsResponse.Config(ApiError.NONE, configEntries.asJava)
       }
 
       try {
@@ -280,7 +280,7 @@ class AdminManager(val config: KafkaConfig,
                 else
                   AdminUtils.changeTopicConfig(zkUtils, topic, properties)
             }
-            resource -> new ApiError(Errors.NONE, null)
+            resource -> ApiError.NONE
           case resourceType =>
             throw new InvalidRequestException(s"AlterConfigs is only supported 
for topics, but resource type is $resourceType")
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedCreateTopics.scala 
b/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
index abf6bc0..83cdd67 100644
--- a/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
+++ b/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
@@ -48,7 +48,7 @@ class DelayedCreateTopics(delayMs: Long,
   override def tryComplete() : Boolean = {
     trace(s"Trying to complete operation for $createMetadata")
 
-    val leaderlessPartitionCount = 
createMetadata.filter(_.error.is(Errors.NONE))
+    val leaderlessPartitionCount = createMetadata.filter(_.error.isSuccess)
       .foldLeft(0) { case (topicCounter, metadata) =>
         topicCounter + missingLeaderCount(metadata.topic, 
metadata.replicaAssignments.keySet)
       }
@@ -69,7 +69,7 @@ class DelayedCreateTopics(delayMs: Long,
     trace(s"Completing operation for $createMetadata")
     val results = createMetadata.map { metadata =>
       // ignore topics that already have errors
-      if (metadata.error.is(Errors.NONE) && missingLeaderCount(metadata.topic, 
metadata.replicaAssignments.keySet) > 0)
+      if (metadata.error.isSuccess && missingLeaderCount(metadata.topic, 
metadata.replicaAssignments.keySet) > 0)
         (metadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null))
       else
         (metadata.topic, metadata.error)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 337c740..a4fd30c 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -51,7 +51,7 @@ import 
org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{Node, TopicPartition}
 import org.apache.kafka.common.requests.SaslHandshakeResponse
-import org.apache.kafka.common.resource.{Resource => AdminResource, 
ResourceType => AdminResourceType}
+import org.apache.kafka.common.resource.{Resource => AdminResource}
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding}
 
 import scala.collection._
@@ -1763,22 +1763,19 @@ class KafkaApis(val requestChannel: RequestChannel,
       case None =>
         sendResponseMaybeThrottle(request, requestThrottleMs =>
           new DescribeAclsResponse(requestThrottleMs,
-            new SecurityDisabledException("No Authorizer is configured on the 
broker."),
-            Collections.emptySet()))
+            new ApiError(Errors.SECURITY_DISABLED, "No Authorizer is 
configured on the broker"), Collections.emptySet()))
       case Some(auth) =>
         val filter = describeAclsRequest.filter()
-        val returnedAcls = new util.ArrayList[AclBinding]
-        val aclMap = auth.getAcls()
-        aclMap.foreach { case (resource, acls) =>
-          acls.foreach { acl =>
-            val fixture = new AclBinding(new 
AdminResource(AdminResourceType.fromString(resource.resourceType.toString), 
resource.name),
+        val returnedAcls = auth.getAcls.toSeq.flatMap { case (resource, acls) 
=>
+          acls.flatMap { acl =>
+            val fixture = new AclBinding(new 
AdminResource(resource.resourceType.toJava, resource.name),
                 new AccessControlEntry(acl.principal.toString, 
acl.host.toString, acl.operation.toJava, acl.permissionType.toJava))
-            if (filter.matches(fixture))
-              returnedAcls.add(fixture)
+            if (filter.matches(fixture)) Some(fixture)
+            else None
           }
         }
         sendResponseMaybeThrottle(request, requestThrottleMs =>
-          new DescribeAclsResponse(requestThrottleMs, null, returnedAcls))
+          new DescribeAclsResponse(requestThrottleMs, ApiError.NONE, 
returnedAcls.asJava))
     }
   }
 
@@ -1793,8 +1790,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       case Some(auth) =>
         val aclCreationResults = createAclsRequest.aclCreations.asScala.map { 
aclCreation =>
           SecurityUtils.convertToResourceAndAcl(aclCreation.acl.toFilter) 
match {
-            case Failure(throwable) => new AclCreationResponse(throwable)
-            case Success((resource, acl)) => try {
+            case Left(apiError) => new AclCreationResponse(apiError)
+            case Right((resource, acl)) => try {
                 if (resource.resourceType.equals(Cluster) &&
                     !resource.name.equals(Resource.ClusterResourceName))
                   throw new InvalidRequestException("The only valid name for 
the CLUSTER resource is " +
@@ -1805,11 +1802,11 @@ class KafkaApis(val requestChannel: RequestChannel,
 
                 logger.debug(s"Added acl $acl to $resource")
 
-                new AclCreationResponse(null)
+                new AclCreationResponse(ApiError.NONE)
               } catch {
                 case throwable: Throwable =>
                   logger.debug(s"Failed to add acl $acl to $resource", 
throwable)
-                  new AclCreationResponse(throwable)
+                  new AclCreationResponse(ApiError.fromThrowable(throwable))
               }
           }
         }
@@ -1835,8 +1832,8 @@ class KafkaApis(val requestChannel: RequestChannel,
           // Delete based on a list of ACL fixtures.
           for ((filter, i) <- filters.zipWithIndex) {
             SecurityUtils.convertToResourceAndAcl(filter) match {
-              case Failure(throwable) => filterResponseMap.put(i, new 
AclFilterResponse(throwable, Seq.empty.asJava))
-              case Success(fixture) => toDelete.put(i, ArrayBuffer(fixture))
+              case Left(apiError) => filterResponseMap.put(i, new 
AclFilterResponse(apiError, Seq.empty.asJava))
+              case Right(binding) => toDelete.put(i, ArrayBuffer(binding))
             }
           }
         } else {
@@ -1845,7 +1842,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           val filtersWithIndex = filters.zipWithIndex
           for ((resource, acls) <- aclMap; acl <- acls) {
             val binding = new AclBinding(
-              new 
AdminResource(AdminResourceType.fromString(resource.resourceType.toString), 
resource.name),
+              new AdminResource(resource.resourceType.toJava, resource.name),
               new AccessControlEntry(acl.principal.toString, 
acl.host.toString, acl.operation.toJava,
                 acl.permissionType.toJava))
 
@@ -1863,8 +1860,7 @@ class KafkaApis(val requestChannel: RequestChannel,
               else None
             } catch {
               case throwable: Throwable =>
-                Some(new AclDeletionResult(new UnknownServerException(s"Failed 
to delete ACL $acl: $throwable"),
-                  aclBinding))
+                Some(new AclDeletionResult(ApiError.fromThrowable(throwable), 
aclBinding))
             }
           }.asJava
           

http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index bd04c7b..09ff9be 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -14,7 +14,7 @@ package kafka.api
 
 import java.nio.ByteBuffer
 import java.util
-import java.util.concurrent.{CountDownLatch, ExecutionException, TimeUnit}
+import java.util.concurrent.ExecutionException
 import java.util.regex.Pattern
 import java.util.{ArrayList, Collections, Properties}
 
@@ -22,14 +22,24 @@ import kafka.common.TopicAndPartition
 import kafka.security.auth._
 import kafka.server.{BaseRequestTest, KafkaConfig}
 import kafka.utils.TestUtils
+import kafka.admin.AdminUtils
+import kafka.log.LogConfig
+import kafka.network.SocketServer
+import org.apache.kafka.clients.consumer.OffsetAndMetadata
 import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic.GROUP_METADATA_TOPIC_NAME
+import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, SecurityProtocol}
 import org.apache.kafka.common.requests.{Resource => RResource, ResourceType 
=> RResourceType, _}
-import CreateTopicsRequest.TopicDetails
+import org.apache.kafka.common.acl.{AccessControlEntry, 
AccessControlEntryFilter, AclBinding, AclBindingFilter, AclOperation, 
AclPermissionType}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
RecordBatch, SimpleRecord}
+import org.apache.kafka.common.requests.CreateAclsRequest.AclCreation
+import org.apache.kafka.common.requests.CreateTopicsRequest.TopicDetails
+import org.apache.kafka.common.resource.{ResourceFilter, Resource => 
AdminResource, ResourceType => AdminResourceType}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.{Node, TopicPartition, requests}
 import org.junit.Assert._
@@ -38,13 +48,6 @@ import org.junit.{After, Assert, Before, Test}
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.collection.mutable.Buffer
-import org.apache.kafka.common.KafkaException
-import kafka.admin.AdminUtils
-import kafka.log.LogConfig
-import kafka.network.SocketServer
-import org.apache.kafka.clients.consumer.OffsetAndMetadata
-import org.apache.kafka.common.network.ListenerName
-import org.apache.kafka.common.record.{CompressionType, MemoryRecords, 
RecordBatch, SimpleRecord}
 
 class AuthorizerIntegrationTest extends BaseRequestTest {
 
@@ -72,6 +75,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   val groupDescribeAcl = Map(groupResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
   val clusterAcl = Map(Resource.ClusterResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, ClusterAction)))
   val clusterCreateAcl = Map(Resource.ClusterResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Create)))
+  val clusterAlterAcl = Map(Resource.ClusterResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Alter)))
+  val clusterDescribeAcl = Map(Resource.ClusterResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Describe)))
   val clusterIdempotentWriteAcl = Map(Resource.ClusterResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, IdempotentWrite)))
   val topicReadAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)))
   val topicWriteAcl = Map(topicResource -> Set(new 
Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)))
@@ -125,7 +130,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.ADD_PARTITIONS_TO_TXN -> classOf[AddPartitionsToTxnResponse],
       ApiKeys.ADD_OFFSETS_TO_TXN -> classOf[AddOffsetsToTxnResponse],
       ApiKeys.END_TXN -> classOf[EndTxnResponse],
-      ApiKeys.TXN_OFFSET_COMMIT -> classOf[TxnOffsetCommitResponse]
+      ApiKeys.TXN_OFFSET_COMMIT -> classOf[TxnOffsetCommitResponse],
+      ApiKeys.CREATE_ACLS -> classOf[CreateAclsResponse],
+      ApiKeys.DELETE_ACLS -> classOf[DeleteAclsResponse],
+      ApiKeys.DESCRIBE_ACLS -> classOf[DescribeAclsResponse]
   )
 
   val requestKeyToError = Map[ApiKeys, Nothing => Errors](
@@ -156,7 +164,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.ADD_PARTITIONS_TO_TXN -> ((resp: AddPartitionsToTxnResponse) => 
resp.errors.get(tp)),
     ApiKeys.ADD_OFFSETS_TO_TXN -> ((resp: AddOffsetsToTxnResponse) => 
resp.error),
     ApiKeys.END_TXN -> ((resp: EndTxnResponse) => resp.error),
-    ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) => 
resp.errors.get(tp))
+    ApiKeys.TXN_OFFSET_COMMIT -> ((resp: TxnOffsetCommitResponse) => 
resp.errors.get(tp)),
+    ApiKeys.CREATE_ACLS -> ((resp: CreateAclsResponse) => 
resp.aclCreationResponses.asScala.head.error.error),
+    ApiKeys.DESCRIBE_ACLS -> ((resp: DescribeAclsResponse) => 
resp.error.error),
+    ApiKeys.DELETE_ACLS -> ((resp: DeleteAclsResponse) => 
resp.responses.asScala.head.error.error)
   )
 
   val requestKeysToAcls = Map[ApiKeys, Map[Resource, Set[Acl]]](
@@ -185,7 +196,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     ApiKeys.ADD_PARTITIONS_TO_TXN -> (topicWriteAcl ++ transactionIdWriteAcl),
     ApiKeys.ADD_OFFSETS_TO_TXN -> (groupReadAcl ++ transactionIdWriteAcl),
     ApiKeys.END_TXN -> transactionIdWriteAcl,
-    ApiKeys.TXN_OFFSET_COMMIT -> (groupReadAcl ++ transactionIdWriteAcl)
+    ApiKeys.TXN_OFFSET_COMMIT -> (groupReadAcl ++ transactionIdWriteAcl),
+    ApiKeys.CREATE_ACLS -> clusterAlterAcl,
+    ApiKeys.DESCRIBE_ACLS -> clusterDescribeAcl,
+    ApiKeys.DELETE_ACLS -> clusterAlterAcl
   )
 
   @Before
@@ -284,46 +298,47 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       build()
   }
 
-  private def createHeartbeatRequest = {
-    new HeartbeatRequest.Builder(group, 1, "").build()
-  }
+  private def heartbeatRequest = new HeartbeatRequest.Builder(group, 1, 
"").build()
 
-  private def createLeaveGroupRequest = {
-    new LeaveGroupRequest.Builder(group, "").build()
-  }
+  private def leaveGroupRequest = new LeaveGroupRequest.Builder(group, 
"").build()
 
-  private def createLeaderAndIsrRequest = {
+  private def leaderAndIsrRequest = {
     new requests.LeaderAndIsrRequest.Builder(brokerId, Int.MaxValue,
       Map(tp -> new PartitionState(Int.MaxValue, brokerId, Int.MaxValue, 
List(brokerId).asJava, 2, Seq(brokerId).asJava)).asJava,
       Set(new Node(brokerId, "localhost", 0)).asJava).build()
   }
 
-  private def createStopReplicaRequest = {
-    new requests.StopReplicaRequest.Builder(brokerId, Int.MaxValue, true, 
Set(tp).asJava).build()
-  }
+  private def stopReplicaRequest = new StopReplicaRequest.Builder(brokerId, 
Int.MaxValue, true, Set(tp).asJava).build()
 
-  private def createControlledShutdownRequest = {
-    new requests.ControlledShutdownRequest.Builder(brokerId).build()
-  }
+  private def controlledShutdownRequest = new 
requests.ControlledShutdownRequest.Builder(brokerId).build()
 
-  private def createTopicsRequest = {
+  private def createTopicsRequest =
     new CreateTopicsRequest.Builder(Map(createTopic -> new TopicDetails(1, 
1.toShort)).asJava, 0).build()
-  }
 
-  private def deleteTopicsRequest = {
-    new DeleteTopicsRequest.Builder(Set(deleteTopic).asJava, 5000).build()
-  }
+  private def deleteTopicsRequest = new 
DeleteTopicsRequest.Builder(Set(deleteTopic).asJava, 5000).build()
 
-  private def createDescribeConfigsRequest =
+  private def describeConfigsRequest =
     new DescribeConfigsRequest.Builder(Collections.singleton(new 
RResource(RResourceType.TOPIC, tp.topic))).build()
 
-  private def createAlterConfigsRequest =
+  private def alterConfigsRequest =
     new AlterConfigsRequest.Builder(
       Collections.singletonMap(new RResource(RResourceType.TOPIC, tp.topic),
         new AlterConfigsRequest.Config(Collections.singleton(
           new AlterConfigsRequest.ConfigEntry(LogConfig.MaxMessageBytesProp, 
"1000000")
         ))), true).build()
 
+  private def describeAclsRequest = new 
DescribeAclsRequest.Builder(AclBindingFilter.ANY).build()
+
+  private def createAclsRequest = new CreateAclsRequest.Builder(
+    Collections.singletonList(new AclCreation(new AclBinding(
+      new AdminResource(AdminResourceType.TOPIC, "mytopic"),
+      new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, 
AclPermissionType.DENY))))).build()
+
+  private def deleteAclsRequest = new DeleteAclsRequest.Builder(
+    Collections.singletonList(new AclBindingFilter(
+      new ResourceFilter(AdminResourceType.TOPIC, null),
+      new AccessControlEntryFilter("User:ANONYMOUS", "*", AclOperation.ANY, 
AclPermissionType.DENY)))).build()
+
 
   @Test
   def testAuthorizationWithTopicExisting() {
@@ -338,16 +353,19 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
       ApiKeys.JOIN_GROUP -> createJoinGroupRequest,
       ApiKeys.SYNC_GROUP -> createSyncGroupRequest,
       ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest,
-      ApiKeys.HEARTBEAT -> createHeartbeatRequest,
-      ApiKeys.LEAVE_GROUP -> createLeaveGroupRequest,
-      ApiKeys.LEADER_AND_ISR -> createLeaderAndIsrRequest,
-      ApiKeys.STOP_REPLICA -> createStopReplicaRequest,
-      ApiKeys.CONTROLLED_SHUTDOWN_KEY -> createControlledShutdownRequest,
+      ApiKeys.HEARTBEAT -> heartbeatRequest,
+      ApiKeys.LEAVE_GROUP -> leaveGroupRequest,
+      ApiKeys.LEADER_AND_ISR -> leaderAndIsrRequest,
+      ApiKeys.STOP_REPLICA -> stopReplicaRequest,
+      ApiKeys.CONTROLLED_SHUTDOWN_KEY -> controlledShutdownRequest,
       ApiKeys.CREATE_TOPICS -> createTopicsRequest,
       ApiKeys.DELETE_TOPICS -> deleteTopicsRequest,
       ApiKeys.OFFSET_FOR_LEADER_EPOCH -> offsetsForLeaderEpochRequest,
-      ApiKeys.DESCRIBE_CONFIGS -> createDescribeConfigsRequest,
-      ApiKeys.ALTER_CONFIGS -> createAlterConfigsRequest
+      ApiKeys.DESCRIBE_CONFIGS -> describeConfigsRequest,
+      ApiKeys.ALTER_CONFIGS -> alterConfigsRequest,
+      ApiKeys.CREATE_ACLS -> createAclsRequest,
+      ApiKeys.DELETE_ACLS -> deleteAclsRequest,
+      ApiKeys.DESCRIBE_ACLS -> describeAclsRequest
     )
 
     for ((key, request) <- requestKeyToRequest) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index b4e09b3..03afc9e 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -31,13 +31,13 @@ import scala.util.{Failure, Success, Try}
 
 class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest 
with SaslSetup {
   this.serverConfig.setProperty(KafkaConfig.ZkEnableSecureAclsProp, "true")
-  this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, 
classOf[SimpleAclAuthorizer].getName())
+  this.serverConfig.setProperty(KafkaConfig.AuthorizerClassNameProp, 
classOf[SimpleAclAuthorizer].getName)
 
   override protected def securityProtocol = SecurityProtocol.SASL_SSL
   override protected lazy val trustStoreFile = 
Some(File.createTempFile("truststore", ".jks"))
 
   override def configureSecurityBeforeServersStart() {
-    val authorizer = 
CoreUtils.createObject[Authorizer](classOf[SimpleAclAuthorizer].getName())
+    val authorizer = 
CoreUtils.createObject[Authorizer](classOf[SimpleAclAuthorizer].getName)
     try {
       authorizer.configure(this.configs.head.originals())
       authorizer.addAcls(Set(new AuthAcl(AuthAcl.WildCardPrincipal, Allow,
@@ -92,6 +92,8 @@ class SaslSslAdminClientIntegrationTest extends 
AdminClientIntegrationTest with
     new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, 
AclPermissionType.ALLOW))
   val fooAcl = new AclBinding(new Resource(ResourceType.TOPIC, "foobar"),
     new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.READ, 
AclPermissionType.ALLOW))
+  val transactionalIdAcl = new AclBinding(new 
Resource(ResourceType.TRANSACTIONAL_ID, "transactional_id"),
+    new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, 
AclPermissionType.ALLOW))
 
   @Test
   override def testAclOperations(): Unit = {
@@ -116,28 +118,33 @@ class SaslSslAdminClientIntegrationTest extends 
AdminClientIntegrationTest with
     TestUtils.waitUntilTrue(() => {
       val results = client.describeAcls(filter).values.get()
       acls == results.asScala.toSet
-    }, "timed out waiting for ACLs")
+    }, s"timed out waiting for ACLs $acls")
   }
 
   @Test
   def testAclOperations2(): Unit = {
     client = AdminClient.create(createConfig())
-    val results = client.createAcls(List(acl2, acl2).asJava)
-    assertEquals(Set(acl2, acl2), results.values.keySet().asScala)
+    val results = client.createAcls(List(acl2, acl2, 
transactionalIdAcl).asJava)
+    assertEquals(Set(acl2, acl2, transactionalIdAcl), 
results.values.keySet.asScala)
     results.all.get()
     waitForDescribeAcls(client, acl2.toFilter, Set(acl2))
+    waitForDescribeAcls(client, transactionalIdAcl.toFilter, 
Set(transactionalIdAcl))
 
     val filterA = new AclBindingFilter(new ResourceFilter(ResourceType.GROUP, 
null), AccessControlEntryFilter.ANY)
     val filterB = new AclBindingFilter(new ResourceFilter(ResourceType.TOPIC, 
"mytopic2"), AccessControlEntryFilter.ANY)
+    val filterC = new AclBindingFilter(new 
ResourceFilter(ResourceType.TRANSACTIONAL_ID, null), 
AccessControlEntryFilter.ANY)
 
     waitForDescribeAcls(client, filterA, Set())
+    waitForDescribeAcls(client, filterC, Set(transactionalIdAcl))
 
-    val results2 = client.deleteAcls(List(filterA, filterB).asJava, new 
DeleteAclsOptions())
-    assertEquals(Set(filterA, filterB), results2.values.keySet().asScala)
+    val results2 = client.deleteAcls(List(filterA, filterB, filterC).asJava, 
new DeleteAclsOptions())
+    assertEquals(Set(filterA, filterB, filterC), 
results2.values.keySet.asScala)
     assertEquals(Set(), 
results2.values.get(filterA).get.values.asScala.map(_.binding).toSet)
+    assertEquals(Set(transactionalIdAcl), 
results2.values.get(filterC).get.values.asScala.map(_.binding).toSet)
     assertEquals(Set(acl2), 
results2.values.get(filterB).get.values.asScala.map(_.binding).toSet)
 
     waitForDescribeAcls(client, filterB, Set())
+    waitForDescribeAcls(client, filterC, Set())
   }
 
   @Test
@@ -161,7 +168,7 @@ class SaslSslAdminClientIntegrationTest extends 
AdminClientIntegrationTest with
 
   private def testAclCreateGetDelete(expectAuth: Boolean): Unit = {
     TestUtils.waitUntilTrue(() => {
-      val result = client.createAcls(List(fooAcl).asJava, new 
CreateAclsOptions)
+      val result = client.createAcls(List(fooAcl, transactionalIdAcl).asJava, 
new CreateAclsOptions)
       if (expectAuth) {
         Try(result.all.get) match {
           case Failure(e) =>
@@ -180,9 +187,10 @@ class SaslSslAdminClientIntegrationTest extends 
AdminClientIntegrationTest with
     }, "timed out waiting for createAcls to " + (if (expectAuth) "succeed" 
else "fail"))
     if (expectAuth) {
       waitForDescribeAcls(client, fooAcl.toFilter, Set(fooAcl))
+      waitForDescribeAcls(client, transactionalIdAcl.toFilter, 
Set(transactionalIdAcl))
     }
     TestUtils.waitUntilTrue(() => {
-      val result = client.deleteAcls(List(fooAcl.toFilter).asJava, new 
DeleteAclsOptions)
+      val result = client.deleteAcls(List(fooAcl.toFilter, 
transactionalIdAcl.toFilter).asJava, new DeleteAclsOptions)
       if (expectAuth) {
         Try(result.all.get) match {
           case Failure(e) =>
@@ -196,13 +204,17 @@ class SaslSslAdminClientIntegrationTest extends 
AdminClientIntegrationTest with
             verifyCauseIsClusterAuth(e)
             true
           case Success(_) =>
+            assertEquals(Set(fooAcl, transactionalIdAcl), result.values.keySet)
             assertEquals(Set(fooAcl), 
result.values.get(fooAcl.toFilter).get.values.asScala.map(_.binding).toSet)
+            assertEquals(Set(transactionalIdAcl),
+              
result.values.get(transactionalIdAcl.toFilter).get.values.asScala.map(_.binding).toSet)
             true
         }
       }
     }, "timed out waiting for deleteAcls to " + (if (expectAuth) "succeed" 
else "fail"))
     if (expectAuth) {
-      waitForDescribeAcls(client, fooAcl.toFilter, Set())
+      waitForDescribeAcls(client, fooAcl.toFilter, Set.empty)
+      waitForDescribeAcls(client, transactionalIdAcl.toFilter, Set.empty)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
index 0ef3405..d89a9df 100644
--- 
a/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/AbstractCreateTopicsRequestTest.scala
@@ -36,7 +36,7 @@ class AbstractCreateTopicsRequestTest extends BaseRequestTest 
{
   protected def validateValidCreateTopicsRequests(request: 
CreateTopicsRequest): Unit = {
     val response = sendCreateTopicRequest(request)
 
-    val error = response.errors.values.asScala.find(!_.is(Errors.NONE))
+    val error = response.errors.values.asScala.find(_.isFailure)
     assertTrue(s"There should be no errors, found ${response.errors.asScala}", 
error.isEmpty)
 
     request.topics.asScala.foreach { case (topic, details) =>
@@ -118,7 +118,7 @@ class AbstractCreateTopicsRequestTest extends 
BaseRequestTest {
         assertEquals(expected.messageWithFallback, actual.messageWithFallback)
       }
       // If no error validate topic exists
-      if (expectedError.is(Errors.NONE) && !request.validateOnly) {
+      if (expectedError.isSuccess && !request.validateOnly) {
         validateTopicExists(topic)
       }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d68f9e2f/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
index 2d47876..ebb3344 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -193,7 +193,7 @@ public class StreamsKafkaClient {
 
         for (InternalTopicConfig internalTopicConfig : topicsMap.keySet()) {
             ApiError error = 
createTopicsResponse.errors().get(internalTopicConfig.name());
-            if (!error.is(Errors.NONE) && 
!error.is(Errors.TOPIC_ALREADY_EXISTS)) {
+            if (error.isFailure() && !error.is(Errors.TOPIC_ALREADY_EXISTS)) {
                 throw new StreamsException("Could not create topic: " + 
internalTopicConfig.name() + " due to " + error.messageWithFallback());
             }
         }

Reply via email to