http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java new file mode 100644 index 0000000..26034eb --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiError.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.errors.ApiException; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; + +/** + * Encapsulates an error code (via the Errors enum) and an optional message. Generally, the optional message is only + * defined if it adds information over the default message associated with the error code. + * + * This is an internal class (like every class in the requests package). + */ +public class ApiError { + + private static final String CODE_KEY_NAME = "error_code"; + private static final String MESSAGE_KEY_NAME = "error_message"; + + private final Errors error; + private final String message; + + public static ApiError fromThrowable(Throwable t) { + // Avoid populating the error message if it's a generic one + Errors error = Errors.forException(t); + String message = error.message().equals(t.getMessage()) ? null : t.getMessage(); + return new ApiError(error, message); + } + + public ApiError(Struct struct) { + error = Errors.forCode(struct.getShort(CODE_KEY_NAME)); + // In some cases, the error message field was introduced in newer version + if (struct.hasField(MESSAGE_KEY_NAME)) + message = struct.getString(MESSAGE_KEY_NAME); + else + message = null; + } + + public ApiError(Errors error, String message) { + this.error = error; + this.message = message; + } + + public void write(Struct struct) { + struct.set(CODE_KEY_NAME, error.code()); + // In some cases, the error message field was introduced in a newer protocol API version + if (struct.hasField(MESSAGE_KEY_NAME) && message != null && error != Errors.NONE) + struct.set(MESSAGE_KEY_NAME, message); + } + + public boolean is(Errors error) { + return this.error == error; + } + + public Errors error() { + return error; + } + + /** + * Return the optional error message or null. Consider using {@link #messageWithFallback()} instead. + */ + public String message() { + return message; + } + + /** + * If `message` is defined, return it. Otherwise fallback to the default error message associated with the error + * code. + */ + public String messageWithFallback() { + if (message == null) + return error.message(); + return message; + } + + public ApiException exception() { + return error.exception(message); + } + + @Override + public String toString() { + return "ApiError(error=" + error + ", message=" + message + ")"; + } +}
http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java index a0626cc..def4c85 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsRequest.java @@ -18,7 +18,6 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -42,9 +41,9 @@ public class CreateTopicsRequest extends AbstractRequest { private static final String REPLICA_ASSIGNMENT_PARTITION_ID_KEY_NAME = "partition_id"; private static final String REPLICA_ASSIGNMENT_REPLICAS_KEY_NAME = "replicas"; - private static final String CONFIG_KEY_KEY_NAME = "config_key"; + private static final String CONFIG_KEY_KEY_NAME = "config_name"; private static final String CONFIG_VALUE_KEY_NAME = "config_value"; - private static final String CONFIGS_KEY_NAME = "configs"; + private static final String CONFIGS_KEY_NAME = "config_entries"; public static final class TopicDetails { public final int numPartitions; @@ -210,12 +209,9 @@ public class CreateTopicsRequest extends AbstractRequest { @Override public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - Map<String, CreateTopicsResponse.Error> topicErrors = new HashMap<>(); + Map<String, ApiError> topicErrors = new HashMap<>(); for (String topic : topics.keySet()) { - Errors error = Errors.forException(e); - // Avoid populating the error message if it's a generic one - String message = error.message().equals(e.getMessage()) ? null : e.getMessage(); - topicErrors.put(topic, new CreateTopicsResponse.Error(error, message)); + topicErrors.put(topic, ApiError.fromThrowable(e)); } short versionId = version(); http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java index 2c2b2dd..e46e7a1 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateTopicsResponse.java @@ -17,9 +17,7 @@ package org.apache.kafka.common.requests; -import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.protocol.types.Struct; import java.nio.ByteBuffer; @@ -32,45 +30,6 @@ public class CreateTopicsResponse extends AbstractResponse { private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; private static final String TOPIC_ERRORS_KEY_NAME = "topic_errors"; private static final String TOPIC_KEY_NAME = "topic"; - private static final String ERROR_CODE_KEY_NAME = "error_code"; - private static final String ERROR_MESSAGE_KEY_NAME = "error_message"; - - public static class Error { - private final Errors error; - private final String message; // introduced in V1 - - public Error(Errors error, String message) { - this.error = error; - this.message = message; - } - - public boolean is(Errors error) { - return this.error == error; - } - - public Errors error() { - return error; - } - - public String message() { - return message; - } - - public String messageWithFallback() { - if (message == null) - return error.message(); - return message; - } - - public ApiException exception() { - return error.exception(message); - } - - @Override - public String toString() { - return "Error(error=" + error + ", message=" + message + ")"; - } - } /** * Possible error codes: @@ -87,29 +46,25 @@ public class CreateTopicsResponse extends AbstractResponse { * INVALID_REQUEST(42) */ - private final Map<String, Error> errors; + private final Map<String, ApiError> errors; private final int throttleTimeMs; - public CreateTopicsResponse(Map<String, Error> errors) { + public CreateTopicsResponse(Map<String, ApiError> errors) { this(DEFAULT_THROTTLE_TIME, errors); } - public CreateTopicsResponse(int throttleTimeMs, Map<String, Error> errors) { + public CreateTopicsResponse(int throttleTimeMs, Map<String, ApiError> errors) { this.throttleTimeMs = throttleTimeMs; this.errors = errors; } public CreateTopicsResponse(Struct struct) { Object[] topicErrorStructs = struct.getArray(TOPIC_ERRORS_KEY_NAME); - Map<String, Error> errors = new HashMap<>(); + Map<String, ApiError> errors = new HashMap<>(); for (Object topicErrorStructObj : topicErrorStructs) { - Struct topicErrorCodeStruct = (Struct) topicErrorStructObj; - String topic = topicErrorCodeStruct.getString(TOPIC_KEY_NAME); - Errors error = Errors.forCode(topicErrorCodeStruct.getShort(ERROR_CODE_KEY_NAME)); - String errorMessage = null; - if (topicErrorCodeStruct.hasField(ERROR_MESSAGE_KEY_NAME)) - errorMessage = topicErrorCodeStruct.getString(ERROR_MESSAGE_KEY_NAME); - errors.put(topic, new Error(error, errorMessage)); + Struct topicErrorStruct = (Struct) topicErrorStructObj; + String topic = topicErrorStruct.getString(TOPIC_KEY_NAME); + errors.put(topic, new ApiError(topicErrorStruct)); } this.throttleTimeMs = struct.hasField(THROTTLE_TIME_KEY_NAME) ? struct.getInt(THROTTLE_TIME_KEY_NAME) : DEFAULT_THROTTLE_TIME; @@ -123,13 +78,10 @@ public class CreateTopicsResponse extends AbstractResponse { struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); List<Struct> topicErrorsStructs = new ArrayList<>(errors.size()); - for (Map.Entry<String, Error> topicError : errors.entrySet()) { + for (Map.Entry<String, ApiError> topicError : errors.entrySet()) { Struct topicErrorsStruct = struct.instance(TOPIC_ERRORS_KEY_NAME); topicErrorsStruct.set(TOPIC_KEY_NAME, topicError.getKey()); - Error error = topicError.getValue(); - topicErrorsStruct.set(ERROR_CODE_KEY_NAME, error.error.code()); - if (version >= 1) - topicErrorsStruct.set(ERROR_MESSAGE_KEY_NAME, error.message()); + topicError.getValue().write(topicErrorsStruct); topicErrorsStructs.add(topicErrorsStruct); } struct.set(TOPIC_ERRORS_KEY_NAME, topicErrorsStructs.toArray()); @@ -140,7 +92,7 @@ public class CreateTopicsResponse extends AbstractResponse { return throttleTimeMs; } - public Map<String, Error> errors() { + public Map<String, ApiError> errors() { return errors; } http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java new file mode 100644 index 0000000..64fae0e --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DescribeConfigsRequest extends AbstractRequest { + + private static final String RESOURCES_KEY_NAME = "resources"; + private static final String RESOURCE_TYPE_KEY_NAME = "resource_type"; + private static final String RESOURCE_NAME_KEY_NAME = "resource_name"; + private static final String CONFIG_NAMES_KEY_NAME = "config_names"; + + public static class Builder extends AbstractRequest.Builder { + private final Map<Resource, Collection<String>> resourceToConfigNames; + + public Builder(Map<Resource, Collection<String>> resourceToConfigNames) { + super(ApiKeys.DESCRIBE_CONFIGS); + this.resourceToConfigNames = resourceToConfigNames; + } + + public Builder(Collection<Resource> resources) { + this(toResourceToConfigNames(resources)); + } + + private static Map<Resource, Collection<String>> toResourceToConfigNames(Collection<Resource> resources) { + Map<Resource, Collection<String>> result = new HashMap<>(resources.size()); + for (Resource resource : resources) + result.put(resource, null); + return result; + } + + @Override + public DescribeConfigsRequest build(short version) { + return new DescribeConfigsRequest(version, resourceToConfigNames); + } + } + + private final Map<Resource, Collection<String>> resourceToConfigNames; + + public DescribeConfigsRequest(short version, Map<Resource, Collection<String>> resourceToConfigNames) { + super(version); + this.resourceToConfigNames = resourceToConfigNames; + + } + + public DescribeConfigsRequest(Struct struct, short version) { + super(version); + Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME); + resourceToConfigNames = new HashMap<>(resourcesArray.length); + for (Object resourceObj : resourcesArray) { + Struct resourceStruct = (Struct) resourceObj; + ResourceType resourceType = ResourceType.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME)); + String resourceName = resourceStruct.getString(RESOURCE_NAME_KEY_NAME); + + Object[] configNamesArray = resourceStruct.getArray(CONFIG_NAMES_KEY_NAME); + List<String> configNames = null; + if (configNamesArray != null) { + configNames = new ArrayList<>(configNamesArray.length); + for (Object configNameObj : configNamesArray) + configNames.add((String) configNameObj); + } + + resourceToConfigNames.put(new Resource(resourceType, resourceName), configNames); + } + } + + public Collection<Resource> resources() { + return resourceToConfigNames.keySet(); + } + + /** + * Return null if all config names should be returned. + */ + public Collection<String> configNames(Resource resource) { + return resourceToConfigNames.get(resource); + } + + @Override + protected Struct toStruct() { + Struct struct = new Struct(ApiKeys.DESCRIBE_CONFIGS.requestSchema(version())); + List<Struct> resourceStructs = new ArrayList<>(resources().size()); + for (Map.Entry<Resource, Collection<String>> entry : resourceToConfigNames.entrySet()) { + Resource resource = entry.getKey(); + Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME); + resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id()); + resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name()); + + String[] configNames = entry.getValue() == null ? null : entry.getValue().toArray(new String[0]); + resourceStruct.set(CONFIG_NAMES_KEY_NAME, configNames); + + resourceStructs.add(resourceStruct); + } + struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0])); + return struct; + } + + @Override + public DescribeConfigsResponse getErrorResponse(int throttleTimeMs, Throwable e) { + short version = version(); + switch (version) { + case 0: + ApiError error = ApiError.fromThrowable(e); + Map<Resource, DescribeConfigsResponse.Config> errors = new HashMap<>(resources().size()); + DescribeConfigsResponse.Config config = new DescribeConfigsResponse.Config(error, + Collections.<DescribeConfigsResponse.ConfigEntry>emptyList()); + for (Resource resource : resources()) + errors.put(resource, config); + return new DescribeConfigsResponse(throttleTimeMs, errors); + default: + throw new IllegalArgumentException(String.format("Version %d is not valid. Valid versions for %s are 0 to %d", + version, this.getClass().getSimpleName(), ApiKeys.DESCRIBE_CONFIGS.latestVersion())); + } + } + + public static DescribeConfigsRequest parse(ByteBuffer buffer, short version) { + return new DescribeConfigsRequest(ApiKeys.DESCRIBE_CONFIGS.parseRequest(version, buffer), version); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java new file mode 100644 index 0000000..05bf88d --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.types.Struct; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DescribeConfigsResponse extends AbstractResponse { + + private static final String THROTTLE_TIME_KEY_NAME = "throttle_time_ms"; + + private static final String RESOURCES_KEY_NAME = "resources"; + + private static final String RESOURCE_TYPE_KEY_NAME = "resource_type"; + private static final String RESOURCE_NAME_KEY_NAME = "resource_name"; + + private static final String CONFIG_ENTRIES_KEY_NAME = "config_entries"; + + private static final String CONFIG_NAME = "config_name"; + private static final String CONFIG_VALUE = "config_value"; + private static final String IS_SENSITIVE = "is_sensitive"; + private static final String IS_DEFAULT = "is_default"; + private static final String READ_ONLY = "read_only"; + + public static class Config { + private final ApiError error; + private final Collection<ConfigEntry> entries; + + public Config(ApiError error, Collection<ConfigEntry> entries) { + this.error = error; + this.entries = entries; + } + + public ApiError error() { + return error; + } + + public Collection<ConfigEntry> entries() { + return entries; + } + } + + public static class ConfigEntry { + private final String name; + private final String value; + private final boolean isSensitive; + private final boolean isDefault; + private final boolean readOnly; + + public ConfigEntry(String name, String value, boolean isSensitive, boolean isDefault, boolean readOnly) { + this.name = name; + this.value = value; + this.isSensitive = isSensitive; + this.isDefault = isDefault; + this.readOnly = readOnly; + } + + public String name() { + return name; + } + + public String value() { + return value; + } + + public boolean isSensitive() { + return isSensitive; + } + + public boolean isDefault() { + return isDefault; + } + + public boolean isReadOnly() { + return readOnly; + } + } + + private final int throttleTimeMs; + private final Map<Resource, Config> configs; + + public DescribeConfigsResponse(int throttleTimeMs, Map<Resource, Config> configs) { + this.throttleTimeMs = throttleTimeMs; + this.configs = configs; + } + + public DescribeConfigsResponse(Struct struct) { + throttleTimeMs = struct.getInt(THROTTLE_TIME_KEY_NAME); + Object[] resourcesArray = struct.getArray(RESOURCES_KEY_NAME); + configs = new HashMap<>(resourcesArray.length); + for (Object resourceObj : resourcesArray) { + Struct resourceStruct = (Struct) resourceObj; + + ApiError error = new ApiError(resourceStruct); + ResourceType resourceType = ResourceType.forId(resourceStruct.getByte(RESOURCE_TYPE_KEY_NAME)); + String resourceName = resourceStruct.getString(RESOURCE_NAME_KEY_NAME); + Resource resource = new Resource(resourceType, resourceName); + + Object[] configEntriesArray = resourceStruct.getArray(CONFIG_ENTRIES_KEY_NAME); + List<ConfigEntry> configEntries = new ArrayList<>(configEntriesArray.length); + for (Object configEntriesObj: configEntriesArray) { + Struct configEntriesStruct = (Struct) configEntriesObj; + String configName = configEntriesStruct.getString(CONFIG_NAME); + String configValue = configEntriesStruct.getString(CONFIG_VALUE); + boolean isSensitive = configEntriesStruct.getBoolean(IS_SENSITIVE); + boolean isDefault = configEntriesStruct.getBoolean(IS_DEFAULT); + boolean readOnly = configEntriesStruct.getBoolean(READ_ONLY); + configEntries.add(new ConfigEntry(configName, configValue, isSensitive, isDefault, readOnly)); + } + Config config = new Config(error, configEntries); + configs.put(resource, config); + } + } + + public Map<Resource, Config> configs() { + return configs; + } + + public Config config(Resource resource) { + return configs.get(resource); + } + + public int throttleTimeMs() { + return throttleTimeMs; + } + + @Override + protected Struct toStruct(short version) { + Struct struct = new Struct(ApiKeys.DESCRIBE_CONFIGS.responseSchema(version)); + struct.set(THROTTLE_TIME_KEY_NAME, throttleTimeMs); + List<Struct> resourceStructs = new ArrayList<>(configs.size()); + for (Map.Entry<Resource, Config> entry : configs.entrySet()) { + Struct resourceStruct = struct.instance(RESOURCES_KEY_NAME); + + Resource resource = entry.getKey(); + resourceStruct.set(RESOURCE_TYPE_KEY_NAME, resource.type().id()); + resourceStruct.set(RESOURCE_NAME_KEY_NAME, resource.name()); + + Config config = entry.getValue(); + config.error.write(resourceStruct); + + List<Struct> configEntryStructs = new ArrayList<>(config.entries.size()); + for (ConfigEntry configEntry : config.entries) { + Struct configEntriesStruct = resourceStruct.instance(CONFIG_ENTRIES_KEY_NAME); + configEntriesStruct.set(CONFIG_NAME, configEntry.name); + configEntriesStruct.set(CONFIG_VALUE, configEntry.value); + configEntriesStruct.set(IS_SENSITIVE, configEntry.isSensitive); + configEntriesStruct.set(IS_DEFAULT, configEntry.isDefault); + configEntriesStruct.set(READ_ONLY, configEntry.readOnly); + configEntryStructs.add(configEntriesStruct); + } + resourceStruct.set(CONFIG_ENTRIES_KEY_NAME, configEntryStructs.toArray(new Struct[0])); + + resourceStructs.add(resourceStruct); + } + struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0])); + return struct; + } + + public static DescribeConfigsResponse parse(ByteBuffer buffer, short version) { + return new DescribeConfigsResponse(ApiKeys.DESCRIBE_CONFIGS.parseResponse(version, buffer)); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/Resource.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/Resource.java b/clients/src/main/java/org/apache/kafka/common/requests/Resource.java new file mode 100644 index 0000000..6a360a5 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/Resource.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +public final class Resource { + private final ResourceType type; + private final String name; + + public Resource(ResourceType type, String name) { + this.type = type; + this.name = name; + } + + public ResourceType type() { + return type; + } + + public String name() { + return name; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + Resource resource = (Resource) o; + + return type == resource.type && name.equals(resource.name); + } + + @Override + public int hashCode() { + int result = type.hashCode(); + result = 31 * result + name.hashCode(); + return result; + } + + @Override + public String toString() { + return "Resource(type=" + type + ", name='" + name + "'}"; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/common/requests/ResourceType.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ResourceType.java b/clients/src/main/java/org/apache/kafka/common/requests/ResourceType.java new file mode 100644 index 0000000..2c11772 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/requests/ResourceType.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.requests; + +public enum ResourceType { + UNKNOWN((byte) 0), ANY((byte) 1), TOPIC((byte) 2), GROUP((byte) 3), BROKER((byte) 4); + + private static final ResourceType[] VALUES = values(); + + private final byte id; + + ResourceType(byte id) { + this.id = id; + } + + public byte id() { + return id; + } + + public static ResourceType forId(byte id) { + if (id < 0) + throw new IllegalArgumentException("id should be positive, id: " + id); + if (id >= VALUES.length) + return UNKNOWN; + return VALUES[id]; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java index 4ff0dc6..e623d73 100644 --- a/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java +++ b/clients/src/main/java/org/apache/kafka/server/policy/CreateTopicPolicy.java @@ -106,7 +106,7 @@ public interface CreateTopicPolicy extends Configurable, AutoCloseable { @Override public String toString() { - return "RequestMetadata(topic=" + topic + + return "CreateTopicPolicy.RequestMetadata(topic=" + topic + ", numPartitions=" + numPartitions + ", replicationFactor=" + replicationFactor + ", replicasAssignments=" + replicasAssignments + @@ -116,12 +116,12 @@ public interface CreateTopicPolicy extends Configurable, AutoCloseable { /** * Validate the request parameters and throw a <code>PolicyViolationException</code> with a suitable error - * message if the create request parameters for the provided topic do not satisfy this policy. + * message if the create topics request parameters for the provided topic do not satisfy this policy. * * Clients will receive the POLICY_VIOLATION error code along with the exception's message. Note that validation * failure only affects the relevant topic, other topics in the request will still be processed. * - * @param requestMetadata the create request parameters for the provided topic. + * @param requestMetadata the create topics request parameters for the provided topic. * @throws PolicyViolationException if the request parameters do not satisfy this policy. */ void validate(RequestMetadata requestMetadata) throws PolicyViolationException; http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java index 2d7c546..06ace63 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/AclOperationTest.java @@ -45,7 +45,9 @@ public class AclOperationTest { new AclOperationTestInfo(AclOperation.DELETE, 6, "delete", false), new AclOperationTestInfo(AclOperation.ALTER, 7, "alter", false), new AclOperationTestInfo(AclOperation.DESCRIBE, 8, "describe", false), - new AclOperationTestInfo(AclOperation.CLUSTER_ACTION, 9, "cluster_action", false) + new AclOperationTestInfo(AclOperation.CLUSTER_ACTION, 9, "cluster_action", false), + new AclOperationTestInfo(AclOperation.DESCRIBE_CONFIGS, 10, "describe_configs", false), + new AclOperationTestInfo(AclOperation.ALTER_CONFIGS, 11, "alter_configs", false) }; @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 6d01b0a..e432c0a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -27,9 +27,9 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.ApiError; import org.apache.kafka.common.requests.CreateAclsResponse; import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse; -import org.apache.kafka.common.requests.CreateTopicsResponse.Error; import org.apache.kafka.common.requests.CreateTopicsResponse; import org.apache.kafka.common.requests.DeleteAclsResponse; import org.apache.kafka.common.requests.DeleteAclsResponse.AclDeletionResult; @@ -60,8 +60,7 @@ import static org.junit.Assert.fail; /** * A unit test for KafkaAdminClient. * - * See for an integration test of the KafkaAdminClient. - * Also see KafkaAdminClientIntegrationTest for a unit test of the admin client. + * See KafkaAdminClientIntegrationTest for an integration test of the KafkaAdminClient. */ public class KafkaAdminClientTest { @Rule @@ -160,8 +159,7 @@ public class KafkaAdminClientTest { @Test public void testCloseAdminClient() throws Exception { - try (MockKafkaAdminClientContext ctx = new MockKafkaAdminClientContext(newStrMap())) { - } + new MockKafkaAdminClientContext(newStrMap()).close(); } private static void assertFutureError(Future<?> future, Class<? extends Throwable> exceptionClass) @@ -186,12 +184,12 @@ public class KafkaAdminClientTest { AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10"))) { ctx.mockClient.setNodeApiVersions(NodeApiVersions.create()); ctx.mockClient.setNode(new Node(0, "localhost", 8121)); - ctx.mockClient.prepareResponse(new CreateTopicsResponse(new HashMap<String, Error>() {{ - put("myTopic", new Error(Errors.NONE, "")); + ctx.mockClient.prepareResponse(new CreateTopicsResponse(new HashMap<String, ApiError>() {{ + put("myTopic", new ApiError(Errors.NONE, "")); }})); KafkaFuture<Void> future = ctx.client. createTopics(Collections.singleton(new NewTopic("myTopic", new HashMap<Integer, List<Integer>>() {{ - put(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 2})); + put(0, Arrays.asList(new Integer[]{0, 1, 2})); }})), new CreateTopicsOptions().timeoutMs(1000)).all(); assertFutureError(future, TimeoutException.class); } @@ -203,12 +201,12 @@ public class KafkaAdminClientTest { ctx.mockClient.setNodeApiVersions(NodeApiVersions.create()); ctx.mockClient.prepareMetadataUpdate(ctx.cluster, Collections.<String>emptySet()); ctx.mockClient.setNode(ctx.nodes.get(0)); - ctx.mockClient.prepareResponse(new CreateTopicsResponse(new HashMap<String, Error>() {{ - put("myTopic", new Error(Errors.NONE, "")); + ctx.mockClient.prepareResponse(new CreateTopicsResponse(new HashMap<String, ApiError>() {{ + put("myTopic", new ApiError(Errors.NONE, "")); }})); KafkaFuture<Void> future = ctx.client. createTopics(Collections.singleton(new NewTopic("myTopic", new HashMap<Integer, List<Integer>>() {{ - put(Integer.valueOf(0), Arrays.asList(new Integer[]{0, 1, 2})); + put(0, Arrays.asList(new Integer[]{0, 1, 2})); }})), new CreateTopicsOptions().timeoutMs(10000)).all(); future.get(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java index 8f6f670..af72de2 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/ResourceTypeTest.java @@ -40,7 +40,8 @@ public class ResourceTypeTest { new AclResourceTypeTestInfo(ResourceType.ANY, 1, "any", false), new AclResourceTypeTestInfo(ResourceType.TOPIC, 2, "topic", false), new AclResourceTypeTestInfo(ResourceType.GROUP, 3, "group", false), - new AclResourceTypeTestInfo(ResourceType.CLUSTER, 4, "cluster", false) + new AclResourceTypeTestInfo(ResourceType.CLUSTER, 4, "cluster", false), + new AclResourceTypeTestInfo(ResourceType.BROKER, 5, "broker", false) }; @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index ede55a5..9142c90 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -60,6 +60,7 @@ import java.nio.ByteBuffer; import java.nio.channels.GatheringByteChannel; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -230,6 +231,13 @@ public class RequestResponseTest { checkRequest(createDeleteAclsRequest()); checkErrorResponse(createDeleteAclsRequest(), new SecurityDisabledException("Security is not enabled.")); checkResponse(createDeleteAclsResponse(), ApiKeys.DELETE_ACLS.latestVersion()); + checkRequest(createAlterConfigsRequest()); + checkErrorResponse(createAlterConfigsRequest(), new UnknownServerException()); + checkResponse(createAlterConfigsResponse(), 0); + checkRequest(createDescribeConfigsRequest()); + checkRequest(createDescribeConfigsRequestWithConfigEntries()); + checkErrorResponse(createDescribeConfigsRequest(), new UnknownServerException()); + checkResponse(createDescribeConfigsResponse(), 0); } @Test @@ -887,9 +895,9 @@ public class RequestResponseTest { } private CreateTopicsResponse createCreateTopicResponse() { - Map<String, CreateTopicsResponse.Error> errors = new HashMap<>(); - errors.put("t1", new CreateTopicsResponse.Error(Errors.INVALID_TOPIC_EXCEPTION, null)); - errors.put("t2", new CreateTopicsResponse.Error(Errors.LEADER_NOT_AVAILABLE, "Leader with id 5 is not available.")); + Map<String, ApiError> errors = new HashMap<>(); + errors.put("t1", new ApiError(Errors.INVALID_TOPIC_EXCEPTION, null)); + errors.put("t2", new ApiError(Errors.LEADER_NOT_AVAILABLE, "Leader with id 5 is not available.")); return new CreateTopicsResponse(errors); } @@ -1085,4 +1093,50 @@ public class RequestResponseTest { closed = true; } } + + private DescribeConfigsRequest createDescribeConfigsRequest() { + return new DescribeConfigsRequest.Builder(asList( + new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), + new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"))).build((short) 0); + } + + private DescribeConfigsRequest createDescribeConfigsRequestWithConfigEntries() { + Map<org.apache.kafka.common.requests.Resource, Collection<String>> resources = new HashMap<>(); + resources.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), asList("foo", "bar")); + resources.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"), null); + resources.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic a"), Collections.<String>emptyList()); + return new DescribeConfigsRequest.Builder(resources).build((short) 0); + } + + private DescribeConfigsResponse createDescribeConfigsResponse() { + Map<org.apache.kafka.common.requests.Resource, DescribeConfigsResponse.Config> configs = new HashMap<>(); + List<DescribeConfigsResponse.ConfigEntry> configEntries = asList( + new DescribeConfigsResponse.ConfigEntry("config_name", "config_value", false, true, false), + new DescribeConfigsResponse.ConfigEntry("another_name", "another value", true, false, true) + ); + configs.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), new DescribeConfigsResponse.Config( + new ApiError(Errors.NONE, null), configEntries)); + configs.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"), new DescribeConfigsResponse.Config( + new ApiError(Errors.NONE, null), Collections.<DescribeConfigsResponse.ConfigEntry>emptyList())); + return new DescribeConfigsResponse(200, configs); + } + + private AlterConfigsRequest createAlterConfigsRequest() { + Map<org.apache.kafka.common.requests.Resource, AlterConfigsRequest.Config> configs = new HashMap<>(); + List<AlterConfigsRequest.ConfigEntry> configEntries = asList( + new AlterConfigsRequest.ConfigEntry("config_name", "config_value"), + new AlterConfigsRequest.ConfigEntry("another_name", "another value") + ); + configs.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), new AlterConfigsRequest.Config(configEntries)); + configs.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"), + new AlterConfigsRequest.Config(Collections.<AlterConfigsRequest.ConfigEntry>emptyList())); + return new AlterConfigsRequest((short) 0, configs, false); + } + + private AlterConfigsResponse createAlterConfigsResponse() { + Map<org.apache.kafka.common.requests.Resource, ApiError> errors = new HashMap<>(); + errors.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), new ApiError(Errors.NONE, null)); + errors.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"), new ApiError(Errors.INVALID_REQUEST, "This request is invalid")); + return new AlterConfigsResponse(20, errors); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/admin/AclCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AclCommand.scala b/core/src/main/scala/kafka/admin/AclCommand.scala index 0bedee3..925c407 100644 --- a/core/src/main/scala/kafka/admin/AclCommand.scala +++ b/core/src/main/scala/kafka/admin/AclCommand.scala @@ -31,9 +31,10 @@ object AclCommand { val Newline = scala.util.Properties.lineSeparator val ResourceTypeToValidOperations = Map[ResourceType, Set[Operation]] ( - Topic -> Set(Read, Write, Describe, All, Delete), + Broker -> Set(DescribeConfigs), + Topic -> Set(Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, All), Group -> Set(Read, Describe, All), - Cluster -> Set(Create, ClusterAction, All) + Cluster -> Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, All) ) def main(args: Array[String]) { @@ -237,6 +238,9 @@ object AclCommand { if (opts.options.has(opts.groupOpt)) opts.options.valuesOf(opts.groupOpt).asScala.foreach(group => resources += new Resource(Group, group.trim)) + if (opts.options.has(opts.brokerOpt)) + opts.options.valuesOf(opts.brokerOpt).asScala.foreach(broker => resources += new Resource(Broker, broker.toString)) + if (resources.isEmpty && dieIfNoResourceFound) CommandLineUtils.printUsageAndDie(opts.parser, "You must provide at least one resource: --topic <topic> or --cluster or --group <group>") @@ -285,6 +289,12 @@ object AclCommand { .describedAs("group") .ofType(classOf[String]) + val brokerOpt = parser.accepts("broker", "broker to which the ACLs should be added or removed. " + + "A value of * indicates the ACLs should apply to all brokers.") + .withRequiredArg + .describedAs("broker") + .ofType(classOf[Int]) + val addOpt = parser.accepts("add", "Indicates you are trying to add ACLs.") val removeOpt = parser.accepts("remove", "Indicates you are trying to remove ACLs.") val listOpt = parser.accepts("list", "List ACLs for the specified resource, use --topic <topic> or --group <group> or --cluster to specify a resource.") http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/admin/AdminUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala b/core/src/main/scala/kafka/admin/AdminUtils.scala index 49d249b..bd8771b 100644 --- a/core/src/main/scala/kafka/admin/AdminUtils.scala +++ b/core/src/main/scala/kafka/admin/AdminUtils.scala @@ -46,6 +46,26 @@ trait AdminUtilities { def changeClientIdConfig(zkUtils: ZkUtils, clientId: String, configs: Properties) def changeUserOrUserClientIdConfig(zkUtils: ZkUtils, sanitizedEntityName: String, configs: Properties) def changeBrokerConfig(zkUtils: ZkUtils, brokerIds: Seq[Int], configs: Properties) + + def changeConfigs(zkUtils: ZkUtils, entityType: String, entityName: String, configs: Properties): Unit = { + + def parseBroker(broker: String): Int = { + try broker.toInt + catch { + case _: NumberFormatException => + throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value") + } + } + + entityType match { + case ConfigType.Topic => changeTopicConfig(zkUtils, entityName, configs) + case ConfigType.Client => changeClientIdConfig(zkUtils, entityName, configs) + case ConfigType.User => changeUserOrUserClientIdConfig(zkUtils, entityName, configs) + case ConfigType.Broker => changeBrokerConfig(zkUtils, Seq(parseBroker(entityName)), configs) + case _ => throw new IllegalArgumentException(s"$entityType is not a known entityType. Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, ${ConfigType.Broker}") + } + } + def fetchEntityConfig(zkUtils: ZkUtils,entityType: String, entityName: String): Properties } @@ -527,6 +547,14 @@ object AdminUtils extends Logging with AdminUtilities { changeEntityConfig(zkUtils, ConfigType.User, sanitizedEntityName, configs) } + def validateTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties): Unit = { + Topic.validate(topic) + if (!topicExists(zkUtils, topic)) + throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic)) + // remove the topic overrides + LogConfig.validate(configs) + } + /** * Update the config for an existing topic and create a change notification so the change will propagate to other brokers * @@ -537,10 +565,7 @@ object AdminUtils extends Logging with AdminUtilities { * */ def changeTopicConfig(zkUtils: ZkUtils, topic: String, configs: Properties) { - if (!topicExists(zkUtils, topic)) - throw new AdminOperationException("Topic \"%s\" does not exist.".format(topic)) - // remove the topic overrides - LogConfig.validate(configs) + validateTopicConfig(zkUtils, topic, configs) changeEntityConfig(zkUtils, ConfigType.Topic, topic, configs) } http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/admin/ConfigCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 3985490..f74d31d 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -98,13 +98,8 @@ object ConfigCommand extends Config { configs.putAll(configsToBeAdded) configsToBeDeleted.foreach(configs.remove(_)) - entityType match { - case ConfigType.Topic => utils.changeTopicConfig(zkUtils, entityName, configs) - case ConfigType.Client => utils.changeClientIdConfig(zkUtils, entityName, configs) - case ConfigType.User => utils.changeUserOrUserClientIdConfig(zkUtils, entityName, configs) - case ConfigType.Broker => utils.changeBrokerConfig(zkUtils, Seq(parseBroker(entityName)), configs) - case _ => throw new IllegalArgumentException(s"$entityType is not a known entityType. Should be one of ${ConfigType.Topic}, ${ConfigType.Client}, ${ConfigType.Broker}") - } + utils.changeConfigs(zkUtils, entityType, entityName, configs) + println(s"Completed Updating config for entity: $entity.") } @@ -129,14 +124,6 @@ object ConfigCommand extends Config { } } - private def parseBroker(broker: String): Int = { - try broker.toInt - catch { - case _: NumberFormatException => - throw new IllegalArgumentException(s"Error parsing broker $broker. The broker's Entity Name must be a single integer value") - } - } - private def describeConfig(zkUtils: ZkUtils, opts: ConfigCommandOptions) { val configEntity = parseEntity(opts) val describeAllUsers = configEntity.root.entityType == ConfigType.User && !configEntity.root.sanitizedName.isDefined && !configEntity.child.isDefined http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/log/LogConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala index 30bc26b..6a329d8 100755 --- a/core/src/main/scala/kafka/log/LogConfig.scala +++ b/core/src/main/scala/kafka/log/LogConfig.scala @@ -321,7 +321,7 @@ object LogConfig { val names = configNames for(name <- props.asScala.keys) if (!names.contains(name)) - throw new InvalidConfigurationException(s"Unknown Log configuration $name.") + throw new InvalidConfigurationException(s"Unknown topic config name: $name") } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/log/LogManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 0b094df..d8cdf90 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -44,7 +44,7 @@ import scala.collection._ */ @threadsafe class LogManager(val logDirs: Array[File], - val topicConfigs: Map[String, LogConfig], + val topicConfigs: Map[String, LogConfig], // note that this doesn't get updated after creation val defaultConfig: LogConfig, val cleanerConfig: CleanerConfig, ioThreads: Int, http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/security/auth/Operation.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/Operation.scala b/core/src/main/scala/kafka/security/auth/Operation.scala index 7d292d2..f65d9f0 100644 --- a/core/src/main/scala/kafka/security/auth/Operation.scala +++ b/core/src/main/scala/kafka/security/auth/Operation.scala @@ -57,16 +57,25 @@ case object ClusterAction extends Operation { val name = "ClusterAction" val toJava = AclOperation.CLUSTER_ACTION } +case object DescribeConfigs extends Operation { + val name = "DescribeConfigs" + val toJava = AclOperation.DESCRIBE_CONFIGS +} +case object AlterConfigs extends Operation { + val name = "AlterConfigs" + val toJava = AclOperation.ALTER_CONFIGS +} case object All extends Operation { val name = "All" val toJava = AclOperation.ALL } object Operation { - def fromString(operation: String): Operation = { - val op = values.find(op => op.name.equalsIgnoreCase(operation)) - op.getOrElse(throw new KafkaException(operation + " not a valid operation name. The valid names are " + values.mkString(","))) - } + + def fromString(operation: String): Operation = { + val op = values.find(op => op.name.equalsIgnoreCase(operation)) + op.getOrElse(throw new KafkaException(operation + " not a valid operation name. The valid names are " + values.mkString(","))) + } def fromJava(operation: AclOperation): Try[Operation] = { try { @@ -76,5 +85,6 @@ object Operation { } } - def values: Seq[Operation] = List(Read, Write, Create, Delete, Alter, Describe, ClusterAction, All) + def values: Seq[Operation] = List(Read, Write, Create, Delete, Alter, Describe, ClusterAction, AlterConfigs, + DescribeConfigs, All) } http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/security/auth/ResourceType.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/ResourceType.scala b/core/src/main/scala/kafka/security/auth/ResourceType.scala index e58d8ec..ea7ce3c 100644 --- a/core/src/main/scala/kafka/security/auth/ResourceType.scala +++ b/core/src/main/scala/kafka/security/auth/ResourceType.scala @@ -31,6 +31,11 @@ case object Cluster extends ResourceType { val error = Errors.CLUSTER_AUTHORIZATION_FAILED } +case object Broker extends ResourceType { + val name = "Broker" + val error = Errors.BROKER_AUTHORIZATION_FAILED +} + case object Topic extends ResourceType { val name = "Topic" val error = Errors.TOPIC_AUTHORIZATION_FAILED @@ -58,5 +63,5 @@ object ResourceType { rType.getOrElse(throw new KafkaException(resourceType + " not a valid resourceType name. The valid names are " + values.mkString(","))) } - def values: Seq[ResourceType] = List(Cluster, Topic, Group, ProducerTransactionalId, ProducerIdResource) + def values: Seq[ResourceType] = List(Cluster, Topic, Group, ProducerTransactionalId, ProducerIdResource, Broker) } http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala index eaacd6a..19fbdc4 100644 --- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala +++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala @@ -252,7 +252,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging { /** * Safely updates the resources ACLs by ensuring reads and writes respect the expected zookeeper version. - * Continues to retry until it succesfully updates zookeeper. + * Continues to retry until it successfully updates zookeeper. * * Returns a boolean indicating if the content of the ACLs was actually changed. * http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/server/AdminManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala index 2f60cbd..c147593 100644 --- a/core/src/main/scala/kafka/server/AdminManager.scala +++ b/core/src/main/scala/kafka/server/AdminManager.scala @@ -16,18 +16,20 @@ */ package kafka.server -import java.util.Properties +import java.util.{Collections, Properties} import kafka.admin.AdminUtils import kafka.common.TopicAlreadyMarkedForDeletionException import kafka.log.LogConfig import kafka.metrics.KafkaMetricsGroup import kafka.utils._ +import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException} import org.apache.kafka.common.errors.{ApiException, InvalidRequestException, PolicyViolationException} +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.requests.CreateTopicsRequest._ -import org.apache.kafka.common.requests.CreateTopicsResponse +import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, DescribeConfigsResponse, Resource, ResourceType} import org.apache.kafka.server.policy.CreateTopicPolicy import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata @@ -63,7 +65,7 @@ class AdminManager(val config: KafkaConfig, def createTopics(timeout: Int, validateOnly: Boolean, createInfo: Map[String, TopicDetails], - responseCallback: Map[String, CreateTopicsResponse.Error] => Unit) { + responseCallback: Map[String, ApiError] => Unit) { // 1. map over topics creating assignment and calling zookeeper val brokers = metadataCache.getAliveBrokers.map { b => kafka.admin.BrokerMetadata(b.id, b.rack) } @@ -114,15 +116,15 @@ class AdminManager(val config: KafkaConfig, else AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignments, configs, update = false) } - CreateTopicMetadata(topic, assignments, new CreateTopicsResponse.Error(Errors.NONE, null)) + CreateTopicMetadata(topic, assignments, new ApiError(Errors.NONE, null)) } catch { // Log client errors at a lower level than unexpected exceptions case e@ (_: PolicyViolationException | _: ApiException) => info(s"Error processing create topic request for topic $topic with arguments $arguments", e) - CreateTopicMetadata(topic, Map(), new CreateTopicsResponse.Error(Errors.forException(e), e.getMessage)) + CreateTopicMetadata(topic, Map(), ApiError.fromThrowable(e)) case e: Throwable => error(s"Error processing create topic request for topic $topic with arguments $arguments", e) - CreateTopicMetadata(topic, Map(), new CreateTopicsResponse.Error(Errors.forException(e), e.getMessage)) + CreateTopicMetadata(topic, Map(), ApiError.fromThrowable(e)) } } @@ -131,7 +133,7 @@ class AdminManager(val config: KafkaConfig, val results = metadata.map { createTopicMetadata => // ignore topics that already have errors if (createTopicMetadata.error.is(Errors.NONE) && !validateOnly) { - (createTopicMetadata.topic, new CreateTopicsResponse.Error(Errors.REQUEST_TIMED_OUT, null)) + (createTopicMetadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null)) } else { (createTopicMetadata.topic, createTopicMetadata.error) } @@ -189,6 +191,99 @@ class AdminManager(val config: KafkaConfig, } } + def describeConfigs(resourceToConfigNames: Map[Resource, Option[Set[String]]]): Map[Resource, DescribeConfigsResponse.Config] = { + resourceToConfigNames.map { case (resource, configNames) => + + def createResponseConfig(config: AbstractConfig, isReadOnly: Boolean, isDefault: String => Boolean): DescribeConfigsResponse.Config = { + val filteredConfigPairs = config.values.asScala.filter { case (configName, _) => + /* Always returns true if configNames is None */ + configNames.map(_.contains(configName)).getOrElse(true) + }.toIndexedSeq + + val configEntries = filteredConfigPairs.map { case (name, value) => + val configEntryType = config.typeOf(name) + val isSensitive = configEntryType == ConfigDef.Type.PASSWORD + val valueAsString = + if (isSensitive) null + else ConfigDef.convertToString(value, configEntryType) + new DescribeConfigsResponse.ConfigEntry(name, valueAsString, isSensitive, isDefault(name), isReadOnly) + } + + new DescribeConfigsResponse.Config(new ApiError(Errors.NONE, null), configEntries.asJava) + } + + try { + val resourceConfig = resource.`type` match { + + case ResourceType.TOPIC => + val topic = resource.name + Topic.validate(topic) + // Consider optimizing this by caching the configs or retrieving them from the `Log` when possible + val topicProps = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic) + val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps) + createResponseConfig(logConfig, isReadOnly = false, name => !topicProps.containsKey(name)) + + case ResourceType.BROKER => + val brokerId = try resource.name.toInt catch { + case _: NumberFormatException => + throw new InvalidRequestException(s"Broker id must be an integer, but it is: ${resource.name}") + } + if (brokerId == config.brokerId) + createResponseConfig(config, isReadOnly = true, name => !config.originals.containsKey(name)) + else + throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId}, but received $brokerId") + + case resourceType => throw new InvalidRequestException(s"Unsupported resource type: $resourceType") + } + resource -> resourceConfig + } catch { + case e: Throwable => + // Log client errors at a lower level than unexpected exceptions + val message = s"Error processing describe configs request for resource $resource" + if (e.isInstanceOf[ApiException]) + info(message, e) + else + error(message, e) + resource -> new DescribeConfigsResponse.Config(ApiError.fromThrowable(e), Collections.emptyList[DescribeConfigsResponse.ConfigEntry]) + } + }.toMap + } + + def alterConfigs(configs: Map[Resource, AlterConfigsRequest.Config], validateOnly: Boolean): Map[Resource, ApiError] = { + configs.map { case (resource, config) => + try { + resource.`type` match { + case ResourceType.TOPIC => + val topic = resource.name + val properties = new Properties + config.entries.asScala.foreach { configEntry => + properties.setProperty(configEntry.name(), configEntry.value()) + } + if (validateOnly) + AdminUtils.validateTopicConfig(zkUtils, topic, properties) + else + AdminUtils.changeTopicConfig(zkUtils, topic, properties) + resource -> new ApiError(Errors.NONE, null) + case resourceType => + throw new InvalidRequestException(s"AlterConfigs is only supported for topics, but resource type is $resourceType") + } + } catch { + case e: ConfigException => + val message = s"Invalid config value for resource $resource: ${e.getMessage}" + info(message) + resource -> ApiError.fromThrowable(new InvalidRequestException(message, e)) + case e: Throwable => + // Log client errors at a lower level than unexpected exceptions + val message = s"Error processing alter configs request for resource $resource" + if (e.isInstanceOf[ApiException]) + info(message, e) + else + error(message, e) + resource -> ApiError.fromThrowable(e) + } + }.toMap + } + def shutdown() { topicPurgatory.shutdown() CoreUtils.swallow(createTopicPolicy.foreach(_.close())) http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/server/DelayedCreateTopics.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/DelayedCreateTopics.scala b/core/src/main/scala/kafka/server/DelayedCreateTopics.scala index 32f844c..abf6bc0 100644 --- a/core/src/main/scala/kafka/server/DelayedCreateTopics.scala +++ b/core/src/main/scala/kafka/server/DelayedCreateTopics.scala @@ -19,7 +19,7 @@ package kafka.server import kafka.api.LeaderAndIsr import org.apache.kafka.common.protocol.Errors -import org.apache.kafka.common.requests.CreateTopicsResponse +import org.apache.kafka.common.requests.{ApiError, CreateTopicsResponse} import scala.collection._ @@ -29,7 +29,7 @@ import scala.collection._ * TODO: local state doesn't count, need to know state of all relevant brokers * */ -case class CreateTopicMetadata(topic: String, replicaAssignments: Map[Int, Seq[Int]], error: CreateTopicsResponse.Error) +case class CreateTopicMetadata(topic: String, replicaAssignments: Map[Int, Seq[Int]], error: ApiError) /** * A delayed create topics operation that can be created by the admin manager and watched @@ -38,7 +38,7 @@ case class CreateTopicMetadata(topic: String, replicaAssignments: Map[Int, Seq[I class DelayedCreateTopics(delayMs: Long, createMetadata: Seq[CreateTopicMetadata], adminManager: AdminManager, - responseCallback: Map[String, CreateTopicsResponse.Error] => Unit) + responseCallback: Map[String, ApiError] => Unit) extends DelayedOperation(delayMs) { /** @@ -70,7 +70,7 @@ class DelayedCreateTopics(delayMs: Long, val results = createMetadata.map { metadata => // ignore topics that already have errors if (metadata.error.is(Errors.NONE) && missingLeaderCount(metadata.topic, metadata.replicaAssignments.keySet) > 0) - (metadata.topic, new CreateTopicsResponse.Error(Errors.REQUEST_TIMED_OUT, null)) + (metadata.topic, new ApiError(Errors.REQUEST_TIMED_OUT, null)) else (metadata.topic, metadata.error) }.toMap http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index bf7d4c1..02a1103 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -43,10 +43,10 @@ import org.apache.kafka.common.internals.Topic.{GROUP_METADATA_TOPIC_NAME, TRANS import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.ListenerName import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol} -import org.apache.kafka.common.record._ +import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch} import org.apache.kafka.common.requests.CreateAclsResponse.AclCreationResponse import org.apache.kafka.common.requests.DeleteAclsResponse.{AclDeletionResult, AclFilterResponse} -import org.apache.kafka.common.requests._ +import org.apache.kafka.common.requests.{Resource => RResource, ResourceType => RResourceType, _} import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.{Node, TopicPartition} @@ -126,6 +126,8 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request) case ApiKeys.CREATE_ACLS => handleCreateAcls(request) case ApiKeys.DELETE_ACLS => handleDeleteAcls(request) + case ApiKeys.ALTER_CONFIGS => handleAlterConfigsRequest(request) + case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request) } } catch { case e: FatalExitError => throw e @@ -1266,7 +1268,7 @@ class KafkaApis(val requestChannel: RequestChannel, def handleCreateTopicsRequest(request: RequestChannel.Request) { val createTopicsRequest = request.body[CreateTopicsRequest] - def sendResponseCallback(results: Map[String, CreateTopicsResponse.Error]): Unit = { + def sendResponseCallback(results: Map[String, ApiError]): Unit = { def createResponse(throttleTimeMs: Int): AbstractResponse = { val responseBody = new CreateTopicsResponse(throttleTimeMs, results.asJava) trace(s"Sending create topics response $responseBody for correlation id ${request.header.correlationId} to client ${request.header.clientId}.") @@ -1277,12 +1279,12 @@ class KafkaApis(val requestChannel: RequestChannel, if (!controller.isActive) { val results = createTopicsRequest.topics.asScala.map { case (topic, _) => - (topic, new CreateTopicsResponse.Error(Errors.NOT_CONTROLLER, null)) + (topic, new ApiError(Errors.NOT_CONTROLLER, null)) } sendResponseCallback(results) } else if (!authorize(request.session, Create, Resource.ClusterResource)) { val results = createTopicsRequest.topics.asScala.map { case (topic, _) => - (topic, new CreateTopicsResponse.Error(Errors.CLUSTER_AUTHORIZATION_FAILED, null)) + (topic, new ApiError(Errors.CLUSTER_AUTHORIZATION_FAILED, null)) } sendResponseCallback(results) } else { @@ -1291,7 +1293,7 @@ class KafkaApis(val requestChannel: RequestChannel, } // Special handling to add duplicate topics to the response - def sendResponseWithDuplicatesCallback(results: Map[String, CreateTopicsResponse.Error]): Unit = { + def sendResponseWithDuplicatesCallback(results: Map[String, ApiError]): Unit = { val duplicatedTopicsResults = if (duplicateTopics.nonEmpty) { @@ -1300,7 +1302,7 @@ class KafkaApis(val requestChannel: RequestChannel, // We can send the error message in the response for version 1, so we don't have to log it any more if (request.header.apiVersion == 0) warn(errorMessage) - duplicateTopics.keySet.map((_, new CreateTopicsResponse.Error(Errors.INVALID_REQUEST, errorMessage))).toMap + duplicateTopics.keySet.map((_, new ApiError(Errors.INVALID_REQUEST, errorMessage))).toMap } else Map.empty val completeResults = results ++ duplicatedTopicsResults @@ -1894,11 +1896,11 @@ class KafkaApis(val requestChannel: RequestChannel, } if (mayThrottle) { - val clientId : String = - if (request.requestObj.isInstanceOf[ControlledShutdownRequest]) - request.requestObj.asInstanceOf[ControlledShutdownRequest].clientId.getOrElse("") - else + val clientId: String = request.requestObj match { + case r: ControlledShutdownRequest => r.clientId.getOrElse("") + case _ => throw new IllegalStateException("Old style requests should only be used for ControlledShutdownRequest") + } sendResponseMaybeThrottle(request, clientId, sendResponseCallback) } else sendResponseExemptThrottle(request, () => sendResponseCallback(0)) @@ -1920,6 +1922,64 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def handleAlterConfigsRequest(request: RequestChannel.Request): Unit = { + val alterConfigsRequest = request.body[AlterConfigsRequest] + val (authorizedResources, unauthorizedResources) = alterConfigsRequest.configs.asScala.partition { case (resource, _) => + resource.`type` match { + case RResourceType.BROKER => + authorize(request.session, AlterConfigs, new Resource(Broker, resource.name)) || + authorize(request.session, AlterConfigs, Resource.ClusterResource) + case RResourceType.TOPIC => + authorize(request.session, AlterConfigs, new Resource(Topic, resource.name)) || + authorize(request.session, AlterConfigs, Resource.ClusterResource) + case rt => throw new InvalidRequestException(s"Unexpected resource type $rt") + } + } + val authorizedResult = adminManager.alterConfigs(authorizedResources, alterConfigsRequest.validateOnly) + val unauthorizedResult = unauthorizedResources.keys.map { resource => + resource -> configsAuthorizationApiError(request.session, resource) + } + sendResponseMaybeThrottle(request, new AlterConfigsResponse(_, (authorizedResult ++ unauthorizedResult).asJava)) + } + + private def configsAuthorizationApiError(session: RequestChannel.Session, resource: RResource): ApiError = { + val error = resource.`type` match { + case RResourceType.BROKER => Errors.BROKER_AUTHORIZATION_FAILED + case RResourceType.TOPIC => + // Don't leak topic name unless the user has describe topic permission + if (authorize(session, Describe, new Resource(Topic, resource.name))) + Errors.TOPIC_AUTHORIZATION_FAILED + else + Errors.UNKNOWN_TOPIC_OR_PARTITION + case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}") + } + new ApiError(error, null) + } + + def handleDescribeConfigsRequest(request: RequestChannel.Request): Unit = { + val describeConfigsRequest = request.body[DescribeConfigsRequest] + val (authorizedResources, unauthorizedResources) = describeConfigsRequest.resources.asScala.partition { resource => + resource.`type` match { + case RResourceType.BROKER => + authorize(request.session, DescribeConfigs, new Resource(Broker, resource.name)) || + authorize(request.session, DescribeConfigs, Resource.ClusterResource) + case RResourceType.TOPIC => + authorize(request.session, DescribeConfigs, new Resource(Topic, resource.name)) || + authorize(request.session, DescribeConfigs, Resource.ClusterResource) + case rt => throw new InvalidRequestException(s"Unexpected resource type $rt for resource ${resource.name}") + } + } + val authorizedConfigs = adminManager.describeConfigs(authorizedResources.map { resource => + resource -> Option(describeConfigsRequest.configNames(resource)).map(_.asScala.toSet) + }.toMap) + val unauthorizedConfigs = unauthorizedResources.map { resource => + val error = configsAuthorizationApiError(request.session, resource) + resource -> new DescribeConfigsResponse.Config(error, Collections.emptyList[DescribeConfigsResponse.ConfigEntry]) + } + + sendResponseMaybeThrottle(request, new DescribeConfigsResponse(_, (authorizedConfigs ++ unauthorizedConfigs).asJava)) + } + def authorizeClusterAction(request: RequestChannel.Request): Unit = { if (!authorize(request.session, ClusterAction, Resource.ClusterResource)) throw new ClusterAuthorizationException(s"Request $request is not authorized.") http://git-wip-us.apache.org/repos/asf/kafka/blob/972b7545/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 788f718..94dfa43 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -24,20 +24,18 @@ import java.util.concurrent._ import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} import com.yammer.metrics.core.Gauge -import kafka.admin.AdminUtils import kafka.api.KAFKA_0_9_0 import kafka.cluster.Broker import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException} -import kafka.controller.{ControllerStats, KafkaController} +import kafka.controller.KafkaController import kafka.coordinator.group.GroupCoordinator import kafka.coordinator.transaction.TransactionCoordinator -import kafka.log.{CleanerConfig, LogConfig, LogManager} +import kafka.log.{LogConfig, LogManager} import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsReporter} import kafka.network.{BlockingChannel, SocketServer} import kafka.security.CredentialProvider import kafka.security.auth.Authorizer import kafka.utils._ -import org.I0Itec.zkclient.ZkClient import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils} import org.apache.kafka.common.internals.ClusterResourceListeners import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}