[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation
gharris1727 commented on code in PR #13971: URL: https://github.com/apache/kafka/pull/13971#discussion_r1263122416 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java: ## @@ -20,79 +20,114 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +@RunWith(Parameterized.class) public class PluginScannerTest { +private enum ScannerType { Reflection, ServiceLoader }; + @Rule public TemporaryFolder pluginDir = new TemporaryFolder(); +public PluginScanner scanner; + +@Parameterized.Parameters +public static Collection parameters() { +List values = new ArrayList<>(); +for (ScannerType type : ScannerType.values()) { +values.add(new Object[]{type}); +} +return values; +} + +public PluginScannerTest(ScannerType scannerType) { +switch (scannerType) { +case Reflection: +this.scanner = new ReflectionScanner(); +break; +case ServiceLoader: +this.scanner = new ServiceLoaderScanner(); +break; +default: +throw new IllegalArgumentException("Unknown type " + scannerType); +} +} + @Test -public void testLoadingUnloadedPluginClass() { -DelegatingClassLoader classLoader = initClassLoader( +public void testScanningEmptyPluginPath() { +PluginScanResult result = scan( Collections.emptyList() ); -for (String pluginClassName : TestPlugins.pluginClasses()) { -assertThrows(ClassNotFoundException.class, () -> classLoader.loadClass(pluginClassName)); -} +assertTrue(result.isEmpty()); } @Test -public void testLoadingPluginClass() throws ClassNotFoundException { -DelegatingClassLoader classLoader = initClassLoader( +public void testScanningPluginClasses() { +PluginScanResult result = scan( TestPlugins.pluginPath() ); +Set classes = new HashSet<>(); +result.forEach(pluginDesc -> classes.add(pluginDesc.className())); for (String pluginClassName : TestPlugins.pluginClasses()) { -assertNotNull(classLoader.loadClass(pluginClassName)); -assertNotNull(classLoader.pluginClassLoader(pluginClassName)); +assertTrue("Expected " + pluginClassName + "to be discovered but it was not", +classes.contains(pluginClassName)); Review Comment: Thanks, I added both of your suggested fixes. The missing manifest and incorrect includeByDefault were oversights on my part, so the stronger test is a good idea. Your suggestion also made me realize that I missed some ServiceLoader entries, meaning that ServiceLoaderScanner tests were incomplete. After I added the entries, two tests fail with an error (not an assertion) because the ServiceLoaderScanner doesn't catch the NoClassDefFound (LinkageError) that the ReflectionScanner does. I added the failing MissingSuperclassConverter and NonExistentInterface test case in KAFKA-14649 #13182, to assert that they no longer crashed the worker, which I thought was reasonable at the time. It appears that the ServiceLoader takes the opinion that LinkageError (like other Errors) should not be caught due to the severity of the cause, which is making me second-think catching it in the ReflectionScanner. I think there are some options here: 1. Delete the MissingSuperclassConverter manifest to disable the test case for the ServiceLoaderScanner 2. Catch the LinkageError in ServiceLoaderScanner making it shadow other plugins in the same PluginLocation. 3. Make ServiceLoaderScanner behave like ReflectionScanner by forking the ServiceLoader implementation and changing one catch clause. 4. Make ReflectionScanner behave like ServiceLoaderScanner and kill the worker for missing dependencies for plugin classes, restoring the pre-KAFKA-14649 behavior for LinkageError. Of those options, I think i'm going to implement (4), as i'm loathe to fork ServiceLoader this early in the project, and the severity and breadth of causes for LinkageError means we probably shouldn't handle it. What do you think? I've pushed the MissingSuperclassConverter manifest so the PluginScannerTest is currently failing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation
gharris1727 commented on code in PR #13971: URL: https://github.com/apache/kafka/pull/13971#discussion_r1264075402 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java: ## @@ -20,79 +20,114 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +@RunWith(Parameterized.class) public class PluginScannerTest { +private enum ScannerType { Reflection, ServiceLoader }; + @Rule public TemporaryFolder pluginDir = new TemporaryFolder(); +public PluginScanner scanner; + +@Parameterized.Parameters +public static Collection parameters() { +List values = new ArrayList<>(); +for (ScannerType type : ScannerType.values()) { +values.add(new Object[]{type}); +} +return values; +} + +public PluginScannerTest(ScannerType scannerType) { +switch (scannerType) { +case Reflection: +this.scanner = new ReflectionScanner(); +break; +case ServiceLoader: +this.scanner = new ServiceLoaderScanner(); +break; +default: +throw new IllegalArgumentException("Unknown type " + scannerType); +} +} + @Test -public void testLoadingUnloadedPluginClass() { -DelegatingClassLoader classLoader = initClassLoader( +public void testScanningEmptyPluginPath() { +PluginScanResult result = scan( Collections.emptyList() ); -for (String pluginClassName : TestPlugins.pluginClasses()) { -assertThrows(ClassNotFoundException.class, () -> classLoader.loadClass(pluginClassName)); -} +assertTrue(result.isEmpty()); } @Test -public void testLoadingPluginClass() throws ClassNotFoundException { -DelegatingClassLoader classLoader = initClassLoader( +public void testScanningPluginClasses() { +PluginScanResult result = scan( TestPlugins.pluginPath() ); +Set classes = new HashSet<>(); +result.forEach(pluginDesc -> classes.add(pluginDesc.className())); for (String pluginClassName : TestPlugins.pluginClasses()) { -assertNotNull(classLoader.loadClass(pluginClassName)); -assertNotNull(classLoader.pluginClassLoader(pluginClassName)); +assertTrue("Expected " + pluginClassName + "to be discovered but it was not", +classes.contains(pluginClassName)); Review Comment: I looked more into how the Reflections library handles this, and it actually just WARN logs these classes and never shows them to us, so we don't even get the opportunity to log the error ourselves: ``` [2023-07-14 11:54:26,418] WARN could not get type for name test.plugins.MissingSuperclassConverter from any class loader (org.reflections.Reflections:318) org.reflections.ReflectionsException: could not get type for name test.plugins.MissingSuperclassConverter at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312) at org.reflections.ReflectionUtils.lambda$forNames$22(ReflectionUtils.java:330) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1621) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at org.reflections.ReflectionUtils.forNames(ReflectionUtils.java:332) at org.reflections.Reflections.getSubTypesOf(Reflections.java:404) at org.apache.kafka.connect.runtime.isolation.ReflectionScanner.getPluginDesc(ReflectionScanner.java:118) at org.apache.kafka.connect.runtime.isolation.ReflectionScanner.scanPlugins(ReflectionScanner.java:91) at org.apache.kafka.connect.runtime.isolation.PluginScanner.scanUrlsAndAddPlugins(PluginScanner.java:78) at org.apache.kafka.connect.runtime.isolation.PluginScanner.discoverPlugins(PluginScanner.java:66) Caused by: java.lang.NoClassDefFoundError: test/plugins/NonExis
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation
gharris1727 commented on code in PR #13971: URL: https://github.com/apache/kafka/pull/13971#discussion_r1264131073 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java: ## @@ -20,79 +20,114 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +@RunWith(Parameterized.class) public class PluginScannerTest { +private enum ScannerType { Reflection, ServiceLoader }; + @Rule public TemporaryFolder pluginDir = new TemporaryFolder(); +public PluginScanner scanner; + +@Parameterized.Parameters +public static Collection parameters() { +List values = new ArrayList<>(); +for (ScannerType type : ScannerType.values()) { +values.add(new Object[]{type}); +} +return values; +} + +public PluginScannerTest(ScannerType scannerType) { +switch (scannerType) { +case Reflection: +this.scanner = new ReflectionScanner(); +break; +case ServiceLoader: +this.scanner = new ServiceLoaderScanner(); +break; +default: +throw new IllegalArgumentException("Unknown type " + scannerType); +} +} + @Test -public void testLoadingUnloadedPluginClass() { -DelegatingClassLoader classLoader = initClassLoader( +public void testScanningEmptyPluginPath() { +PluginScanResult result = scan( Collections.emptyList() ); -for (String pluginClassName : TestPlugins.pluginClasses()) { -assertThrows(ClassNotFoundException.class, () -> classLoader.loadClass(pluginClassName)); -} +assertTrue(result.isEmpty()); } @Test -public void testLoadingPluginClass() throws ClassNotFoundException { -DelegatingClassLoader classLoader = initClassLoader( +public void testScanningPluginClasses() { +PluginScanResult result = scan( TestPlugins.pluginPath() ); +Set classes = new HashSet<>(); +result.forEach(pluginDesc -> classes.add(pluginDesc.className())); for (String pluginClassName : TestPlugins.pluginClasses()) { -assertNotNull(classLoader.loadClass(pluginClassName)); -assertNotNull(classLoader.pluginClassLoader(pluginClassName)); +assertTrue("Expected " + pluginClassName + "to be discovered but it was not", +classes.contains(pluginClassName)); Review Comment: Oh and just to motivate the tweaks I made to your implementation: If the ServiceLoader implementation can skip over some LinkageErrors, I think it's possible to have two different plugins throw the same exception and accidentally trigger the equals condition. For example, if two plugins implemented NonExistentInterface and appeared consecutively in the ServiceLoader manifest, I think the logic should show at least one of the errors, but not fail the worker or shadow other plugins. I think this would typically happen if a plugin packages both a Source and Sink that both have the same missing dependency, but the ServiceLoader implementation could continue to make progress. There's a heuristic for 100 consecutive failures: if you package 100 consecutive faulty plugins (or have them all on the classpath) then it falls back to failing the worker. I don't think i've seen more than ~5-10 connectors, and ~20-30 Transforms packaged together. If the errors are non-consecutive then the counter resets as well, so the actual number of tolerated hasNext calls could be quite large, covering most use-cases. The heuristic is just there to prevent infinite loops if the exception message happens to contain a memory address, or the exception alternates between two different messages in the ServiceLoader-not-making-progress case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation
gharris1727 commented on code in PR #13971: URL: https://github.com/apache/kafka/pull/13971#discussion_r1264131073 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java: ## @@ -20,79 +20,114 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +@RunWith(Parameterized.class) public class PluginScannerTest { +private enum ScannerType { Reflection, ServiceLoader }; + @Rule public TemporaryFolder pluginDir = new TemporaryFolder(); +public PluginScanner scanner; + +@Parameterized.Parameters +public static Collection parameters() { +List values = new ArrayList<>(); +for (ScannerType type : ScannerType.values()) { +values.add(new Object[]{type}); +} +return values; +} + +public PluginScannerTest(ScannerType scannerType) { +switch (scannerType) { +case Reflection: +this.scanner = new ReflectionScanner(); +break; +case ServiceLoader: +this.scanner = new ServiceLoaderScanner(); +break; +default: +throw new IllegalArgumentException("Unknown type " + scannerType); +} +} + @Test -public void testLoadingUnloadedPluginClass() { -DelegatingClassLoader classLoader = initClassLoader( +public void testScanningEmptyPluginPath() { +PluginScanResult result = scan( Collections.emptyList() ); -for (String pluginClassName : TestPlugins.pluginClasses()) { -assertThrows(ClassNotFoundException.class, () -> classLoader.loadClass(pluginClassName)); -} +assertTrue(result.isEmpty()); } @Test -public void testLoadingPluginClass() throws ClassNotFoundException { -DelegatingClassLoader classLoader = initClassLoader( +public void testScanningPluginClasses() { +PluginScanResult result = scan( TestPlugins.pluginPath() ); +Set classes = new HashSet<>(); +result.forEach(pluginDesc -> classes.add(pluginDesc.className())); for (String pluginClassName : TestPlugins.pluginClasses()) { -assertNotNull(classLoader.loadClass(pluginClassName)); -assertNotNull(classLoader.pluginClassLoader(pluginClassName)); +assertTrue("Expected " + pluginClassName + "to be discovered but it was not", +classes.contains(pluginClassName)); Review Comment: Oh and just to motivate the tweaks I made to your implementation: If the ServiceLoader implementation can skip over some LinkageErrors, I think it's possible to have two different plugins throw the same exception and accidentally trigger the equals condition. For example, if two plugins implemented NonExistentInterface and appeared consecutively in the ServiceLoader manifest, I think the logic should show at least one of the errors, but not fail the worker or shadow other plugins. I think this would typically happen if a plugin packages both a Source and Sink that both have the same missing dependency, but the ServiceLoader implementation could continue to make progress. There's a heuristic for 100 consecutive failures: if you package 100 consecutive faulty plugins (or have them all on the classpath) then it falls back to failing the worker. I don't think i've seen more than ~5-10 connectors, and ~20-30 Transforms packaged together. If the errors are non-consecutive then the counter resets as well, so the actual number of tolerated hasNext calls could be quite large, covering most use-cases. The heuristic is just there to prevent infinite loops in the ServiceLoader-not-making-progress case. If the exception message happens to contain a memory address, or the exception alternates between two different messages, we should still fail the worker. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation
gharris1727 commented on code in PR #13971: URL: https://github.com/apache/kafka/pull/13971#discussion_r1264075402 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java: ## @@ -20,79 +20,114 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +@RunWith(Parameterized.class) public class PluginScannerTest { +private enum ScannerType { Reflection, ServiceLoader }; + @Rule public TemporaryFolder pluginDir = new TemporaryFolder(); +public PluginScanner scanner; + +@Parameterized.Parameters +public static Collection parameters() { +List values = new ArrayList<>(); +for (ScannerType type : ScannerType.values()) { +values.add(new Object[]{type}); +} +return values; +} + +public PluginScannerTest(ScannerType scannerType) { +switch (scannerType) { +case Reflection: +this.scanner = new ReflectionScanner(); +break; +case ServiceLoader: +this.scanner = new ServiceLoaderScanner(); +break; +default: +throw new IllegalArgumentException("Unknown type " + scannerType); +} +} + @Test -public void testLoadingUnloadedPluginClass() { -DelegatingClassLoader classLoader = initClassLoader( +public void testScanningEmptyPluginPath() { +PluginScanResult result = scan( Collections.emptyList() ); -for (String pluginClassName : TestPlugins.pluginClasses()) { -assertThrows(ClassNotFoundException.class, () -> classLoader.loadClass(pluginClassName)); -} +assertTrue(result.isEmpty()); } @Test -public void testLoadingPluginClass() throws ClassNotFoundException { -DelegatingClassLoader classLoader = initClassLoader( +public void testScanningPluginClasses() { +PluginScanResult result = scan( TestPlugins.pluginPath() ); +Set classes = new HashSet<>(); +result.forEach(pluginDesc -> classes.add(pluginDesc.className())); for (String pluginClassName : TestPlugins.pluginClasses()) { -assertNotNull(classLoader.loadClass(pluginClassName)); -assertNotNull(classLoader.pluginClassLoader(pluginClassName)); +assertTrue("Expected " + pluginClassName + "to be discovered but it was not", +classes.contains(pluginClassName)); Review Comment: I looked more into how the Reflections library handles this, and it actually just WARN logs these classes and never shows them to us, so we don't even get the opportunity to log the error ourselves: ``` [2023-07-14 11:54:26,418] WARN could not get type for name test.plugins.MissingSuperclassConverter from any class loader (org.reflections.Reflections:318) org.reflections.ReflectionsException: could not get type for name test.plugins.MissingSuperclassConverter at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312) at org.reflections.ReflectionUtils.lambda$forNames$22(ReflectionUtils.java:330) at java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195) at java.base/java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1621) at java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474) at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913) at java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) at org.reflections.ReflectionUtils.forNames(ReflectionUtils.java:332) at org.reflections.Reflections.getSubTypesOf(Reflections.java:404) at org.apache.kafka.connect.runtime.isolation.ReflectionScanner.getPluginDesc(ReflectionScanner.java:118) at org.apache.kafka.connect.runtime.isolation.ReflectionScanner.scanPlugins(ReflectionScanner.java:91) at org.apache.kafka.connect.runtime.isolation.PluginScanner.scanUrlsAndAddPlugins(PluginScanner.java:78) at org.apache.kafka.connect.runtime.isolation.PluginScanner.discoverPlugins(PluginScanner.java:66) Caused by: java.lang.NoClassDefFoundError: test/plugins/NonExis
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation
gharris1727 commented on code in PR #13971: URL: https://github.com/apache/kafka/pull/13971#discussion_r1265841495 ## connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginScannerTest.java: ## @@ -20,79 +20,114 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Set; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +@RunWith(Parameterized.class) public class PluginScannerTest { +private enum ScannerType { Reflection, ServiceLoader }; + @Rule public TemporaryFolder pluginDir = new TemporaryFolder(); +public PluginScanner scanner; + +@Parameterized.Parameters +public static Collection parameters() { +List values = new ArrayList<>(); +for (ScannerType type : ScannerType.values()) { +values.add(new Object[]{type}); +} +return values; +} + +public PluginScannerTest(ScannerType scannerType) { +switch (scannerType) { +case Reflection: +this.scanner = new ReflectionScanner(); +break; +case ServiceLoader: +this.scanner = new ServiceLoaderScanner(); +break; +default: +throw new IllegalArgumentException("Unknown type " + scannerType); +} +} + @Test -public void testLoadingUnloadedPluginClass() { -DelegatingClassLoader classLoader = initClassLoader( +public void testScanningEmptyPluginPath() { +PluginScanResult result = scan( Collections.emptyList() ); -for (String pluginClassName : TestPlugins.pluginClasses()) { -assertThrows(ClassNotFoundException.class, () -> classLoader.loadClass(pluginClassName)); -} +assertTrue(result.isEmpty()); } @Test -public void testLoadingPluginClass() throws ClassNotFoundException { -DelegatingClassLoader classLoader = initClassLoader( +public void testScanningPluginClasses() { +PluginScanResult result = scan( TestPlugins.pluginPath() ); +Set classes = new HashSet<>(); +result.forEach(pluginDesc -> classes.add(pluginDesc.className())); for (String pluginClassName : TestPlugins.pluginClasses()) { -assertNotNull(classLoader.loadClass(pluginClassName)); -assertNotNull(classLoader.pluginClassLoader(pluginClassName)); +assertTrue("Expected " + pluginClassName + "to be discovered but it was not", +classes.contains(pluginClassName)); Review Comment: > But the current implementation still goes through the 100 tries before giving up. I agree with the implementation (better to err on the side of not unnecessarily killing the worker), but just wanted to double check--this is intentional, right? Yes it is intentional. This edit should be more clear: > The heuristic is just there to prevent infinite loops in the ServiceLoader-not-making-progress case. If the exception message happens to contain a memory address, or the exception alternates between two different messages, we should still **eventually** fail the worker **and not be stuck in an infinite loop**. > Also, I notice that we've added LinkageError to the catch clause around invoking Iterator::next. Do we think it's worth adding similar logic to prevent infinite loops in the event that a call to Iterator::next that throws an exception also fails to advance the iterator? Or is this unreasonable paranoia? I suppose if we're being paranoid about hasNext, being paranoid about next() for JDK8 implementations would also make sense. I've made the error-handling function more generic and applied it to all of the ServiceLoader operations (including load and iterator, which i've never seen fail) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation
gharris1727 commented on code in PR #13971: URL: https://github.com/apache/kafka/pull/13971#discussion_r1267082036 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java: ## @@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) { } @SuppressWarnings({"rawtypes", "unchecked"}) -protected PluginDesc pluginDesc(Class plugin, String version, ClassLoader loader) { -return new PluginDesc(plugin, version, loader); +protected PluginDesc pluginDesc(Class plugin, String version, PluginSource source) { +return new PluginDesc(plugin, version, source.loader()); } @SuppressWarnings("unchecked") -protected SortedSet> getServiceLoaderPluginDesc(Class klass, ClassLoader loader) { +protected SortedSet> getServiceLoaderPluginDesc(Class klass, PluginSource source) { SortedSet> result = new TreeSet<>(); -ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); -for (Iterator iterator = serviceLoader.iterator(); iterator.hasNext(); ) { -try (LoaderSwap loaderSwap = withClassLoader(loader)) { +ServiceLoader serviceLoader = handleLinkageError(klass, source, () -> ServiceLoader.load(klass, source.loader())); Review Comment: In the case where an Implementation of ServiceLoader may eagerly-evaluate a first provider and cause a LinkageError to appear, I wanted to re-use the handleLinkageError logging. I don't think that the retries for these operations would be effective, and that the later retries would probably be wasteful. However, I thought re-using the same function in both places was simpler than customizing the behavior of each call. Since this code is (probably never) going to be exercised, I tried to keep it as simple as possible. Do you think that we should log errors from load and iterator? I can catch the error or just let it propagate to the caller. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation
gharris1727 commented on code in PR #13971: URL: https://github.com/apache/kafka/pull/13971#discussion_r1267094589 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java: ## @@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) { } @SuppressWarnings({"rawtypes", "unchecked"}) -protected PluginDesc pluginDesc(Class plugin, String version, ClassLoader loader) { -return new PluginDesc(plugin, version, loader); +protected PluginDesc pluginDesc(Class plugin, String version, PluginSource source) { +return new PluginDesc(plugin, version, source.loader()); } @SuppressWarnings("unchecked") -protected SortedSet> getServiceLoaderPluginDesc(Class klass, ClassLoader loader) { +protected SortedSet> getServiceLoaderPluginDesc(Class klass, PluginSource source) { SortedSet> result = new TreeSet<>(); -ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); -for (Iterator iterator = serviceLoader.iterator(); iterator.hasNext(); ) { -try (LoaderSwap loaderSwap = withClassLoader(loader)) { +ServiceLoader serviceLoader = handleLinkageError(klass, source, () -> ServiceLoader.load(klass, source.loader())); +Iterator iterator = handleLinkageError(klass, source, serviceLoader::iterator); +while (handleLinkageError(klass, source, iterator::hasNext)) { +try (LoaderSwap loaderSwap = withClassLoader(source.loader())) { T pluginImpl; try { -pluginImpl = iterator.next(); +pluginImpl = handleLinkageError(klass, source, iterator::next); } catch (ServiceConfigurationError t) { -log.error("Failed to discover {}{}", klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t); +log.error("Failed to discover {} in {}{}", +klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t); Review Comment: > Since that field (and its accessor method) are currently only used for log messages, what do you think about altering PluginSource::location to return a string, and using "System classpath" in that case? I can't do that, because I need the Path object later in the migration script. I've used a non-null sentinel path instead, so that all of the logging call-sites are improved but the Path object is still available. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation
gharris1727 commented on code in PR #13971: URL: https://github.com/apache/kafka/pull/13971#discussion_r1267103768 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java: ## @@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) { } @SuppressWarnings({"rawtypes", "unchecked"}) -protected PluginDesc pluginDesc(Class plugin, String version, ClassLoader loader) { -return new PluginDesc(plugin, version, loader); +protected PluginDesc pluginDesc(Class plugin, String version, PluginSource source) { +return new PluginDesc(plugin, version, source.loader()); } @SuppressWarnings("unchecked") -protected SortedSet> getServiceLoaderPluginDesc(Class klass, ClassLoader loader) { +protected SortedSet> getServiceLoaderPluginDesc(Class klass, PluginSource source) { SortedSet> result = new TreeSet<>(); -ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); -for (Iterator iterator = serviceLoader.iterator(); iterator.hasNext(); ) { -try (LoaderSwap loaderSwap = withClassLoader(loader)) { +ServiceLoader serviceLoader = handleLinkageError(klass, source, () -> ServiceLoader.load(klass, source.loader())); +Iterator iterator = handleLinkageError(klass, source, serviceLoader::iterator); +while (handleLinkageError(klass, source, iterator::hasNext)) { +try (LoaderSwap loaderSwap = withClassLoader(source.loader())) { T pluginImpl; try { -pluginImpl = iterator.next(); +pluginImpl = handleLinkageError(klass, source, iterator::next); } catch (ServiceConfigurationError t) { -log.error("Failed to discover {}{}", klass.getSimpleName(), reflectiveErrorDescription(t.getCause()), t); +log.error("Failed to discover {} in {}{}", +klass.getSimpleName(), source.location(), reflectiveErrorDescription(t.getCause()), t); continue; } Class pluginKlass = (Class) pluginImpl.getClass(); -if (pluginKlass.getClassLoader() != loader) { +if (pluginKlass.getClassLoader() != source.loader()) { log.debug("{} from other classloader {} is visible from {}, excluding to prevent isolated loading", -pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), loader); +pluginKlass.getSimpleName(), pluginKlass.getClassLoader(), source.location()); continue; } -result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), loader)); +result.add(pluginDesc(pluginKlass, versionFor(pluginImpl), source)); } } return result; } +/** + * Helper to evaluate a {@link ServiceLoader} operation while handling {@link LinkageError}s. + * + * @param klass The plugin superclass which is being loaded + * @param function A function on a {@link ServiceLoader} which may throw {@link LinkageError} + * @return the return value of function + * @throws Error errors thrown by the passed-in function + * @param Type being iterated over by the ServiceLoader + * @param Return value of the passed-in function + */ +private U handleLinkageError(Class klass, PluginSource source, Supplier function) { +// It's difficult to know for sure if the iterator was able to advance past the first broken +// plugin class, or if it will continue to fail on that broken class for any subsequent calls +// to Iterator::hasNext or Iterator::next +// For reference, see https://bugs.openjdk.org/browse/JDK-8196182, which describes +// the behavior we are trying to mitigate with this logic as buggy, but indicates that a fix +// in the JDK standard library ServiceLoader implementation is unlikely to land +LinkageError lastError = null; +// Try a fixed maximum number of times in case the ServiceLoader cannot move past a faulty plugin, +// but the LinkageError varies between calls. This limit is chosen to be higher than the typical number +// of plugins in a single plugin location, and to limit the amount of log-spam on startup. +for (int i = 0; i < 100; i++) { +try { +return function.get(); +} catch (LinkageError t) { +// As an optimization, hide subsequent error logs if two consecutive errors look similar. +// This reduces log-spam for iterators which cannot advance and rethrow the same exception. +if (lastError == null +|| !Objects.equals(lastError.getClass(), t.getClass()) +|| !Objects.equa
[GitHub] [kafka] gharris1727 commented on a diff in pull request #13971: KAFKA-15150: Add ServiceLoaderScanner implementation
gharris1727 commented on code in PR #13971: URL: https://github.com/apache/kafka/pull/13971#discussion_r1267248169 ## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java: ## @@ -118,35 +120,80 @@ private void loadJdbcDrivers(final ClassLoader loader) { } @SuppressWarnings({"rawtypes", "unchecked"}) -protected PluginDesc pluginDesc(Class plugin, String version, ClassLoader loader) { -return new PluginDesc(plugin, version, loader); +protected PluginDesc pluginDesc(Class plugin, String version, PluginSource source) { +return new PluginDesc(plugin, version, source.loader()); } @SuppressWarnings("unchecked") -protected SortedSet> getServiceLoaderPluginDesc(Class klass, ClassLoader loader) { +protected SortedSet> getServiceLoaderPluginDesc(Class klass, PluginSource source) { SortedSet> result = new TreeSet<>(); -ServiceLoader serviceLoader = ServiceLoader.load(klass, loader); -for (Iterator iterator = serviceLoader.iterator(); iterator.hasNext(); ) { -try (LoaderSwap loaderSwap = withClassLoader(loader)) { +ServiceLoader serviceLoader = handleLinkageError(klass, source, () -> ServiceLoader.load(klass, source.loader())); Review Comment: I've removed these usages, so the error propagates normally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org