C0urante commented on code in PR #13445: URL: https://github.com/apache/kafka/pull/13445#discussion_r1150651312
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java: ########## @@ -835,29 +835,43 @@ public List<ConfigKeyInfo> connectorPluginConfig(String pluginName) { try (LoaderSwap loaderSwap = p.withClassLoader(pluginClass.getClassLoader())) { Object plugin = p.newPlugin(pluginName); PluginType pluginType = PluginType.from(plugin.getClass()); - ConfigDef configDefs; + // Contains definitions coming from Connect framework + ConfigDef baseConfigDefs = null; + // Contains definitions specifically declared on the plugin + ConfigDef pluginConfigDefs; switch (pluginType) { case SINK: + baseConfigDefs = SourceConnectorConfig.configDef(); + pluginConfigDefs = ((SinkConnector) plugin).config(); + break; case SOURCE: - configDefs = ((Connector) plugin).config(); + baseConfigDefs = SourceConnectorConfig.configDef(); + pluginConfigDefs = ((SourceConnector) plugin).config(); break; case CONVERTER: - configDefs = ((Converter) plugin).config(); + pluginConfigDefs = ((Converter) plugin).config(); break; case HEADER_CONVERTER: - configDefs = ((HeaderConverter) plugin).config(); + pluginConfigDefs = ((HeaderConverter) plugin).config(); break; case TRANSFORMATION: - configDefs = ((Transformation<?>) plugin).config(); + pluginConfigDefs = ((Transformation<?>) plugin).config(); break; case PREDICATE: - configDefs = ((Predicate<?>) plugin).config(); + pluginConfigDefs = ((Predicate<?>) plugin).config(); break; default: throw new BadRequestException("Invalid plugin type " + pluginType + ". Valid types are sink, source, converter, header_converter, transformation, predicate."); } + + // Track config properties by name and, if the same property is defined in multiple places, + // give precedence to the one defined by the plugin class + Map<String, ConfigKey> configsMap = new HashMap<>(); + if (baseConfigDefs != null) configsMap.putAll(baseConfigDefs.configKeys()); + configsMap.putAll(pluginConfigDefs.configKeys()); Review Comment: I've just realized--I think there's a tiny change in behavior with this change where we lose the ordering of elements in the property list provided by the existing API. We can preserve that ordering with a small tweak: ```suggestion // Preserve the ordering of properties as they're returned from each ConfigDef Map<String, ConfigKey> configsMap = new LinkedHashMap<>(pluginConfigDefs.configKeys()); if (baseConfigDefs != null) baseConfigDefs.configKeys().forEach(configsMap::putIfAbsent); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org