[ 
https://issues.apache.org/jira/browse/KAFKA-6841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16501111#comment-16501111
 ] 

ASF GitHub Bot commented on KAFKA-6841:
---------------------------------------

piyushvijay closed pull request #5079: KAFKA-6841: Add support for Prefixed ACLs
URL: https://github.com/apache/kafka/pull/5079
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index e4f9a4e8f59..87d9ee50c1e 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -146,7 +146,7 @@
     </subpackage>
 
     <subpackage name="utils">
-      <allow pkg="org.apache.kafka.common.metrics" />
+      <allow pkg="org.apache.kafka.common" />
     </subpackage>
   </subpackage>
 
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ba48c38cb28..0e1861dbd9b 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -51,7 +51,7 @@
               files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler).java"/>
+              
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler|ResourceUtils).java"/>
 
     <suppress checks="JavaNCSS"
               
files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java"/>
diff --git 
a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java 
b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
index 64f16cd7460..5841b5aeb9a 100644
--- a/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/acl/AclBindingFilter.java
@@ -19,7 +19,6 @@
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.resource.ResourceFilter;
-import org.apache.kafka.common.resource.ResourceType;
 
 import java.util.Objects;
 
@@ -36,9 +35,7 @@
     /**
      * A filter which matches any ACL binding.
      */
-    public static final AclBindingFilter ANY = new AclBindingFilter(
-        new ResourceFilter(ResourceType.ANY, null),
-        new AccessControlEntryFilter(null, null, AclOperation.ANY, 
AclPermissionType.ANY));
+    public static final AclBindingFilter ANY = new 
AclBindingFilter(ResourceFilter.ANY, AccessControlEntryFilter.ANY);
 
     /**
      * Create an instance of this filter with the provided parameters.
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
index 7f43caf8696..8bc546efcec 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/CommonFields.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.protocol;
 
 import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.requests.ResourceNameType;
 
 public class CommonFields {
     public static final Field.Int32 THROTTLE_TIME_MS = new 
Field.Int32("throttle_time_ms",
@@ -45,6 +46,7 @@
     public static final Field.Int8 RESOURCE_TYPE = new 
Field.Int8("resource_type", "The resource type");
     public static final Field.Str RESOURCE_NAME = new 
Field.Str("resource_name", "The resource name");
     public static final Field.NullableStr RESOURCE_NAME_FILTER = new 
Field.NullableStr("resource_name", "The resource name filter");
+    public static final Field.Int8 RESOURCE_NAME_TYPE = new 
Field.Int8("resource_name_type", "The resource name type", 
ResourceNameType.LITERAL.code());
     public static final Field.Str PRINCIPAL = new Field.Str("principal", "The 
ACL principal");
     public static final Field.NullableStr PRINCIPAL_FILTER = new 
Field.NullableStr("principal", "The ACL principal filter");
     public static final Field.Str HOST = new Field.Str("host", "The ACL host");
diff --git 
a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
index ec217f5bd0b..5c170017454 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
@@ -50,6 +50,9 @@ public Field(String name, Type type) {
         public Int8(String name, String docString) {
             super(name, Type.INT8, docString, false, null);
         }
+        public Int8(String name, String docString, byte defaultValue) {
+            super(name, Type.INT8, docString, true, defaultValue);
+        }
     }
 
     public static class Int32 extends Field {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
index d281b3ba4af..8d6e9054353 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsRequest.java
@@ -36,6 +36,7 @@
 import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
 
 public class CreateAclsRequest extends AbstractRequest {
@@ -50,8 +51,18 @@
                     OPERATION,
                     PERMISSION_TYPE))));
 
+    private static final Schema CREATE_ACLS_REQUEST_V1 = new Schema(
+            new Field(CREATIONS_KEY_NAME, new ArrayOf(new Schema(
+                    RESOURCE_TYPE,
+                    RESOURCE_NAME,
+                    RESOURCE_NAME_TYPE,
+                    PRINCIPAL,
+                    HOST,
+                    OPERATION,
+                    PERMISSION_TYPE))));
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{CREATE_ACLS_REQUEST_V0};
+        return new Schema[]{CREATE_ACLS_REQUEST_V0, CREATE_ACLS_REQUEST_V1};
     }
 
     public static class AclCreation {
@@ -139,6 +150,7 @@ public AbstractResponse getErrorResponse(int 
throttleTimeMs, Throwable throwable
         short versionId = version();
         switch (versionId) {
             case 0:
+            case 1:
                 List<CreateAclsResponse.AclCreationResponse> responses = new 
ArrayList<>();
                 for (int i = 0; i < aclCreations.size(); i++)
                     responses.add(new 
CreateAclsResponse.AclCreationResponse(ApiError.fromThrowable(throwable)));
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
index 6c3579893ce..ac981f8e67e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateAclsResponse.java
@@ -42,8 +42,11 @@
                     ERROR_CODE,
                     ERROR_MESSAGE))));
 
+    // v1 is same as v0, request has additional resource_name_type
+    private static final Schema CREATE_ACLS_RESPONSE_V1 = 
CREATE_ACLS_RESPONSE_V0;
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{CREATE_ACLS_RESPONSE_V0};
+        return new Schema[]{CREATE_ACLS_RESPONSE_V0, CREATE_ACLS_RESPONSE_V1};
     }
 
     public static class AclCreationResponse {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
index 2d50ea65cda..1125c6acf5e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsRequest.java
@@ -37,6 +37,7 @@
 import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_FILTER;
 import static 
org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_FILTER;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
 
 public class DeleteAclsRequest extends AbstractRequest {
@@ -51,8 +52,18 @@
                     OPERATION,
                     PERMISSION_TYPE))));
 
+    private static final Schema DELETE_ACLS_REQUEST_V1 = new Schema(
+            new Field(FILTERS, new ArrayOf(new Schema(
+                    RESOURCE_TYPE,
+                    RESOURCE_NAME_FILTER,
+                    RESOURCE_NAME_TYPE,
+                    PRINCIPAL_FILTER,
+                    HOST_FILTER,
+                    OPERATION,
+                    PERMISSION_TYPE))));
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{DELETE_ACLS_REQUEST_V0};
+        return new Schema[]{DELETE_ACLS_REQUEST_V0, DELETE_ACLS_REQUEST_V1};
     }
 
     public static class Builder extends 
AbstractRequest.Builder<DeleteAclsRequest> {
@@ -115,10 +126,11 @@ public AbstractResponse getErrorResponse(int 
throttleTimeMs, Throwable throwable
         short versionId = version();
         switch (versionId) {
             case 0:
+            case 1:
                 List<DeleteAclsResponse.AclFilterResponse> responses = new 
ArrayList<>();
                 for (int i = 0; i < filters.size(); i++) {
                     responses.add(new DeleteAclsResponse.AclFilterResponse(
-                        ApiError.fromThrowable(throwable), 
Collections.<DeleteAclsResponse.AclDeletionResult>emptySet()));
+                            ApiError.fromThrowable(throwable), 
Collections.<DeleteAclsResponse.AclDeletionResult>emptySet()));
                 }
                 return new DeleteAclsResponse(throttleTimeMs, responses);
             default:
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
index 7ae25da2c62..ae16abaf0c5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DeleteAclsResponse.java
@@ -43,6 +43,7 @@
 import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
 
@@ -51,7 +52,7 @@
     private final static String FILTER_RESPONSES_KEY_NAME = "filter_responses";
     private final static String MATCHING_ACLS_KEY_NAME = "matching_acls";
 
-    private static final Schema MATCHING_ACL = new Schema(
+    private static final Schema MATCHING_ACL_V0 = new Schema(
             ERROR_CODE,
             ERROR_MESSAGE,
             RESOURCE_TYPE,
@@ -61,16 +62,35 @@
             OPERATION,
             PERMISSION_TYPE);
 
+    private static final Schema MATCHING_ACL_V1 = new Schema(
+            ERROR_CODE,
+            ERROR_MESSAGE,
+            RESOURCE_TYPE,
+            RESOURCE_NAME,
+            RESOURCE_NAME_TYPE,
+            PRINCIPAL,
+            HOST,
+            OPERATION,
+            PERMISSION_TYPE);
+
     private static final Schema DELETE_ACLS_RESPONSE_V0 = new Schema(
             THROTTLE_TIME_MS,
             new Field(FILTER_RESPONSES_KEY_NAME,
                     new ArrayOf(new Schema(
                             ERROR_CODE,
                             ERROR_MESSAGE,
-                            new Field(MATCHING_ACLS_KEY_NAME, new 
ArrayOf(MATCHING_ACL), "The matching ACLs")))));
+                            new Field(MATCHING_ACLS_KEY_NAME, new 
ArrayOf(MATCHING_ACL_V0), "The matching ACLs")))));
+
+    private static final Schema DELETE_ACLS_RESPONSE_V1 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(FILTER_RESPONSES_KEY_NAME,
+                    new ArrayOf(new Schema(
+                            ERROR_CODE,
+                            ERROR_MESSAGE,
+                            new Field(MATCHING_ACLS_KEY_NAME, new 
ArrayOf(MATCHING_ACL_V1), "The matching ACLs")))));
 
     public static Schema[] schemaVersions() {
-        return new Schema[]{DELETE_ACLS_RESPONSE_V0};
+        return new Schema[]{DELETE_ACLS_RESPONSE_V0, DELETE_ACLS_RESPONSE_V1};
     }
 
     public static class AclDeletionResult {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
index 1bacac7c7f5..855153b7ca1 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsRequest.java
@@ -32,6 +32,7 @@
 import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_FILTER;
 import static 
org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_FILTER;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
 
 public class DescribeAclsRequest extends AbstractRequest {
@@ -43,8 +44,17 @@
             OPERATION,
             PERMISSION_TYPE);
 
+    private static final Schema DESCRIBE_ACLS_REQUEST_V1 = new Schema(
+            RESOURCE_TYPE,
+            RESOURCE_NAME_FILTER,
+            RESOURCE_NAME_TYPE,
+            PRINCIPAL_FILTER,
+            HOST_FILTER,
+            OPERATION,
+            PERMISSION_TYPE);
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{DESCRIBE_ACLS_REQUEST_V0};
+        return new Schema[]{DESCRIBE_ACLS_REQUEST_V0, 
DESCRIBE_ACLS_REQUEST_V1};
     }
 
     public static class Builder extends 
AbstractRequest.Builder<DescribeAclsRequest> {
@@ -93,6 +103,7 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, 
Throwable throwable
         short versionId = version();
         switch (versionId) {
             case 0:
+            case 1:
                 return new DescribeAclsResponse(throttleTimeMs, 
ApiError.fromThrowable(throwable),
                         Collections.<AclBinding>emptySet());
             default:
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
index a21230bfc39..c520f90c13d 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeAclsResponse.java
@@ -41,6 +41,7 @@
 import static org.apache.kafka.common.protocol.CommonFields.PERMISSION_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
 
@@ -48,7 +49,7 @@
     private final static String RESOURCES_KEY_NAME = "resources";
     private final static String ACLS_KEY_NAME = "acls";
 
-    private static final Schema DESCRIBE_ACLS_RESOURCE = new Schema(
+    private static final Schema DESCRIBE_ACLS_RESOURCE_V0 = new Schema(
             RESOURCE_TYPE,
             RESOURCE_NAME,
             new Field(ACLS_KEY_NAME, new ArrayOf(new Schema(
@@ -57,14 +58,30 @@
                     OPERATION,
                     PERMISSION_TYPE))));
 
+    private static final Schema DESCRIBE_ACLS_RESOURCE_V1 = new Schema(
+            RESOURCE_TYPE,
+            RESOURCE_NAME,
+            RESOURCE_NAME_TYPE,
+            new Field(ACLS_KEY_NAME, new ArrayOf(new Schema(
+                    PRINCIPAL,
+                    HOST,
+                    OPERATION,
+                    PERMISSION_TYPE))));
+
     private static final Schema DESCRIBE_ACLS_RESPONSE_V0 = new Schema(
             THROTTLE_TIME_MS,
             ERROR_CODE,
             ERROR_MESSAGE,
-            new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_ACLS_RESOURCE), 
"The resources and their associated ACLs."));
+            new Field(RESOURCES_KEY_NAME, new 
ArrayOf(DESCRIBE_ACLS_RESOURCE_V0), "The resources and their associated 
ACLs."));
+
+    private static final Schema DESCRIBE_ACLS_RESPONSE_V1 = new Schema(
+            THROTTLE_TIME_MS,
+            ERROR_CODE,
+            ERROR_MESSAGE,
+            new Field(RESOURCES_KEY_NAME, new 
ArrayOf(DESCRIBE_ACLS_RESOURCE_V1), "The resources and their associated 
ACLs."));
 
     public static Schema[] schemaVersions() {
-        return new Schema[]{DESCRIBE_ACLS_RESPONSE_V0};
+        return new Schema[]{DESCRIBE_ACLS_RESPONSE_V0, 
DESCRIBE_ACLS_RESPONSE_V1};
     }
 
     private final int throttleTimeMs;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java 
b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
index a1c27b725b8..a1e4453cdb7 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RequestUtils.java
@@ -24,6 +24,7 @@
 import org.apache.kafka.common.resource.Resource;
 import org.apache.kafka.common.resource.ResourceFilter;
 import org.apache.kafka.common.resource.ResourceType;
+import org.apache.kafka.common.resource.ResourceNameType;
 
 import static org.apache.kafka.common.protocol.CommonFields.HOST;
 import static org.apache.kafka.common.protocol.CommonFields.HOST_FILTER;
@@ -33,6 +34,7 @@
 import static org.apache.kafka.common.protocol.CommonFields.PRINCIPAL_FILTER;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME;
 import static 
org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_FILTER;
+import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_NAME_TYPE;
 import static org.apache.kafka.common.protocol.CommonFields.RESOURCE_TYPE;
 
 final class RequestUtils {
@@ -42,23 +44,27 @@ private RequestUtils() {}
     static Resource resourceFromStructFields(Struct struct) {
         byte resourceType = struct.get(RESOURCE_TYPE);
         String name = struct.get(RESOURCE_NAME);
-        return new Resource(ResourceType.fromCode(resourceType), name);
+        byte resourceNameType = struct.get(RESOURCE_NAME_TYPE);
+        return new Resource(ResourceType.fromCode(resourceType), name, 
ResourceNameType.fromCode(resourceNameType));
     }
 
     static void resourceSetStructFields(Resource resource, Struct struct) {
         struct.set(RESOURCE_TYPE, resource.resourceType().code());
         struct.set(RESOURCE_NAME, resource.name());
+        struct.set(RESOURCE_NAME_TYPE, resource.resourceNameType().code());
     }
 
     static ResourceFilter resourceFilterFromStructFields(Struct struct) {
         byte resourceType = struct.get(RESOURCE_TYPE);
         String name = struct.get(RESOURCE_NAME_FILTER);
-        return new ResourceFilter(ResourceType.fromCode(resourceType), name);
+        byte resourceNameType = struct.get(RESOURCE_NAME_TYPE);
+        return new ResourceFilter(ResourceType.fromCode(resourceType), name, 
ResourceNameType.fromCode(resourceNameType));
     }
 
     static void resourceFilterSetStructFields(ResourceFilter resourceFilter, 
Struct struct) {
         struct.set(RESOURCE_TYPE, resourceFilter.resourceType().code());
         struct.set(RESOURCE_NAME_FILTER, resourceFilter.name());
+        struct.set(RESOURCE_NAME_TYPE, 
resourceFilter.resourceNameType().code());
     }
 
     static AccessControlEntry aceFromStructFields(Struct struct) {
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/Resource.java 
b/clients/src/main/java/org/apache/kafka/common/requests/Resource.java
index bd814661ae3..b7cbae65a90 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/Resource.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/Resource.java
@@ -20,10 +20,26 @@
 public final class Resource {
     private final ResourceType type;
     private final String name;
+    private final ResourceNameType resourceNameType;
 
-    public Resource(ResourceType type, String name) {
+    /**
+     * @param type resource type
+     * @param name resouce name
+     * @param resourceNameType resource name type
+     */
+    public Resource(ResourceType type, String name, ResourceNameType 
resourceNameType) {
         this.type = type;
         this.name = name;
+        this.resourceNameType = resourceNameType;
+    }
+
+    /**
+     * Resource name type would default to ResourceNameType.LITERAL.
+     * @param type resource type
+     * @param name resource name type
+     */
+    public Resource(ResourceType type, String name) {
+        this(type, name, ResourceNameType.LITERAL);
     }
 
     public ResourceType type() {
@@ -34,6 +50,10 @@ public String name() {
         return name;
     }
 
+    public ResourceNameType resourceNameType() {
+        return resourceNameType;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o)
@@ -43,18 +63,19 @@ public boolean equals(Object o) {
 
         Resource resource = (Resource) o;
 
-        return type == resource.type && name.equals(resource.name);
+        return type == resource.type && name.equals(resource.name) && 
resourceNameType.equals(resource.resourceNameType);
     }
 
     @Override
     public int hashCode() {
         int result = type.hashCode();
         result = 31 * result + name.hashCode();
+        result = 31 * result + resourceNameType.hashCode();
         return result;
     }
 
     @Override
     public String toString() {
-        return "Resource(type=" + type + ", name='" + name + "')";
+        return "Resource(type=" + type + ", name='" + name + "', 
resourceNameType=" + resourceNameType + ")";
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ResourceNameType.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ResourceNameType.java
new file mode 100644
index 00000000000..142a3f31b22
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/ResourceNameType.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+@InterfaceStability.Evolving
+public enum ResourceNameType {
+    /**
+     * Represents any ResourceNameType which this client cannot understand,
+     * perhaps because this client is too old.
+     */
+    UNKNOWN((byte) 0),
+
+    /**
+     * In a filter, matches any ResourceNameType.
+     */
+    ANY((byte) 1),
+
+    /**
+     * A Kafka topic.
+     */
+    LITERAL((byte) 2),
+
+    /**
+     * A consumer group.
+     */
+    WILDCARD_SUFFIXED((byte) 3);
+
+    private final static Map<Byte, ResourceNameType> CODE_TO_VALUE;
+
+    static {
+        final Map<Byte, ResourceNameType> codeToValues = new HashMap<>();
+        for (ResourceNameType resourceType : ResourceNameType.values()) {
+            codeToValues.put(resourceType.code, resourceType);
+        }
+        CODE_TO_VALUE = Collections.unmodifiableMap(codeToValues);
+    }
+
+    private final byte code;
+
+    ResourceNameType(byte code) {
+        this.code = code;
+    }
+
+    /**
+     * @return Return code.
+     */
+    public byte code() {
+        return code;
+    }
+
+    /**
+     * Return the ResourceNameType with the provided code or 
`ResourceNameType.UNKNOWN` if one cannot be found.
+     */
+    public static ResourceNameType fromCode(byte code) {
+        ResourceNameType resourceNameType = CODE_TO_VALUE.get(code);
+        if (resourceNameType == null) {
+            return UNKNOWN;
+        }
+        return resourceNameType;
+    }
+
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/resource/Resource.java 
b/clients/src/main/java/org/apache/kafka/common/resource/Resource.java
index f41f41a04b6..ebebd9408ca 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/Resource.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/Resource.java
@@ -30,6 +30,7 @@
 public class Resource {
     private final ResourceType resourceType;
     private final String name;
+    private final ResourceNameType resourceNameType;
 
     /**
      * The name of the CLUSTER resource.
@@ -46,12 +47,26 @@
      *
      * @param resourceType non-null resource type
      * @param name non-null resource name
+     * @param resourceNameType non-null resource name type
      */
-    public Resource(ResourceType resourceType, String name) {
+    public Resource(ResourceType resourceType, String name, ResourceNameType 
resourceNameType) {
         Objects.requireNonNull(resourceType);
         this.resourceType = resourceType;
         Objects.requireNonNull(name);
         this.name = name;
+        Objects.requireNonNull(resourceNameType);
+        this.resourceNameType = resourceNameType;
+    }
+
+    /**
+     * Create an instance of this class with the provided parameters.
+     * Resource name type would default to ResourceNameType.LITERAL.
+     *
+     * @param resourceType non-null resource type
+     * @param name non-null resource name
+     */
+    public Resource(ResourceType resourceType, String name) {
+        this(resourceType, name, ResourceNameType.LITERAL);
     }
 
     /**
@@ -61,6 +76,13 @@ public ResourceType resourceType() {
         return resourceType;
     }
 
+    /**
+     * Return the resource name type.
+     */
+    public ResourceNameType resourceNameType() {
+        return resourceNameType;
+    }
+
     /**
      * Return the resource name.
      */
@@ -72,19 +94,19 @@ public String name() {
      * Create a filter which matches only this Resource.
      */
     public ResourceFilter toFilter() {
-        return new ResourceFilter(resourceType, name);
+        return new ResourceFilter(resourceType, name, resourceNameType);
     }
 
     @Override
     public String toString() {
-        return "(resourceType=" + resourceType + ", name=" + ((name == null) ? 
"<any>" : name) + ")";
+        return "(resourceType=" + resourceType + ", name=" + ((name == null) ? 
"<any>" : name) + ", resourceNameType=" + resourceNameType + ")";
     }
 
     /**
      * Return true if this Resource has any UNKNOWN components.
      */
     public boolean isUnknown() {
-        return resourceType.isUnknown();
+        return resourceType.isUnknown() || resourceNameType.isUnknown();
     }
 
     @Override
@@ -92,11 +114,13 @@ public boolean equals(Object o) {
         if (!(o instanceof Resource))
             return false;
         Resource other = (Resource) o;
-        return resourceType.equals(other.resourceType) && Objects.equals(name, 
other.name);
+        return resourceType.equals(other.resourceType)
+                && Objects.equals(name, other.name)
+                && resourceNameType.equals(other.resourceNameType);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(resourceType, name);
+        return Objects.hash(resourceType, name, resourceNameType);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java 
b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
index 0a4611f9874..c7e29fa65dd 100644
--- a/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
+++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceFilter.java
@@ -21,6 +21,8 @@
 
 import java.util.Objects;
 
+import static org.apache.kafka.common.resource.ResourceUtils.matchResource;
+
 /**
  * A filter which matches Resource objects.
  *
@@ -30,22 +32,37 @@
 public class ResourceFilter {
     private final ResourceType resourceType;
     private final String name;
+    private final ResourceNameType resourceNameType;
 
     /**
      * Matches any resource.
      */
-    public static final ResourceFilter ANY = new 
ResourceFilter(ResourceType.ANY, null);
+    public static final ResourceFilter ANY = new 
ResourceFilter(ResourceType.ANY, null, ResourceNameType.ANY);
 
     /**
      * Create an instance of this class with the provided parameters.
+     * Resource name type defaults to ResourceNameType.LITERAL
      *
      * @param resourceType non-null resource type
      * @param name resource name or null
      */
     public ResourceFilter(ResourceType resourceType, String name) {
+        this(resourceType, name, ResourceNameType.LITERAL);
+    }
+
+    /**
+     * Create an instance of this class with the provided parameters.
+     *
+     * @param resourceType non-null resource type
+     * @param name resource name or null
+     * @param resourceNameType non-null resource name type
+     */
+    public ResourceFilter(ResourceType resourceType, String name, 
ResourceNameType resourceNameType) {
         Objects.requireNonNull(resourceType);
         this.resourceType = resourceType;
         this.name = name;
+        Objects.requireNonNull(resourceNameType);
+        this.resourceNameType = resourceNameType;
     }
 
     /**
@@ -62,16 +79,23 @@ public String name() {
         return name;
     }
 
+    /**
+     * Return the resource name type.
+     */
+    public ResourceNameType resourceNameType() {
+        return resourceNameType;
+    }
+
     @Override
     public String toString() {
-        return "(resourceType=" + resourceType + ", name=" + ((name == null) ? 
"<any>" : name) + ")";
+        return "(resourceType=" + resourceType + ", name=" + ((name == null) ? 
"<any>" : name) + ", resourceNameType=" + resourceNameType + ")";
     }
 
     /**
      * Return true if this ResourceFilter has any UNKNOWN components.
      */
     public boolean isUnknown() {
-        return resourceType.isUnknown();
+        return resourceType.isUnknown() || resourceNameType.isUnknown();
     }
 
     @Override
@@ -79,23 +103,21 @@ public boolean equals(Object o) {
         if (!(o instanceof ResourceFilter))
             return false;
         ResourceFilter other = (ResourceFilter) o;
-        return resourceType.equals(other.resourceType) && Objects.equals(name, 
other.name);
+        return resourceType.equals(other.resourceType)
+                && Objects.equals(name, other.name)
+                && Objects.equals(resourceNameType, other.resourceNameType);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(resourceType, name);
+        return Objects.hash(resourceType, name, resourceNameType);
     }
 
     /**
      * Return true if this filter matches the given Resource.
      */
     public boolean matches(Resource other) {
-        if ((name != null) && (!name.equals(other.name())))
-            return false;
-        if ((resourceType != ResourceType.ANY) && 
(!resourceType.equals(other.resourceType())))
-            return false;
-        return true;
+        return matchResource(other, this);
     }
 
     /**
@@ -115,6 +137,10 @@ public String findIndefiniteField() {
             return "Resource type is UNKNOWN.";
         if (name == null)
             return "Resource name is NULL.";
+        if (resourceNameType == ResourceNameType.ANY)
+            return "Resource name type is ANY.";
+        if (resourceNameType == ResourceNameType.UNKNOWN)
+            return "Resource name type is UNKNOWN.";
         return null;
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/resource/ResourceNameType.java 
b/clients/src/main/java/org/apache/kafka/common/resource/ResourceNameType.java
new file mode 100644
index 00000000000..623fcf77368
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/resource/ResourceNameType.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.resource;
+
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Resource name type.
+ */
+@InterfaceStability.Evolving
+public enum ResourceNameType {
+    /**
+     * Represents any ResourceType which this client cannot understand,
+     * perhaps because this client is too old.
+     */
+    UNKNOWN((byte) 0),
+
+    /**
+     * In a filter, matches any ResourceType.
+     */
+    ANY((byte) 1),
+
+    /**
+     * A Kafka topic.
+     */
+    LITERAL((byte) 2),
+
+    /**
+     * A consumer group.
+     */
+    WILDCARD_SUFFIXED((byte) 3);
+
+    private final static Map<Byte, ResourceNameType> CODE_TO_VALUE;
+
+    static {
+        final Map<Byte, ResourceNameType> codeToValues = new HashMap<>();
+        for (ResourceNameType resourceType : ResourceNameType.values()) {
+            codeToValues.put(resourceType.code, resourceType);
+        }
+        CODE_TO_VALUE = Collections.unmodifiableMap(codeToValues);
+    }
+
+    private final byte code;
+
+    ResourceNameType(byte code) {
+        this.code = code;
+    }
+
+    /**
+     * Return the code of this resource.
+     */
+    public byte code() {
+        return code;
+    }
+
+    /**
+     * Return the ResourceNameType with the provided code or 
`ResourceNameType.UNKNOWN` if one cannot be found.
+     */
+    public static ResourceNameType fromCode(byte code) {
+        return CODE_TO_VALUE.getOrDefault(code, UNKNOWN);
+    }
+
+    /**
+     * Return whether this resource name type is UNKNOWN.
+     */
+    public boolean isUnknown() {
+        return this == UNKNOWN;
+    }
+
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/resource/ResourceUtils.java 
b/clients/src/main/java/org/apache/kafka/common/resource/ResourceUtils.java
new file mode 100644
index 00000000000..c665a021325
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/resource/ResourceUtils.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.resource;
+
+public final class ResourceUtils {
+
+    private ResourceUtils() {}
+
+    public static final String WILDCARD_MARKER = "*";
+
+    public static boolean matchResource(Resource stored, Resource input) {
+        return matchResource(stored, input.toFilter());
+    }
+
+    public static boolean matchResource(Resource stored, ResourceFilter input) 
{ // TODO matching criteria should be different for delete call?
+        if (!input.resourceType().equals(ResourceType.ANY) && 
!input.resourceType().equals(stored.resourceType())) {
+            return false;
+        }
+        switch (stored.resourceNameType()) {
+            case LITERAL:
+                switch (input.resourceNameType()) {
+                    case ANY:
+                        return input.name() == null
+                                || input.name().equals(stored.name())
+                                || stored.name().equals(WILDCARD_MARKER);
+                    case LITERAL:
+                        return input.name() == null
+                                || stored.name().equals(input.name())
+                                || stored.name().equals(WILDCARD_MARKER);
+                    case WILDCARD_SUFFIXED:
+                        return false;
+                    default:
+                        return false;
+                }
+            case WILDCARD_SUFFIXED:
+                switch (input.resourceNameType()) {
+                    case ANY:
+                        return input.name() == null
+                                || matchWildcardSuffixedString(stored.name() + 
WILDCARD_MARKER, input.name())
+                                || stored.name().equals(input.name());
+                    case LITERAL:
+                        return input.name() == null
+                                || matchWildcardSuffixedString(stored.name() + 
WILDCARD_MARKER, input.name());
+                    case WILDCARD_SUFFIXED:
+                        return stored.name().equals(input.name());
+                    default:
+                        return false;
+                }
+            default:
+                return false;
+        }
+    }
+
+    /**
+     * Returns true if two strings match, both of which might end with a 
WILDCARD_MARKER (which matches everything).
+     * Examples:
+     *   matchWildcardSuffixedString("rob", "rob") => true
+     *   matchWildcardSuffixedString("*", "rob") => true
+     *   matchWildcardSuffixedString("ro*", "rob") => true
+     *   matchWildcardSuffixedString("rob", "bob") => false
+     *   matchWildcardSuffixedString("ro*", "bob") => false
+     *
+     *   matchWildcardSuffixedString("rob", "*") => true
+     *   matchWildcardSuffixedString("rob", "ro*") => true
+     *   matchWildcardSuffixedString("bob", "ro*") => false
+     *
+     *   matchWildcardSuffixedString("ro*", "ro*") => true
+     *   matchWildcardSuffixedString("rob*", "ro*") => false
+     *   matchWildcardSuffixedString("ro*", "rob*") => true
+     *
+     * @param wildcardSuffixedPattern Value stored in ZK in either resource 
name or Acl.
+     * @param resourceName Value present in the request.
+     * @return true if there is a match (including wildcard-suffix matching).
+     */
+    static boolean matchWildcardSuffixedString(String wildcardSuffixedPattern, 
String resourceName) { // TODO review this method again after design changes
+
+        if (wildcardSuffixedPattern.equals(resourceName) || 
wildcardSuffixedPattern.equals(WILDCARD_MARKER) || 
resourceName.equals(WILDCARD_MARKER)) {
+            // if strings are equal or either of acl or resourceName is a 
wildcard
+            return true;
+        }
+
+        if (wildcardSuffixedPattern.endsWith(WILDCARD_MARKER)) {
+
+            String aclPrefix = wildcardSuffixedPattern.substring(0, 
wildcardSuffixedPattern.length() - WILDCARD_MARKER.length());
+
+            if (resourceName.endsWith(WILDCARD_MARKER)) {
+                // when both acl and resourceName ends with wildcard, 
non-wildcard prefix of resourceName should start with non-wildcard prefix of acl
+                String inputPrefix = resourceName.substring(0, 
resourceName.length() - WILDCARD_MARKER.length());
+                return inputPrefix.startsWith(aclPrefix);
+            }
+
+            // when acl ends with wildcard but resourceName doesn't, then 
resourceName should start with non-wildcard prefix of acl
+            return resourceName.startsWith(aclPrefix);
+
+        } else {
+
+            if (resourceName.endsWith(WILDCARD_MARKER)) {
+                // when resourceName ends with wildcard but acl doesn't, then 
acl should start with non-wildcard prefix of resourceName
+                String inputPrefix = resourceName.substring(0, 
resourceName.length() - WILDCARD_MARKER.length());
+                return wildcardSuffixedPattern.startsWith(inputPrefix);
+            }
+
+            // when neither acl nor resourceName ends with wildcard, they have 
to match exactly.
+            return wildcardSuffixedPattern.equals(resourceName);
+
+        }
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java 
b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
index 0ebcdfedb4f..fac18e87057 100644
--- a/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/acl/AclBindingTest.java
@@ -21,8 +21,8 @@
 import org.apache.kafka.common.resource.ResourceType;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 public class AclBindingTest {
@@ -43,11 +43,11 @@
         new AccessControlEntry("User:ANONYMOUS", "127.0.0.1", 
AclOperation.UNKNOWN, AclPermissionType.DENY));
 
     private static final AclBindingFilter ANY_ANONYMOUS = new AclBindingFilter(
-        new ResourceFilter(ResourceType.ANY, null),
+        ResourceFilter.ANY,
         new AccessControlEntryFilter("User:ANONYMOUS", null, AclOperation.ANY, 
AclPermissionType.ANY));
 
     private static final AclBindingFilter ANY_DENY = new AclBindingFilter(
-        new ResourceFilter(ResourceType.ANY, null),
+        ResourceFilter.ANY,
         new AccessControlEntryFilter(null, null, AclOperation.ANY, 
AclPermissionType.DENY));
 
     private static final AclBindingFilter ANY_MYTOPIC = new AclBindingFilter(
@@ -103,9 +103,9 @@ public void testUnknowns() throws Exception {
 
     @Test
     public void testMatchesAtMostOne() throws Exception {
-        assertEquals(null, ACL1.toFilter().findIndefiniteField());
-        assertEquals(null, ACL2.toFilter().findIndefiniteField());
-        assertEquals(null, ACL3.toFilter().findIndefiniteField());
+        assertNull(ACL1.toFilter().findIndefiniteField());
+        assertNull(ACL2.toFilter().findIndefiniteField());
+        assertNull(ACL3.toFilter().findIndefiniteField());
         assertFalse(ANY_ANONYMOUS.matchesAtMostOne());
         assertFalse(ANY_DENY.matchesAtMostOne());
         assertFalse(ANY_MYTOPIC.matchesAtMostOne());
diff --git 
a/clients/src/test/java/org/apache/kafka/common/resource/ResourceUtilsTest.java 
b/clients/src/test/java/org/apache/kafka/common/resource/ResourceUtilsTest.java
new file mode 100644
index 00000000000..14429a39ce9
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/resource/ResourceUtilsTest.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.resource;
+
+import org.junit.Test;
+
+import static org.apache.kafka.common.resource.ResourceUtils.WILDCARD_MARKER;
+import static org.apache.kafka.common.resource.ResourceUtils.matchResource;
+import static 
org.apache.kafka.common.resource.ResourceUtils.matchWildcardSuffixedString;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ResourceUtilsTest {
+
+    @Test
+    public void testmatchWildcardSuffixedString() {
+        // everything should match wildcard string
+        assertTrue(matchWildcardSuffixedString(WILDCARD_MARKER, 
WILDCARD_MARKER));
+        assertTrue(matchWildcardSuffixedString(WILDCARD_MARKER, "f"));
+        assertTrue(matchWildcardSuffixedString(WILDCARD_MARKER, "foo"));
+        assertTrue(matchWildcardSuffixedString(WILDCARD_MARKER, "fo" + 
WILDCARD_MARKER));
+        assertTrue(matchWildcardSuffixedString(WILDCARD_MARKER, "f" + 
WILDCARD_MARKER));
+
+        assertTrue(matchWildcardSuffixedString("f", WILDCARD_MARKER));
+        assertTrue(matchWildcardSuffixedString("f", "f"));
+        assertTrue(matchWildcardSuffixedString("f", "f" + WILDCARD_MARKER));
+        assertFalse(matchWildcardSuffixedString("f", "foo"));
+        assertFalse(matchWildcardSuffixedString("f", "fo" + WILDCARD_MARKER));
+
+        assertTrue(matchWildcardSuffixedString("foo", WILDCARD_MARKER));
+        assertTrue(matchWildcardSuffixedString("foo", "foo"));
+        assertTrue(matchWildcardSuffixedString("foo", "fo" + WILDCARD_MARKER));
+        assertTrue(matchWildcardSuffixedString("foo", "f" + WILDCARD_MARKER));
+        assertTrue(matchWildcardSuffixedString("foo", "foo" + 
WILDCARD_MARKER));
+        assertFalse(matchWildcardSuffixedString("foo", "f"));
+        assertFalse(matchWildcardSuffixedString("foo", "foot" + 
WILDCARD_MARKER));
+
+        assertTrue(matchWildcardSuffixedString("fo" + WILDCARD_MARKER, 
WILDCARD_MARKER));
+        assertTrue(matchWildcardSuffixedString("fo" + WILDCARD_MARKER, "fo"));
+        assertTrue(matchWildcardSuffixedString("fo" + WILDCARD_MARKER, "foo"));
+        assertTrue(matchWildcardSuffixedString("fo" + WILDCARD_MARKER, 
"foot"));
+        assertTrue(matchWildcardSuffixedString("fo" + WILDCARD_MARKER, "fo" + 
WILDCARD_MARKER));
+
+        assertTrue(matchWildcardSuffixedString("fo" + WILDCARD_MARKER, "foo" + 
WILDCARD_MARKER));
+        assertTrue(matchWildcardSuffixedString("fo" + WILDCARD_MARKER, "foot" 
+ WILDCARD_MARKER));
+        assertFalse(matchWildcardSuffixedString("fo" + WILDCARD_MARKER, "f"));
+        assertFalse(matchWildcardSuffixedString("fo" + WILDCARD_MARKER, "f" + 
WILDCARD_MARKER));
+    }
+
+    @Test
+    public void testMatchResource() {
+        // same resource should match
+        assertTrue(
+                matchResource(
+                        new Resource(ResourceType.TOPIC, 
"ResourceType.TOPICA"),
+                        new Resource(ResourceType.TOPIC, "ResourceType.TOPICA")
+                )
+        );
+
+        // different resource shouldn't match
+        assertFalse(
+                matchResource(
+                        new Resource(ResourceType.TOPIC, 
"ResourceType.TOPICA"),
+                        new Resource(ResourceType.TOPIC, "ResourceType.TOPICB")
+                )
+        );
+        assertFalse(
+                matchResource(
+                        new Resource(ResourceType.TOPIC, 
"ResourceType.TOPICA"),
+                        new Resource(ResourceType.GROUP, "ResourceType.TOPICA")
+                )
+        );
+
+        // wildcard resource should match
+        assertTrue(
+                matchResource(
+                        new Resource(ResourceType.TOPIC, WILDCARD_MARKER),
+                        new Resource(ResourceType.TOPIC, "ResourceType.TOPICA")
+                )
+        );
+
+        // wildcard-suffix resource should match
+        assertTrue(
+                matchResource(
+                        new Resource(ResourceType.TOPIC, "ResourceType.TOPIC", 
ResourceNameType.WILDCARD_SUFFIXED),
+                        new Resource(ResourceType.TOPIC, "ResourceType.TOPICA")
+                )
+        );
+        assertFalse(
+                matchResource(
+                        new Resource(ResourceType.TOPIC, "ResourceType.TOPIC", 
ResourceNameType.WILDCARD_SUFFIXED),
+                        new Resource(ResourceType.TOPIC, "topiA")
+                )
+        );
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/utils/SecurityUtilsTest.java 
b/clients/src/test/java/org/apache/kafka/common/utils/SecurityUtilsTest.java
index 273c13abbfe..b40fc11660e 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/SecurityUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/SecurityUtilsTest.java
@@ -34,7 +34,7 @@ public void testPrincipalNameCanContainSeparator() {
     @Test
     public void testParseKafkaPrincipalWithNonUserPrincipalType() {
         String name = "foo";
-        String principalType = "Group";
+        String principalType = "GROUP";
         KafkaPrincipal principal = 
SecurityUtils.parseKafkaPrincipal(principalType + ":" + name);
         assertEquals(principalType, principal.getPrincipalType());
         assertEquals(name, principal.getName());
diff --git a/core/src/main/scala/kafka/security/SecurityUtils.scala 
b/core/src/main/scala/kafka/security/SecurityUtils.scala
index 573a16b8c58..24af79cb5dd 100644
--- a/core/src/main/scala/kafka/security/SecurityUtils.scala
+++ b/core/src/main/scala/kafka/security/SecurityUtils.scala
@@ -17,7 +17,7 @@
 
 package kafka.security
 
-import kafka.security.auth.{Acl, Operation, PermissionType, Resource, 
ResourceType}
+import kafka.security.auth.{Acl, Operation, PermissionType, Resource, 
ResourceNameType, ResourceType}
 import org.apache.kafka.common.acl.{AccessControlEntry, AclBinding, 
AclBindingFilter}
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.ApiError
@@ -32,10 +32,11 @@ object SecurityUtils {
   def convertToResourceAndAcl(filter: AclBindingFilter): Either[ApiError, 
(Resource, Acl)] = {
     (for {
       resourceType <- 
Try(ResourceType.fromJava(filter.resourceFilter.resourceType))
+      resourceNameType <- 
Try(ResourceNameType.fromJava(filter.resourceFilter.resourceNameType))
       principal <- Try(KafkaPrincipal.fromString(filter.entryFilter.principal))
       operation <- Try(Operation.fromJava(filter.entryFilter.operation))
       permissionType <- 
Try(PermissionType.fromJava(filter.entryFilter.permissionType))
-      resource = Resource(resourceType, filter.resourceFilter.name)
+      resource = Resource(resourceType, filter.resourceFilter.name, 
resourceNameType)
       acl = Acl(principal, permissionType, filter.entryFilter.host, operation)
     } yield (resource, acl)) match {
       case Failure(throwable) => Left(new ApiError(Errors.INVALID_REQUEST, 
throwable.getMessage))
@@ -44,7 +45,7 @@ object SecurityUtils {
   }
 
   def convertToAclBinding(resource: Resource, acl: Acl): AclBinding = {
-    val adminResource = new AdminResource(resource.resourceType.toJava, 
resource.name)
+    val adminResource = new AdminResource(resource.resourceType.toJava, 
resource.name, resource.resourceNameType.toJava)
     val entry = new AccessControlEntry(acl.principal.toString, 
acl.host.toString,
       acl.operation.toJava, acl.permissionType.toJava)
     new AclBinding(adminResource, entry)
diff --git a/core/src/main/scala/kafka/security/auth/Acl.scala 
b/core/src/main/scala/kafka/security/auth/Acl.scala
index 67f3d9592f2..29376f7821f 100644
--- a/core/src/main/scala/kafka/security/auth/Acl.scala
+++ b/core/src/main/scala/kafka/security/auth/Acl.scala
@@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
 object Acl {
   val WildCardPrincipal: KafkaPrincipal = new 
KafkaPrincipal(KafkaPrincipal.USER_TYPE, "*")
   val WildCardHost: String = "*"
+  val WildCardString: String = "*"
   val AllowAllAcl = new Acl(WildCardPrincipal, Allow, WildCardHost, All)
   val PrincipalKey = "principal"
   val PermissionTypeKey = "permissionType"
diff --git a/core/src/main/scala/kafka/security/auth/Authorizer.scala 
b/core/src/main/scala/kafka/security/auth/Authorizer.scala
index 6f4ca0eb522..d0672cdf01b 100644
--- a/core/src/main/scala/kafka/security/auth/Authorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/Authorizer.scala
@@ -37,7 +37,7 @@ trait Authorizer extends Configurable {
   /**
    * @param session The session being authenticated.
    * @param operation Type of operation client is trying to perform on 
resource.
-   * @param resource Resource the client is trying to access.
+   * @param resource Resource the client is trying to access. Resource name 
type is always literal in input resource.
    * @return true if the operation should be permitted, false otherwise
    */
   def authorize(session: Session, operation: Operation, resource: Resource): 
Boolean
@@ -60,21 +60,21 @@ trait Authorizer extends Configurable {
 
   /**
    * remove a resource along with all of its acls from acl store.
-   * @param resource
+   * @param resource the resource from which these acls should be removed.
    * @return
    */
   def removeAcls(resource: Resource): Boolean
 
   /**
    * get set of acls for this resource
-   * @param resource
+   * @param resource the resource to which the acls belong.
    * @return empty set if no acls are found, otherwise the acls for the 
resource.
    */
   def getAcls(resource: Resource): Set[Acl]
 
   /**
    * get the acls for this principal.
-   * @param principal
+   * @param principal principal name.
    * @return empty Map if no acls exist for this principal, otherwise a map of 
resource -> acls for the principal.
    */
   def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]]
@@ -90,4 +90,3 @@ trait Authorizer extends Configurable {
   def close(): Unit
 
 }
-
diff --git a/core/src/main/scala/kafka/security/auth/Resource.scala 
b/core/src/main/scala/kafka/security/auth/Resource.scala
index 311f5b5083a..e7ac9c0b549 100644
--- a/core/src/main/scala/kafka/security/auth/Resource.scala
+++ b/core/src/main/scala/kafka/security/auth/Resource.scala
@@ -29,6 +29,9 @@ object Resource {
       case _ => throw new IllegalArgumentException("expected a string in 
format ResourceType:ResourceName but got " + str)
     }
   }
+
+    def apply(resourceType: ResourceType, name: String) = new 
Resource(resourceType, name, Literal)
+
 }
 
 /**
@@ -36,10 +39,16 @@ object Resource {
  * @param resourceType type of resource.
  * @param name name of the resource, for topic this will be topic name , for 
group it will be group name. For cluster type
  *             it will be a constant string kafka-cluster.
+ * @param resourceNameType type of resource name: literal, wildcard-suffixed, 
etc.
  */
-case class Resource(resourceType: ResourceType, name: String) {
+case class Resource(resourceType: ResourceType, name: String, 
resourceNameType: ResourceNameType) {
+
+  def this(resourceType: ResourceType, name: String) {
+    this(resourceType, name, Literal)
+  }
 
   override def toString: String = {
+    // can't be changed because it will break backward compatibility with acl 
change notification
     resourceType.name + Resource.Separator + name
   }
 }
diff --git a/core/src/main/scala/kafka/security/auth/ResourceNameType.scala 
b/core/src/main/scala/kafka/security/auth/ResourceNameType.scala
new file mode 100644
index 00000000000..3765d8deeff
--- /dev/null
+++ b/core/src/main/scala/kafka/security/auth/ResourceNameType.scala
@@ -0,0 +1,47 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  * <p/>
+  * http://www.apache.org/licenses/LICENSE-2.0
+  * <p/>
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.security.auth
+
+import kafka.common.{BaseEnum, KafkaException}
+import org.apache.kafka.common.resource.{ResourceNameType => JResourceNameType}
+
+sealed trait ResourceNameType extends BaseEnum {
+  def toJava: JResourceNameType
+}
+
+case object Literal extends ResourceNameType {
+  val name = "Literal"
+  val toJava = JResourceNameType.LITERAL
+}
+
+case object WildcardSuffixed extends ResourceNameType {
+  val name = "WildcardSuffixed"
+  val toJava = JResourceNameType.WILDCARD_SUFFIXED
+}
+
+object ResourceNameType {
+
+  def fromString(resourceNameType: String): ResourceNameType = {
+    val rType = values.find(rType => 
rType.name.equalsIgnoreCase(resourceNameType))
+    rType.getOrElse(throw new KafkaException(resourceNameType + " not a valid 
resourceNameType name. The valid names are " + values.mkString(",")))
+  }
+
+  def values: Seq[ResourceNameType] = List(Literal, WildcardSuffixed)
+
+  def fromJava(operation: JResourceNameType): ResourceNameType = 
fromString(operation.toString.replaceAll("_", ""))
+}
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala 
b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index c439f5e15af..4426da32fbf 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -24,10 +24,12 @@ import com.typesafe.scalalogging.Logger
 import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
 import kafka.network.RequestChannel.Session
 import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
+import kafka.security.auth.storage.AclStore
 import kafka.server.KafkaConfig
 import kafka.utils.CoreUtils.{inReadLock, inWriteLock}
 import kafka.utils._
-import kafka.zk.{AclChangeNotificationSequenceZNode, 
AclChangeNotificationZNode, KafkaZkClient}
+import kafka.zk.KafkaZkClient
+import org.apache.kafka.common.resource.ResourceUtils
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{SecurityUtils, Time}
 
@@ -54,8 +56,9 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   private val authorizerLogger = Logger("kafka.authorizer.logger")
   private var superUsers = Set.empty[KafkaPrincipal]
   private var shouldAllowEveryoneIfNoAclIsFound = false
-  private var zkClient: KafkaZkClient = null
-  private var aclChangeListener: ZkNodeChangeNotificationListener = null
+  private var zkClient: KafkaZkClient = _
+  private var aclChangeListener: ZkNodeChangeNotificationListener = _
+  private var wildcardSuffixedAclChangeListener: 
ZkNodeChangeNotificationListener = _
 
   private val aclCache = new scala.collection.mutable.HashMap[Resource, 
VersionedAcls]
   private val lock = new ReentrantReadWriteLock()
@@ -97,14 +100,16 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
     loadCache()
 
-    aclChangeListener = new ZkNodeChangeNotificationListener(zkClient, 
AclChangeNotificationZNode.path, 
AclChangeNotificationSequenceZNode.SequenceNumberPrefix, 
AclChangedNotificationHandler)
+    aclChangeListener = new ZkNodeChangeNotificationListener(zkClient, 
AclStore.literalAclStore.aclChangesZNode.path, 
AclStore.literalAclStore.aclChangeNotificationSequenceZNode.SequenceNumberPrefix,
 AclChangedNotificationHandler)
     aclChangeListener.init()
+    wildcardSuffixedAclChangeListener = new 
ZkNodeChangeNotificationListener(zkClient, 
AclStore.wildcardSuffixedAclStore.aclChangesZNode.path, 
AclStore.wildcardSuffixedAclStore.aclChangeNotificationSequenceZNode.SequenceNumberPrefix,
 AclChangedNotificationHandler)
+    wildcardSuffixedAclChangeListener.init()
   }
 
   override def authorize(session: Session, operation: Operation, resource: 
Resource): Boolean = {
     val principal = session.principal
     val host = session.clientAddress.getHostAddress
-    val acls = getAcls(resource) ++ getAcls(new 
Resource(resource.resourceType, Resource.WildCardResource))
+    val acls = getMatchingAcls(resource)
 
     // Check if there is any Deny acl match that would disallow this operation.
     val denyMatch = aclMatch(operation, resource, principal, host, Deny, acls)
@@ -143,18 +148,28 @@ class SimpleAclAuthorizer extends Authorizer with Logging 
{
     } else false
   }
 
-  private def aclMatch(operations: Operation, resource: Resource, principal: 
KafkaPrincipal, host: String, permissionType: PermissionType, acls: Set[Acl]): 
Boolean = {
+  private def aclMatch(operation: Operation, resource: Resource, principal: 
KafkaPrincipal, host: String, permissionType: PermissionType, acls: Set[Acl]): 
Boolean = {
     acls.find { acl =>
       acl.permissionType == permissionType &&
-        (acl.principal == principal || acl.principal == Acl.WildCardPrincipal) 
&&
-        (operations == acl.operation || acl.operation == All) &&
+        matchPrincipal(acl.principal, principal) &&
+        (operation == acl.operation || acl.operation == All) &&
         (acl.host == host || acl.host == Acl.WildCardHost)
     }.exists { acl =>
-      authorizerLogger.debug(s"operation = $operations on resource = $resource 
from host = $host is $permissionType based on acl = $acl")
+      authorizerLogger.debug(s"operation = $operation on resource = $resource 
from host = $host is $permissionType based on acl = $acl")
       true
     }
   }
 
+  /**
+    * @param valueInAcl KafkaPrincipal value present in Acl.
+    * @param input KafkaPrincipal present in the request.
+    * @return true if there is a match (including wildcard-suffix matching).
+    */
+  def matchPrincipal(valueInAcl: KafkaPrincipal, input: KafkaPrincipal): 
Boolean = {
+    (valueInAcl.getPrincipalType == input.getPrincipalType || 
valueInAcl.getPrincipalType == Acl.WildCardString) &&
+      (valueInAcl.getName.equals(input.getName) || 
valueInAcl.getName.equals(Acl.WildCardString))
+  }
+
   override def addAcls(acls: Set[Acl], resource: Resource) {
     if (acls != null && acls.nonEmpty) {
       inWriteLock(lock) {
@@ -191,13 +206,22 @@ class SimpleAclAuthorizer extends Authorizer with Logging 
{
   override def getAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]] = {
     inReadLock(lock) {
       aclCache.mapValues { versionedAcls =>
-        versionedAcls.acls.filter(_.principal == principal)
+        versionedAcls.acls.filter(acl => matchPrincipal(acl.principal, 
principal))
       }.filter { case (_, acls) =>
         acls.nonEmpty
       }.toMap
     }
   }
 
+  private def getMatchingAcls(resource: Resource): Set[Acl] = {
+    inReadLock(lock) {
+      aclCache.filterKeys(stored => ResourceUtils.matchResource(
+        new 
org.apache.kafka.common.resource.Resource(stored.resourceType.toJava, 
stored.name, stored.resourceNameType.toJava),
+        new 
org.apache.kafka.common.resource.Resource(resource.resourceType.toJava, 
resource.name, resource.resourceNameType.toJava)
+      )).flatMap(_._2.acls).toSet
+    }
+  }
+
   override def getAcls(): Map[Resource, Set[Acl]] = {
     inReadLock(lock) {
       aclCache.mapValues(_.acls).toMap
@@ -206,20 +230,23 @@ class SimpleAclAuthorizer extends Authorizer with Logging 
{
 
   def close() {
     if (aclChangeListener != null) aclChangeListener.close()
+    if (wildcardSuffixedAclChangeListener != null) 
wildcardSuffixedAclChangeListener.close()
     if (zkClient != null) zkClient.close()
   }
 
-  private def loadCache()  {
+  private def loadCache() {
     inWriteLock(lock) {
-      val resourceTypes = zkClient.getResourceTypes()
-      for (rType <- resourceTypes) {
-        val resourceType = ResourceType.fromString(rType)
-        val resourceNames = zkClient.getResourceNames(resourceType.name)
-        for (resourceName <- resourceNames) {
-          val versionedAcls = getAclsFromZk(Resource(resourceType, 
resourceName))
-          updateCache(new Resource(resourceType, resourceName), versionedAcls)
+      AclStore.AclStores.foreach(aclStore => {
+        val resourceTypes = zkClient.getResourceTypes(aclStore)
+        for (rType <- resourceTypes) {
+          val resourceType = ResourceType.fromString(rType)
+          val resourceNames = zkClient.getResourceNames(aclStore, resourceType)
+          for (resourceName <- resourceNames) {
+            val versionedAcls = getAclsFromZk(new Resource(resourceType, 
resourceName, aclStore.resourceNameType))
+            updateCache(new Resource(resourceType, resourceName, 
aclStore.resourceNameType), versionedAcls)
+          }
         }
-      }
+      })
     }
   }
 
@@ -305,7 +332,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
   }
 
   private def updateAclChangedFlag(resource: Resource) {
-    zkClient.createAclChangeNotification(resource.toString)
+    zkClient.createAclChangeNotification(AclStore.fromResource(resource), 
resource.toString)
   }
 
   private def backoffTime = {
diff --git a/core/src/main/scala/kafka/security/auth/storage/AclStore.scala 
b/core/src/main/scala/kafka/security/auth/storage/AclStore.scala
new file mode 100644
index 00000000000..c77b04b72e5
--- /dev/null
+++ b/core/src/main/scala/kafka/security/auth/storage/AclStore.scala
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.security.auth.storage
+
+import java.nio.charset.StandardCharsets.UTF_8
+
+import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
+import kafka.security.auth._
+import kafka.utils.{Json, ZkUtils}
+import org.apache.zookeeper.data.Stat
+
+import scala.collection.JavaConverters._
+import scala.collection.Seq
+
+/**
+  * Acl Store.
+  */
+class AclStore(aclZNod: AclZNode, aclChangesZNod: AclChangesZNode, 
resourceNameTyp: ResourceNameType) {
+  def aclZNode: AclZNode = aclZNod
+  def aclChangesZNode: AclChangesZNode = aclChangesZNod
+  def resourceNameType: ResourceNameType = resourceNameTyp
+  def resourceTypeZNode = ResourceTypeZNode(aclZNode)
+  def resourceZNode = ResourceZNode(aclZNode)
+  def aclChangeNotificationSequenceZNode = 
AclChangeNotificationSequenceZNode(aclChangesZNode)
+}
+
+case class AclZNode(nodePath: String) {
+  def path: String = nodePath
+}
+
+case class AclChangesZNode(nodePath: String) {
+  def path: String = nodePath
+}
+
+case class ResourceTypeZNode(aclZNode: AclZNode) {
+  def path(resourceType: ResourceType) = 
s"${aclZNode.path}/${resourceType.name}"
+}
+
+case class ResourceZNode(aclZNode: AclZNode) {
+  def path(resource: Resource) = 
s"${aclZNode.path}/${resource.resourceType.name}/${resource.name}"
+  def encode(acls: Set[Acl]): Array[Byte] = 
Json.encodeAsBytes(Acl.toJsonCompatibleMap(acls).asJava)
+  def decode(bytes: Array[Byte], stat: Stat): VersionedAcls = 
VersionedAcls(Acl.fromBytes(bytes), stat.getVersion)
+}
+
+case class AclChangeNotificationSequenceZNode(aclChangesZNode: 
AclChangesZNode) {
+  def SequenceNumberPrefix = "acl_changes_"
+  def createPath = s"${aclChangesZNode.path}/$SequenceNumberPrefix"
+  def deletePath(sequenceNode: String) = 
s"${aclChangesZNode.path}/$sequenceNode"
+  def encode(resourceName: String): Array[Byte] = resourceName.getBytes(UTF_8)
+  def decode(bytes: Array[Byte]): String = new String(bytes, UTF_8)
+}
+
+object AclStore {
+  val literalAclStore: AclStore = new AclStore(
+    AclZNode(
+      /**
+        * The root acl storage node. Under this node there will be one child 
node per resource type (Topic, Cluster, Group).
+        * under each resourceType there will be a unique child for each 
resource instance and the data for that child will contain
+        * list of its acls as a json object. Following gives an example:
+        *
+        * <pre>
+        * /kafka-acl/Topic/topic-1 => {"version": 1, "acls": [ { 
"host":"host1", "permissionType": "Allow","operation": "Read","principal": 
"User:alice"}]}
+        * /kafka-acl/Cluster/kafka-cluster => {"version": 1, "acls": [ { 
"host":"host1", "permissionType": "Allow","operation": "Read","principal": 
"User:alice"}]}
+        * /kafka-acl/Group/group-1 => {"version": 1, "acls": [ { 
"host":"host1", "permissionType": "Allow","operation": "Read","principal": 
"User:alice"}]}
+        * </pre>
+        */
+      ZkUtils.KafkaAclPath),
+    AclChangesZNode(
+      /**
+        * Notification node which gets updated with the resource name when acl 
on a resource is changed.
+        */
+      ZkUtils.KafkaAclChangesPath),
+    Literal
+  )
+
+  val wildcardSuffixedAclStore: AclStore = new AclStore(
+    AclZNode(ZkUtils.KafkaWildcardSuffixedAclPath),
+    AclChangesZNode(ZkUtils.KafkaWildcardSuffixedAclChangesPath),
+    WildcardSuffixed
+  )
+
+  val AclStores = Seq(literalAclStore, wildcardSuffixedAclStore)
+
+  def fromResource(resource: Resource): AclStore = {
+    
AclStores.find(_.resourceNameType.equals(resource.resourceNameType)).getOrElse(
+      throw new IllegalArgumentException("Unsupported resource name type: " + 
resource.resourceNameType)
+    )
+  }
+}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index f86026d9f95..971b62f7510 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1867,7 +1867,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         val filter = describeAclsRequest.filter()
         val returnedAcls = auth.getAcls.toSeq.flatMap { case (resource, acls) 
=>
           acls.flatMap { acl =>
-            val fixture = new AclBinding(new 
AdminResource(resource.resourceType.toJava, resource.name),
+            val fixture = new AclBinding(new 
AdminResource(resource.resourceType.toJava, resource.name, 
resource.resourceNameType.toJava),
                 new AccessControlEntry(acl.principal.toString, 
acl.host.toString, acl.operation.toJava, acl.permissionType.toJava))
             if (filter.matches(fixture)) Some(fixture)
             else None
@@ -1941,7 +1941,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           val filtersWithIndex = filters.zipWithIndex
           for ((resource, acls) <- aclMap; acl <- acls) {
             val binding = new AclBinding(
-              new AdminResource(resource.resourceType.toJava, resource.name),
+              new AdminResource(resource.resourceType.toJava, resource.name, 
resource.resourceNameType.toJava),
               new AccessControlEntry(acl.principal.toString, 
acl.host.toString, acl.operation.toJava,
                 acl.permissionType.toJava))
 
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala 
b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 0c162434541..b8e83dc8524 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -55,6 +55,8 @@ object ZkUtils {
   val LogDirEventNotificationPath = "/log_dir_event_notification"
   val KafkaAclPath = "/kafka-acl"
   val KafkaAclChangesPath = "/kafka-acl-changes"
+  val KafkaWildcardSuffixedAclPath = "/kafka-wildcard-acl"
+  val KafkaWildcardSuffixedAclChangesPath = "/kafka-wildcard-acl-changes"
 
   val ConsumersPath = "/consumers"
   val ClusterIdPath = s"$ClusterPath/id"
diff --git a/core/src/main/scala/kafka/zk/KafkaZkClient.scala 
b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
index 0cf158ecdad..d1d7805a255 100644
--- a/core/src/main/scala/kafka/zk/KafkaZkClient.scala
+++ b/core/src/main/scala/kafka/zk/KafkaZkClient.scala
@@ -26,6 +26,7 @@ import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
 import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
+import kafka.security.auth.storage.AclStore
 import kafka.security.auth.{Acl, Resource, ResourceType}
 import kafka.server.ConfigType
 import kafka.utils.Logging
@@ -943,9 +944,11 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
    * Creates the required zk nodes for Acl storage
    */
   def createAclPaths(): Unit = {
-    createRecursive(AclZNode.path, throwIfPathExists = false)
-    createRecursive(AclChangeNotificationZNode.path, throwIfPathExists = false)
-    ResourceType.values.foreach(resource => 
createRecursive(ResourceTypeZNode.path(resource.name), throwIfPathExists = 
false))
+    AclStore.AclStores.foreach(aclStore => {
+      createRecursive(aclStore.aclZNode.path, throwIfPathExists = false)
+      createRecursive(aclStore.aclChangesZNode.path, throwIfPathExists = false)
+      ResourceType.values.foreach(resourceType => 
createRecursive(aclStore.resourceTypeZNode.path(resourceType), 
throwIfPathExists = false))
+    })
   }
 
   /**
@@ -954,10 +957,11 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
    * @return  VersionedAcls
    */
   def getVersionedAclsForResource(resource: Resource): VersionedAcls = {
-    val getDataRequest = GetDataRequest(ResourceZNode.path(resource))
+    val resourceZNode = AclStore.fromResource(resource).resourceZNode
+    val getDataRequest = GetDataRequest(resourceZNode.path(resource))
     val getDataResponse = retryRequestUntilConnected(getDataRequest)
     getDataResponse.resultCode match {
-      case Code.OK => ResourceZNode.decode(getDataResponse.data, 
getDataResponse.stat)
+      case Code.OK => resourceZNode.decode(getDataResponse.data, 
getDataResponse.stat)
       case Code.NONODE => VersionedAcls(Set(), -1)
       case _ => throw getDataResponse.resultException.get
     }
@@ -972,18 +976,19 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
    * @return true if the update was successful and the new version
    */
   def conditionalSetOrCreateAclsForResource(resource: Resource, aclsSet: 
Set[Acl], expectedVersion: Int): (Boolean, Int) = {
+    val resourceZNode = AclStore.fromResource(resource).resourceZNode
     def set(aclData: Array[Byte],  expectedVersion: Int): SetDataResponse = {
-      val setDataRequest = SetDataRequest(ResourceZNode.path(resource), 
aclData, expectedVersion)
+      val setDataRequest = SetDataRequest(resourceZNode.path(resource), 
aclData, expectedVersion)
       retryRequestUntilConnected(setDataRequest)
     }
 
     def create(aclData: Array[Byte]): CreateResponse = {
-      val path = ResourceZNode.path(resource)
+      val path = resourceZNode.path(resource)
       val createRequest = CreateRequest(path, aclData, acls(path), 
CreateMode.PERSISTENT)
       retryRequestUntilConnected(createRequest)
     }
 
-    val aclData = ResourceZNode.encode(aclsSet)
+    val aclData = resourceZNode.encode(aclsSet)
 
     val setDataResponse = set(aclData, expectedVersion)
     setDataResponse.resultCode match {
@@ -1003,11 +1008,13 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
 
   /**
    * Creates Acl change notification message
+   * @param aclStore acl store
    * @param resourceName resource name
    */
-  def createAclChangeNotification(resourceName: String): Unit = {
-    val path = AclChangeNotificationSequenceZNode.createPath
-    val createRequest = CreateRequest(path, 
AclChangeNotificationSequenceZNode.encode(resourceName), acls(path), 
CreateMode.PERSISTENT_SEQUENTIAL)
+  def createAclChangeNotification(aclStore: AclStore, resourceName: String): 
Unit = {
+    val aclChangeNotificationSequenceZNode = 
aclStore.aclChangeNotificationSequenceZNode
+    val path = aclChangeNotificationSequenceZNode.createPath
+    val createRequest = CreateRequest(path, 
aclChangeNotificationSequenceZNode.encode(resourceName), acls(path), 
CreateMode.PERSISTENT_SEQUENTIAL)
     val createResponse = retryRequestUntilConnected(createRequest)
     createResponse.maybeThrow
   }
@@ -1030,21 +1037,23 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
    * @throws KeeperException if there is an error while deleting Acl change 
notifications
    */
   def deleteAclChangeNotifications(): Unit = {
-    val getChildrenResponse = 
retryRequestUntilConnected(GetChildrenRequest(AclChangeNotificationZNode.path))
-    if (getChildrenResponse.resultCode == Code.OK) {
-      deleteAclChangeNotifications(getChildrenResponse.children)
-    } else if (getChildrenResponse.resultCode != Code.NONODE) {
-      getChildrenResponse.maybeThrow
-    }
+    AclStore.AclStores.foreach(aclStore => {
+      val getChildrenResponse = 
retryRequestUntilConnected(GetChildrenRequest(aclStore.aclChangesZNode.path))
+      if (getChildrenResponse.resultCode == Code.OK) {
+        deleteAclChangeNotifications(getChildrenResponse.children, aclStore)
+      } else if (getChildrenResponse.resultCode != Code.NONODE) {
+        getChildrenResponse.maybeThrow
+      }
+    })
   }
 
   /**
    * Deletes the Acl change notifications associated with the given sequence 
nodes
    * @param sequenceNodes
    */
-  private def deleteAclChangeNotifications(sequenceNodes: Seq[String]): Unit = 
{
+  private def deleteAclChangeNotifications(sequenceNodes: Seq[String], 
aclStore: AclStore): Unit = {
     val deleteRequests = sequenceNodes.map { sequenceNode =>
-      
DeleteRequest(AclChangeNotificationSequenceZNode.deletePath(sequenceNode), 
ZkVersion.NoVersion)
+      
DeleteRequest(aclStore.aclChangeNotificationSequenceZNode.deletePath(sequenceNode),
 ZkVersion.NoVersion)
     }
 
     val deleteResponses = retryRequestsUntilConnected(deleteRequests)
@@ -1059,17 +1068,18 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
    * Gets the resource types
    * @return list of resource type names
    */
-  def getResourceTypes(): Seq[String] = {
-    getChildren(AclZNode.path)
+  def getResourceTypes(aclStore: AclStore): Seq[String] = {
+    getChildren(aclStore.aclZNode.path)
   }
 
   /**
    * Gets the resource names for a give resource type
-   * @param resourceType
+   * @param aclStore Acl store.
+   * @param resourceType Resource type.
    * @return list of resource names
    */
-  def getResourceNames(resourceType: String): Seq[String] = {
-    getChildren(ResourceTypeZNode.path(resourceType))
+  def getResourceNames(aclStore: AclStore, resourceType: ResourceType): 
Seq[String] = {
+    getChildren(aclStore.resourceTypeZNode.path(resourceType))
   }
 
   /**
@@ -1078,7 +1088,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
    * @return delete status
    */
   def deleteResource(resource: Resource): Boolean = {
-    deleteRecursive(ResourceZNode.path(resource))
+    
deleteRecursive(AclStore.fromResource(resource).resourceZNode.path(resource))
   }
 
   /**
@@ -1087,7 +1097,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
    * @return existence status
    */
   def resourceExists(resource: Resource): Boolean = {
-    pathExists(ResourceZNode.path(resource))
+    pathExists(AclStore.fromResource(resource).resourceZNode.path(resource))
   }
 
   /**
@@ -1097,7 +1107,7 @@ class KafkaZkClient private (zooKeeperClient: 
ZooKeeperClient, isSecure: Boolean
    * @return return true if it succeeds, false otherwise (the current version 
is not the expected version)
    */
   def conditionalDelete(resource: Resource, expectedVersion: Int): Boolean = {
-    val deleteRequest = DeleteRequest(ResourceZNode.path(resource), 
expectedVersion)
+    val deleteRequest = 
DeleteRequest(AclStore.fromResource(resource).resourceZNode.path(resource), 
expectedVersion)
     val deleteResponse = retryRequestUntilConnected(deleteRequest)
     deleteResponse.resultCode match {
       case Code.OK | Code.NONODE => true
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala 
b/core/src/main/scala/kafka/zk/ZkData.scala
index 64aed564fd4..2502aff43a8 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -25,8 +25,6 @@ import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
 import kafka.common.KafkaException
 import kafka.controller.{IsrChangeNotificationHandler, 
LeaderIsrAndControllerEpoch}
-import kafka.security.auth.SimpleAclAuthorizer.VersionedAcls
-import kafka.security.auth.{Acl, Resource}
 import kafka.server.{ConfigType, DelegationTokenManager}
 import kafka.utils.Json
 import org.apache.kafka.common.TopicPartition
@@ -444,45 +442,6 @@ object StateChangeHandlers {
   def zkNodeChangeListenerHandler(seqNodeRoot: String) = 
s"change-notification-$seqNodeRoot"
 }
 
-/**
- * The root acl storage node. Under this node there will be one child node per 
resource type (Topic, Cluster, Group).
- * under each resourceType there will be a unique child for each resource 
instance and the data for that child will contain
- * list of its acls as a json object. Following gives an example:
- *
- * <pre>
- * /kafka-acl/Topic/topic-1 => {"version": 1, "acls": [ { "host":"host1", 
"permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
- * /kafka-acl/Cluster/kafka-cluster => {"version": 1, "acls": [ { 
"host":"host1", "permissionType": "Allow","operation": "Read","principal": 
"User:alice"}]}
- * /kafka-acl/Group/group-1 => {"version": 1, "acls": [ { "host":"host1", 
"permissionType": "Allow","operation": "Read","principal": "User:alice"}]}
- * </pre>
- */
-object AclZNode {
-  def path = "/kafka-acl"
-}
-
-object ResourceTypeZNode {
-  def path(resourceType: String) = s"${AclZNode.path}/$resourceType"
-}
-
-object ResourceZNode {
-  def path(resource: Resource) = 
s"${AclZNode.path}/${resource.resourceType}/${resource.name}"
-  def encode(acls: Set[Acl]): Array[Byte] = {
-    Json.encodeAsBytes(Acl.toJsonCompatibleMap(acls).asJava)
-  }
-  def decode(bytes: Array[Byte], stat: Stat): VersionedAcls = 
VersionedAcls(Acl.fromBytes(bytes), stat.getVersion)
-}
-
-object AclChangeNotificationZNode {
-  def path = "/kafka-acl-changes"
-}
-
-object AclChangeNotificationSequenceZNode {
-  val SequenceNumberPrefix = "acl_changes_"
-  def createPath = s"${AclChangeNotificationZNode.path}/$SequenceNumberPrefix"
-  def deletePath(sequenceNode: String) = 
s"${AclChangeNotificationZNode.path}/${sequenceNode}"
-  def encode(resourceName : String): Array[Byte] = resourceName.getBytes(UTF_8)
-  def decode(bytes: Array[Byte]): String = new String(bytes, UTF_8)
-}
-
 object ClusterZNode {
   def path = "/cluster"
 }
@@ -535,6 +494,14 @@ object DelegationTokenInfoZNode {
   def decode(bytes: Array[Byte]): Option[TokenInformation] = 
DelegationTokenManager.fromBytes(bytes)
 }
 
+// need to be duplicated
+object AclZNodesInfo {
+  val KafkaAclPath = "/kafka-acl"
+  val KafkaAclChangesPath = "/kafka-acl-changes"
+  val KafkaWildcardSuffixedAclPath = "/kafka-wildcard-acl"
+  val KafkaWildcardSuffixedAclChangesPath = "/kafka-wildcard-acl-changes"
+}
+
 object ZkData {
 
   // Important: it is necessary to add any new top level Zookeeper path to the 
Seq
@@ -545,8 +512,10 @@ object ZkData {
     ControllerZNode.path,
     ControllerEpochZNode.path,
     IsrChangeNotificationZNode.path,
-    AclZNode.path,
-    AclChangeNotificationZNode.path,
+    AclZNodesInfo.KafkaAclPath,
+    AclZNodesInfo.KafkaAclChangesPath,
+    AclZNodesInfo.KafkaWildcardSuffixedAclPath,
+    AclZNodesInfo.KafkaWildcardSuffixedAclChangesPath,
     ProducerIdBlockZNode.path,
     LogDirEventNotificationZNode.path,
     DelegationTokenAuthZNode.path)
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index 099af5247d3..23085007784 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -98,6 +98,8 @@ class SaslSslAdminClientIntegrationTest extends 
AdminClientIntegrationTest with
     new AccessControlEntry("User:ANONYMOUS", "*", AclOperation.WRITE, 
AclPermissionType.ALLOW))
   val groupAcl = new AclBinding(new Resource(ResourceType.GROUP, "*"),
     new AccessControlEntry("User:*", "*", AclOperation.ALL, 
AclPermissionType.ALLOW))
+  val userAcl = new AclBinding(new Resource(ResourceType.TOPIC, "*"),
+    new AccessControlEntry("User:*", "*", AclOperation.ALL, 
AclPermissionType.ALLOW))
 
   @Test
   override def testAclOperations(): Unit = {
@@ -145,7 +147,7 @@ class SaslSslAdminClientIntegrationTest extends 
AdminClientIntegrationTest with
     assertEquals(Set(filterA, filterB, filterC), 
results2.values.keySet.asScala)
     assertEquals(Set(groupAcl), 
results2.values.get(filterA).get.values.asScala.map(_.binding).toSet)
     assertEquals(Set(transactionalIdAcl), 
results2.values.get(filterC).get.values.asScala.map(_.binding).toSet)
-    assertEquals(Set(acl2), 
results2.values.get(filterB).get.values.asScala.map(_.binding).toSet)
+    assertEquals(Set(acl2, userAcl), 
results2.values.get(filterB).get.values.asScala.map(_.binding).toSet)
 
     waitForDescribeAcls(client, filterB, Set())
     waitForDescribeAcls(client, filterC, Set())
@@ -224,8 +226,6 @@ class SaslSslAdminClientIntegrationTest extends 
AdminClientIntegrationTest with
 
   private def testAclGet(expectAuth: Boolean): Unit = {
     TestUtils.waitUntilTrue(() => {
-      val userAcl = new AclBinding(new Resource(ResourceType.TOPIC, "*"),
-        new AccessControlEntry("User:*", "*", AclOperation.ALL, 
AclPermissionType.ALLOW))
       val results = client.describeAcls(userAcl.toFilter)
       if (expectAuth) {
         Try(results.values.get) match {
diff --git 
a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
 
b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
index e46bd9b726f..e351f3f7b76 100644
--- 
a/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/common/ZkNodeChangeNotificationListenerTest.scala
@@ -18,8 +18,9 @@ package kafka.common
 
 import java.nio.charset.StandardCharsets
 
+import kafka.security.auth.storage.AclStore
 import kafka.utils.TestUtils
-import kafka.zk.{AclChangeNotificationSequenceZNode, 
AclChangeNotificationZNode, ZooKeeperTestHarness}
+import kafka.zk.ZooKeeperTestHarness
 import org.junit.Test
 
 class ZkNodeChangeNotificationListenerTest extends ZooKeeperTestHarness {
@@ -40,11 +41,11 @@ class ZkNodeChangeNotificationListenerTest extends 
ZooKeeperTestHarness {
     val notificationMessage2 = "message2"
     val changeExpirationMs = 1000
 
-    val notificationListener = new ZkNodeChangeNotificationListener(zkClient,  
AclChangeNotificationZNode.path,
-      AclChangeNotificationSequenceZNode.SequenceNumberPrefix, 
notificationHandler, changeExpirationMs)
+    val notificationListener = new ZkNodeChangeNotificationListener(zkClient,  
AclStore.literalAclStore.aclChangesZNode.path,
+      
AclStore.literalAclStore.aclChangeNotificationSequenceZNode.SequenceNumberPrefix,
 notificationHandler, changeExpirationMs)
     notificationListener.init()
 
-    zkClient.createAclChangeNotification(notificationMessage1)
+    zkClient.createAclChangeNotification(AclStore.literalAclStore, 
notificationMessage1)
     TestUtils.waitUntilTrue(() => invocationCount == 1 && notification == 
notificationMessage1,
       "Failed to send/process notification message in the timeout period.")
 
@@ -56,11 +57,11 @@ class ZkNodeChangeNotificationListenerTest extends 
ZooKeeperTestHarness {
      * can fail as the second node can be deleted depending on how threads get 
scheduled.
      */
 
-    zkClient.createAclChangeNotification(notificationMessage2)
+    zkClient.createAclChangeNotification(AclStore.literalAclStore, 
notificationMessage2)
     TestUtils.waitUntilTrue(() => invocationCount == 2 && notification == 
notificationMessage2,
       "Failed to send/process notification message in the timeout period.")
 
-    (3 to 10).foreach(i => zkClient.createAclChangeNotification("message" + i))
+    (3 to 10).foreach(i => 
zkClient.createAclChangeNotification(AclStore.literalAclStore, "message" + i))
 
     TestUtils.waitUntilTrue(() => invocationCount == 10 ,
       s"Expected 10 invocations of processNotifications, but there were 
$invocationCount")
diff --git 
a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala 
b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 1e18f1d7bce..7679fe2447b 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -38,6 +38,7 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
   var resource: Resource = null
   val superUsers = "User:superuser1; User:superuser2"
   val username = "alice"
+  val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)
   var config: KafkaConfig = null
 
   @Before
@@ -419,6 +420,122 @@ class SimpleAclAuthorizerTest extends 
ZooKeeperTestHarness {
     TestUtils.waitAndVerifyAcls(Set.empty[Acl], simpleAclAuthorizer2, resource)
   }
 
+  @Test
+  def testAuthorizeTrueOnWildCardAcl(): Unit = {
+    val session = Session(principal, InetAddress.getByName("192.168.3.1"))
+
+    // verify authorize fails with no acls present
+    assertFalse(simpleAclAuthorizer.authorize(session, Read, resource))
+
+    // add wildcard acl and verify authorize succeeds
+    val acl = new Acl(principal, Allow, WildCardHost, Read)
+    simpleAclAuthorizer.addAcls(Set[Acl](acl), new Resource(Topic, 
Acl.WildCardString))
+    assertTrue(simpleAclAuthorizer.authorize(session, Read, resource))
+
+    // remove wildcard acl and verify authorize fails again
+    simpleAclAuthorizer.removeAcls(Set[Acl](acl), new Resource(Topic, 
Acl.WildCardString))
+    assertFalse(simpleAclAuthorizer.authorize(session, Read, resource))
+
+    // add wildcard-suffixed acl and verify authorize succeeds
+    simpleAclAuthorizer.addAcls(Set[Acl](acl), new Resource(Topic, 
resource.name.charAt(0).toString, WildcardSuffixed))
+    assertTrue(simpleAclAuthorizer.authorize(session, Read, resource))
+  }
+
+  @Test
+  def testMatchPrincipal(): Unit = {
+    // same username should match
+    assertTrue(
+      simpleAclAuthorizer.matchPrincipal(
+        new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "rob"),
+        new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "rob")
+      )
+    )
+
+    // different username shouldn't match
+    assertFalse(
+      simpleAclAuthorizer.matchPrincipal(
+        new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "rob"),
+        new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "bob")
+      )
+    )
+
+    // any username should match wildcard principal
+    assertTrue(
+      simpleAclAuthorizer.matchPrincipal(
+        Acl.WildCardPrincipal,
+        new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "rob")
+      )
+    )
+
+    // wildcard principal type should match
+    assertTrue(
+      simpleAclAuthorizer.matchPrincipal(
+        new KafkaPrincipal(Acl.WildCardString, "rob"),
+        new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "rob")
+      )
+    )
+
+    // different principal type shouldn't match
+    assertFalse(
+      simpleAclAuthorizer.matchPrincipal(
+        new KafkaPrincipal("userType1", "rob"),
+        new KafkaPrincipal("userType2", "rob")
+      )
+    )
+  }
+
+  @Test
+  def testGetAcls(): Unit = {
+    assertEquals(0, simpleAclAuthorizer.getAcls(resource).size)
+    assertEquals(0, simpleAclAuthorizer.getAcls(new Resource(Topic, 
Acl.WildCardString)).size)
+    assertEquals(0, simpleAclAuthorizer.getAcls(new Resource(Topic, 
resource.name.charAt(0) + Acl.WildCardString)).size)
+    assertEquals(0, simpleAclAuthorizer.getAcls(new Resource(Topic, 
resource.name + "t")).size)
+
+    val acl1 = new Acl(principal, Allow, WildCardHost, Read)
+    simpleAclAuthorizer.addAcls(Set[Acl](acl1), resource)
+    assertEquals(1, simpleAclAuthorizer.getAcls(resource).size)
+    assertEquals(0, simpleAclAuthorizer.getAcls(new Resource(Topic, 
Acl.WildCardString)).size)
+    assertEquals(0, simpleAclAuthorizer.getAcls(new Resource(Topic, 
resource.name.charAt(0).toString, WildcardSuffixed)).size)
+    assertEquals(0, simpleAclAuthorizer.getAcls(new Resource(Topic, 
resource.name + "t")).size)
+
+    // add same acl on wildcard resource
+    simpleAclAuthorizer.addAcls(Set[Acl](acl1), new Resource(Topic, 
Acl.WildCardString))
+    assertEquals(1, simpleAclAuthorizer.getAcls(resource).size)
+    assertEquals(1, simpleAclAuthorizer.getAcls(new Resource(Topic, 
Acl.WildCardString)).size)
+    assertEquals(0, simpleAclAuthorizer.getAcls(new Resource(Topic, 
resource.name.charAt(0).toString, WildcardSuffixed)).size)
+    assertEquals(0, simpleAclAuthorizer.getAcls(new Resource(Topic, 
resource.name + "t")).size)
+
+    // add different acl on resource
+    val acl2 = new Acl(principal, Allow, WildCardHost, Write)
+    simpleAclAuthorizer.addAcls(Set[Acl](acl2), resource)
+    assertEquals(2, simpleAclAuthorizer.getAcls(resource).size)
+    assertEquals(1, simpleAclAuthorizer.getAcls(new Resource(Topic, 
Acl.WildCardString)).size)
+    assertEquals(0, simpleAclAuthorizer.getAcls(new Resource(Topic, 
resource.name.charAt(0) + Acl.WildCardString)).size)
+    assertEquals(0, simpleAclAuthorizer.getAcls(new Resource(Topic, 
resource.name + "t")).size)
+
+  }
+
+  @Test
+  def testGetAclsPrincipal(): Unit = {
+    assertEquals(0, simpleAclAuthorizer.getAcls(principal).size)
+
+    val acl1 = new Acl(principal, Allow, WildCardHost, Write)
+    simpleAclAuthorizer.addAcls(Set[Acl](acl1), resource)
+    assertEquals(1, simpleAclAuthorizer.getAcls(principal).size)
+
+    simpleAclAuthorizer.addAcls(Set[Acl](acl1), new Resource(Topic, 
Acl.WildCardString))
+    assertEquals(2, simpleAclAuthorizer.getAcls(principal).size)
+
+    val acl2 = new Acl(Acl.WildCardPrincipal, Allow, WildCardHost, Write)
+    simpleAclAuthorizer.addAcls(Set[Acl](acl1), new Resource(Group, "groupA"))
+    assertEquals(3, simpleAclAuthorizer.getAcls(principal).size)
+
+    // add wildcard-suffixed principal acl on wildcard group name
+    val acl3 = new Acl(new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
principal.getName.charAt(0) + Acl.WildCardString), Allow, WildCardHost, Write)
+    simpleAclAuthorizer.addAcls(Set[Acl](acl1), new Resource(Group, 
Acl.WildCardString))
+    assertEquals(4, simpleAclAuthorizer.getAcls(principal).size)
+  }
+
   private def changeAclAndVerify(originalAcls: Set[Acl], addedAcls: Set[Acl], 
removedAcls: Set[Acl], resource: Resource = resource): Set[Acl] = {
     var acls = originalAcls
 
diff --git 
a/core/src/test/scala/unit/kafka/security/auth/storage/AclStoreTest.scala 
b/core/src/test/scala/unit/kafka/security/auth/storage/AclStoreTest.scala
new file mode 100644
index 00000000000..f22aed68df5
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/security/auth/storage/AclStoreTest.scala
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.security.auth.storage
+
+import kafka.security.auth.storage.AclStore
+import kafka.security.auth.{Literal, Resource, Topic, WildcardSuffixed}
+import kafka.utils.ZkUtils
+import org.junit.Assert.assertEquals
+import org.junit.Test
+
+class AclStoreTest {
+
+  @Test
+  def testAclStore(): Unit = {
+
+    assertEquals(ZkUtils.KafkaAclPath, AclStore.literalAclStore.aclZNode.path)
+    assertEquals(WildcardSuffixed, 
AclStore.wildcardSuffixedAclStore.resourceNameType)
+    assertEquals("/kafka-wildcard-acl/Topic/topicName", 
AclStore.wildcardSuffixedAclStore.resourceZNode.path(new Resource(Topic, 
"topicName")))
+
+  }
+
+  @Test
+  def testFromResource(): Unit = {
+    val literalResource = new Resource(Topic, "name", Literal)
+    assertEquals(AclStore.literalAclStore, 
AclStore.fromResource(literalResource))
+    val wildcardResource = new Resource(Topic, "name", WildcardSuffixed)
+    assertEquals(AclStore.wildcardSuffixedAclStore, 
AclStore.fromResource(wildcardResource))
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ec6c756d453..1e4b80218f8 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -24,8 +24,8 @@ import java.nio.charset.{Charset, StandardCharsets}
 import java.security.cert.X509Certificate
 import java.util.{Collections, Properties}
 import java.util.concurrent.{Callable, Executors, TimeUnit}
-import javax.net.ssl.X509TrustManager
 
+import javax.net.ssl.X509TrustManager
 import kafka.api._
 import kafka.cluster.{Broker, EndPoint}
 import kafka.consumer.{ConsumerConfig, ConsumerTimeoutException, KafkaStream}
diff --git a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala 
b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
index 1aeca2203b4..de74112e42b 100644
--- a/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/KafkaZkClientTest.scala
@@ -16,35 +16,36 @@
 */
 package kafka.zk
 
-import java.util.{Collections, Properties, UUID}
 import java.nio.charset.StandardCharsets.UTF_8
 import java.util.concurrent.{CountDownLatch, TimeUnit}
+import java.util.{Collections, Properties, UUID}
 
 import kafka.api.{ApiVersion, LeaderAndIsr}
 import kafka.cluster.{Broker, EndPoint}
+import kafka.controller.LeaderIsrAndControllerEpoch
 import kafka.log.LogConfig
 import kafka.security.auth._
+import kafka.security.auth.storage.AclStore
 import kafka.server.ConfigType
 import kafka.utils.CoreUtils
+import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
+import kafka.zookeeper._
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
 import org.apache.kafka.common.security.token.delegation.TokenInformation
 import org.apache.kafka.common.utils.{SecurityUtils, Time}
 import org.apache.zookeeper.KeeperException.{Code, NoNodeException, 
NodeExistsException}
+import org.apache.zookeeper.data.Stat
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.{Seq, mutable}
 import scala.util.Random
 
-import kafka.controller.LeaderIsrAndControllerEpoch
-import kafka.zk.KafkaZkClient.UpdateLeaderAndIsrResult
-import kafka.zookeeper._
-import org.apache.kafka.common.security.JaasUtils
-import org.apache.zookeeper.data.Stat
-
 class KafkaZkClientTest extends ZooKeeperTestHarness {
 
   private val group = "my-group"
@@ -427,16 +428,16 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
   @Test
   def testAclManagementMethods() {
 
-    assertFalse(zkClient.pathExists(AclZNode.path))
-    assertFalse(zkClient.pathExists(AclChangeNotificationZNode.path))
-    ResourceType.values.foreach(resource => 
assertFalse(zkClient.pathExists(ResourceTypeZNode.path(resource.name))))
+    assertFalse(zkClient.pathExists(AclStore.literalAclStore.aclZNode.path))
+    
assertFalse(zkClient.pathExists(AclStore.literalAclStore.aclChangesZNode.path))
+    ResourceType.values.foreach(resource => 
assertFalse(zkClient.pathExists(AclStore.literalAclStore.resourceTypeZNode.path(resource))))
 
     // create acl paths
     zkClient.createAclPaths
 
-    assertTrue(zkClient.pathExists(AclZNode.path))
-    assertTrue(zkClient.pathExists(AclChangeNotificationZNode.path))
-    ResourceType.values.foreach(resource => 
assertTrue(zkClient.pathExists(ResourceTypeZNode.path(resource.name))))
+    assertTrue(zkClient.pathExists(AclStore.literalAclStore.aclZNode.path))
+    
assertTrue(zkClient.pathExists(AclStore.literalAclStore.aclChangesZNode.path))
+    ResourceType.values.foreach(resource => 
assertTrue(zkClient.pathExists(AclStore.literalAclStore.resourceTypeZNode.path(resource))))
 
     val resource1 = new Resource(Topic, UUID.randomUUID().toString)
     val resource2 = new Resource(Topic, UUID.randomUUID().toString)
@@ -469,10 +470,10 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertEquals(1, versionedAcls.zkVersion)
 
     //get resource Types
-    assertTrue(ResourceType.values.map( rt => rt.name).toSet == 
zkClient.getResourceTypes().toSet)
+    assertTrue(ResourceType.values.map( rt => rt.name).toSet == 
zkClient.getResourceTypes(AclStore.literalAclStore).toSet)
 
     //get resource name
-    val resourceNames = zkClient.getResourceNames(Topic.name)
+    val resourceNames = zkClient.getResourceNames(AclStore.literalAclStore, 
Topic)
     assertEquals(2, resourceNames.size)
     assertTrue(Set(resource1.name,resource2.name) == resourceNames.toSet)
 
@@ -486,13 +487,13 @@ class KafkaZkClientTest extends ZooKeeperTestHarness {
     assertTrue(zkClient.conditionalDelete(resource2, 0))
 
 
-    zkClient.createAclChangeNotification("resource1")
-    zkClient.createAclChangeNotification("resource2")
+    zkClient.createAclChangeNotification(AclStore.literalAclStore, "resource1")
+    zkClient.createAclChangeNotification(AclStore.literalAclStore, "resource2")
 
-    assertEquals(2, zkClient.getChildren(AclChangeNotificationZNode.path).size)
+    assertEquals(2, 
zkClient.getChildren(AclStore.literalAclStore.aclChangesZNode.path).size)
 
     zkClient.deleteAclChangeNotifications()
-    assertTrue(zkClient.getChildren(AclChangeNotificationZNode.path).isEmpty)
+    
assertTrue(zkClient.getChildren(AclStore.literalAclStore.aclChangesZNode.path).isEmpty)
   }
 
   @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for Prefixed ACLs
> -----------------------------
>
>                 Key: KAFKA-6841
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6841
>             Project: Kafka
>          Issue Type: New Feature
>          Components: admin, security
>            Reporter: Piyush Vijay
>            Priority: Major
>             Fix For: 2.0.0
>
>
> Kafka supports authorize access to resources like topics, consumer groups 
> etc. by way of ACLs. The current supported semantic of resource name and 
> principal name in ACL definition is either full resource/principal name or 
> special wildcard '**'*, which matches everything.
> Kafka should support a way of defining bulk ACLs instead of specifying 
> individual ACLs.
> The details for the feature are available here - 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-290%3A+Support+for+wildcard+suffixed+ACLs]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to