KAFKA-3315: Add REST and Connector API to expose connector configuration Author: Liquan Pei <[email protected]>
Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #964 from Ishiihara/expose-connector-config Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c07d0172 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c07d0172 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c07d0172 Branch: refs/heads/trunk Commit: c07d017227f319250e5c373f8a6f504874ecfbf2 Parents: a1eb12d Author: Liquan Pei <[email protected]> Authored: Thu Mar 17 13:26:02 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Thu Mar 17 13:26:02 2016 -0700 ---------------------------------------------------------------------- build.gradle | 1 + .../kafka/common/config/AbstractConfig.java | 26 +- .../org/apache/kafka/common/config/Config.java | 28 + .../apache/kafka/common/config/ConfigDef.java | 652 +++++++++++++++++-- .../apache/kafka/common/config/ConfigValue.java | 113 ++++ .../kafka/common/config/ConfigDefTest.java | 202 +++++- .../kafka/connect/connector/Connector.java | 21 + .../connector/ConnectorReconfigurationTest.java | 9 +- .../connect/file/FileStreamSinkConnector.java | 11 + .../connect/file/FileStreamSourceConnector.java | 12 + .../kafka/connect/runtime/AbstractHerder.java | 109 +++- .../kafka/connect/runtime/ConnectorConfig.java | 18 +- .../apache/kafka/connect/runtime/Herder.java | 7 + .../apache/kafka/connect/runtime/Worker.java | 5 + .../runtime/distributed/DistributedHerder.java | 6 +- .../kafka/connect/runtime/rest/RestServer.java | 8 +- .../runtime/rest/entities/ConfigInfo.java | 66 ++ .../runtime/rest/entities/ConfigInfos.java | 102 +++ .../runtime/rest/entities/ConfigKeyInfo.java | 171 +++++ .../runtime/rest/entities/ConfigValueInfo.java | 106 +++ .../runtime/rest/entities/ConnectorInfo.java | 5 +- .../resources/ConnectorPluginsResource.java | 49 ++ .../rest/resources/ConnectorsResource.java | 16 +- .../runtime/standalone/StandaloneHerder.java | 4 +- .../connect/tools/VerifiableSinkConnector.java | 6 + .../tools/VerifiableSourceConnector.java | 6 + .../connect/runtime/AbstractHerderTest.java | 10 +- .../kafka/connect/runtime/WorkerTest.java | 12 +- .../resources/ConnectorPluginsResourceTest.java | 165 +++++ .../rest/resources/ConnectorsResourceTest.java | 4 +- 30 files changed, 1836 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 321fc3f..4b84ba5 100644 --- a/build.gradle +++ b/build.gradle @@ -543,6 +543,7 @@ project(':clients') { include "**/org/apache/kafka/common/*" include "**/org/apache/kafka/common/errors/*" include "**/org/apache/kafka/common/serialization/*" + include "**/org/apache/kafka/common/config/*" } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index b44f72c..f833d7e 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -12,6 +12,13 @@ */ package org.apache.kafka.common.config; +import org.apache.kafka.common.Configurable; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -20,13 +27,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.kafka.common.Configurable; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.config.types.Password; -import org.apache.kafka.common.utils.Utils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * A convenient base class for configurations to extend. * <p> @@ -46,7 +46,7 @@ public class AbstractConfig { private final Map<String, Object> values; @SuppressWarnings("unchecked") - public AbstractConfig(ConfigDef definition, Map<?, ?> originals, Boolean doLog) { + public AbstractConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) { /* check that all the keys are really strings */ for (Object key : originals.keySet()) if (!(key instanceof String)) @@ -62,6 +62,12 @@ public class AbstractConfig { this(definition, originals, true); } + public AbstractConfig(Map<String, Object> parsedConfig) { + this.values = parsedConfig; + this.originals = new HashMap<>(); + this.used = Collections.synchronizedSet(new HashSet<String>()); + } + protected Object get(String key) { if (!values.containsKey(key)) throw new ConfigException(String.format("Unknown configuration '%s'", key)); @@ -94,7 +100,7 @@ public class AbstractConfig { return (List<String>) get(key); } - public boolean getBoolean(String key) { + public Boolean getBoolean(String key) { return (Boolean) get(key); } @@ -125,7 +131,7 @@ public class AbstractConfig { /** * Get all the original settings, ensuring that all values are of type String. * @return the original settings - * @throw ClassCastException if any of the values are not strings + * @throws ClassCastException if any of the values are not strings */ public Map<String, String> originalsStrings() { Map<String, String> copy = new RecordingMap<>(); http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/clients/src/main/java/org/apache/kafka/common/config/Config.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/Config.java b/clients/src/main/java/org/apache/kafka/common/config/Config.java new file mode 100644 index 0000000..ce5ee17 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/config/Config.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.config; + +import java.util.List; + +public class Config { + private List<ConfigValue> configValues; + + public Config(List<ConfigValue> configValues) { + this.configValues = configValues; + } + + public List<ConfigValue> configValues() { + return configValues; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 703eb7c..881cb0b 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -12,45 +12,66 @@ */ package org.apache.kafka.common.config; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.utils.Utils; + import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; -import org.apache.kafka.common.config.types.Password; -import org.apache.kafka.common.utils.Utils; - /** - * This class is used for specifying the set of expected configurations, their type, their defaults, their - * documentation, and any special validation logic used for checking the correctness of the values the user provides. + * This class is used for specifying the set of expected configurations. For each configuration, you can specify + * the name, the type, the default value, the documentation, the group information, the order in the group, + * the width of the configuration value and the name suitable for display in the UI. + * + * You can provide special validation logic used for single configuration validation by overriding {@link Validator}. + * + * Moreover, you can specify the dependents of a configuration. The valid values and visibility of a configuration + * may change according to the values of other configurations. You can override {@link Recommender} to get valid + * values and set visibility of a configuration given the current configuration values. + * * <p/> - * Usage of this class looks something like this: + * To use the class: * <p/> * <pre> * ConfigDef defs = new ConfigDef(); - * defs.define("config_name", Type.STRING, "default string value", "This configuration is used for blah blah blah."); - * defs.define("another_config_name", Type.INT, 42, Range.atLeast(0), "More documentation on this config"); * - * Properties props = new Properties(); - * props.setProperty("config_name", "some value"); + * defs.define("config_with_default", Type.STRING, "default string value", "Configuration with default value."); + * defs.define("config_with_validator", Type.INT, 42, Range.atLeast(0), "Configuration with user provided validator."); + * defs.define("config_with_dependents", Type.INT, "Configuration with dependents.", "group", 1, "Config With Dependents", Arrays.asList("config_with_default;","config_with_validator")); + * + * Map<String, String> props = new HashMap<>(); + * props.put("config_with_default", "some value"); + * props.put("config_with_dependents", "some other value"); + * // will return "some value" * Map<String, Object> configs = defs.parse(props); + * String someConfig = (String) configs.get("config_with_default"); + * // will return default value of 42 + * int anotherConfig = (Integer) configs.get("config_with_validator"); * - * String someConfig = (String) configs.get("config_name"); // will return "some value" - * int anotherConfig = (Integer) configs.get("another_config_name"); // will return default value of 42 + * To validate the full configuration, use: + * List<Config> configs = def.validate(props); + * The {@link Config} contains updated configuration information given the current configuration values. * </pre> * <p/> - * This class can be used stand-alone or in combination with {@link AbstractConfig} which provides some additional + * This class can be used standalone or in combination with {@link AbstractConfig} which provides some additional * functionality for accessing configs. */ public class ConfigDef { public static final Object NO_DEFAULT_VALUE = new String(""); - private final Map<String, ConfigKey> configKeys = new HashMap<String, ConfigKey>(); + private final Map<String, ConfigKey> configKeys = new HashMap<>(); + private final List<String> groups = new LinkedList<>(); + private Set<String> configsWithNoParent; /** * Returns unmodifiable set of properties names defined in this {@linkplain ConfigDef} @@ -63,26 +84,256 @@ public class ConfigDef { /** * Define a new configuration - * - * @param name The name of the config parameter - * @param type The type of the config - * @param defaultValue The default value to use if this config isn't present - * @param validator A validator to use in checking the correctness of the config - * @param importance The importance of this config: is this something you will likely need to change. - * @param documentation The documentation string for the config + * @param name the name of the config parameter + * @param type the type of the config + * @param defaultValue the default value to use if this config isn't present + * @param validator the validator to use in checking the correctness of the config + * @param importance the importance of this config + * @param documentation the documentation string for the config + * @param group the group this config belongs to + * @param orderInGroup the order of this config in the group + * @param width the width of the config + * @param displayName the name suitable for display + * @param dependents the configurations that are dependents of this configuration + * @param recommender the recommender provides valid values given the parent configuration values * @return This ConfigDef so you can chain calls */ - public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) { - if (configKeys.containsKey(name)) + public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, + String group, int orderInGroup, Width width, String displayName, List<String> dependents, Recommender recommender) { + if (configKeys.containsKey(name)) { throw new ConfigException("Configuration " + name + " is defined twice."); + } + if (group != null && !groups.contains(group)) { + groups.add(group); + } Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type); - configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation)); + configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender)); return this; } /** + * Define a new configuration with no custom recommender + * @param name the name of the config parameter + * @param type the type of the config + * @param defaultValue the default value to use if this config isn't present + * @param validator the validator to use in checking the correctness of the config + * @param importance the importance of this config + * @param documentation the documentation string for the config + * @param group the group this config belongs to + * @param orderInGroup the order of this config in the group + * @param width the width of the config + * @param displayName the name suitable for display + * @param dependents the configurations that are dependents of this configuration + * @return This ConfigDef so you can chain calls + */ + public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, + String group, int orderInGroup, Width width, String displayName, List<String> dependents) { + return define(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, dependents, null); + } + + /** + * Define a new configuration with no dependents + * @param name the name of the config parameter + * @param type the type of the config + * @param defaultValue the default value to use if this config isn't present + * @param validator the validator to use in checking the correctness of the config + * @param importance the importance of this config + * @param documentation the documentation string for the config + * @param group the group this config belongs to + * @param orderInGroup the order of this config in the group + * @param width the width of the config + * @param displayName the name suitable for display + * @param recommender the recommender provides valid values given the parent configuration values + * @return This ConfigDef so you can chain calls + */ + public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, + String group, int orderInGroup, Width width, String displayName, Recommender recommender) { + return define(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList(), recommender); + } + + /** + * Define a new configuration with no dependents and no custom recommender + * @param name the name of the config parameter + * @param type the type of the config + * @param defaultValue the default value to use if this config isn't present + * @param validator the validator to use in checking the correctness of the config + * @param importance the importance of this config + * @param documentation the documentation string for the config + * @param group the group this config belongs to + * @param orderInGroup the order of this config in the group + * @param width the width of the config + * @param displayName the name suitable for display + * @return This ConfigDef so you can chain calls + */ + public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation, + String group, int orderInGroup, Width width, String displayName) { + return define(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList()); + } + + /** + * Define a new configuration with no special validation logic + * @param name the name of the config parameter + * @param type the type of the config + * @param defaultValue the default value to use if this config isn't present + * @param importance the importance of this config + * @param documentation the documentation string for the config + * @param group the group this config belongs to + * @param orderInGroup the order of this config in the group + * @param width the width of the config + * @param displayName the name suitable for display + * @param dependents the configurations that are dependents of this configuration + * @param recommender the recommender provides valid values given the parent configuration values + * @return This ConfigDef so you can chain calls + */ + public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation, + String group, int orderInGroup, Width width, String displayName, List<String> dependents, Recommender recommender) { + return define(name, type, defaultValue, null, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender); + } + + /** + * Define a new configuration with no special validation logic and no custom recommender + * @param name the name of the config parameter + * @param type the type of the config + * @param defaultValue the default value to use if this config isn't present + * @param importance the importance of this config + * @param documentation the documentation string for the config + * @param group the group this config belongs to + * @param orderInGroup the order of this config in the group + * @param width the width of the config + * @param displayName the name suitable for display + * @param dependents the configurations that are dependents of this configuration + * @return This ConfigDef so you can chain calls + */ + public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation, + String group, int orderInGroup, Width width, String displayName, List<String> dependents) { + return define(name, type, defaultValue, null, importance, documentation, group, orderInGroup, width, displayName, dependents, null); + } + + /** + * Define a new configuration with no special validation logic and no custom recommender + * @param name the name of the config parameter + * @param type the type of the config + * @param defaultValue the default value to use if this config isn't present + * @param importance the importance of this config + * @param documentation the documentation string for the config + * @param group the group this config belongs to + * @param orderInGroup the order of this config in the group + * @param width the width of the config + * @param displayName the name suitable for display + * @param recommender the recommender provides valid values given the parent configuration values + * @return This ConfigDef so you can chain calls + */ + public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation, + String group, int orderInGroup, Width width, String displayName, Recommender recommender) { + return define(name, type, defaultValue, null, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList(), recommender); + } + + /** + * Define a new configuration with no special validation logic, not dependents and no custom recommender + * @param name the name of the config parameter + * @param type the type of the config + * @param defaultValue the default value to use if this config isn't present + * @param importance the importance of this config + * @param documentation the documentation string for the config + * @param group the group this config belongs to + * @param orderInGroup the order of this config in the group + * @param width the width of the config + * @param displayName the name suitable for display + * @return This ConfigDef so you can chain calls + */ + public ConfigDef define(String name, Type type, Object defaultValue, Importance importance, String documentation, + String group, int orderInGroup, Width width, String displayName) { + return define(name, type, defaultValue, null, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList()); + } + + /** + * Define a new configuration with no default value and no special validation logic + * @param name the name of the config parameter + * @param type the type of the config + * @param importance the importance of this config + * @param documentation the documentation string for the config + * @param group the group this config belongs to + * @param orderInGroup the order of this config in the group + * @param width the width of the config + * @param displayName the name suitable for display + * @param dependents the configurations that are dependents of this configuration + * @param recommender the recommender provides valid values given the parent configuration value + * @return This ConfigDef so you can chain calls + */ + public ConfigDef define(String name, Type type, Importance importance, String documentation, String group, int orderInGroup, + Width width, String displayName, List<String> dependents, Recommender recommender) { + return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender); + } + + /** + * Define a new configuration with no default value, no special validation logic and no custom recommender + * @param name the name of the config parameter + * @param type the type of the config + * @param importance the importance of this config + * @param documentation the documentation string for the config + * @param group the group this config belongs to + * @param orderInGroup the order of this config in the group + * @param width the width of the config + * @param displayName the name suitable for display + * @param dependents the configurations that are dependents of this configuration + * @return This ConfigDef so you can chain calls + */ + public ConfigDef define(String name, Type type, Importance importance, String documentation, String group, int orderInGroup, + Width width, String displayName, List<String> dependents) { + return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, group, orderInGroup, width, displayName, dependents, null); + } + + /** + * Define a new configuration with no default value, no special validation logic and no custom recommender + * @param name the name of the config parameter + * @param type the type of the config + * @param importance the importance of this config + * @param documentation the documentation string for the config + * @param group the group this config belongs to + * @param orderInGroup the order of this config in the group + * @param width the width of the config + * @param displayName the name suitable for display + * @param recommender the recommender provides valid values given the parent configuration value + * @return This ConfigDef so you can chain calls + */ + public ConfigDef define(String name, Type type, Importance importance, String documentation, String group, int orderInGroup, + Width width, String displayName, Recommender recommender) { + return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList(), recommender); + } + + /** + * Define a new configuration with no default value, no special validation logic, no dependents and no custom recommender + * @param name the name of the config parameter + * @param type the type of the config + * @param importance the importance of this config + * @param documentation the documentation string for the config + * @param group the group this config belongs to + * @param orderInGroup the order of this config in the group + * @param width the width of the config + * @param displayName the name suitable for display + * @return This ConfigDef so you can chain calls + */ + public ConfigDef define(String name, Type type, Importance importance, String documentation, String group, int orderInGroup, + Width width, String displayName) { + return define(name, type, NO_DEFAULT_VALUE, null, importance, documentation, group, orderInGroup, width, displayName, Collections.<String>emptyList()); + } + + /** + * Define a new configuration with no group, no order in group, no width, no display name, no dependents and no custom recommender + * @param name the name of the config parameter + * @param type the type of the config + * @param defaultValue the default value to use if this config isn't present + * @param validator the validator to use in checking the correctness of the config + * @param importance the importance of this config + * @param documentation the documentation string for the config + * @return This ConfigDef so you can chain calls + */ + public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) { + return define(name, type, defaultValue, validator, importance, documentation, null, -1, Width.NONE, name); + } + + /** * Define a new configuration with no special validation logic - * * @param name The name of the config parameter * @param type The type of the config * @param defaultValue The default value to use if this config isn't present @@ -96,7 +347,6 @@ public class ConfigDef { /** * Define a new configuration with no default value and no special validation logic - * * @param name The name of the config parameter * @param type The type of the config * @param importance The importance of this config: is this something you will likely need to change. @@ -108,6 +358,22 @@ public class ConfigDef { } /** + * Get the configuration keys + * @return a map containing all configuration keys + */ + public Map<String, ConfigKey> configKeys() { + return configKeys; + } + + /** + * Get the groups for the configuration + * @return a list of group names + */ + public List<String> groups() { + return groups; + } + + /** * Add standard SSL client configuration options. * @return this */ @@ -131,34 +397,188 @@ public class ConfigDef { * appropriate type (int, string, etc). This will work equally well with either java.util.Properties instances or a * programmatically constructed map. * - * @param props The configs to parse and validate + * @param props The configs to parse and validate. * @return Parsed and validated configs. The key will be the config name and the value will be the value parsed into - * the appropriate type (int, string, etc) + * the appropriate type (int, string, etc). */ public Map<String, Object> parse(Map<?, ?> props) { - /* parse all known keys */ - Map<String, Object> values = new HashMap<String, Object>(); + // Check all configurations are defined + List<String> undefinedConfigKeys = undefinedDependentConfigs(); + if (!undefinedConfigKeys.isEmpty()) { + String joined = Utils.join(undefinedConfigKeys, ","); + throw new ConfigException("Some configurations in are referred in the dependents, but not defined: " + joined); + } + // parse all known keys + Map<String, Object> values = new HashMap<>(); for (ConfigKey key : configKeys.values()) { Object value; // props map contains setting - assign ConfigKey value - if (props.containsKey(key.name)) + if (props.containsKey(key.name)) { value = parseType(key.name, props.get(key.name), key.type); - // props map doesn't contain setting, the key is required because no default value specified - its an error - else if (key.defaultValue == NO_DEFAULT_VALUE) + // props map doesn't contain setting, the key is required because no default value specified - its an error + } else if (key.defaultValue == NO_DEFAULT_VALUE) { throw new ConfigException("Missing required configuration \"" + key.name + "\" which has no default value."); - // otherwise assign setting its default value - else + } else { + // otherwise assign setting its default value value = key.defaultValue; - if (key.validator != null) + } + if (key.validator != null) { key.validator.ensureValid(key.name, value); + } values.put(key.name, value); } return values; } /** + * Validate the current configuration values with the configuration definition. + * @param props the current configuration values + * @return List of Config, each Config contains the updated configuration information given + * the current configuration values. + */ + public List<ConfigValue> validate(Map<String, String> props) { + Map<String, ConfigValue> configValues = new HashMap<>(); + for (String name: configKeys.keySet()) { + configValues.put(name, new ConfigValue(name)); + } + + List<String> undefinedConfigKeys = undefinedDependentConfigs(); + for (String undefinedConfigKey: undefinedConfigKeys) { + ConfigValue undefinedConfigValue = new ConfigValue(undefinedConfigKey); + undefinedConfigValue.addErrorMessage(undefinedConfigKey + " is referred in the dependents, but not defined."); + undefinedConfigValue.visible(false); + configValues.put(undefinedConfigKey, undefinedConfigValue); + } + + Map<String, Object> parsed = parseForValidate(props, configValues); + return validate(parsed, configValues); + } + + // package accessible for testing + Map<String, Object> parseForValidate(Map<String, String> props, Map<String, ConfigValue> configValues) { + Map<String, Object> parsed = new HashMap<>(); + Set<String> configsWithNoParent = getConfigsWithNoParent(); + for (String name: configsWithNoParent) { + parseForValidate(name, props, parsed, configValues); + } + return parsed; + } + + + private List<ConfigValue> validate(Map<String, Object> parsed, Map<String, ConfigValue> configValues) { + Set<String> configsWithNoParent = getConfigsWithNoParent(); + for (String name: configsWithNoParent) { + validate(name, parsed, configValues); + } + return new LinkedList<>(configValues.values()); + } + + private List<String> undefinedDependentConfigs() { + Set<String> undefinedConfigKeys = new HashSet<>(); + for (String configName: configKeys.keySet()) { + ConfigKey configKey = configKeys.get(configName); + List<String> dependents = configKey.dependents; + for (String dependent: dependents) { + if (!configKeys.containsKey(dependent)) { + undefinedConfigKeys.add(dependent); + } + } + } + return new LinkedList<>(undefinedConfigKeys); + } + + private Set<String> getConfigsWithNoParent() { + if (this.configsWithNoParent != null) { + return this.configsWithNoParent; + } + Set<String> configsWithParent = new HashSet<>(); + + for (ConfigKey configKey: configKeys.values()) { + List<String> dependents = configKey.dependents; + configsWithParent.addAll(dependents); + } + + Set<String> configs = new HashSet<>(configKeys.keySet()); + configs.removeAll(configsWithParent); + this.configsWithNoParent = configs; + return configs; + } + + private void parseForValidate(String name, Map<String, String> props, Map<String, Object> parsed, Map<String, ConfigValue> configs) { + if (!configKeys.containsKey(name)) { + return; + } + ConfigKey key = configKeys.get(name); + ConfigValue config = configs.get(name); + + Object value = null; + if (props.containsKey(key.name)) { + try { + value = parseType(key.name, props.get(key.name), key.type); + } catch (ConfigException e) { + config.addErrorMessage(e.getMessage()); + } + } else if (key.defaultValue == NO_DEFAULT_VALUE) { + config.addErrorMessage("Missing required configuration \"" + key.name + "\" which has no default value."); + } else { + value = key.defaultValue; + } + + if (key.validator != null) { + try { + key.validator.ensureValid(key.name, value); + } catch (ConfigException e) { + config.addErrorMessage(e.getMessage()); + } + } + config.value(value); + parsed.put(name, value); + for (String dependent: key.dependents) { + parseForValidate(dependent, props, parsed, configs); + } + } + + private void validate(String name, Map<String, Object> parsed, Map<String, ConfigValue> configs) { + if (!configKeys.containsKey(name)) { + return; + } + ConfigKey key = configKeys.get(name); + ConfigValue config = configs.get(name); + Object value = parsed.get(name); + List<Object> recommendedValues; + if (key.recommender != null) { + try { + recommendedValues = key.recommender.validValues(name, parsed); + List<Object> originalRecommendedValues = config.recommendedValues(); + + if (!originalRecommendedValues.isEmpty()) { + Set<Object> originalRecommendedValueSet = new HashSet<>(originalRecommendedValues); + Iterator<Object> it = recommendedValues.iterator(); + while (it.hasNext()) { + Object o = it.next(); + if (!originalRecommendedValueSet.contains(o)) { + it.remove(); + } + } + } + config.recommendedValues(recommendedValues); + if (value != null && !recommendedValues.isEmpty() && !recommendedValues.contains(value)) { + config.addErrorMessage("Invalid value for configuration " + key.name); + } + config.visible(key.recommender.visible(name, parsed)); + } catch (ConfigException e) { + config.addErrorMessage(e.getMessage()); + } + } + + configs.put(name, config); + for (String dependent: key.dependents) { + validate(dependent, parsed, configs); + } + } + + /** * Parse a value according to its expected type. - * * @param name The config name * @param value The config value * @param type The expected type @@ -263,15 +683,56 @@ public class ConfigDef { BOOLEAN, STRING, INT, SHORT, LONG, DOUBLE, LIST, CLASS, PASSWORD } + /** + * The importance level for a configuration + */ public enum Importance { HIGH, MEDIUM, LOW } /** - * Validation logic the user may provide + * The width of a configuration value + */ + public enum Width { + NONE, SHORT, MEDIUM, LONG + } + + /** + * This is used by the {@link #validate(Map)} to get valid values for a configuration given the current + * configuration values in order to perform full configuration validation and visibility modification. + * In case that there are dependencies between configurations, the valid values and visibility + * for a configuration may change given the values of other configurations. + */ + public interface Recommender { + + /** + * The valid values for the configuration given the current configuration values. + * @param name The name of the configuration + * @param parsedConfig The parsed configuration values + * @return The list of valid values. To function properly, the returned objects should have the type + * defined for the configuration using the recommender. + */ + List<Object> validValues(String name, Map<String, Object> parsedConfig); + + /** + * Set the visibility of the configuration given the current configuration values. + * @param name The name of the configuration + * @param parsedConfig The parsed configuration values + * @return The visibility of the configuration + */ + boolean visible(String name, Map<String, Object> parsedConfig); + } + + /** + * Validation logic the user may provide to perform single configuration validation. */ public interface Validator { - public void ensureValid(String name, Object o); + /** + * Perform single configuration validation. + * @param name The name of the configuration + * @param value The value of the configuration + */ + void ensureValid(String name, Object value); } /** @@ -345,16 +806,24 @@ public class ConfigDef { } } - private static class ConfigKey { + public static class ConfigKey { public final String name; public final Type type; public final String documentation; public final Object defaultValue; public final Validator validator; public final Importance importance; - - public ConfigKey(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation) { - super(); + public final String group; + public final int orderInGroup; + public final Width width; + public final String displayName; + public final List<String> dependents; + public final Recommender recommender; + + public ConfigKey(String name, Type type, Object defaultValue, Validator validator, + Importance importance, String documentation, String group, + int orderInGroup, Width width, String displayName, + List<String> dependents, Recommender recommender) { this.name = name; this.type = type; this.defaultValue = defaultValue; @@ -363,34 +832,21 @@ public class ConfigDef { if (this.validator != null && this.hasDefault()) this.validator.ensureValid(name, defaultValue); this.documentation = documentation; + this.dependents = dependents; + this.group = group; + this.orderInGroup = orderInGroup; + this.width = width; + this.displayName = displayName; + this.recommender = recommender; } public boolean hasDefault() { return this.defaultValue != NO_DEFAULT_VALUE; } - } public String toHtmlTable() { - // sort first required fields, then by importance, then name - List<ConfigDef.ConfigKey> configs = new ArrayList<ConfigDef.ConfigKey>(this.configKeys.values()); - Collections.sort(configs, new Comparator<ConfigDef.ConfigKey>() { - public int compare(ConfigDef.ConfigKey k1, ConfigDef.ConfigKey k2) { - // first take anything with no default value (therefore required) - if (!k1.hasDefault() && k2.hasDefault()) - return -1; - else if (!k2.hasDefault() && k1.hasDefault()) - return 1; - - // then sort by importance - int cmp = k1.importance.compareTo(k2.importance); - if (cmp == 0) - // then sort in alphabetical order - return k1.name.compareTo(k2.name); - else - return cmp; - } - }); + List<ConfigKey> configs = sortedConfigs(); StringBuilder b = new StringBuilder(); b.append("<table class=\"data-table\"><tbody>\n"); b.append("<tr>\n"); @@ -434,4 +890,74 @@ public class ConfigDef { b.append("</tbody></table>"); return b.toString(); } + + /** + * Get the configs formatted with reStructuredText, suitable for embedding in Sphinx + * documentation. + */ + public String toRst() { + List<ConfigKey> configs = sortedConfigs(); + StringBuilder b = new StringBuilder(); + + for (ConfigKey def : configs) { + b.append("``"); + b.append(def.name); + b.append("``\n"); + for (String docLine : def.documentation.split("\n")) { + if (docLine.length() == 0) { + continue; + } + b.append(" "); + b.append(docLine); + b.append("\n\n"); + } + b.append(" * Type: "); + b.append(def.type.toString().toLowerCase()); + b.append("\n"); + if (def.defaultValue != null) { + b.append(" * Default: "); + if (def.type == Type.STRING) { + b.append("\""); + b.append(def.defaultValue); + b.append("\""); + } else { + b.append(def.defaultValue); + } + b.append("\n"); + } + b.append(" * Importance: "); + b.append(def.importance.toString().toLowerCase()); + b.append("\n\n"); + } + return b.toString(); + } + + /** + * Get a list of configs sorted into "natural" order: listing required fields first, then + * ordering by importance, and finally by name. + */ + private List<ConfigKey> sortedConfigs() { + // sort first required fields, then by importance, then name + List<ConfigKey> configs = new ArrayList<>(this.configKeys.values()); + Collections.sort(configs, new Comparator<ConfigKey>() { + public int compare(ConfigKey k1, ConfigKey k2) { + // first take anything with no default value + if (!k1.hasDefault() && k2.hasDefault()) { + return -1; + } else if (!k2.hasDefault() && k1.hasDefault()) { + return 1; + } + + // then sort by importance + int cmp = k1.importance.compareTo(k2.importance); + if (cmp == 0) { + // then sort in alphabetical order + return k1.name.compareTo(k2.name); + } else { + return cmp; + } + } + }); + return configs; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/clients/src/main/java/org/apache/kafka/common/config/ConfigValue.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigValue.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigValue.java new file mode 100644 index 0000000..c9a4a34 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigValue.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.common.config; + +import java.util.LinkedList; +import java.util.List; +import java.util.Objects; + +public class ConfigValue { + + private String name; + private Object value; + private List<Object> recommendedValues; + private List<String> errorMessages; + private boolean visible; + + public ConfigValue(String name) { + this(name, null, new LinkedList<Object>(), new LinkedList<String>()); + } + + public ConfigValue(String name, Object value, List<Object> recommendedValues, List<String> errorMessages) { + this.name = name; + this.value = value; + this.recommendedValues = recommendedValues; + this.errorMessages = errorMessages; + this.visible = true; + } + + public String name() { + return name; + } + + public Object value() { + return value; + } + + public List<Object> recommendedValues() { + return recommendedValues; + } + + public List<String> errorMessages() { + return errorMessages; + } + + public boolean visible() { + return visible; + } + + public void value(Object value) { + this.value = value; + } + + public void recommendedValues(List<Object> recommendedValues) { + this.recommendedValues = recommendedValues; + } + + public void addErrorMessage(String errorMessage) { + this.errorMessages.add(errorMessage); + } + + public void visible(boolean visible) { + this.visible = visible; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ConfigValue that = (ConfigValue) o; + return Objects.equals(name, that.name) && + Objects.equals(value, that.value) && + Objects.equals(recommendedValues, that.recommendedValues) && + Objects.equals(errorMessages, that.errorMessages) && + Objects.equals(visible, that.visible); + } + + @Override + public int hashCode() { + return Objects.hash(name, value, recommendedValues, errorMessages, visible); + } + + @Override + public String toString() { + StringBuffer sb = new StringBuffer(); + sb.append("[") + .append(name) + .append(",") + .append(value) + .append(",") + .append(recommendedValues) + .append(",") + .append(errorMessages) + .append(",") + .append(visible) + .append("]"); + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java index fa0370b..022fb6b 100644 --- a/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java +++ b/clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java @@ -12,22 +12,27 @@ */ package org.apache.kafka.common.config; -import static java.util.Arrays.asList; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - import org.apache.kafka.common.config.ConfigDef.Importance; -import org.apache.kafka.common.config.ConfigDef.Validator; import org.apache.kafka.common.config.ConfigDef.Range; -import org.apache.kafka.common.config.ConfigDef.ValidString; import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.ValidString; +import org.apache.kafka.common.config.ConfigDef.Validator; +import org.apache.kafka.common.config.ConfigDef.Width; import org.apache.kafka.common.config.types.Password; import org.junit.Test; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import static java.util.Arrays.asList; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + public class ConfigDefTest { @Test @@ -156,7 +161,8 @@ public class ConfigDefTest { final String key = "enum_test"; ConfigDef def = new ConfigDef(); - def.define(key, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ValidString.in("ONE", "TWO", "THREE"), Importance.HIGH, "docs"); + def.define(key, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, + ValidString.in("ONE", "TWO", "THREE"), Importance.HIGH, "docs"); Properties props = new Properties(); props.put(key, "ONE"); @@ -164,6 +170,180 @@ public class ConfigDefTest { assertEquals("ONE", vals.get(key)); } + @Test + public void testGroupInference() { + List<String> expected1 = Arrays.asList("group1", "group2"); + ConfigDef def1 = new ConfigDef() + .define("a", Type.INT, Importance.HIGH, "docs", "group1", 1, Width.SHORT, "a") + .define("b", Type.INT, Importance.HIGH, "docs", "group2", 1, Width.SHORT, "b") + .define("c", Type.INT, Importance.HIGH, "docs", "group1", 2, Width.SHORT, "c"); + + assertEquals(expected1, def1.groups()); + + List<String> expected2 = Arrays.asList("group2", "group1"); + ConfigDef def2 = new ConfigDef() + .define("a", Type.INT, Importance.HIGH, "docs", "group2", 1, Width.SHORT, "a") + .define("b", Type.INT, Importance.HIGH, "docs", "group2", 2, Width.SHORT, "b") + .define("c", Type.INT, Importance.HIGH, "docs", "group1", 2, Width.SHORT, "c"); + + assertEquals(expected2, def2.groups()); + } + + @Test + public void testParseForValidate() { + Map<String, Object> expectedParsed = new HashMap<>(); + expectedParsed.put("a", 1); + expectedParsed.put("b", null); + expectedParsed.put("c", null); + expectedParsed.put("d", 10); + + Map<String, ConfigValue> expected = new HashMap<>(); + String errorMessageB = "Missing required configuration \"b\" which has no default value."; + String errorMessageC = "Missing required configuration \"c\" which has no default value."; + ConfigValue configA = new ConfigValue("a", 1, Collections.<Object>emptyList(), Collections.<String>emptyList()); + ConfigValue configB = new ConfigValue("b", null, Collections.<Object>emptyList(), Arrays.asList(errorMessageB, errorMessageB)); + ConfigValue configC = new ConfigValue("c", null, Collections.<Object>emptyList(), Arrays.asList(errorMessageC)); + ConfigValue configD = new ConfigValue("d", 10, Collections.<Object>emptyList(), Collections.<String>emptyList()); + expected.put("a", configA); + expected.put("b", configB); + expected.put("c", configC); + expected.put("d", configD); + + ConfigDef def = new ConfigDef() + .define("a", Type.INT, Importance.HIGH, "docs", "group", 1, Width.SHORT, "a", Arrays.asList("b", "c"), new IntegerRecommender(false)) + .define("b", Type.INT, Importance.HIGH, "docs", "group", 2, Width.SHORT, "b", new IntegerRecommender(true)) + .define("c", Type.INT, Importance.HIGH, "docs", "group", 3, Width.SHORT, "c", new IntegerRecommender(true)) + .define("d", Type.INT, Importance.HIGH, "docs", "group", 4, Width.SHORT, "d", Arrays.asList("b"), new IntegerRecommender(false)); + + Map<String, String> props = new HashMap<>(); + props.put("a", "1"); + props.put("d", "10"); + + Map<String, ConfigValue> configValues = new HashMap<>(); + + for (String name: def.configKeys().keySet()) { + configValues.put(name, new ConfigValue(name)); + } + + Map<String, Object> parsed = def.parseForValidate(props, configValues); + + assertEquals(expectedParsed, parsed); + assertEquals(expected, configValues); + } + + @Test + public void testValidate() { + Map<String, ConfigValue> expected = new HashMap<>(); + String errorMessageB = "Missing required configuration \"b\" which has no default value."; + String errorMessageC = "Missing required configuration \"c\" which has no default value."; + String errorMessageD = "Invalid value for configuration d"; + + ConfigValue configA = new ConfigValue("a", 1, Arrays.<Object>asList(1, 2, 3), Collections.<String>emptyList()); + ConfigValue configB = new ConfigValue("b", null, Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageB, errorMessageB)); + ConfigValue configC = new ConfigValue("c", null, Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageC)); + ConfigValue configD = new ConfigValue("d", 10, Arrays.<Object>asList(1, 2, 3), Arrays.asList(errorMessageD)); + + expected.put("a", configA); + expected.put("b", configB); + expected.put("c", configC); + expected.put("d", configD); + + ConfigDef def = new ConfigDef() + .define("a", Type.INT, Importance.HIGH, "docs", "group", 1, Width.SHORT, "a", Arrays.asList("b", "c"), new IntegerRecommender(false)) + .define("b", Type.INT, Importance.HIGH, "docs", "group", 2, Width.SHORT, "b", new IntegerRecommender(true)) + .define("c", Type.INT, Importance.HIGH, "docs", "group", 3, Width.SHORT, "c", new IntegerRecommender(true)) + .define("d", Type.INT, Importance.HIGH, "docs", "group", 4, Width.SHORT, "d", Arrays.asList("b"), new IntegerRecommender(false)); + + Map<String, String> props = new HashMap<>(); + props.put("a", "1"); + props.put("d", "10"); + + List<ConfigValue> configs = def.validate(props); + for (ConfigValue config : configs) { + String name = config.name(); + ConfigValue expectedConfig = expected.get(name); + assertEquals(expectedConfig, config); + } + } + + @Test + public void testValidateMissingConfigKey() { + Map<String, ConfigValue> expected = new HashMap<>(); + String errorMessageB = "Missing required configuration \"b\" which has no default value."; + String errorMessageC = "Missing required configuration \"c\" which has no default value."; + String errorMessageD = "d is referred in the dependents, but not defined."; + + ConfigValue configA = new ConfigValue("a", 1, Arrays.<Object>asList(1, 2, 3), Collections.<String>emptyList()); + ConfigValue configB = new ConfigValue("b", null, Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageB)); + ConfigValue configC = new ConfigValue("c", null, Arrays.<Object>asList(4, 5), Arrays.asList(errorMessageC)); + ConfigValue configD = new ConfigValue("d", null, Collections.emptyList(), Arrays.asList(errorMessageD)); + configD.visible(false); + + expected.put("a", configA); + expected.put("b", configB); + expected.put("c", configC); + expected.put("d", configD); + + ConfigDef def = new ConfigDef() + .define("a", Type.INT, Importance.HIGH, "docs", "group", 1, Width.SHORT, "a", Arrays.asList("b", "c", "d"), new IntegerRecommender(false)) + .define("b", Type.INT, Importance.HIGH, "docs", "group", 2, Width.SHORT, "b", new IntegerRecommender(true)) + .define("c", Type.INT, Importance.HIGH, "docs", "group", 3, Width.SHORT, "c", new IntegerRecommender(true)); + + Map<String, String> props = new HashMap<>(); + props.put("a", "1"); + + List<ConfigValue> configs = def.validate(props); + for (ConfigValue config: configs) { + String name = config.name(); + ConfigValue expectedConfig = expected.get(name); + assertEquals(expectedConfig, config); + } + } + + @Test + public void testValidateCannotParse() { + Map<String, ConfigValue> expected = new HashMap<>(); + String errorMessageB = "Invalid value non_integer for configuration a: Not a number of type INT"; + ConfigValue configA = new ConfigValue("a", null, Collections.emptyList(), Arrays.asList(errorMessageB)); + expected.put("a", configA); + + ConfigDef def = new ConfigDef().define("a", Type.INT, Importance.HIGH, "docs"); + Map<String, String> props = new HashMap<>(); + props.put("a", "non_integer"); + + List<ConfigValue> configs = def.validate(props); + for (ConfigValue config: configs) { + String name = config.name(); + ConfigValue expectedConfig = expected.get(name); + assertEquals(expectedConfig, config); + } + } + + private static class IntegerRecommender implements ConfigDef.Recommender { + + private boolean hasParent; + + public IntegerRecommender(boolean hasParent) { + this.hasParent = hasParent; + } + + @Override + public List<Object> validValues(String name, Map<String, Object> parsedConfig) { + List<Object> values = new LinkedList<>(); + if (!hasParent) { + values.addAll(Arrays.asList(1, 2, 3)); + } else { + values.addAll(Arrays.asList(4, 5)); + } + return values; + } + + @Override + public boolean visible(String name, Map<String, Object> parsedConfig) { + return true; + } + } + private void testValidators(Type type, Validator validator, Object defaultVal, Object[] okValues, Object[] badValues) { ConfigDef def = new ConfigDef().define("name", type, defaultVal, validator, Importance.HIGH, "docs"); http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java ---------------------------------------------------------------------- diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java index 934cdbd..1370156 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/Connector.java @@ -18,6 +18,9 @@ package org.apache.kafka.connect.connector; import org.apache.kafka.common.annotation.InterfaceStability; +import org.apache.kafka.common.config.Config; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigValue; import java.util.List; import java.util.Map; @@ -121,4 +124,22 @@ public abstract class Connector { * Stop this connector. */ public abstract void stop(); + + /** + * Validate the connector configuration values against configuration definitions. + * @param connectorConfigs the provided configuration values + * @return List of Config, each Config contains the updated configuration information given + * the current configuration values. + */ + public Config validate(Map<String, String> connectorConfigs) { + ConfigDef configDef = config(); + List<ConfigValue> configValues = configDef.validate(connectorConfigs); + return new Config(configValues); + } + + /** + * Define the configuration for the connector. + * @return The ConfigDef for this connector. + */ + public abstract ConfigDef config(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java ---------------------------------------------------------------------- diff --git a/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java index 7ea1de2..0517b66 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/connector/ConnectorReconfigurationTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.connector; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.errors.ConnectException; import org.junit.Test; @@ -43,6 +44,7 @@ public class ConnectorReconfigurationTest { } private static class TestConnector extends Connector { + private boolean stopException; private int order = 0; public int stopOrder = -1; @@ -78,5 +80,10 @@ public class ConnectorReconfigurationTest { if (stopException) throw new ConnectException("error"); } + + @Override + public ConfigDef config() { + return new ConfigDef(); + } } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java ---------------------------------------------------------------------- diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java index a73153f..d423313 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSinkConnector.java @@ -17,6 +17,9 @@ package org.apache.kafka.connect.file; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.sink.SinkConnector; @@ -31,7 +34,10 @@ import java.util.Map; * sink modes via its 'mode' setting. */ public class FileStreamSinkConnector extends SinkConnector { + public static final String FILE_CONFIG = "file"; + private static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Destination filename."); private String filename; @@ -66,4 +72,9 @@ public class FileStreamSinkConnector extends SinkConnector { public void stop() { // Nothing to do since FileStreamSinkConnector has no background monitoring. } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java ---------------------------------------------------------------------- diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java index 843e999..4fb33b7 100644 --- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java +++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceConnector.java @@ -17,6 +17,9 @@ package org.apache.kafka.connect.file; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.errors.ConnectException; @@ -35,6 +38,10 @@ public class FileStreamSourceConnector extends SourceConnector { public static final String TOPIC_CONFIG = "topic"; public static final String FILE_CONFIG = "file"; + private static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.") + .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to"); + private String filename; private String topic; @@ -74,4 +81,9 @@ public class FileStreamSourceConnector extends SourceConnector { public void stop() { // Nothing to do since FileStreamSourceConnector has no background monitoring. } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index ca85d87..8d83644 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -16,7 +16,16 @@ **/ package org.apache.kafka.connect.runtime; +import org.apache.kafka.common.config.Config; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.ConfigKey; +import org.apache.kafka.common.config.ConfigValue; +import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.errors.NotFoundException; +import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; +import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo; +import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.storage.StatusBackingStore; import org.apache.kafka.connect.util.ConnectorTaskId; @@ -27,7 +36,11 @@ import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; /** * Abstract Herder implementation which handles connector/task lifecycle tracking. Extensions @@ -52,10 +65,14 @@ import java.util.List; */ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, ConnectorStatus.Listener { + protected final Worker worker; protected final StatusBackingStore statusBackingStore; private final String workerId; - public AbstractHerder(StatusBackingStore statusBackingStore, String workerId) { + protected Map<String, Connector> tempConnectors = new ConcurrentHashMap<>(); + + public AbstractHerder(Worker worker, StatusBackingStore statusBackingStore, String workerId) { + this.worker = worker; this.statusBackingStore = statusBackingStore; this.workerId = workerId; } @@ -143,6 +160,95 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con status.workerId(), status.trace()); } + + @Override + public ConfigInfos validateConfigs(String connType, Map<String, String> connectorConfig) { + ConfigDef connectorConfigDef = ConnectorConfig.configDef(); + List<ConfigValue> connectorConfigValues = connectorConfigDef.validate(connectorConfig); + ConfigInfos result = generateResult(connType, connectorConfigDef.configKeys(), connectorConfigValues, Collections.<String>emptyList()); + + if (result.errorCount() != 0) { + return result; + } + + Connector connector = getConnector(connType); + + Config config = connector.validate(connectorConfig); + ConfigDef configDef = connector.config(); + Map<String, ConfigKey> configKeys = configDef.configKeys(); + List<ConfigValue> configValues = config.configValues(); + + Map<String, ConfigKey> resultConfigKeys = new HashMap<>(configKeys); + resultConfigKeys.putAll(connectorConfigDef.configKeys()); + configValues.addAll(connectorConfigValues); + + List<String> allGroups = new LinkedList<>(connectorConfigDef.groups()); + List<String> groups = configDef.groups(); + allGroups.addAll(groups); + + return generateResult(connType, resultConfigKeys, configValues, allGroups); + } + + // public for testing + public static ConfigInfos generateResult(String connType, Map<String, ConfigKey> configKeys, List<ConfigValue> configValues, List<String> groups) { + int errorCount = 0; + List<ConfigInfo> configInfoList = new LinkedList<>(); + + Map<String, ConfigValue> configValueMap = new HashMap<>(); + for (ConfigValue configValue: configValues) { + String configName = configValue.name(); + configValueMap.put(configName, configValue); + if (!configKeys.containsKey(configName)) { + configValue.addErrorMessage("Configuration is not defined: " + configName); + configInfoList.add(new ConfigInfo(null, convertConfigValue(configValue))); + } + } + + for (String configName: configKeys.keySet()) { + ConfigKeyInfo configKeyInfo = convertConfigKey(configKeys.get(configName)); + ConfigValueInfo configValueInfo = null; + if (configValueMap.containsKey(configName)) { + ConfigValue configValue = configValueMap.get(configName); + configValueInfo = convertConfigValue(configValue); + errorCount += configValue.errorMessages().size(); + } + configInfoList.add(new ConfigInfo(configKeyInfo, configValueInfo)); + } + return new ConfigInfos(connType, errorCount, groups, configInfoList); + } + + private static ConfigKeyInfo convertConfigKey(ConfigKey configKey) { + String name = configKey.name; + String type = configKey.type.name(); + Object defaultValue = configKey.defaultValue; + boolean required = false; + if (defaultValue == ConfigDef.NO_DEFAULT_VALUE) { + required = true; + } + String importance = configKey.importance.name(); + String documentation = configKey.documentation; + String group = configKey.group; + int orderInGroup = configKey.orderInGroup; + String width = configKey.width.name(); + String displayName = configKey.displayName; + List<String> dependents = configKey.dependents; + return new ConfigKeyInfo(name, type, required, defaultValue, importance, documentation, group, orderInGroup, width, displayName, dependents); + } + + private static ConfigValueInfo convertConfigValue(ConfigValue configValue) { + return new ConfigValueInfo(configValue.name(), configValue.value(), configValue.recommendedValues(), configValue.errorMessages(), configValue.visible()); + } + + private Connector getConnector(String connType) { + if (tempConnectors.containsKey(connType)) { + return tempConnectors.get(connType); + } else { + Connector connector = worker.getConnector(connType); + tempConnectors.put(connType, connector); + return connector; + } + } + private String trace(Throwable t) { ByteArrayOutputStream output = new ByteArrayOutputStream(); t.printStackTrace(new PrintStream(output)); @@ -152,5 +258,4 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con return null; } } - } http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index 4824acd..e21faf6 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Width; import java.util.HashMap; import java.util.Map; @@ -37,32 +38,41 @@ import java.util.Map; * </p> */ public class ConnectorConfig extends AbstractConfig { + private static final String COMMON_GROUP = "Common"; public static final String NAME_CONFIG = "name"; private static final String NAME_DOC = "Globally unique name to use for this connector."; + private static final String NAME_DISPLAY = "Connector name"; public static final String CONNECTOR_CLASS_CONFIG = "connector.class"; private static final String CONNECTOR_CLASS_DOC = "Name or alias of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector. " + "If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name, " + " or use \"FileStreamSink\" or \"FileStreamSinkConnector\" to make the configuration a bit shorter"; + private static final String CONNECTOR_CLASS_DISPLAY = "Connector class"; public static final String TASKS_MAX_CONFIG = "tasks.max"; private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector."; public static final int TASKS_MAX_DEFAULT = 1; + private static final String TASK_MAX_DISPLAY = "Tasks max"; public static final String TOPICS_CONFIG = "topics"; private static final String TOPICS_DOC = ""; public static final String TOPICS_DEFAULT = ""; + private static final String TOPICS_DISPLAY = "Topics"; private static ConfigDef config; static { config = new ConfigDef() - .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC) - .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC) - .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, Importance.HIGH, TASKS_MAX_DOC) - .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC); + .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY) + .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY) + .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY) + .define(TOPICS_CONFIG, Type.LIST, TOPICS_DEFAULT, Importance.HIGH, TOPICS_DOC, COMMON_GROUP, 4, Width.LONG, TOPICS_DISPLAY); + } + + public static ConfigDef configDef() { + return config; } public ConnectorConfig() { http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java index 95c7700..3ea4a81 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.runtime; +import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; @@ -127,6 +128,12 @@ public interface Herder { */ ConnectorStateInfo.TaskState taskStatus(ConnectorTaskId id); + /** + * Validate the provided connector config values against the configuration definition. + * @param connType the connector class + * @param connectorConfig the provided connector config values + */ + ConfigInfos validateConfigs(String connType, Map<String, String> connectorConfig); class Created<T> { private final boolean created; http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index aa57493..1a9ff11 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -189,6 +189,11 @@ public class Worker { return SinkConnector.class.isAssignableFrom(workerConnector.delegate.getClass()); } + public Connector getConnector(String connType) { + Class<? extends Connector> connectorClass = getConnectorClass(connType); + return instantiateConnector(connectorClass); + } + @SuppressWarnings("unchecked") private Class<? extends Connector> getConnectorClass(String connectorAlias) { // Avoid the classpath scan if the full class name was provided http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index 16b950b..2fc8297 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -84,7 +84,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable { private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250; - private final Worker worker; private final KafkaConfigStorage configStorage; private ClusterConfigState configState; private final Time time; @@ -130,9 +129,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable { WorkerGroupMember member, String restUrl, Time time) { - super(statusBackingStore, workerId); + super(worker, statusBackingStore, workerId); - this.worker = worker; if (configStorage != null) { // For testing. Assume configuration has already been performed this.configStorage = configStorage; @@ -551,6 +549,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable { return generation; } + // Should only be called from work thread, so synchronization should not be needed private boolean isLeader() { return assignment != null && member.memberId().equals(assignment.leader()); @@ -701,7 +700,6 @@ public class DistributedHerder extends AbstractHerder implements Runnable { String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG); ConnectorContext ctx = new HerderConnectorContext(DistributedHerder.this, connName); worker.startConnector(connConfig, ctx, this); - // Immediately request configuration since this could be a brand new connector. However, also only update those // task configs if they are actually different from the existing ones to avoid unnecessary updates when this is // just restoring an existing connector. http://git-wip-us.apache.org/repos/asf/kafka/blob/c07d0172/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java index dbac58f..7e4279a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java @@ -20,12 +20,14 @@ package org.apache.kafka.connect.runtime.rest; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; + import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage; import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; +import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource; import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource; import org.apache.kafka.connect.runtime.rest.resources.RootResource; import org.eclipse.jetty.server.Connector; @@ -44,8 +46,6 @@ import org.glassfish.jersey.servlet.ServletContainer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriBuilder; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -55,6 +55,9 @@ import java.net.URL; import java.util.List; import java.util.Map; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriBuilder; + /** * Embedded server for the REST API that provides the control plane for Kafka Connect workers. */ @@ -95,6 +98,7 @@ public class RestServer { resourceConfig.register(RootResource.class); resourceConfig.register(new ConnectorsResource(herder)); + resourceConfig.register(new ConnectorPluginsResource(herder)); resourceConfig.register(ConnectExceptionMapper.class);
