Repository: kafka Updated Branches: refs/heads/trunk 18226ff0b -> 9575e9307
KAFKA-3684: SinkConnectorConfig does not return topics in config validation. Author: Liquan Pei <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #1356 from Ishiihara/bug-fix-validate Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9575e930 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9575e930 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9575e930 Branch: refs/heads/trunk Commit: 9575e93070a480eb1ef1e136a67ce0226914b937 Parents: 18226ff Author: Liquan Pei <[email protected]> Authored: Mon May 9 17:37:17 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Mon May 9 17:37:17 2016 -0700 ---------------------------------------------------------------------- .../kafka/connect/runtime/AbstractHerder.java | 2 +- .../kafka/connect/runtime/ConnectorConfig.java | 16 +++++----------- .../kafka/connect/runtime/SinkConnectorConfig.java | 4 ++++ .../connect/runtime/SourceConnectorConfig.java | 4 ++++ 4 files changed, 14 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9575e930/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 43fc4d1..a29d216 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -241,7 +241,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con connectorConfigDef = SinkConnectorConfig.configDef(); } List<ConfigValue> connectorConfigValues = connectorConfigDef.validate(connectorConfig); - + Config config = connector.validate(connectorConfig); ConfigDef configDef = connector.config(); Map<String, ConfigKey> configKeys = configDef.configKeys(); http://git-wip-us.apache.org/repos/asf/kafka/blob/9575e930/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index 0cbfe21..9569b4b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -60,17 +60,11 @@ public class ConnectorConfig extends AbstractConfig { private static final String TASK_MAX_DISPLAY = "Tasks max"; - protected static ConfigDef config; - - static { - config = new ConfigDef() - .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY) - .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY) - .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY); - } - public static ConfigDef configDef() { - return config; + return new ConfigDef() + .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY) + .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY) + .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY); } public ConnectorConfig() { @@ -78,7 +72,7 @@ public class ConnectorConfig extends AbstractConfig { } public ConnectorConfig(Map<String, String> props) { - super(config, props); + super(configDef(), props); } public ConnectorConfig(ConfigDef subClassConfig, Map<String, String> props) { http://git-wip-us.apache.org/repos/asf/kafka/blob/9575e930/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java index cbfc6d1..7de3b02 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SinkConnectorConfig.java @@ -40,6 +40,10 @@ public class SinkConnectorConfig extends ConnectorConfig { this(new HashMap<String, String>()); } + public static ConfigDef configDef() { + return config; + } + public SinkConnectorConfig(Map<String, String> props) { super(config, props); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9575e930/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java index ca9219f..27b0408 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java @@ -17,10 +17,14 @@ package org.apache.kafka.connect.runtime; +import org.apache.kafka.common.config.ConfigDef; + import java.util.Map; public class SourceConnectorConfig extends ConnectorConfig { + private static ConfigDef config = configDef(); + public SourceConnectorConfig(Map<String, String> props) { super(config, props); }
