C0urante commented on code in PR #13445:
URL: https://github.com/apache/kafka/pull/13445#discussion_r1150651312


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java:
##########
@@ -835,29 +835,43 @@ public List<ConfigKeyInfo> connectorPluginConfig(String 
pluginName) {
         try (LoaderSwap loaderSwap = 
p.withClassLoader(pluginClass.getClassLoader())) {
             Object plugin = p.newPlugin(pluginName);
             PluginType pluginType = PluginType.from(plugin.getClass());
-            ConfigDef configDefs;
+            // Contains definitions coming from Connect framework
+            ConfigDef baseConfigDefs = null;
+            // Contains definitions specifically declared on the plugin
+            ConfigDef pluginConfigDefs;
             switch (pluginType) {
                 case SINK:
+                    baseConfigDefs = SourceConnectorConfig.configDef();
+                    pluginConfigDefs = ((SinkConnector) plugin).config();
+                    break;
                 case SOURCE:
-                    configDefs = ((Connector) plugin).config();
+                    baseConfigDefs = SourceConnectorConfig.configDef();
+                    pluginConfigDefs = ((SourceConnector) plugin).config();
                     break;
                 case CONVERTER:
-                    configDefs = ((Converter) plugin).config();
+                    pluginConfigDefs = ((Converter) plugin).config();
                     break;
                 case HEADER_CONVERTER:
-                    configDefs = ((HeaderConverter) plugin).config();
+                    pluginConfigDefs = ((HeaderConverter) plugin).config();
                     break;
                 case TRANSFORMATION:
-                    configDefs = ((Transformation<?>) plugin).config();
+                    pluginConfigDefs = ((Transformation<?>) plugin).config();
                     break;
                 case PREDICATE:
-                    configDefs = ((Predicate<?>) plugin).config();
+                    pluginConfigDefs = ((Predicate<?>) plugin).config();
                     break;
                 default:
                     throw new BadRequestException("Invalid plugin type " + 
pluginType + ". Valid types are sink, source, converter, header_converter, 
transformation, predicate.");
             }
+
+            // Track config properties by name and, if the same property is 
defined in multiple places,
+            // give precedence to the one defined by the plugin class
+            Map<String, ConfigKey> configsMap = new HashMap<>();
+            if (baseConfigDefs != null) 
configsMap.putAll(baseConfigDefs.configKeys());
+            configsMap.putAll(pluginConfigDefs.configKeys());

Review Comment:
   I've just realized--I think there's a tiny change in behavior with this 
change where we lose the ordering of elements in the property list provided by 
the existing API.
   
   We can preserve that ordering with a small tweak:
   ```suggestion
               // Preserve the ordering of properties as they're returned from 
each ConfigDef
               Map<String, ConfigKey> configsMap = new 
LinkedHashMap<>(pluginConfigDefs.configKeys());
               if (baseConfigDefs != null)
                   baseConfigDefs.configKeys().forEach(configsMap::putIfAbsent);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to