[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-13 Thread via GitHub


gharris1727 commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1263122416


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -20,79 +20,114 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
+@RunWith(Parameterized.class)
 public class PluginScannerTest {
 
+private enum ScannerType { Reflection, ServiceLoader };
+
 @Rule
 public TemporaryFolder pluginDir = new TemporaryFolder();
 
+public PluginScanner scanner;
+
+@Parameterized.Parameters
+public static Collection parameters() {
+List values = new ArrayList<>();
+for (ScannerType type : ScannerType.values()) {
+values.add(new Object[]{type});
+}
+return values;
+}
+
+public PluginScannerTest(ScannerType scannerType) {
+switch (scannerType) {
+case Reflection:
+this.scanner = new ReflectionScanner();
+break;
+case ServiceLoader:
+this.scanner = new ServiceLoaderScanner();
+break;
+default:
+throw new IllegalArgumentException("Unknown type " + 
scannerType);
+}
+}
+
 @Test
-public void testLoadingUnloadedPluginClass() {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningEmptyPluginPath() {
+PluginScanResult result = scan(
 Collections.emptyList()
 );
-for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertThrows(ClassNotFoundException.class, () -> 
classLoader.loadClass(pluginClassName));
-}
+assertTrue(result.isEmpty());
 }
 
 @Test
-public void testLoadingPluginClass() throws ClassNotFoundException {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningPluginClasses() {
+PluginScanResult result = scan(
 TestPlugins.pluginPath()
 );
+Set classes = new HashSet<>();
+result.forEach(pluginDesc -> classes.add(pluginDesc.className()));
 for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertNotNull(classLoader.loadClass(pluginClassName));
-assertNotNull(classLoader.pluginClassLoader(pluginClassName));
+assertTrue("Expected " + pluginClassName + "to be discovered but 
it was not",
+classes.contains(pluginClassName));

Review Comment:
   Thanks, I added both of your suggested fixes. The missing manifest and 
incorrect includeByDefault were oversights on my part, so the stronger test is 
a good idea.
   
   Your suggestion also made me realize that I missed some ServiceLoader 
entries, meaning that ServiceLoaderScanner tests were incomplete. After I added 
the entries, two tests fail with an error (not an assertion) because the 
ServiceLoaderScanner doesn't catch the NoClassDefFound (LinkageError) that the 
ReflectionScanner does.
   
   I added the failing MissingSuperclassConverter and NonExistentInterface test 
case in KAFKA-14649 #13182, to assert that they no longer crashed the worker, 
which I thought was reasonable at the time. It appears that the ServiceLoader 
takes the opinion that LinkageError (like other Errors) should not be caught 
due to the severity of the cause, which is making me second-think catching it 
in the ReflectionScanner.
   
   I think there are some options here:
   1. Delete the MissingSuperclassConverter manifest to disable the test case 
for the ServiceLoaderScanner
   2. Catch the LinkageError in ServiceLoaderScanner making it shadow other 
plugins in the same PluginLocation.
   3. Make ServiceLoaderScanner behave like ReflectionScanner by forking the 
ServiceLoader implementation and changing one catch clause.
   4. Make ReflectionScanner behave like ServiceLoaderScanner and kill the 
worker for missing dependencies for plugin classes, restoring the 
pre-KAFKA-14649 behavior for LinkageError.
   
   Of those options, I think i'm going to implement (4), as i'm loathe to fork 
ServiceLoader this early in the project, and the severity and breadth of causes 
for LinkageError means we probably shouldn't handle it.
   
   What do you think? I've pushed the MissingSuperclassConverter manifest so 
the PluginScannerTest is currently failing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-14 Thread via GitHub


gharris1727 commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1264075402


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -20,79 +20,114 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
+@RunWith(Parameterized.class)
 public class PluginScannerTest {
 
+private enum ScannerType { Reflection, ServiceLoader };
+
 @Rule
 public TemporaryFolder pluginDir = new TemporaryFolder();
 
+public PluginScanner scanner;
+
+@Parameterized.Parameters
+public static Collection parameters() {
+List values = new ArrayList<>();
+for (ScannerType type : ScannerType.values()) {
+values.add(new Object[]{type});
+}
+return values;
+}
+
+public PluginScannerTest(ScannerType scannerType) {
+switch (scannerType) {
+case Reflection:
+this.scanner = new ReflectionScanner();
+break;
+case ServiceLoader:
+this.scanner = new ServiceLoaderScanner();
+break;
+default:
+throw new IllegalArgumentException("Unknown type " + 
scannerType);
+}
+}
+
 @Test
-public void testLoadingUnloadedPluginClass() {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningEmptyPluginPath() {
+PluginScanResult result = scan(
 Collections.emptyList()
 );
-for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertThrows(ClassNotFoundException.class, () -> 
classLoader.loadClass(pluginClassName));
-}
+assertTrue(result.isEmpty());
 }
 
 @Test
-public void testLoadingPluginClass() throws ClassNotFoundException {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningPluginClasses() {
+PluginScanResult result = scan(
 TestPlugins.pluginPath()
 );
+Set classes = new HashSet<>();
+result.forEach(pluginDesc -> classes.add(pluginDesc.className()));
 for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertNotNull(classLoader.loadClass(pluginClassName));
-assertNotNull(classLoader.pluginClassLoader(pluginClassName));
+assertTrue("Expected " + pluginClassName + "to be discovered but 
it was not",
+classes.contains(pluginClassName));

Review Comment:
   I looked more into how the Reflections library handles this, and it actually 
just WARN logs these classes and never shows them to us, so we don't even get 
the opportunity to log the error ourselves:
   ```
   [2023-07-14 11:54:26,418] WARN could not get type for name 
test.plugins.MissingSuperclassConverter from any class loader 
(org.reflections.Reflections:318)
   org.reflections.ReflectionsException: could not get type for name 
test.plugins.MissingSuperclassConverter
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312)
at 
org.reflections.ReflectionUtils.lambda$forNames$22(ReflectionUtils.java:330)
at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at 
java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1621)
at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at org.reflections.ReflectionUtils.forNames(ReflectionUtils.java:332)
at org.reflections.Reflections.getSubTypesOf(Reflections.java:404)
at 
org.apache.kafka.connect.runtime.isolation.ReflectionScanner.getPluginDesc(ReflectionScanner.java:118)
at 
org.apache.kafka.connect.runtime.isolation.ReflectionScanner.scanPlugins(ReflectionScanner.java:91)
at 
org.apache.kafka.connect.runtime.isolation.PluginScanner.scanUrlsAndAddPlugins(PluginScanner.java:78)
at 
org.apache.kafka.connect.runtime.isolation.PluginScanner.discoverPlugins(PluginScanner.java:66)
   Caused by: java.lang.NoClassDefFoundError: test/plugins/NonExis

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-14 Thread via GitHub


gharris1727 commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1264131073


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -20,79 +20,114 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
+@RunWith(Parameterized.class)
 public class PluginScannerTest {
 
+private enum ScannerType { Reflection, ServiceLoader };
+
 @Rule
 public TemporaryFolder pluginDir = new TemporaryFolder();
 
+public PluginScanner scanner;
+
+@Parameterized.Parameters
+public static Collection parameters() {
+List values = new ArrayList<>();
+for (ScannerType type : ScannerType.values()) {
+values.add(new Object[]{type});
+}
+return values;
+}
+
+public PluginScannerTest(ScannerType scannerType) {
+switch (scannerType) {
+case Reflection:
+this.scanner = new ReflectionScanner();
+break;
+case ServiceLoader:
+this.scanner = new ServiceLoaderScanner();
+break;
+default:
+throw new IllegalArgumentException("Unknown type " + 
scannerType);
+}
+}
+
 @Test
-public void testLoadingUnloadedPluginClass() {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningEmptyPluginPath() {
+PluginScanResult result = scan(
 Collections.emptyList()
 );
-for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertThrows(ClassNotFoundException.class, () -> 
classLoader.loadClass(pluginClassName));
-}
+assertTrue(result.isEmpty());
 }
 
 @Test
-public void testLoadingPluginClass() throws ClassNotFoundException {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningPluginClasses() {
+PluginScanResult result = scan(
 TestPlugins.pluginPath()
 );
+Set classes = new HashSet<>();
+result.forEach(pluginDesc -> classes.add(pluginDesc.className()));
 for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertNotNull(classLoader.loadClass(pluginClassName));
-assertNotNull(classLoader.pluginClassLoader(pluginClassName));
+assertTrue("Expected " + pluginClassName + "to be discovered but 
it was not",
+classes.contains(pluginClassName));

Review Comment:
   Oh and just to motivate the tweaks I made to your implementation:
   If the ServiceLoader implementation can skip over some LinkageErrors, I 
think it's possible to have two different plugins throw the same exception and 
accidentally trigger the equals condition.
   
   For example, if two plugins implemented NonExistentInterface and appeared 
consecutively in the ServiceLoader manifest, I think the logic should show at 
least one of the errors, but not fail the worker or shadow other plugins. I 
think this would typically happen if a plugin packages both a Source and Sink 
that both have the same missing dependency, but the ServiceLoader 
implementation could continue to make progress.
   
   There's a heuristic for 100 consecutive failures: if you package 100 
consecutive faulty plugins (or have them all on the classpath) then it falls 
back to failing the worker. I don't think i've seen more than ~5-10 connectors, 
and ~20-30 Transforms packaged together. If the errors are non-consecutive then 
the counter resets as well, so the actual number of tolerated hasNext calls 
could be quite large, covering most use-cases.
   
   The heuristic is just there to prevent infinite loops if the exception 
message happens to contain a memory address, or the exception alternates 
between two different messages in the ServiceLoader-not-making-progress case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-14 Thread via GitHub


gharris1727 commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1264131073


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -20,79 +20,114 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
+@RunWith(Parameterized.class)
 public class PluginScannerTest {
 
+private enum ScannerType { Reflection, ServiceLoader };
+
 @Rule
 public TemporaryFolder pluginDir = new TemporaryFolder();
 
+public PluginScanner scanner;
+
+@Parameterized.Parameters
+public static Collection parameters() {
+List values = new ArrayList<>();
+for (ScannerType type : ScannerType.values()) {
+values.add(new Object[]{type});
+}
+return values;
+}
+
+public PluginScannerTest(ScannerType scannerType) {
+switch (scannerType) {
+case Reflection:
+this.scanner = new ReflectionScanner();
+break;
+case ServiceLoader:
+this.scanner = new ServiceLoaderScanner();
+break;
+default:
+throw new IllegalArgumentException("Unknown type " + 
scannerType);
+}
+}
+
 @Test
-public void testLoadingUnloadedPluginClass() {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningEmptyPluginPath() {
+PluginScanResult result = scan(
 Collections.emptyList()
 );
-for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertThrows(ClassNotFoundException.class, () -> 
classLoader.loadClass(pluginClassName));
-}
+assertTrue(result.isEmpty());
 }
 
 @Test
-public void testLoadingPluginClass() throws ClassNotFoundException {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningPluginClasses() {
+PluginScanResult result = scan(
 TestPlugins.pluginPath()
 );
+Set classes = new HashSet<>();
+result.forEach(pluginDesc -> classes.add(pluginDesc.className()));
 for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertNotNull(classLoader.loadClass(pluginClassName));
-assertNotNull(classLoader.pluginClassLoader(pluginClassName));
+assertTrue("Expected " + pluginClassName + "to be discovered but 
it was not",
+classes.contains(pluginClassName));

Review Comment:
   Oh and just to motivate the tweaks I made to your implementation:
   If the ServiceLoader implementation can skip over some LinkageErrors, I 
think it's possible to have two different plugins throw the same exception and 
accidentally trigger the equals condition.
   
   For example, if two plugins implemented NonExistentInterface and appeared 
consecutively in the ServiceLoader manifest, I think the logic should show at 
least one of the errors, but not fail the worker or shadow other plugins. I 
think this would typically happen if a plugin packages both a Source and Sink 
that both have the same missing dependency, but the ServiceLoader 
implementation could continue to make progress.
   
   There's a heuristic for 100 consecutive failures: if you package 100 
consecutive faulty plugins (or have them all on the classpath) then it falls 
back to failing the worker. I don't think i've seen more than ~5-10 connectors, 
and ~20-30 Transforms packaged together. If the errors are non-consecutive then 
the counter resets as well, so the actual number of tolerated hasNext calls 
could be quite large, covering most use-cases.
   
   The heuristic is just there to prevent infinite loops in the 
ServiceLoader-not-making-progress case. If the exception message happens to 
contain a memory address, or the exception alternates between two different 
messages, we should still fail the worker.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-14 Thread via GitHub


gharris1727 commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1264075402


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -20,79 +20,114 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
+@RunWith(Parameterized.class)
 public class PluginScannerTest {
 
+private enum ScannerType { Reflection, ServiceLoader };
+
 @Rule
 public TemporaryFolder pluginDir = new TemporaryFolder();
 
+public PluginScanner scanner;
+
+@Parameterized.Parameters
+public static Collection parameters() {
+List values = new ArrayList<>();
+for (ScannerType type : ScannerType.values()) {
+values.add(new Object[]{type});
+}
+return values;
+}
+
+public PluginScannerTest(ScannerType scannerType) {
+switch (scannerType) {
+case Reflection:
+this.scanner = new ReflectionScanner();
+break;
+case ServiceLoader:
+this.scanner = new ServiceLoaderScanner();
+break;
+default:
+throw new IllegalArgumentException("Unknown type " + 
scannerType);
+}
+}
+
 @Test
-public void testLoadingUnloadedPluginClass() {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningEmptyPluginPath() {
+PluginScanResult result = scan(
 Collections.emptyList()
 );
-for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertThrows(ClassNotFoundException.class, () -> 
classLoader.loadClass(pluginClassName));
-}
+assertTrue(result.isEmpty());
 }
 
 @Test
-public void testLoadingPluginClass() throws ClassNotFoundException {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningPluginClasses() {
+PluginScanResult result = scan(
 TestPlugins.pluginPath()
 );
+Set classes = new HashSet<>();
+result.forEach(pluginDesc -> classes.add(pluginDesc.className()));
 for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertNotNull(classLoader.loadClass(pluginClassName));
-assertNotNull(classLoader.pluginClassLoader(pluginClassName));
+assertTrue("Expected " + pluginClassName + "to be discovered but 
it was not",
+classes.contains(pluginClassName));

Review Comment:
   I looked more into how the Reflections library handles this, and it actually 
just WARN logs these classes and never shows them to us, so we don't even get 
the opportunity to log the error ourselves:
   ```
   [2023-07-14 11:54:26,418] WARN could not get type for name 
test.plugins.MissingSuperclassConverter from any class loader 
(org.reflections.Reflections:318)
   org.reflections.ReflectionsException: could not get type for name 
test.plugins.MissingSuperclassConverter
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312)
at 
org.reflections.ReflectionUtils.lambda$forNames$22(ReflectionUtils.java:330)
at 
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at 
java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1621)
at 
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at 
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at 
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at 
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at 
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at org.reflections.ReflectionUtils.forNames(ReflectionUtils.java:332)
at org.reflections.Reflections.getSubTypesOf(Reflections.java:404)
at 
org.apache.kafka.connect.runtime.isolation.ReflectionScanner.getPluginDesc(ReflectionScanner.java:118)
at 
org.apache.kafka.connect.runtime.isolation.ReflectionScanner.scanPlugins(ReflectionScanner.java:91)
at 
org.apache.kafka.connect.runtime.isolation.PluginScanner.scanUrlsAndAddPlugins(PluginScanner.java:78)
at 
org.apache.kafka.connect.runtime.isolation.PluginScanner.discoverPlugins(PluginScanner.java:66)
   Caused by: java.lang.NoClassDefFoundError: test/plugins/NonExis

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-17 Thread via GitHub


gharris1727 commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1265841495


##
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java:
##
@@ -20,79 +20,114 @@
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.nio.file.Files;
 import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
+@RunWith(Parameterized.class)
 public class PluginScannerTest {
 
+private enum ScannerType { Reflection, ServiceLoader };
+
 @Rule
 public TemporaryFolder pluginDir = new TemporaryFolder();
 
+public PluginScanner scanner;
+
+@Parameterized.Parameters
+public static Collection parameters() {
+List values = new ArrayList<>();
+for (ScannerType type : ScannerType.values()) {
+values.add(new Object[]{type});
+}
+return values;
+}
+
+public PluginScannerTest(ScannerType scannerType) {
+switch (scannerType) {
+case Reflection:
+this.scanner = new ReflectionScanner();
+break;
+case ServiceLoader:
+this.scanner = new ServiceLoaderScanner();
+break;
+default:
+throw new IllegalArgumentException("Unknown type " + 
scannerType);
+}
+}
+
 @Test
-public void testLoadingUnloadedPluginClass() {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningEmptyPluginPath() {
+PluginScanResult result = scan(
 Collections.emptyList()
 );
-for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertThrows(ClassNotFoundException.class, () -> 
classLoader.loadClass(pluginClassName));
-}
+assertTrue(result.isEmpty());
 }
 
 @Test
-public void testLoadingPluginClass() throws ClassNotFoundException {
-DelegatingClassLoader classLoader = initClassLoader(
+public void testScanningPluginClasses() {
+PluginScanResult result = scan(
 TestPlugins.pluginPath()
 );
+Set classes = new HashSet<>();
+result.forEach(pluginDesc -> classes.add(pluginDesc.className()));
 for (String pluginClassName : TestPlugins.pluginClasses()) {
-assertNotNull(classLoader.loadClass(pluginClassName));
-assertNotNull(classLoader.pluginClassLoader(pluginClassName));
+assertTrue("Expected " + pluginClassName + "to be discovered but 
it was not",
+classes.contains(pluginClassName));

Review Comment:
   > But the current implementation still goes through the 100 tries before 
giving up. I agree with the implementation (better to err on the side of not 
unnecessarily killing the worker), but just wanted to double check--this is 
intentional, right?
   
   Yes it is intentional. This edit should be more clear:
   
   > The heuristic is just there to prevent infinite loops in the 
ServiceLoader-not-making-progress case.  If the exception message happens to 
contain a memory address, or the exception alternates between two different 
messages, we should still **eventually** fail the worker **and not be stuck in 
an infinite loop**.
   
   > Also, I notice that we've added LinkageError to the catch clause around 
invoking Iterator::next. Do we think it's worth adding similar logic to prevent 
infinite loops in the event that a call to Iterator::next that throws an 
exception also fails to advance the iterator? Or is this unreasonable paranoia?
   
   I suppose if we're being paranoid about hasNext, being paranoid about next() 
for JDK8 implementations would also make sense. I've made the error-handling 
function more generic and applied it to all of the ServiceLoader operations 
(including load and iterator, which i've never seen fail)
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-18 Thread via GitHub


gharris1727 commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1267082036


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##
@@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) {
 }
 
 @SuppressWarnings({"rawtypes", "unchecked"})
-protected  PluginDesc pluginDesc(Class plugin, String 
version, ClassLoader loader) {
-return new PluginDesc(plugin, version, loader);
+protected  PluginDesc pluginDesc(Class plugin, String 
version, PluginSource source) {
+return new PluginDesc(plugin, version, source.loader());
 }
 
 @SuppressWarnings("unchecked")
-protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, ClassLoader loader) {
+protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, PluginSource source) {
 SortedSet> result = new TreeSet<>();
-ServiceLoader serviceLoader = ServiceLoader.load(klass, loader);
-for (Iterator iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
-try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+ServiceLoader serviceLoader = handleLinkageError(klass, source, () 
-> ServiceLoader.load(klass, source.loader()));

Review Comment:
   In the case where an Implementation of ServiceLoader may eagerly-evaluate a 
first provider and cause a LinkageError to appear, I wanted to re-use the 
handleLinkageError logging.
   
   I don't think that the retries for these operations would be effective, and 
that the later retries would probably be wasteful. However, I thought re-using 
the same function in both places was simpler than customizing the behavior of 
each call. Since this code is (probably never) going to be exercised, I tried 
to keep it as simple as possible.
   
   Do you think that we should log errors from load and iterator? I can catch 
the error or just let it propagate to the caller.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-18 Thread via GitHub


gharris1727 commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1267094589


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##
@@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) {
 }
 
 @SuppressWarnings({"rawtypes", "unchecked"})
-protected  PluginDesc pluginDesc(Class plugin, String 
version, ClassLoader loader) {
-return new PluginDesc(plugin, version, loader);
+protected  PluginDesc pluginDesc(Class plugin, String 
version, PluginSource source) {
+return new PluginDesc(plugin, version, source.loader());
 }
 
 @SuppressWarnings("unchecked")
-protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, ClassLoader loader) {
+protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, PluginSource source) {
 SortedSet> result = new TreeSet<>();
-ServiceLoader serviceLoader = ServiceLoader.load(klass, loader);
-for (Iterator iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
-try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+ServiceLoader serviceLoader = handleLinkageError(klass, source, () 
-> ServiceLoader.load(klass, source.loader()));
+Iterator iterator = handleLinkageError(klass, source, 
serviceLoader::iterator);
+while (handleLinkageError(klass, source, iterator::hasNext)) {
+try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
 T pluginImpl;
 try {
-pluginImpl = iterator.next();
+pluginImpl = handleLinkageError(klass, source, 
iterator::next);
 } catch (ServiceConfigurationError t) {
-log.error("Failed to discover {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
+log.error("Failed to discover {} in {}{}",
+klass.getSimpleName(), source.location(), 
reflectiveErrorDescription(t.getCause()), t);

Review Comment:
   > Since that field (and its accessor method) are currently only used for log 
messages, what do you think about altering PluginSource::location to return a 
string, and using "System classpath" in that case?
   
   I can't do that, because I need the Path object later in the migration 
script. I've used a non-null sentinel path instead, so that all of the logging 
call-sites are improved but the Path object is still available.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-18 Thread via GitHub


gharris1727 commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1267103768


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##
@@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) {
 }
 
 @SuppressWarnings({"rawtypes", "unchecked"})
-protected  PluginDesc pluginDesc(Class plugin, String 
version, ClassLoader loader) {
-return new PluginDesc(plugin, version, loader);
+protected  PluginDesc pluginDesc(Class plugin, String 
version, PluginSource source) {
+return new PluginDesc(plugin, version, source.loader());
 }
 
 @SuppressWarnings("unchecked")
-protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, ClassLoader loader) {
+protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, PluginSource source) {
 SortedSet> result = new TreeSet<>();
-ServiceLoader serviceLoader = ServiceLoader.load(klass, loader);
-for (Iterator iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
-try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+ServiceLoader serviceLoader = handleLinkageError(klass, source, () 
-> ServiceLoader.load(klass, source.loader()));
+Iterator iterator = handleLinkageError(klass, source, 
serviceLoader::iterator);
+while (handleLinkageError(klass, source, iterator::hasNext)) {
+try (LoaderSwap loaderSwap = withClassLoader(source.loader())) {
 T pluginImpl;
 try {
-pluginImpl = iterator.next();
+pluginImpl = handleLinkageError(klass, source, 
iterator::next);
 } catch (ServiceConfigurationError t) {
-log.error("Failed to discover {}{}", 
klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t);
+log.error("Failed to discover {} in {}{}",
+klass.getSimpleName(), source.location(), 
reflectiveErrorDescription(t.getCause()), t);
 continue;
 }
 Class pluginKlass = (Class) 
pluginImpl.getClass();
-if (pluginKlass.getClassLoader() != loader) {
+if (pluginKlass.getClassLoader() != source.loader()) {
 log.debug("{} from other classloader {} is visible from 
{}, excluding to prevent isolated loading",
-pluginKlass.getSimpleName(), 
pluginKlass.getClassLoader(), loader);
+pluginKlass.getSimpleName(), 
pluginKlass.getClassLoader(), source.location());
 continue;
 }
-result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), 
loader));
+result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), 
source));
 }
 }
 return result;
 }
 
+/**
+ * Helper to evaluate a {@link ServiceLoader} operation while handling 
{@link LinkageError}s.
+ *
+ * @param klass The plugin superclass which is being loaded
+ * @param function A function on a {@link ServiceLoader} which may throw 
{@link LinkageError}
+ * @return the return value of function
+ * @throws Error errors thrown by the passed-in function
+ * @param  Type being iterated over by the ServiceLoader
+ * @param  Return value of the passed-in function
+ */
+private  U handleLinkageError(Class klass, PluginSource source, 
Supplier function) {
+// It's difficult to know for sure if the iterator was able to advance 
past the first broken
+// plugin class, or if it will continue to fail on that broken class 
for any subsequent calls
+// to Iterator::hasNext or Iterator::next
+// For reference, see https://bugs.openjdk.org/browse/JDK-8196182, 
which describes
+// the behavior we are trying to mitigate with this logic as buggy, 
but indicates that a fix
+// in the JDK standard library ServiceLoader implementation is 
unlikely to land
+LinkageError lastError = null;
+// Try a fixed maximum number of times in case the ServiceLoader 
cannot move past a faulty plugin,
+// but the LinkageError varies between calls. This limit is chosen to 
be higher than the typical number
+// of plugins in a single plugin location, and to limit the amount of 
log-spam on startup.
+for (int i = 0; i < 100; i++) {
+try {
+return function.get();
+} catch (LinkageError t) {
+// As an optimization, hide subsequent error logs if two 
consecutive errors look similar.
+// This reduces log-spam for iterators which cannot advance 
and rethrow the same exception.
+if (lastError == null
+|| !Objects.equals(lastError.getClass(), t.getClass())
+|| !Objects.equa

[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation

2023-07-18 Thread via GitHub


gharris1727 commented on code in PR #13971:
URL: https://github.com/apache/kafka/pull/13971#discussion_r1267248169


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java:
##
@@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) {
 }
 
 @SuppressWarnings({"rawtypes", "unchecked"})
-protected  PluginDesc pluginDesc(Class plugin, String 
version, ClassLoader loader) {
-return new PluginDesc(plugin, version, loader);
+protected  PluginDesc pluginDesc(Class plugin, String 
version, PluginSource source) {
+return new PluginDesc(plugin, version, source.loader());
 }
 
 @SuppressWarnings("unchecked")
-protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, ClassLoader loader) {
+protected  SortedSet> getServiceLoaderPluginDesc(Class 
klass, PluginSource source) {
 SortedSet> result = new TreeSet<>();
-ServiceLoader serviceLoader = ServiceLoader.load(klass, loader);
-for (Iterator iterator = serviceLoader.iterator(); 
iterator.hasNext(); ) {
-try (LoaderSwap loaderSwap = withClassLoader(loader)) {
+ServiceLoader serviceLoader = handleLinkageError(klass, source, () 
-> ServiceLoader.load(klass, source.loader()));

Review Comment:
   I've removed these usages, so the error propagates normally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org