[
https://issues.apache.org/jira/browse/KAFKA-18119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17901880#comment-17901880
]
Snehashis Pal commented on KAFKA-18119:
---------------------------------------
With some more investigation, I think this is because Java's service loading
will internally load all implementation of a service it finds
[here|https://github.com/corretto/corretto-19/blob/45cd4e2504ec9006bbbd957af65890ac7fa7854b/src/java.base/share/classes/java/util/ServiceLoader.java#L1217]
which gets invoked when the service load scanner tries to find implementation
of converter classes from
[here|https://github.com/apache/kafka/blob/b9a45546a7918799b6fb3c0fe63b56f47d8fcba9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ServiceLoaderScanner.java#L61].
The class loader being used is the plugin class loader of the plugin that is
currently being scanned, and the service loader invokes the *getResources*
method on this loader to find the list of META-INF services in all the packaged
artifacts its path. However, in addition to its own path it also scans for any
implementations in the [parent
path|https://github.com/apache/kafka/blob/b9a45546a7918799b6fb3c0fe63b56f47d8fcba9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java#L96-L102]
which ends up loading all the plugin implementations in class path.
When the connector is initialized, the converter (and other class plugins) are
loaded through *Utils.loadclass* which tries to load the converter class via
the plugin class loader of the connector plugin. This call eventually ends up
finding the class already loaded in the services loading step
[here|https://github.com/apache/kafka/blob/b9a45546a7918799b6fb3c0fe63b56f47d8fcba9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginClassLoader.java#L112].
In case service loading is not used the *findLoadedClass* does not find the
provided class (unless it's also packaged with the plugin) which then delegates
to parent ({*}DelagatingClassLoader{*}) and the current plugin loading
implementation in connect loads the latest class it found in all the plugins
paths. This also explains why if we don't provide the converter plugin name as
part of the connector config, it does load the correct latest version as then
it defaults to using the [Delegating
loader|https://github.com/apache/kafka/blob/b9a45546a7918799b6fb3c0fe63b56f47d8fcba9/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java#L316]
to load the plugin.
I believe that this is not isolated to converters but transformations as well,
though I have not tested them. Also, this seems to be the case when
*LazyClassPathLookupIterator* is used in service loading, not sure what the
behavior would be if Module based loading is used. I tested this in corretto-19
jdk though the service loading implementation seems to be the same in openjdk
as well so it's likely common across all java spec implementations.
> Service loading loads incorrect plugin version.
> -----------------------------------------------
>
> Key: KAFKA-18119
> URL: https://issues.apache.org/jira/browse/KAFKA-18119
> Project: Kafka
> Issue Type: Bug
> Components: connect
> Affects Versions: 3.6.0, 3.7.0, 3.6.1, 3.6.2, 3.8.0, 3.7.1, 3.9.0, 3.8.1
> Reporter: Snehashis Pal
> Priority: Major
>
> Kafka connect seems to be loading incorrect version of connect plugins (such
> as connectors) if run in SERVICE_LOAD mode. If the worker is started with
> only service loading enabled, it does not seem to be loading the latest
> version of a plugin available in its plugins path. Rather it always seems to
> be defaulting to the plugin version provided in the classpath.
> I observed this when I placed an updated json-converter in the plugins path
> and a connector instantiated, still defaulted to using the one provided in
> the connect (Kafka) distribution. It does not happen when the older
> reflections-based plugin scanner is used. This can be reproduced by following
> the same process, noted down below
> * Set {*}plugin.discovery{*}=SERVICE_LOAD
> * Install a newer version of json-converter
> ({*}org.apache.kafka.connect.json.JsonConverter{*}) than the one provided in
> the distribution. Usually, the bundled version is the same as the current
> Kafka distribution.
> * Start a connector with either *key.converter* or *value.converter* set to
> the json converter. Without this in the config the latest version is loaded.
> It might be hard to judge which version of the converter is loaded. For
> testing it might be good to build a new json converter with some log lines
> depicting the version in use. Another way would be run connect in debug mode
> and add some breakpoints in the startTask method in Worker where the
> converters are initialised.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)