snehashisp commented on code in PR #17743:
URL: https://github.com/apache/kafka/pull/17743#discussion_r1966255974
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -549,7 +549,7 @@ public HeaderConverter newHeaderConverter(AbstractConfig
config, String classPro
}
private HeaderConverter newHeaderConverter(AbstractConfig config, String
classPropertyName, String versionPropertyName, ClassLoaderUsage
classLoaderUsage) {
- if (!config.originals().containsKey(classPropertyName) &&
classLoaderUsage == ClassLoaderUsage.CURRENT_CLASSLOADER) {
+ if (config.getClass(classPropertyName) == null && classLoaderUsage ==
ClassLoaderUsage.CURRENT_CLASSLOADER) {
Review Comment:
The config for header converter does not have a default defined in
[ConnectorConfig](https://github.com/apache/kafka/blob/84caaa6e9da06435411510a81fa321d4f99c351f/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java#L106),
its only in WorkerConfig where the SimpleHeaderConverter is set as default.
This method is first called with connectorConfig and then with the workerConfig
if the connector config does not define the header converter.
One thing I have changed is that I am no longer passing the class loader
(CURRENT or PLUGINS) from the worker during converter instantiation. If a
version is provided we should not use the current class loader as that would
always bring in the latest version of the converter packaged with the connector
plugin. Checking if a version is provided in the configs and passing the
appropriate class loader was getting a bit messy in the worker code and I think
its better suited to be abstracted away in plugins (load with CURRENT if no
version is provided and with PLUGINS otherwise). Right now, in the worker,
while instantiating converters from the worker config we explicitly use PLUGINS
loader. With this change, if no version for the converter is provided in the
worker config we will use CURRENT classloader. This should not change the
behavior as both flows for class loading go via the delegating classloader as
it is swapped in prior to the worker config instantiation.
Now with this change, the existing if condition with
`!config.originals().containsKey(classPropertyName)` will return null even if
the worker config has defaults as we are using `config.originals()` which
disregards the defaults specified in the config def.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]