[GitHub] [kafka] gharris1727 commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

2023-05-24 Thread via GitHub


gharris1727 commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1204482479


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/LoaderSwap.java:
##
@@ -24,13 +24,32 @@ public class LoaderSwap implements AutoCloseable {
 
 private final ClassLoader savedLoader;
 
-public LoaderSwap(ClassLoader savedLoader) {
+public static LoaderSwap use(ClassLoader loader) {
+ClassLoader savedLoader = compareAndSwapLoaders(loader);
+try {
+return new LoaderSwap(savedLoader);
+} catch (Throwable t) {
+compareAndSwapLoaders(savedLoader);
+throw t;
+}
+}

Review Comment:
   I've reverted this change and left Plugins.compareAndSwapLoader unchanged.



##
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##
@@ -119,36 +120,37 @@ public Connect startConnect(Map 
workerProps, String... extraArgs
 
 log.info("Scanning for plugin classes. This might take a moment ...");
 Plugins plugins = new Plugins(workerProps);
-plugins.compareAndSwapWithDelegatingLoader();
-T config = createConfig(workerProps);
-log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+try (LoaderSwap loaderSwap = 
plugins.withClassLoader(plugins.delegatingLoader())) {

Review Comment:
   I've reverted this change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

2023-05-23 Thread via GitHub


gharris1727 commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1203138122


##
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##
@@ -119,36 +120,37 @@ public Connect startConnect(Map 
workerProps, String... extraArgs
 
 log.info("Scanning for plugin classes. This might take a moment ...");
 Plugins plugins = new Plugins(workerProps);
-plugins.compareAndSwapWithDelegatingLoader();
-T config = createConfig(workerProps);
-log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+try (LoaderSwap loaderSwap = 
plugins.withClassLoader(plugins.delegatingLoader())) {

Review Comment:
   Also here is some of the context for this change: 
https://github.com/apache/kafka/pull/13165#discussion_r1161929533
   
   Since the elimination of compareAndSwap is technically unrelated to the 
title change, it could be moved out to it's own PR. Let me know if you'd like 
me to separate the two changes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

2023-05-23 Thread via GitHub


gharris1727 commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1202962972


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/LoaderSwap.java:
##
@@ -24,13 +24,32 @@ public class LoaderSwap implements AutoCloseable {
 
 private final ClassLoader savedLoader;
 
-public LoaderSwap(ClassLoader savedLoader) {
+public static LoaderSwap use(ClassLoader loader) {
+ClassLoader savedLoader = compareAndSwapLoaders(loader);
+try {
+return new LoaderSwap(savedLoader);
+} catch (Throwable t) {
+compareAndSwapLoaders(savedLoader);
+throw t;
+}
+}

Review Comment:
   This is not re-introducing the static logic, it is just refactoring to 
eliminate the open-ended Plugins.compareAndSwap* methods.
   
   This method is only called in two places: by 
DelegatingClassLoader.scanPluginPath (before scanning is finished) and 
Plugins.withClassLoader (after scanning is finished).
   
   I've dropped the visibility and made the DCL call-site mock-able.



##
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java:
##
@@ -89,9 +88,6 @@ public class EmbeddedConnectCluster {
 private final String workerNamePrefix;
 private final AtomicInteger nextWorkerId = new AtomicInteger(0);
 private final EmbeddedConnectClusterAssertions assertions;
-// we should keep the original class loader and set it back after 
connector stopped since the connector will change the class loader,
-// and then, the Mockito will use the unexpected class loader to generate 
the wrong proxy instance, which makes mock failed
-private final ClassLoader originalClassLoader = 
Thread.currentThread().getContextClassLoader();

Review Comment:
   I disagree. I think that this is a symptom of the open-ended context 
classloader swap having unintended downstream effects. The existing fix is 
adequate, but is mostly addressing the symptom rather than the problem.



##
connect/runtime/src/main/java/org/apache/kafka/connect/cli/AbstractConnectCli.java:
##
@@ -119,36 +120,37 @@ public Connect startConnect(Map 
workerProps, String... extraArgs
 
 log.info("Scanning for plugin classes. This might take a moment ...");
 Plugins plugins = new Plugins(workerProps);
-plugins.compareAndSwapWithDelegatingLoader();
-T config = createConfig(workerProps);
-log.debug("Kafka cluster ID: {}", config.kafkaClusterId());
+try (LoaderSwap loaderSwap = 
plugins.withClassLoader(plugins.delegatingLoader())) {

Review Comment:
   I understand that this is a change in semantics, but that change is 
intentional. After this method completes, operations should not require the 
delegating loader and should be performed via the Connect handle. That handle 
only has methods for starting, stopping, and interacting with the REST API, all 
of which should internally handle setting the context classloader when 
appropriate.
   
   The reason that I'm changing this is that I think the open-ended swap 
methods are an anti-pattern, and lead to unexpected behavior later in the 
caller thread.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

2023-04-11 Thread via GitHub


gharris1727 commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1163148455


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java:
##
@@ -159,13 +143,7 @@ public ClassLoader compareAndSwapWithDelegatingLoader() {
  * @return A {@link LoaderSwap} handle which restores the prior 
classloader on {@link LoaderSwap#close()}.
  */
 public LoaderSwap withClassLoader(ClassLoader loader) {

Review Comment:
   I think I would be in favor of this change if we could eliminate or reduce 
the visibility of the delegatingLoader(), but there are a couple of non-trivial 
usages in tests which need a ClassLoader to use when constructing worker tasks. 
I don't think that two call-sites are enough to justify an extra method that 
makes the delegatingLoader into a public method that is only used in tests.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13165: KAFKA-14654: Connector classes should statically initialize with plugin classloader

2023-04-10 Thread via GitHub


gharris1727 commented on code in PR #13165:
URL: https://github.com/apache/kafka/pull/13165#discussion_r1161929533


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java:
##
@@ -360,17 +360,22 @@ private PluginScanResult scanPluginPath(
 builder.useParallelExecutor();
 Reflections reflections = new InternalReflections(builder);
 
-return new PluginScanResult(
-getPluginDesc(reflections, SinkConnector.class, loader),
-getPluginDesc(reflections, SourceConnector.class, loader),
-getPluginDesc(reflections, Converter.class, loader),
-getPluginDesc(reflections, HeaderConverter.class, loader),
-getTransformationPluginDesc(loader, reflections),
-getPredicatePluginDesc(loader, reflections),
-getServiceLoaderPluginDesc(ConfigProvider.class, loader),
-getServiceLoaderPluginDesc(ConnectRestExtension.class, loader),
-
getServiceLoaderPluginDesc(ConnectorClientConfigOverridePolicy.class, loader)
-);
+ClassLoader savedLoader = Plugins.compareAndSwapLoaders(loader);

Review Comment:
   > I don't see a strong reason why it's not [static].
   
   It's non-static to make mocking easier. Rather than having to mock a static 
method of a class, you mock the Plugins instance, and stub out the loader 
swapping functionality.
   
   It appears that there are only a handful of places where 
compareAndSwapLoaders (and compareAndSwapWithDelegatingLoader) is used:
   * In DelegatingClassLoader, during initialization
   * In AbstractConnectCli and MirrorMaker to swap to the delegating classloader
   * In EmbeddedConnectCluster to swap back to the saved loader (KAFKA-12229)
   
   I think that the EmbeddedConnectCluster call-site is just a result of the 
open-ended delegating swaps. I'll refactor all of these call-sites to use 
LoaderSwap, and hide the more dangerous compareAndSwapLoaders now that only 
LoaderSwap is using it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org