Repository: kafka Updated Branches: refs/heads/trunk 84de7f175 -> de982ba3f
KAFKA-5472: Eliminated duplicate group names when validating connector results Kafka Connect was adding duplicate group names in the response from the REST API's validation of connector configurations. This fixes the duplicates and maintains the order of the `ConfigDef` objects so that the `ConfigValue` results are in the same order. This is a blocker and should be merged to 0.11.0. Author: Randall Hauch <[email protected]> Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #3379 from rhauch/KAFKA-5472 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/de982ba3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/de982ba3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/de982ba3 Branch: refs/heads/trunk Commit: de982ba3fbf99664f0aaa5aa4b72af8fd1881232 Parents: 84de7f1 Author: Randall Hauch <[email protected]> Authored: Tue Jun 20 17:48:32 2017 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Tue Jun 20 17:48:32 2017 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/common/config/ConfigDef.java | 5 +++-- .../apache/kafka/connect/runtime/AbstractHerder.java | 15 +++++++++------ 2 files changed, 12 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/de982ba3/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 8197b1f..2514e4f 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -26,6 +26,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Locale; @@ -82,13 +83,13 @@ public class ConfigDef { private Set<String> configsWithNoParent; public ConfigDef() { - configKeys = new HashMap<>(); + configKeys = new LinkedHashMap<>(); groups = new LinkedList<>(); configsWithNoParent = null; } public ConfigDef(ConfigDef base) { - configKeys = new HashMap<>(base.configKeys); + configKeys = new LinkedHashMap<>(base.configKeys); groups = new LinkedList<>(base.groups); configsWithNoParent = base.configsWithNoParent == null ? null : new HashSet<>(base.configsWithNoParent); } http://git-wip-us.apache.org/repos/asf/kafka/blob/de982ba3/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java index 6293b01..cfb8ae0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java @@ -46,9 +46,12 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** @@ -232,11 +235,11 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con throw new BadRequestException("Connector config " + connectorConfig + " contains no connector type"); List<ConfigValue> configValues = new ArrayList<>(); - Map<String, ConfigKey> configKeys = new HashMap<>(); - List<String> allGroups = new ArrayList<>(); + Map<String, ConfigKey> configKeys = new LinkedHashMap<>(); + Set<String> allGroups = new LinkedHashSet<>(); Connector connector = getConnector(connType); - ClassLoader savedLoader = worker.getPlugins().compareAndSwapLoaders(connector); + ClassLoader savedLoader = plugins().compareAndSwapLoaders(connector); try { // do basic connector validation (name, connector type, etc.) ConfigDef basicConfigDef = (connector instanceof SourceConnector) @@ -271,10 +274,10 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con configKeys.putAll(configDef.configKeys()); allGroups.addAll(configDef.groups()); configValues.addAll(config.configValues()); - return generateResult(connType, configKeys, configValues, allGroups); + return generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups)); } catch (ConfigException e) { // Basic validation must have failed. Return the result. - return generateResult(connType, configKeys, configValues, allGroups); + return generateResult(connType, configKeys, configValues, new ArrayList<>(allGroups)); } finally { Plugins.compareAndSwapLoaders(savedLoader); } @@ -353,7 +356,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con if (tempConnectors.containsKey(connType)) { return tempConnectors.get(connType); } else { - Connector connector = worker.getPlugins().newConnector(connType); + Connector connector = plugins().newConnector(connType); tempConnectors.put(connType, connector); return connector; }
