http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
new file mode 100644
index 0000000..26034eb
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
+
+/**
+ * Encapsulates an error code (via the Errors enum) and an optional message. 
Generally, the optional message is only
+ * defined if it adds information over the default message associated with the 
error code.
+ *
+ * This is an internal class (like every class in the requests package).
+ */
+public class ApiError {
+
+    private static final String CODE_KEY_NAME = "error_code";
+    private static final String MESSAGE_KEY_NAME = "error_message";
+
+    private final Errors error;
+    private final String message;
+
+    public static ApiError fromThrowable(Throwable t) {
+        // Avoid populating the error message if it's a generic one
+        Errors error = Errors.forException(t);
+        String message = error.message().equals(t.getMessage()) ? null : 
t.getMessage();
+        return new ApiError(error, message);
+    }
+
+    public ApiError(Struct struct) {
+        error = Errors.forCode(struct.getShort(CODE_KEY_NAME));
+        // In some cases, the error message field was introduced in newer 
version
+        if (struct.hasField(MESSAGE_KEY_NAME))
+            message = struct.getString(MESSAGE_KEY_NAME);
+        else
+            message = null;
+    }
+
+    public ApiError(Errors error, String message) {
+        this.error = error;
+        this.message = message;
+    }
+
+    public void write(Struct struct) {
+        struct.set(CODE_KEY_NAME, error.code());
+        // In some cases, the error message field was introduced in a newer 
protocol API version
+        if (struct.hasField(MESSAGE_KEY_NAME) && message != null && error != 
Errors.NONE)
+            struct.set(MESSAGE_KEY_NAME, message);
+    }
+
+    public boolean is(Errors error) {
+        return this.error == error;
+    }
+
+    public Errors error() {
+        return error;
+    }
+
+    /**
+     * Return the optional error message or null. Consider using {@link 
#messageWithFallback()} instead.
+     */
+    public String message() {
+        return message;
+    }
+
+    /**
+     * If `message` is defined, return it. Otherwise fallback to the default 
error message associated with the error
+     * code.
+     */
+    public String messageWithFallback() {
+        if (message == null)
+            return error.message();
+        return message;
+    }
+
+    public ApiException exception() {
+        return error.exception(message);
+    }
+
+    @Override
+    public String toString() {
+        return "ApiError(error=" + error + ", message=" + message + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
index a0626cc..def4c85 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -42,9 +41,9 @@ public class CreateTopicsRequest extends AbstractRequest {
     private static final String REPLICA_ASSIGNMENT_PARTITION_ID_KEY_NAME = 
"partition_id";
     private static final String REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME = 
"replicas";
 
-    private static final String CONFIG_KEY_KEY_NAME = "config_key";
+    private static final String CONFIG_KEY_KEY_NAME = "config_name";
     private static final String CONFIG_VALUE_KEY_NAME = "config_value";
-    private static final String CONFIGS_KEY_NAME = "configs";
+    private static final String CONFIGS_KEY_NAME = "config_entries";
 
     public static final class TopicDetails {
         public final int numPartitions;
@@ -210,12 +209,9 @@ public class CreateTopicsRequest extends AbstractRequest {
 
     @Override
     public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        Map<String, CreateTopicsResponse.Error> topicErrors = new HashMap<>();
+        Map<String, ApiError> topicErrors = new HashMap<>();
         for (String topic : topics.keySet()) {
-            Errors error = Errors.forException(e);
-            // Avoid populating the error message if it's a generic one
-            String message = error.message().equals(e.getMessage()) ? null : 
e.getMessage();
-            topicErrors.put(topic, new CreateTopicsResponse.Error(error, 
message));
+            topicErrors.put(topic, ApiError.fromThrowable(e));
         }
 
         short versionId = version();

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
index 2c2b2dd..e46e7a1 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java
@@ -17,9 +17,7 @@
 package org.apache.kafka.common.requests;
 
 
-import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.protocol.types.Struct;
 
 import java.nio.ByteBuffer;
@@ -32,45 +30,6 @@ public class CreateTopicsResponse extends AbstractResponse {
     private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
     private static final String TOPIC_ERRORS_KEY_NAME = "topic_errors";
     private static final String TOPIC_KEY_NAME = "topic";
-    private static final String ERROR_CODE_KEY_NAME = "error_code";
-    private static final String ERROR_MESSAGE_KEY_NAME = "error_message";
-
-    public static class Error {
-        private final Errors error;
-        private final String message; // introduced in V1
-
-        public Error(Errors error, String message) {
-            this.error = error;
-            this.message = message;
-        }
-
-        public boolean is(Errors error) {
-            return this.error == error;
-        }
-
-        public Errors error() {
-            return error;
-        }
-
-        public String message() {
-            return message;
-        }
-
-        public String messageWithFallback() {
-            if (message == null)
-                return error.message();
-            return message;
-        }
-
-        public ApiException exception() {
-            return error.exception(message);
-        }
-
-        @Override
-        public String toString() {
-            return "Error(error=" + error + ", message=" + message + ")";
-        }
-    }
 
     /**
      * Possible error codes:
@@ -87,29 +46,25 @@ public class CreateTopicsResponse extends AbstractResponse {
      * INVALID_REQUEST(42)
      */
 
-    private final Map<String, Error> errors;
+    private final Map<String, ApiError> errors;
     private final int throttleTimeMs;
 
-    public CreateTopicsResponse(Map<String, Error> errors) {
+    public CreateTopicsResponse(Map<String, ApiError> errors) {
         this(DEFAULT_THROTTLE_TIME, errors);
     }
 
-    public CreateTopicsResponse(int throttleTimeMs, Map<String, Error> errors) 
{
+    public CreateTopicsResponse(int throttleTimeMs, Map<String, ApiError> 
errors) {
         this.throttleTimeMs = throttleTimeMs;
         this.errors = errors;
     }
 
     public CreateTopicsResponse(Struct struct) {
         Object[] topicErrorStructs = struct.getArray(TOPIC_ERRORS_KEY_NAME);
-        Map<String, Error> errors = new HashMap<>();
+        Map<String, ApiError> errors = new HashMap<>();
         for (Object topicErrorStructObj : topicErrorStructs) {
-            Struct topicErrorCodeStruct = (Struct) topicErrorStructObj;
-            String topic = topicErrorCodeStruct.getString(TOPIC_KEY_NAME);
-            Errors error = 
Errors.forCode(topicErrorCodeStruct.getShort(ERROR_CODE_KEY_NAME));
-            String errorMessage = null;
-            if (topicErrorCodeStruct.hasField(ERROR_MESSAGE_KEY_NAME))
-                errorMessage = 
topicErrorCodeStruct.getString(ERROR_MESSAGE_KEY_NAME);
-            errors.put(topic, new Error(error, errorMessage));
+            Struct topicErrorStruct = (Struct) topicErrorStructObj;
+            String topic = topicErrorStruct.getString(TOPIC_KEY_NAME);
+            errors.put(topic, new ApiError(topicErrorStruct));
         }
 
         this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? 
struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME;
@@ -123,13 +78,10 @@ public class CreateTopicsResponse extends AbstractResponse 
{
             struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
 
         List<Struct> topicErrorsStructs = new ArrayList<>(errors.size());
-        for (Map.Entry<String, Error> topicError : errors.entrySet()) {
+        for (Map.Entry<String, ApiError> topicError : errors.entrySet()) {
             Struct topicErrorsStruct = struct.instance(TOPIC_ERRORS_KEY_NAME);
             topicErrorsStruct.set(TOPIC_KEY_NAME, topicError.getKey());
-            Error error = topicError.getValue();
-            topicErrorsStruct.set(ERROR_CODE_KEY_NAME, error.error.code());
-            if (version >= 1)
-                topicErrorsStruct.set(ERROR_MESSAGE_KEY_NAME, error.message());
+            topicError.getValue().write(topicErrorsStruct);
             topicErrorsStructs.add(topicErrorsStruct);
         }
         struct.set(TOPIC_ERRORS_KEY_NAME, topicErrorsStructs.toArray());
@@ -140,7 +92,7 @@ public class CreateTopicsResponse extends AbstractResponse {
         return throttleTimeMs;
     }
 
-    public Map<String, Error> errors() {
+    public Map<String, ApiError> errors() {
         return errors;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
new file mode 100644
index 0000000..64fae0e
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DescribeConfigsRequest extends AbstractRequest {
+
+    private static final String RESOURCES_KEY_NAME = "resources";
+    private static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
+    private static final String RESOURCE_NAME_KEY_NAME = "resource_name";
+    private static final String CONFIG_NAMES_KEY_NAME = "config_names";
+
+    public static class Builder extends AbstractRequest.Builder {
+        private final Map<Resource, Collection<String>> resourceToConfigNames;
+
+        public Builder(Map<Resource, Collection<String>> 
resourceToConfigNames) {
+            super(ApiKeys.DESCRIBE_CONFIGS);
+            this.resourceToConfigNames = resourceToConfigNames;
+        }
+
+        public Builder(Collection<Resource> resources) {
+            this(toResourceToConfigNames(resources));
+        }
+
+        private static Map<Resource, Collection<String>> 
toResourceToConfigNames(Collection<Resource> resources) {
+            Map<Resource, Collection<String>> result = new 
HashMap<>(resources.size());
+            for (Resource resource : resources)
+                result.put(resource, null);
+            return result;
+        }
+
+        @Override
+        public DescribeConfigsRequest build(short version) {
+            return new DescribeConfigsRequest(version, resourceToConfigNames);
+        }
+    }
+
+    private final Map<Resource, Collection<String>> resourceToConfigNames;
+
+    public DescribeConfigsRequest(short version, Map<Resource, 
Collection<String>> resourceToConfigNames) {
+        super(version);
+        this.resourceToConfigNames = resourceToConfigNames;
+
+    }
+
+    public DescribeConfigsRequest(Struct struct, short version) {
+        super(version);
+        Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME);
+        resourceToConfigNames = new HashMap<>(resourcesArray.length);
+        for (Object resourceObj : resourcesArray) {
+            Struct resourceStruct = (Struct) resourceObj;
+            ResourceType resourceType = 
ResourceType.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME));
+            String resourceName = 
resourceStruct.getString(RESOURCE_NAME_KEY_NAME);
+
+            Object[] configNamesArray = 
resourceStruct.getArray(CONFIG_NAMES_KEY_NAME);
+            List<String> configNames = null;
+            if (configNamesArray != null) {
+                configNames = new ArrayList<>(configNamesArray.length);
+                for (Object configNameObj : configNamesArray)
+                    configNames.add((String) configNameObj);
+            }
+
+            resourceToConfigNames.put(new Resource(resourceType, 
resourceName), configNames);
+        }
+    }
+
+    public Collection<Resource> resources() {
+        return resourceToConfigNames.keySet();
+    }
+
+    /**
+     * Return null if all config names should be returned.
+     */
+    public Collection<String> configNames(Resource resource) {
+        return resourceToConfigNames.get(resource);
+    }
+
+    @Override
+    protected Struct toStruct() {
+        Struct struct = new 
Struct(ApiKeys.DESCRIBE_CONFIGS.requestSchema(version()));
+        List<Struct> resourceStructs = new ArrayList<>(resources().size());
+        for (Map.Entry<Resource, Collection<String>> entry : 
resourceToConfigNames.entrySet()) {
+            Resource resource = entry.getKey();
+            Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME);
+            resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id());
+            resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name());
+
+            String[] configNames = entry.getValue() == null ? null : 
entry.getValue().toArray(new String[0]);
+            resourceStruct.set(CONFIG_NAMES_KEY_NAME, configNames);
+
+            resourceStructs.add(resourceStruct);
+        }
+        struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0]));
+        return struct;
+    }
+
+    @Override
+    public DescribeConfigsResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
+        short version = version();
+        switch (version) {
+            case 0:
+                ApiError error = ApiError.fromThrowable(e);
+                Map<Resource, DescribeConfigsResponse.Config> errors = new 
HashMap<>(resources().size());
+                DescribeConfigsResponse.Config config = new 
DescribeConfigsResponse.Config(error,
+                        
Collections.<DescribeConfigsResponse.ConfigEntry>emptyList());
+                for (Resource resource : resources())
+                    errors.put(resource, config);
+                return new DescribeConfigsResponse(throttleTimeMs, errors);
+            default:
+                throw new IllegalArgumentException(String.format("Version %d 
is not valid. Valid versions for %s are 0 to %d",
+                        version, this.getClass().getSimpleName(), 
ApiKeys.DESCRIBE_CONFIGS.latestVersion()));
+        }
+    }
+
+    public static DescribeConfigsRequest parse(ByteBuffer buffer, short 
version) {
+        return new 
DescribeConfigsRequest(ApiKeys.DESCRIBE_CONFIGS.parseRequest(version, buffer), 
version);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
new file mode 100644
index 0000000..05bf88d
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.types.Struct;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DescribeConfigsResponse extends AbstractResponse {
+
+    private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms";
+
+    private static final String RESOURCES_KEY_NAME = "resources";
+
+    private static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
+    private static final String RESOURCE_NAME_KEY_NAME = "resource_name";
+
+    private static final String CONFIG_ENTRIES_KEY_NAME = "config_entries";
+
+    private static final String CONFIG_NAME = "config_name";
+    private static final String CONFIG_VALUE = "config_value";
+    private static final String IS_SENSITIVE = "is_sensitive";
+    private static final String IS_DEFAULT = "is_default";
+    private static final String READ_ONLY = "read_only";
+
+    public static class Config {
+        private final ApiError error;
+        private final Collection<ConfigEntry> entries;
+
+        public Config(ApiError error, Collection<ConfigEntry> entries) {
+            this.error = error;
+            this.entries = entries;
+        }
+
+        public ApiError error() {
+            return error;
+        }
+
+        public Collection<ConfigEntry> entries() {
+            return entries;
+        }
+    }
+
+    public static class ConfigEntry {
+        private final String name;
+        private final String value;
+        private final boolean isSensitive;
+        private final boolean isDefault;
+        private final boolean readOnly;
+
+        public ConfigEntry(String name, String value, boolean isSensitive, 
boolean isDefault, boolean readOnly) {
+            this.name = name;
+            this.value = value;
+            this.isSensitive = isSensitive;
+            this.isDefault = isDefault;
+            this.readOnly = readOnly;
+        }
+
+        public String name() {
+            return name;
+        }
+
+        public String value() {
+            return value;
+        }
+
+        public boolean isSensitive() {
+            return isSensitive;
+        }
+
+        public boolean isDefault() {
+            return isDefault;
+        }
+
+        public boolean isReadOnly() {
+            return readOnly;
+        }
+    }
+
+    private final int throttleTimeMs;
+    private final Map<Resource, Config> configs;
+
+    public DescribeConfigsResponse(int throttleTimeMs, Map<Resource, Config> 
configs) {
+        this.throttleTimeMs = throttleTimeMs;
+        this.configs = configs;
+    }
+
+    public DescribeConfigsResponse(Struct struct) {
+        throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME);
+        Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME);
+        configs = new HashMap<>(resourcesArray.length);
+        for (Object resourceObj : resourcesArray) {
+            Struct resourceStruct = (Struct) resourceObj;
+
+            ApiError error = new ApiError(resourceStruct);
+            ResourceType resourceType = 
ResourceType.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME));
+            String resourceName = 
resourceStruct.getString(RESOURCE_NAME_KEY_NAME);
+            Resource resource = new Resource(resourceType, resourceName);
+
+            Object[] configEntriesArray = 
resourceStruct.getArray(CONFIG_ENTRIES_KEY_NAME);
+            List<ConfigEntry> configEntries = new 
ArrayList<>(configEntriesArray.length);
+            for (Object configEntriesObj: configEntriesArray) {
+                Struct configEntriesStruct = (Struct) configEntriesObj;
+                String configName = configEntriesStruct.getString(CONFIG_NAME);
+                String configValue = 
configEntriesStruct.getString(CONFIG_VALUE);
+                boolean isSensitive = 
configEntriesStruct.getBoolean(IS_SENSITIVE);
+                boolean isDefault = configEntriesStruct.getBoolean(IS_DEFAULT);
+                boolean readOnly = configEntriesStruct.getBoolean(READ_ONLY);
+                configEntries.add(new ConfigEntry(configName, configValue, 
isSensitive, isDefault, readOnly));
+            }
+            Config config = new Config(error, configEntries);
+            configs.put(resource, config);
+        }
+    }
+
+    public Map<Resource, Config> configs() {
+        return configs;
+    }
+
+    public Config config(Resource resource) {
+        return configs.get(resource);
+    }
+
+    public int throttleTimeMs() {
+        return throttleTimeMs;
+    }
+
+    @Override
+    protected Struct toStruct(short version) {
+        Struct struct = new 
Struct(ApiKeys.DESCRIBE_CONFIGS.responseSchema(version));
+        struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs);
+        List<Struct> resourceStructs = new ArrayList<>(configs.size());
+        for (Map.Entry<Resource, Config> entry : configs.entrySet()) {
+            Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME);
+
+            Resource resource = entry.getKey();
+            resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id());
+            resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name());
+
+            Config config = entry.getValue();
+            config.error.write(resourceStruct);
+
+            List<Struct> configEntryStructs = new 
ArrayList<>(config.entries.size());
+            for (ConfigEntry configEntry : config.entries) {
+                Struct configEntriesStruct = 
resourceStruct.instance(CONFIG_ENTRIES_KEY_NAME);
+                configEntriesStruct.set(CONFIG_NAME, configEntry.name);
+                configEntriesStruct.set(CONFIG_VALUE, configEntry.value);
+                configEntriesStruct.set(IS_SENSITIVE, configEntry.isSensitive);
+                configEntriesStruct.set(IS_DEFAULT, configEntry.isDefault);
+                configEntriesStruct.set(READ_ONLY, configEntry.readOnly);
+                configEntryStructs.add(configEntriesStruct);
+            }
+            resourceStruct.set(CONFIG_ENTRIES_KEY_NAME, 
configEntryStructs.toArray(new Struct[0]));
+            
+            resourceStructs.add(resourceStruct);
+        }
+        struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0]));
+        return struct;
+    }
+
+    public static DescribeConfigsResponse parse(ByteBuffer buffer, short 
version) {
+        return new 
DescribeConfigsResponse(ApiKeys.DESCRIBE_CONFIGS.parseResponse(version, 
buffer));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/Resource.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/Resource.java 
b/clients/src/main/java/org/apache/kafka/common/requests/Resource.java
new file mode 100644
index 0000000..6a360a5
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/Resource.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+public final class Resource {
+    private final ResourceType type;
+    private final String name;
+
+    public Resource(ResourceType type, String name) {
+        this.type = type;
+        this.name = name;
+    }
+
+    public ResourceType type() {
+        return type;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o)
+            return true;
+        if (o == null || getClass() != o.getClass())
+            return false;
+
+        Resource resource = (Resource) o;
+
+        return type == resource.type && name.equals(resource.name);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = type.hashCode();
+        result = 31 * result + name.hashCode();
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "Resource(type=" + type + ", name='" + name + "'}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/ResourceType.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/ResourceType.java 
b/clients/src/main/java/org/apache/kafka/common/requests/ResourceType.java
new file mode 100644
index 0000000..2c11772
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ResourceType.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.requests;
+
+public enum ResourceType {
+    UNKNOWN((byte) 0), ANY((byte) 1), TOPIC((byte) 2), GROUP((byte) 3), 
BROKER((byte) 4);
+
+    private static final ResourceType[] VALUES = values();
+
+    private final byte id;
+
+    ResourceType(byte id) {
+        this.id = id;
+    }
+
+    public byte id() {
+        return id;
+    }
+
+    public static ResourceType forId(byte id) {
+        if (id < 0)
+            throw new IllegalArgumentException("id should be positive, id: " + 
id);
+        if (id >= VALUES.length)
+            return UNKNOWN;
+        return VALUES[id];
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java 
b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
index 4ff0dc6..e623d73 100644
--- 
a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
+++ 
b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java
@@ -106,7 +106,7 @@ public interface CreateTopicPolicy extends Configurable, 
AutoCloseable {
 
         @Override
         public String toString() {
-            return "RequestMetadata(topic=" + topic +
+            return "CreateTopicPolicy.RequestMetadata(topic=" + topic +
                     ", numPartitions=" + numPartitions +
                     ", replicationFactor=" + replicationFactor +
                     ", replicasAssignments=" + replicasAssignments +
@@ -116,12 +116,12 @@ public interface CreateTopicPolicy extends Configurable, 
AutoCloseable {
 
     /**
      * Validate the request parameters and throw a 
<code>PolicyViolationException</code> with a suitable error
-     * message if the create request parameters for the provided topic do not 
satisfy this policy.
+     * message if the create topics request parameters for the provided topic 
do not satisfy this policy.
      *
      * Clients will receive the POLICY_VIOLATION error code along with the 
exception's message. Note that validation
      * failure only affects the relevant topic, other topics in the request 
will still be processed.
      *
-     * @param requestMetadata the create request parameters for the provided 
topic.
+     * @param requestMetadata the create topics request parameters for the 
provided topic.
      * @throws PolicyViolationException if the request parameters do not 
satisfy this policy.
      */
     void validate(RequestMetadata requestMetadata) throws 
PolicyViolationException;

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
index 2d7c546..06ace63 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java
@@ -45,7 +45,9 @@ public class AclOperationTest {
         new AclOperationTestInfo(AclOperation.DELETE, 6, "delete", false),
         new AclOperationTestInfo(AclOperation.ALTER, 7, "alter", false),
         new AclOperationTestInfo(AclOperation.DESCRIBE, 8, "describe", false),
-        new AclOperationTestInfo(AclOperation.CLUSTER_ACTION, 9, 
"cluster_action", false)
+        new AclOperationTestInfo(AclOperation.CLUSTER_ACTION, 9, 
"cluster_action", false),
+        new AclOperationTestInfo(AclOperation.DESCRIBE_CONFIGS, 10, 
"describe_configs", false),
+        new AclOperationTestInfo(AclOperation.ALTER_CONFIGS, 11, 
"alter_configs", false)
     };
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 6d01b0a..e432c0a 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -27,9 +27,9 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.errors.SecurityDisabledException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.requests.CreateAclsResponse;
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse;
-import org.apache.kafka.common.requests.CreateTopicsResponse.Error;
 import org.apache.kafka.common.requests.CreateTopicsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse;
 import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult;
@@ -60,8 +60,7 @@ import static org.junit.Assert.fail;
 /**
  * A unit test for KafkaAdminClient.
  *
- * See for an integration test of the KafkaAdminClient.
- * Also see KafkaAdminClientIntegrationTest for a unit test of the admin 
client.
+ * See KafkaAdminClientIntegrationTest for an integration test of the 
KafkaAdminClient.
  */
 public class KafkaAdminClientTest {
     @Rule
@@ -160,8 +159,7 @@ public class KafkaAdminClientTest {
 
     @Test
     public void testCloseAdminClient() throws Exception {
-        try (MockKafkaAdminClientContext ctx = new 
MockKafkaAdminClientContext(newStrMap())) {
-        }
+        new MockKafkaAdminClientContext(newStrMap()).close();
     }
 
     private static void assertFutureError(Future<?> future, Class<? extends 
Throwable> exceptionClass)
@@ -186,12 +184,12 @@ public class KafkaAdminClientTest {
             AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) {
             ctx.mockClient.setNodeApiVersions(NodeApiVersions.create());
             ctx.mockClient.setNode(new Node(0, "localhost", 8121));
-            ctx.mockClient.prepareResponse(new CreateTopicsResponse(new 
HashMap<String, Error>() {{
-                    put("myTopic", new Error(Errors.NONE, ""));
+            ctx.mockClient.prepareResponse(new CreateTopicsResponse(new 
HashMap<String, ApiError>() {{
+                    put("myTopic", new ApiError(Errors.NONE, ""));
                 }}));
             KafkaFuture<Void> future = ctx.client.
                 createTopics(Collections.singleton(new NewTopic("myTopic", new 
HashMap<Integer, List<Integer>>() {{
-                        put(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 
1, 2}));
+                        put(0, Arrays.asList(new Integer[]{0, 1, 2}));
                     }})), new CreateTopicsOptions().timeoutMs(1000)).all();
             assertFutureError(future, TimeoutException.class);
         }
@@ -203,12 +201,12 @@ public class KafkaAdminClientTest {
             ctx.mockClient.setNodeApiVersions(NodeApiVersions.create());
             ctx.mockClient.prepareMetadataUpdate(ctx.cluster, 
Collections.<String>emptySet());
             ctx.mockClient.setNode(ctx.nodes.get(0));
-            ctx.mockClient.prepareResponse(new CreateTopicsResponse(new 
HashMap<String, Error>() {{
-                    put("myTopic", new Error(Errors.NONE, ""));
+            ctx.mockClient.prepareResponse(new CreateTopicsResponse(new 
HashMap<String, ApiError>() {{
+                    put("myTopic", new ApiError(Errors.NONE, ""));
                 }}));
             KafkaFuture<Void> future = ctx.client.
                 createTopics(Collections.singleton(new NewTopic("myTopic", new 
HashMap<Integer, List<Integer>>() {{
-                        put(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 
1, 2}));
+                        put(0, Arrays.asList(new Integer[]{0, 1, 2}));
                     }})), new CreateTopicsOptions().timeoutMs(10000)).all();
             future.get();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java 
b/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java
index 8f6f670..af72de2 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java
@@ -40,7 +40,8 @@ public class ResourceTypeTest {
         new AclResourceTypeTestInfo(ResourceType.ANY, 1, "any", false),
         new AclResourceTypeTestInfo(ResourceType.TOPIC, 2, "topic", false),
         new AclResourceTypeTestInfo(ResourceType.GROUP, 3, "group", false),
-        new AclResourceTypeTestInfo(ResourceType.CLUSTER, 4, "cluster", false)
+        new AclResourceTypeTestInfo(ResourceType.CLUSTER, 4, "cluster", false),
+        new AclResourceTypeTestInfo(ResourceType.BROKER, 5, "broker", false)
     };
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index ede55a5..9142c90 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -60,6 +60,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.GatheringByteChannel;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -230,6 +231,13 @@ public class RequestResponseTest {
         checkRequest(createDeleteAclsRequest());
         checkErrorResponse(createDeleteAclsRequest(), new 
SecurityDisabledException("Security is not enabled."));
         checkResponse(createDeleteAclsResponse(), 
ApiKeys.DELETE_ACLS.latestVersion());
+        checkRequest(createAlterConfigsRequest());
+        checkErrorResponse(createAlterConfigsRequest(), new 
UnknownServerException());
+        checkResponse(createAlterConfigsResponse(), 0);
+        checkRequest(createDescribeConfigsRequest());
+        checkRequest(createDescribeConfigsRequestWithConfigEntries());
+        checkErrorResponse(createDescribeConfigsRequest(), new 
UnknownServerException());
+        checkResponse(createDescribeConfigsResponse(), 0);
     }
 
     @Test
@@ -887,9 +895,9 @@ public class RequestResponseTest {
     }
 
     private CreateTopicsResponse createCreateTopicResponse() {
-        Map<String, CreateTopicsResponse.Error> errors = new HashMap<>();
-        errors.put("t1", new 
CreateTopicsResponse.Error(Errors.INVALID_TOPIC_EXCEPTION, null));
-        errors.put("t2", new 
CreateTopicsResponse.Error(Errors.LEADER_NOT_AVAILABLE, "Leader with id 5 is 
not available."));
+        Map<String, ApiError> errors = new HashMap<>();
+        errors.put("t1", new ApiError(Errors.INVALID_TOPIC_EXCEPTION, null));
+        errors.put("t2", new ApiError(Errors.LEADER_NOT_AVAILABLE, "Leader 
with id 5 is not available."));
         return new CreateTopicsResponse(errors);
     }
 
@@ -1085,4 +1093,50 @@ public class RequestResponseTest {
             closed = true;
         }
     }
+
+    private DescribeConfigsRequest createDescribeConfigsRequest() {
+        return new DescribeConfigsRequest.Builder(asList(
+                new 
org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER,
 "0"),
+                new 
org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC,
 "topic"))).build((short) 0);
+    }
+
+    private DescribeConfigsRequest 
createDescribeConfigsRequestWithConfigEntries() {
+        Map<org.apache.kafka.common.requests.Resource, Collection<String>> 
resources = new HashMap<>();
+        resources.put(new 
org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER,
 "0"), asList("foo", "bar"));
+        resources.put(new 
org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC,
 "topic"), null);
+        resources.put(new 
org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC,
 "topic a"), Collections.<String>emptyList());
+        return new DescribeConfigsRequest.Builder(resources).build((short) 0);
+    }
+
+    private DescribeConfigsResponse createDescribeConfigsResponse() {
+        Map<org.apache.kafka.common.requests.Resource, 
DescribeConfigsResponse.Config> configs = new HashMap<>();
+        List<DescribeConfigsResponse.ConfigEntry> configEntries = asList(
+                new DescribeConfigsResponse.ConfigEntry("config_name", 
"config_value", false, true, false),
+                new DescribeConfigsResponse.ConfigEntry("another_name", 
"another value", true, false, true)
+        );
+        configs.put(new 
org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER,
 "0"), new DescribeConfigsResponse.Config(
+                new ApiError(Errors.NONE, null), configEntries));
+        configs.put(new 
org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC,
 "topic"), new DescribeConfigsResponse.Config(
+                new ApiError(Errors.NONE, null), 
Collections.<DescribeConfigsResponse.ConfigEntry>emptyList()));
+        return new DescribeConfigsResponse(200, configs);
+    }
+
+    private AlterConfigsRequest createAlterConfigsRequest() {
+        Map<org.apache.kafka.common.requests.Resource, 
AlterConfigsRequest.Config> configs = new HashMap<>();
+        List<AlterConfigsRequest.ConfigEntry> configEntries = asList(
+                new AlterConfigsRequest.ConfigEntry("config_name", 
"config_value"),
+                new AlterConfigsRequest.ConfigEntry("another_name", "another 
value")
+        );
+        configs.put(new 
org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER,
 "0"), new AlterConfigsRequest.Config(configEntries));
+        configs.put(new 
org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC,
 "topic"),
+                new 
AlterConfigsRequest.Config(Collections.<AlterConfigsRequest.ConfigEntry>emptyList()));
+        return new AlterConfigsRequest((short) 0, configs, false);
+    }
+
+    private AlterConfigsResponse createAlterConfigsResponse() {
+        Map<org.apache.kafka.common.requests.Resource, ApiError> errors = new 
HashMap<>();
+        errors.put(new 
org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER,
 "0"), new ApiError(Errors.NONE, null));
+        errors.put(new 
org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC,
 "topic"), new ApiError(Errors.INVALID_REQUEST, "This request is invalid"));
+        return new AlterConfigsResponse(20, errors);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/admin/AclCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala 
b/core/src/main/scala/kafka/admin/AclCommand.scala
index 0bedee3..925c407 100644
--- a/core/src/main/scala/kafka/admin/AclCommand.scala
+++ b/core/src/main/scala/kafka/admin/AclCommand.scala
@@ -31,9 +31,10 @@ object AclCommand {
 
   val Newline = scala.util.Properties.lineSeparator
   val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] (
-    Topic -> Set(Read, Write, Describe, All, Delete),
+    Broker -> Set(DescribeConfigs),
+    Topic -> Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, 
All),
     Group -> Set(Read, Describe, All),
-    Cluster -> Set(Create, ClusterAction, All)
+    Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, All)
   )
 
   def main(args: Array[String]) {
@@ -237,6 +238,9 @@ object AclCommand {
     if (opts.options.has(opts.groupOpt))
       opts.options.valuesOf(opts.groupOpt).asScala.foreach(group => resources 
+= new Resource(Group, group.trim))
 
+    if (opts.options.has(opts.brokerOpt))
+      opts.options.valuesOf(opts.brokerOpt).asScala.foreach(broker => 
resources += new Resource(Broker, broker.toString))
+
     if (resources.isEmpty && dieIfNoResourceFound)
       CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at 
least one resource: --topic <topic> or --cluster or --group <group>")
 
@@ -285,6 +289,12 @@ object AclCommand {
       .describedAs("group")
       .ofType(classOf[String])
 
+    val brokerOpt = parser.accepts("broker", "broker to which the ACLs should 
be added or removed. " +
+      "A value of * indicates the ACLs should apply to all brokers.")
+      .withRequiredArg
+      .describedAs("broker")
+      .ofType(classOf[Int])
+
     val addOpt = parser.accepts("add", "Indicates you are trying to add ACLs.")
     val removeOpt = parser.accepts("remove", "Indicates you are trying to 
remove ACLs.")
     val listOpt = parser.accepts("list", "List ACLs for the specified 
resource, use --topic <topic> or --group <group> or --cluster to specify a 
resource.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala 
b/core/src/main/scala/kafka/admin/AdminUtils.scala
index 49d249b..bd8771b 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -46,6 +46,26 @@ trait AdminUtilities {
   def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: 
Properties)
   def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: 
String, configs: Properties)
   def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configs: 
Properties)
+
+  def changeConfigs(zkUtils: ZkUtils, entityType: String, entityName: String, 
configs: Properties): Unit = {
+
+    def parseBroker(broker: String): Int = {
+      try broker.toInt
+      catch {
+        case _: NumberFormatException =>
+          throw new IllegalArgumentException(s"Error parsing broker $broker. 
The broker's Entity Name must be a single integer value")
+      }
+    }
+
+    entityType match {
+      case ConfigType.Topic => changeTopicConfig(zkUtils, entityName, configs)
+      case ConfigType.Client => changeClientIdConfig(zkUtils, entityName, 
configs)
+      case ConfigType.User => changeUserOrUserClientIdConfig(zkUtils, 
entityName, configs)
+      case ConfigType.Broker => changeBrokerConfig(zkUtils, 
Seq(parseBroker(entityName)), configs)
+      case _ => throw new IllegalArgumentException(s"$entityType is not a 
known entityType. Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, 
${ConfigType.Broker}")
+    }
+  }
+
   def fetchEntityConfig(zkUtils: ZkUtils,entityType: String, entityName: 
String): Properties
 }
 
@@ -527,6 +547,14 @@ object AdminUtils extends Logging with AdminUtilities {
     changeEntityConfig(zkUtils, ConfigType.User, sanitizedEntityName, configs)
   }
 
+  def validateTopicConfig(zkUtils: ZkUtils, topic: String, configs: 
Properties): Unit = {
+    Topic.validate(topic)
+    if (!topicExists(zkUtils, topic))
+      throw new AdminOperationException("Topic \"%s\" does not 
exist.".format(topic))
+    // remove the topic overrides
+    LogConfig.validate(configs)
+  }
+
   /**
    * Update the config for an existing topic and create a change notification 
so the change will propagate to other brokers
    *
@@ -537,10 +565,7 @@ object AdminUtils extends Logging with AdminUtilities {
    *
    */
   def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties) {
-    if (!topicExists(zkUtils, topic))
-      throw new AdminOperationException("Topic \"%s\" does not 
exist.".format(topic))
-    // remove the topic overrides
-    LogConfig.validate(configs)
+    validateTopicConfig(zkUtils, topic, configs)
     changeEntityConfig(zkUtils, ConfigType.Topic, topic, configs)
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/admin/ConfigCommand.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala 
b/core/src/main/scala/kafka/admin/ConfigCommand.scala
index 3985490..f74d31d 100644
--- a/core/src/main/scala/kafka/admin/ConfigCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala
@@ -98,13 +98,8 @@ object ConfigCommand extends Config {
     configs.putAll(configsToBeAdded)
     configsToBeDeleted.foreach(configs.remove(_))
 
-    entityType match {
-      case ConfigType.Topic => utils.changeTopicConfig(zkUtils, entityName, 
configs)
-      case ConfigType.Client => utils.changeClientIdConfig(zkUtils, 
entityName, configs)
-      case ConfigType.User => utils.changeUserOrUserClientIdConfig(zkUtils, 
entityName, configs)
-      case ConfigType.Broker => utils.changeBrokerConfig(zkUtils, 
Seq(parseBroker(entityName)), configs)
-      case _ => throw new IllegalArgumentException(s"$entityType is not a 
known entityType. Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, 
${ConfigType.Broker}")
-    }
+    utils.changeConfigs(zkUtils, entityType, entityName, configs)
+
     println(s"Completed Updating config for entity: $entity.")
   }
 
@@ -129,14 +124,6 @@ object ConfigCommand extends Config {
     }
   }
 
-  private def parseBroker(broker: String): Int = {
-    try broker.toInt
-    catch {
-      case _: NumberFormatException =>
-        throw new IllegalArgumentException(s"Error parsing broker $broker. The 
broker's Entity Name must be a single integer value")
-    }
-  }
-
   private def describeConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions) {
     val configEntity = parseEntity(opts)
     val describeAllUsers = configEntity.root.entityType == ConfigType.User && 
!configEntity.root.sanitizedName.isDefined && !configEntity.child.isDefined

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/log/LogConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala 
b/core/src/main/scala/kafka/log/LogConfig.scala
index 30bc26b..6a329d8 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -321,7 +321,7 @@ object LogConfig {
     val names = configNames
     for(name <- props.asScala.keys)
       if (!names.contains(name))
-        throw new InvalidConfigurationException(s"Unknown Log configuration 
$name.")
+        throw new InvalidConfigurationException(s"Unknown topic config name: 
$name")
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 0b094df..d8cdf90 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -44,7 +44,7 @@ import scala.collection._
  */
 @threadsafe
 class LogManager(val logDirs: Array[File],
-                 val topicConfigs: Map[String, LogConfig],
+                 val topicConfigs: Map[String, LogConfig], // note that this 
doesn't get updated after creation
                  val defaultConfig: LogConfig,
                  val cleanerConfig: CleanerConfig,
                  ioThreads: Int,

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/security/auth/Operation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/Operation.scala 
b/core/src/main/scala/kafka/security/auth/Operation.scala
index 7d292d2..f65d9f0 100644
--- a/core/src/main/scala/kafka/security/auth/Operation.scala
+++ b/core/src/main/scala/kafka/security/auth/Operation.scala
@@ -57,16 +57,25 @@ case object ClusterAction extends Operation {
   val name = "ClusterAction"
   val toJava = AclOperation.CLUSTER_ACTION
 }
+case object DescribeConfigs extends Operation {
+  val name = "DescribeConfigs"
+  val toJava = AclOperation.DESCRIBE_CONFIGS
+}
+case object AlterConfigs extends Operation {
+  val name = "AlterConfigs"
+  val toJava = AclOperation.ALTER_CONFIGS
+}
 case object All extends Operation {
   val name = "All"
   val toJava = AclOperation.ALL
 }
 
 object Operation {
-   def fromString(operation: String): Operation = {
-      val op = values.find(op => op.name.equalsIgnoreCase(operation))
-      op.getOrElse(throw new KafkaException(operation + " not a valid 
operation name. The valid names are " + values.mkString(",")))
-   }
+
+  def fromString(operation: String): Operation = {
+    val op = values.find(op => op.name.equalsIgnoreCase(operation))
+    op.getOrElse(throw new KafkaException(operation + " not a valid operation 
name. The valid names are " + values.mkString(",")))
+  }
 
   def fromJava(operation: AclOperation): Try[Operation] = {
     try {
@@ -76,5 +85,6 @@ object Operation {
     }
   }
 
-   def values: Seq[Operation] = List(Read, Write, Create, Delete, Alter, 
Describe, ClusterAction, All)
+  def values: Seq[Operation] = List(Read, Write, Create, Delete, Alter, 
Describe, ClusterAction, AlterConfigs,
+     DescribeConfigs, All)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/security/auth/ResourceType.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala 
b/core/src/main/scala/kafka/security/auth/ResourceType.scala
index e58d8ec..ea7ce3c 100644
--- a/core/src/main/scala/kafka/security/auth/ResourceType.scala
+++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala
@@ -31,6 +31,11 @@ case object Cluster extends ResourceType {
   val error = Errors.CLUSTER_AUTHORIZATION_FAILED
 }
 
+case object Broker extends ResourceType {
+  val name = "Broker"
+  val error = Errors.BROKER_AUTHORIZATION_FAILED
+}
+
 case object Topic extends ResourceType {
   val name = "Topic"
   val error = Errors.TOPIC_AUTHORIZATION_FAILED
@@ -58,5 +63,5 @@ object ResourceType {
     rType.getOrElse(throw new KafkaException(resourceType + " not a valid 
resourceType name. The valid names are " + values.mkString(",")))
   }
 
-  def values: Seq[ResourceType] = List(Cluster, Topic, Group, 
ProducerTransactionalId, ProducerIdResource)
+  def values: Seq[ResourceType] = List(Cluster, Topic, Group, 
ProducerTransactionalId, ProducerIdResource, Broker)
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala 
b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index eaacd6a..19fbdc4 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -252,7 +252,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
   /**
     * Safely updates the resources ACLs by ensuring reads and writes respect 
the expected zookeeper version.
-    * Continues to retry until it succesfully updates zookeeper.
+    * Continues to retry until it successfully updates zookeeper.
     *
     * Returns a boolean indicating if the content of the ACLs was actually 
changed.
     *

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/server/AdminManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala 
b/core/src/main/scala/kafka/server/AdminManager.scala
index 2f60cbd..c147593 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -16,18 +16,20 @@
   */
 package kafka.server
 
-import java.util.Properties
+import java.util.{Collections, Properties}
 
 import kafka.admin.AdminUtils
 import kafka.common.TopicAlreadyMarkedForDeletionException
 import kafka.log.LogConfig
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils._
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException}
 import org.apache.kafka.common.errors.{ApiException, InvalidRequestException, 
PolicyViolationException}
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.CreateTopicsRequest._
-import org.apache.kafka.common.requests.CreateTopicsResponse
+import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, 
DescribeConfigsResponse, Resource, ResourceType}
 import org.apache.kafka.server.policy.CreateTopicPolicy
 import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
 
@@ -63,7 +65,7 @@ class AdminManager(val config: KafkaConfig,
   def createTopics(timeout: Int,
                    validateOnly: Boolean,
                    createInfo: Map[String, TopicDetails],
-                   responseCallback: Map[String, CreateTopicsResponse.Error] 
=> Unit) {
+                   responseCallback: Map[String, ApiError] => Unit) {
 
     // 1. map over topics creating assignment and calling zookeeper
     val brokers = metadataCache.getAliveBrokers.map { b => 
kafka.admin.BrokerMetadata(b.id, b.rack) }
@@ -114,15 +116,15 @@ class AdminManager(val config: KafkaConfig,
             else
               
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, 
assignments, configs, update = false)
         }
-        CreateTopicMetadata(topic, assignments, new 
CreateTopicsResponse.Error(Errors.NONE, null))
+        CreateTopicMetadata(topic, assignments, new ApiError(Errors.NONE, 
null))
       } catch {
         // Log client errors at a lower level than unexpected exceptions
         case e@ (_: PolicyViolationException | _: ApiException) =>
           info(s"Error processing create topic request for topic $topic with 
arguments $arguments", e)
-          CreateTopicMetadata(topic, Map(), new 
CreateTopicsResponse.Error(Errors.forException(e), e.getMessage))
+          CreateTopicMetadata(topic, Map(), ApiError.fromThrowable(e))
         case e: Throwable =>
           error(s"Error processing create topic request for topic $topic with 
arguments $arguments", e)
-          CreateTopicMetadata(topic, Map(), new 
CreateTopicsResponse.Error(Errors.forException(e), e.getMessage))
+          CreateTopicMetadata(topic, Map(), ApiError.fromThrowable(e))
       }
     }
 
@@ -131,7 +133,7 @@ class AdminManager(val config: KafkaConfig,
       val results = metadata.map { createTopicMetadata =>
         // ignore topics that already have errors
         if (createTopicMetadata.error.is(Errors.NONE) && !validateOnly) {
-          (createTopicMetadata.topic, new 
CreateTopicsResponse.Error(Errors.REQUEST_TIMED_OUT, null))
+          (createTopicMetadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, 
null))
         } else {
           (createTopicMetadata.topic, createTopicMetadata.error)
         }
@@ -189,6 +191,99 @@ class AdminManager(val config: KafkaConfig,
     }
   }
 
+  def describeConfigs(resourceToConfigNames: Map[Resource, 
Option[Set[String]]]): Map[Resource, DescribeConfigsResponse.Config] = {
+    resourceToConfigNames.map { case (resource, configNames) =>
+
+      def createResponseConfig(config: AbstractConfig, isReadOnly: Boolean, 
isDefault: String => Boolean): DescribeConfigsResponse.Config = {
+        val filteredConfigPairs = config.values.asScala.filter { case 
(configName, _) =>
+          /* Always returns true if configNames is None */
+          configNames.map(_.contains(configName)).getOrElse(true)
+        }.toIndexedSeq
+
+        val configEntries = filteredConfigPairs.map { case (name, value) =>
+          val configEntryType = config.typeOf(name)
+          val isSensitive = configEntryType == ConfigDef.Type.PASSWORD
+          val valueAsString =
+            if (isSensitive) null
+            else ConfigDef.convertToString(value, configEntryType)
+          new DescribeConfigsResponse.ConfigEntry(name, valueAsString, 
isSensitive, isDefault(name), isReadOnly)
+        }
+
+        new DescribeConfigsResponse.Config(new ApiError(Errors.NONE, null), 
configEntries.asJava)
+      }
+
+      try {
+        val resourceConfig = resource.`type` match {
+
+          case ResourceType.TOPIC =>
+            val topic = resource.name
+            Topic.validate(topic)
+            // Consider optimizing this by caching the configs or retrieving 
them from the `Log` when possible
+            val topicProps = AdminUtils.fetchEntityConfig(zkUtils, 
ConfigType.Topic, topic)
+            val logConfig = 
LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps)
+            createResponseConfig(logConfig, isReadOnly = false, name => 
!topicProps.containsKey(name))
+
+          case ResourceType.BROKER =>
+            val brokerId = try resource.name.toInt catch {
+              case _: NumberFormatException =>
+                throw new InvalidRequestException(s"Broker id must be an 
integer, but it is: ${resource.name}")
+            }
+            if (brokerId == config.brokerId)
+              createResponseConfig(config, isReadOnly = true, name => 
!config.originals.containsKey(name))
+            else
+              throw new InvalidRequestException(s"Unexpected broker id, 
expected ${config.brokerId}, but received $brokerId")
+
+          case resourceType => throw new InvalidRequestException(s"Unsupported 
resource type: $resourceType")
+        }
+        resource -> resourceConfig
+      } catch {
+        case e: Throwable =>
+          // Log client errors at a lower level than unexpected exceptions
+          val message = s"Error processing describe configs request for 
resource $resource"
+          if (e.isInstanceOf[ApiException])
+            info(message, e)
+          else
+            error(message, e)
+          resource -> new 
DescribeConfigsResponse.Config(ApiError.fromThrowable(e), 
Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
+      }
+    }.toMap
+  }
+
+  def alterConfigs(configs: Map[Resource, AlterConfigsRequest.Config], 
validateOnly: Boolean): Map[Resource, ApiError] = {
+    configs.map { case (resource, config) =>
+      try {
+        resource.`type` match {
+          case ResourceType.TOPIC =>
+            val topic = resource.name
+            val properties = new Properties
+            config.entries.asScala.foreach { configEntry =>
+              properties.setProperty(configEntry.name(), configEntry.value())
+            }
+            if (validateOnly)
+              AdminUtils.validateTopicConfig(zkUtils, topic, properties)
+            else
+              AdminUtils.changeTopicConfig(zkUtils, topic, properties)
+            resource -> new ApiError(Errors.NONE, null)
+          case resourceType =>
+            throw new InvalidRequestException(s"AlterConfigs is only supported 
for topics, but resource type is $resourceType")
+        }
+      } catch {
+        case e: ConfigException =>
+          val message = s"Invalid config value for resource $resource: 
${e.getMessage}"
+          info(message)
+          resource -> ApiError.fromThrowable(new 
InvalidRequestException(message, e))
+        case e: Throwable =>
+          // Log client errors at a lower level than unexpected exceptions
+          val message = s"Error processing alter configs request for resource 
$resource"
+          if (e.isInstanceOf[ApiException])
+            info(message, e)
+          else
+            error(message, e)
+          resource -> ApiError.fromThrowable(e)
+      }
+    }.toMap
+  }
+
   def shutdown() {
     topicPurgatory.shutdown()
     CoreUtils.swallow(createTopicPolicy.foreach(_.close()))

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedCreateTopics.scala 
b/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
index 32f844c..abf6bc0 100644
--- a/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
+++ b/core/src/main/scala/kafka/server/DelayedCreateTopics.scala
@@ -19,7 +19,7 @@ package kafka.server
 
 import kafka.api.LeaderAndIsr
 import org.apache.kafka.common.protocol.Errors
-import org.apache.kafka.common.requests.CreateTopicsResponse
+import org.apache.kafka.common.requests.{ApiError, CreateTopicsResponse}
 
 import scala.collection._
 
@@ -29,7 +29,7 @@ import scala.collection._
   * TODO: local state doesn't count, need to know state of all relevant brokers
   *
   */
-case class CreateTopicMetadata(topic: String, replicaAssignments: Map[Int, 
Seq[Int]], error: CreateTopicsResponse.Error)
+case class CreateTopicMetadata(topic: String, replicaAssignments: Map[Int, 
Seq[Int]], error: ApiError)
 
 /**
   * A delayed create topics operation that can be created by the admin manager 
and watched
@@ -38,7 +38,7 @@ case class CreateTopicMetadata(topic: String, 
replicaAssignments: Map[Int, Seq[I
 class DelayedCreateTopics(delayMs: Long,
                           createMetadata: Seq[CreateTopicMetadata],
                           adminManager: AdminManager,
-                          responseCallback: Map[String, 
CreateTopicsResponse.Error] => Unit)
+                          responseCallback: Map[String, ApiError] => Unit)
   extends DelayedOperation(delayMs) {
 
   /**
@@ -70,7 +70,7 @@ class DelayedCreateTopics(delayMs: Long,
     val results = createMetadata.map { metadata =>
       // ignore topics that already have errors
       if (metadata.error.is(Errors.NONE) && missingLeaderCount(metadata.topic, 
metadata.replicaAssignments.keySet) > 0)
-        (metadata.topic, new 
CreateTopicsResponse.Error(Errors.REQUEST_TIMED_OUT, null))
+        (metadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null))
       else
         (metadata.topic, metadata.error)
     }.toMap

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index bf7d4c1..02a1103 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -43,10 +43,10 @@ import 
org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol}
-import org.apache.kafka.common.record._
+import org.apache.kafka.common.record.{ControlRecordType, 
EndTransactionMarker, MemoryRecords, RecordBatch}
 import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse
 import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, 
AclFilterResponse}
-import org.apache.kafka.common.requests._
+import org.apache.kafka.common.requests.{Resource => RResource, ResourceType 
=> RResourceType, _}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{Node, TopicPartition}
@@ -126,6 +126,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
         case ApiKeys.CREATE_ACLS => handleCreateAcls(request)
         case ApiKeys.DELETE_ACLS => handleDeleteAcls(request)
+        case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request)
+        case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
       }
     } catch {
       case e: FatalExitError => throw e
@@ -1266,7 +1268,7 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleCreateTopicsRequest(request: RequestChannel.Request) {
     val createTopicsRequest = request.body[CreateTopicsRequest]
 
-    def sendResponseCallback(results: Map[String, 
CreateTopicsResponse.Error]): Unit = {
+    def sendResponseCallback(results: Map[String, ApiError]): Unit = {
       def createResponse(throttleTimeMs: Int): AbstractResponse = {
         val responseBody = new CreateTopicsResponse(throttleTimeMs, 
results.asJava)
         trace(s"Sending create topics response $responseBody for correlation 
id ${request.header.correlationId} to client ${request.header.clientId}.")
@@ -1277,12 +1279,12 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     if (!controller.isActive) {
       val results = createTopicsRequest.topics.asScala.map { case (topic, _) =>
-        (topic, new CreateTopicsResponse.Error(Errors.NOT_CONTROLLER, null))
+        (topic, new ApiError(Errors.NOT_CONTROLLER, null))
       }
       sendResponseCallback(results)
     } else if (!authorize(request.session, Create, Resource.ClusterResource)) {
       val results = createTopicsRequest.topics.asScala.map { case (topic, _) =>
-        (topic, new 
CreateTopicsResponse.Error(Errors.CLUSTER_AUTHORIZATION_FAILED, null))
+        (topic, new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null))
       }
       sendResponseCallback(results)
     } else {
@@ -1291,7 +1293,7 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
       // Special handling to add duplicate topics to the response
-      def sendResponseWithDuplicatesCallback(results: Map[String, 
CreateTopicsResponse.Error]): Unit = {
+      def sendResponseWithDuplicatesCallback(results: Map[String, ApiError]): 
Unit = {
 
         val duplicatedTopicsResults =
           if (duplicateTopics.nonEmpty) {
@@ -1300,7 +1302,7 @@ class KafkaApis(val requestChannel: RequestChannel,
             // We can send the error message in the response for version 1, so 
we don't have to log it any more
             if (request.header.apiVersion == 0)
               warn(errorMessage)
-            duplicateTopics.keySet.map((_, new 
CreateTopicsResponse.Error(Errors.INVALID_REQUEST, errorMessage))).toMap
+            duplicateTopics.keySet.map((_, new 
ApiError(Errors.INVALID_REQUEST, errorMessage))).toMap
           } else Map.empty
 
         val completeResults = results ++ duplicatedTopicsResults
@@ -1894,11 +1896,11 @@ class KafkaApis(val requestChannel: RequestChannel,
       }
 
       if (mayThrottle) {
-        val clientId : String =
-          if (request.requestObj.isInstanceOf[ControlledShutdownRequest])
-            
request.requestObj.asInstanceOf[ControlledShutdownRequest].clientId.getOrElse("")
-          else
+        val clientId: String = request.requestObj match {
+          case r: ControlledShutdownRequest => r.clientId.getOrElse("")
+          case _ =>
             throw new IllegalStateException("Old style requests should only be 
used for ControlledShutdownRequest")
+        }
         sendResponseMaybeThrottle(request, clientId, sendResponseCallback)
       } else
         sendResponseExemptThrottle(request, () => sendResponseCallback(0))
@@ -1920,6 +1922,64 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
+  def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = {
+    val alterConfigsRequest = request.body[AlterConfigsRequest]
+    val (authorizedResources, unauthorizedResources) = 
alterConfigsRequest.configs.asScala.partition { case (resource, _) =>
+      resource.`type` match {
+        case RResourceType.BROKER =>
+          authorize(request.session, AlterConfigs, new Resource(Broker, 
resource.name)) ||
+            authorize(request.session, AlterConfigs, Resource.ClusterResource)
+        case RResourceType.TOPIC =>
+          authorize(request.session, AlterConfigs, new Resource(Topic, 
resource.name)) ||
+            authorize(request.session, AlterConfigs, Resource.ClusterResource)
+        case rt => throw new InvalidRequestException(s"Unexpected resource 
type $rt")
+      }
+    }
+    val authorizedResult = adminManager.alterConfigs(authorizedResources, 
alterConfigsRequest.validateOnly)
+    val unauthorizedResult = unauthorizedResources.keys.map { resource =>
+      resource -> configsAuthorizationApiError(request.session, resource)
+    }
+    sendResponseMaybeThrottle(request, new AlterConfigsResponse(_, 
(authorizedResult ++ unauthorizedResult).asJava))
+  }
+
+  private def configsAuthorizationApiError(session: RequestChannel.Session, 
resource: RResource): ApiError = {
+    val error = resource.`type` match {
+      case RResourceType.BROKER => Errors.BROKER_AUTHORIZATION_FAILED
+      case RResourceType.TOPIC =>
+        // Don't leak topic name unless the user has describe topic permission
+        if (authorize(session, Describe, new Resource(Topic, resource.name)))
+          Errors.TOPIC_AUTHORIZATION_FAILED
+        else
+          Errors.UNKNOWN_TOPIC_OR_PARTITION
+      case rt => throw new InvalidRequestException(s"Unexpected resource type 
$rt for resource ${resource.name}")
+    }
+    new ApiError(error, null)
+  }
+
+  def handleDescribeConfigsRequest(request: RequestChannel.Request): Unit = {
+    val describeConfigsRequest = request.body[DescribeConfigsRequest]
+    val (authorizedResources, unauthorizedResources) = 
describeConfigsRequest.resources.asScala.partition { resource =>
+      resource.`type` match {
+        case RResourceType.BROKER =>
+          authorize(request.session, DescribeConfigs, new Resource(Broker, 
resource.name)) ||
+            authorize(request.session, DescribeConfigs, 
Resource.ClusterResource)
+        case RResourceType.TOPIC =>
+          authorize(request.session, DescribeConfigs, new Resource(Topic, 
resource.name)) ||
+            authorize(request.session, DescribeConfigs, 
Resource.ClusterResource)
+        case rt => throw new InvalidRequestException(s"Unexpected resource 
type $rt for resource ${resource.name}")
+      }
+    }
+    val authorizedConfigs = 
adminManager.describeConfigs(authorizedResources.map { resource =>
+      resource -> 
Option(describeConfigsRequest.configNames(resource)).map(_.asScala.toSet)
+    }.toMap)
+    val unauthorizedConfigs = unauthorizedResources.map { resource =>
+      val error = configsAuthorizationApiError(request.session, resource)
+      resource -> new DescribeConfigsResponse.Config(error, 
Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
+    }
+
+    sendResponseMaybeThrottle(request, new DescribeConfigsResponse(_, 
(authorizedConfigs ++ unauthorizedConfigs).asJava))
+  }
+
   def authorizeClusterAction(request: RequestChannel.Request): Unit = {
     if (!authorize(request.session, ClusterAction, Resource.ClusterResource))
       throw new ClusterAuthorizationException(s"Request $request is not 
authorized.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 788f718..94dfa43 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -24,20 +24,18 @@ import java.util.concurrent._
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
 
 import com.yammer.metrics.core.Gauge
-import kafka.admin.AdminUtils
 import kafka.api.KAFKA_0_9_0
 import kafka.cluster.Broker
 import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
-import kafka.controller.{ControllerStats, KafkaController}
+import kafka.controller.KafkaController
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
-import kafka.log.{CleanerConfig, LogConfig, LogManager}
+import kafka.log.{LogConfig, LogManager}
 import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsReporter}
 import kafka.network.{BlockingChannel, SocketServer}
 import kafka.security.CredentialProvider
 import kafka.security.auth.Authorizer
 import kafka.utils._
-import org.I0Itec.zkclient.ZkClient
 import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, 
NetworkClient, NetworkClientUtils}
 import org.apache.kafka.common.internals.ClusterResourceListeners
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}

Reply via email to