gharris1727 commented on code in PR #16984:
URL: https://github.com/apache/kafka/pull/16984#discussion_r1870852425
##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##########
@@ -434,50 +516,93 @@ public Converter newInternalConverter(boolean isKey,
String className, Map<Strin
* @throws ConnectException if the {@link HeaderConverter} implementation
class could not be found
*/
public HeaderConverter newHeaderConverter(AbstractConfig config, String
classPropertyName, ClassLoaderUsage classLoaderUsage) {
- Class<? extends HeaderConverter> klass = null;
+ return getHeaderConverter(config, classPropertyName, null,
classLoaderUsage);
+ }
+
+ public HeaderConverter newHeaderConverter(AbstractConfig config, String
classPropertyName, String versionPropertyName) {
+ ClassLoaderUsage classLoader = config.getString(versionPropertyName)
== null ? ClassLoaderUsage.CURRENT_CLASSLOADER: ClassLoaderUsage.PLUGINS;
+ return getHeaderConverter(config, classPropertyName,
versionPropertyName, classLoader);
+ }
+
+ private HeaderConverter getHeaderConverter(AbstractConfig config, String
classPropertyName, String versionPropertyName, ClassLoaderUsage
classLoaderUsage) {
+ if (!config.originals().containsKey(classPropertyName)) {
+ // This configuration does not define the Header Converter via the
specified property name
+ return null;
+ }
+
+ HeaderConverter plugin = getVersionedPlugin(config, classPropertyName,
versionPropertyName,
+ HeaderConverter.class, classLoaderUsage,
scanResult.headerConverters());
+
+ String configPrefix = classPropertyName + ".";
+ Map<String, Object> converterConfig =
config.originalsWithPrefix(configPrefix);
+ converterConfig.put(ConverterConfig.TYPE_CONFIG,
ConverterType.HEADER.getName());
+ log.debug("Configuring the header converter with configuration
keys:{}{}", System.lineSeparator(), converterConfig.keySet());
+
+ try (LoaderSwap loaderSwap =
withClassLoader(plugin.getClass().getClassLoader())) {
+ plugin.configure(converterConfig);
+ }
+ return plugin;
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private <U> U getVersionedPlugin(
+ AbstractConfig config,
+ String classPropertyName,
+ String versionPropertyName,
+ Class basePluginClass,
+ ClassLoaderUsage classLoaderUsage,
+ SortedSet<PluginDesc<U>> availablePlugins
+ ) {
+
+ String version = versionPropertyName == null ? null :
config.getString(versionPropertyName);
+ VersionRange range = null;
+ if (version != null) {
+ try {
+ range = VersionRange.createFromVersionSpec(version);
+ } catch (InvalidVersionSpecificationException e) {
+ throw new ConnectException(String.format("Invalid version
range for %s: %s %s", classPropertyName, version, e));
+ }
+ }
+
+ Class<? extends U> klass = null;
+ String basePluginClassName = basePluginClass.getSimpleName();
switch (classLoaderUsage) {
case CURRENT_CLASSLOADER:
- if (!config.originals().containsKey(classPropertyName)) {
- // This connector configuration does not define the header
converter via the specified property name
- return null;
- }
Review Comment:
> Although we don't do it, there is nothing stopping us from passing a
WorkerConfig with current class loader or vice versa.
Yeah we don't do that because it doesn't make much sense.
CURRENT_CLASSLOADER implies that there's a particular PluginClassLoader which
is given precedence. The WorkerConfig doesn't have a preferred
PluginClassLoader, so it only ever makes sense to use PLUGINS.
Although CURRENT_CLASSLOADER when the TCCL is the DelegatingClassLoader
should be equivalent to PLUGINS as far as I understand...
I agree that we don't have to fully clean-up the ClassLoaderUsage stuff
immediately, just enough to keep compatible behaviors.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]