[GitHub] [kafka] C0urante commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

2023-05-24 Thread via GitHub


C0urante commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1204337446


##
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##
@@ -119,36 +120,37 @@ public Connect startConnect(Map 
workerProps, String... extraArgs
 
 log.info("Scanning for plugin classes. This might take a moment ...");
 Plugins plugins = new Plugins(workerProps);
-plugins.compareAndSwapWithDelegatingLoader();
-T config = createConfig(workerProps);
-log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+try (LoaderSwap loaderSwap = 
plugins.withClassLoader(plugins.delegatingLoader())) {

Review Comment:
   I think it's brittle to change the context classloader back. Currently 
there's no additional logic that requires it, but we have a choice between 
adding the potential for bugs related to the context classloader and not adding 
it.
   
   I get that the approach on trunk requires special treatment for integration 
tests, but since that's already a solved problem, I'd prefer to keep things as 
they are, especially since it's preferable to keep the risk in the testing 
portions of the code base over the main parts.



##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##
@@ -89,9 +88,6 @@ public class EmbeddedConnectCluster {
 private final String workerNamePrefix;
 private final AtomicInteger nextWorkerId = new AtomicInteger(0);
 private final EmbeddedConnectClusterAssertions assertions;
-// we should keep the original class loader and set it back after 
connector stopped since the connector will change the class loader,
-// and then, the Mockito will use the unexpected class loader to generate 
the wrong proxy instance, which makes mock failed
-private final ClassLoader originalClassLoader = 
Thread.currentThread().getContextClassLoader();

Review Comment:
   (Discussed above)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

2023-05-24 Thread via GitHub


C0urante commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1204336495


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/LoaderSwap.java:
##
@@ -24,13 +24,32 @@ public class LoaderSwap implements AutoCloseable {
 
 private final ClassLoader savedLoader;
 
-public LoaderSwap(ClassLoader savedLoader) {
+public static LoaderSwap use(ClassLoader loader) {
+ClassLoader savedLoader = compareAndSwapLoaders(loader);
+try {
+return new LoaderSwap(savedLoader);
+} catch (Throwable t) {
+compareAndSwapLoaders(savedLoader);
+throw t;
+}
+}

Review Comment:
   This does still involve static logic for classloader swapping, though. And 
the comment about internal use doesn't seem very helpful since the way we use 
that term ("internal") has to do with public vs. private API; it's not really 
clear to people that (or why) they shouldn't just upgrade the visibility to 
public.
   
   Ultimately I'd prefer to see this logic duplicated in two places 
(`DelegatingClassLoader::withClassLoader` and `Plugins::withCLassLoader`) 
rather than introduce a new API that might be misused in the future.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

2023-05-23 Thread via GitHub


C0urante commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1202903632


##
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##
@@ -119,36 +120,37 @@ public Connect startConnect(Map 
workerProps, String... extraArgs
 
 log.info("Scanning for plugin classes. This might take a moment ...");
 Plugins plugins = new Plugins(workerProps);
-plugins.compareAndSwapWithDelegatingLoader();
-T config = createConfig(workerProps);
-log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+try (LoaderSwap loaderSwap = 
plugins.withClassLoader(plugins.delegatingLoader())) {

Review Comment:
   This is actually incorrect; we want the delegating loader to remain the 
classloader even after this method exits (normally or exceptionally).



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/LoaderSwap.java:
##
@@ -24,13 +24,32 @@ public class LoaderSwap implements AutoCloseable {
 
 private final ClassLoader savedLoader;
 
-public LoaderSwap(ClassLoader savedLoader) {
+public static LoaderSwap use(ClassLoader loader) {
+ClassLoader savedLoader = compareAndSwapLoaders(loader);
+try {
+return new LoaderSwap(savedLoader);
+} catch (Throwable t) {
+compareAndSwapLoaders(savedLoader);
+throw t;
+}
+}

Review Comment:
   Adding static logic that invokes `compareAndSwapLoaders` is difficult to 
test, which was the motivation for KAFKA-14346. Can we try not to re-introduce 
that kind of static logic?



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##
@@ -360,17 +360,19 @@ private PluginScanResult scanPluginPath(
 builder.useParallelExecutor();
 Reflections reflections = new InternalReflections(builder);
 
-return new PluginScanResult(
-getPluginDesc(reflections, SinkConnector.class, loader),
-getPluginDesc(reflections, SourceConnector.class, loader),
-getPluginDesc(reflections, Converter.class, loader),
-getPluginDesc(reflections, HeaderConverter.class, loader),
-getTransformationPluginDesc(loader, reflections),
-getPredicatePluginDesc(loader, reflections),
-getServiceLoaderPluginDesc(ConfigProvider.class, loader),
-getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
-
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
-);
+try (LoaderSwap loaderSwap = LoaderSwap.use(loader)) {

Review Comment:
   If static initialization logic for a plugin class changes the context 
classloader, then that will remain the classloader for the rest of the plugin 
scanning that takes place in this method.
   
   I don't think we have to accommodate this case, but if there's an easy way 
to, we might try.



##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##
@@ -89,9 +88,6 @@ public class EmbeddedConnectCluster {
 private final String workerNamePrefix;
 private final AtomicInteger nextWorkerId = new AtomicInteger(0);
 private final EmbeddedConnectClusterAssertions assertions;
-// we should keep the original class loader and set it back after 
connector stopped since the connector will change the class loader,
-// and then, the Mockito will use the unexpected class loader to generate 
the wrong proxy instance, which makes mock failed
-private final ClassLoader originalClassLoader = 
Thread.currentThread().getContextClassLoader();

Review Comment:
   I think we need to keep this since the change to 
`AbstractConnectCli::startConnect` is incorrect.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org