[GitHub] [kafka] kkonstantine commented on a change in pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences
kkonstantine commented on a change in pull request #8259: URL: https://github.com/apache/kafka/pull/8259#discussion_r669806313 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java ## @@ -82,11 +93,19 @@ Arrays.stream(SERVICE_LOADER_PLUGINS).map(serviceLoaderPlugin -> MANIFEST_PREFIX + serviceLoaderPlugin.getName()) .collect(Collectors.toSet()); +// Although this classloader does not load classes directly but rather delegates loading to a +// PluginClassLoader or its parent through its base class, because of the use of inheritance in +// in the latter case, this classloader needs to also be declared as parallel capable to use +// fine-grain locking when loading classes. +static { Review comment: Keeping the static block here, because it's a block and that's what we have in `PluginClassLoader`. Our style is not too strict with respect of this ordering. The class overwrites the `loadClass` method. But it's delegating the loading to different classloaders and the locking is embedded in these classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences
kkonstantine commented on a change in pull request #8259: URL: https://github.com/apache/kafka/pull/8259#discussion_r669042150 ## File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java ## @@ -48,25 +48,25 @@ import java.util.Arrays; import java.util.Collection; import java.util.Enumeration; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; import java.util.ServiceLoader; import java.util.Set; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.stream.Collectors; public class DelegatingClassLoader extends URLClassLoader { private static final Logger log = LoggerFactory.getLogger(DelegatingClassLoader.class); private static final String CLASSPATH_NAME = "classpath"; private static final String UNDEFINED_VERSION = "undefined"; -private final Map, ClassLoader>> pluginLoaders; -private final Map aliases; +private final ConcurrentMap, ClassLoader>> pluginLoaders; +private final ConcurrentMap aliases; Review comment: The purpose of this change was to highlight that the data structure is required to be concurrent. Of course if a method that existed in `ConcurrentMap` and not in `Map` was used, that would be a hard requirement. `putIfAbsent` used to be such a method but that's not the case after 1.8. In any case, the use of the more accurate interface is valid even if we don't explicitly use methods that don't exist in the parent. That's because the need for this implementation to be thread safe is a requirement here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] kkonstantine commented on a change in pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences
kkonstantine commented on a change in pull request #8259: URL: https://github.com/apache/kafka/pull/8259#discussion_r585078292 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java ## @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.isolation; + +import static org.junit.Assert.fail; + +import java.lang.management.LockInfo; +import java.lang.management.ManagementFactory; +import java.lang.management.MonitorInfo; +import java.lang.management.ThreadInfo; +import java.net.URL; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SynchronizationTest { + +public static final Logger log = LoggerFactory.getLogger(SynchronizationTest.class); + +@Rule +public final TestName testName = new TestName(); + +private String threadPrefix; +private Plugins plugins; +private ThreadPoolExecutor exec; +private Breakpoint dclBreakpoint; +private Breakpoint pclBreakpoint; + +@Before +public void setup() { +TestPlugins.assertAvailable(); +Map pluginProps = Collections.singletonMap( +WorkerConfig.PLUGIN_PATH_CONFIG, +String.join(",", TestPlugins.pluginPath()) +); +threadPrefix = SynchronizationTest.class.getSimpleName() ++ "." + testName.getMethodName() + "-"; +dclBreakpoint = new Breakpoint<>(); +pclBreakpoint = new Breakpoint<>(); +plugins = new Plugins(pluginProps) { +@Override +protected DelegatingClassLoader newDelegatingClassLoader(List paths) { +return AccessController.doPrivileged( +(PrivilegedAction) () -> +new SynchronizedDelegatingClassLoader(paths) +); +} +}; +exec = new ThreadPoolExecutor( +2, +2, +1000L, +TimeUnit.MILLISECONDS, +new LinkedBlockingDeque<>(), +threadFactoryWithNamedThreads(threadPrefix) +); + +} + +@After +public void tearDown() throws InterruptedException { +dclBreakpoint.clear(); +pclBreakpoint.clear(); +exec.shutdown(); +exec.awaitTermination(1L, TimeUnit.SECONDS); +} + +private static class Breakpoint { + +private Predicate predicate; +private CyclicBarrier barrier; + +public synchronized void clear() { +if (barrier != null) { +barrier.reset(); +} +predicate = null; +barrier = null; +} + +public synchronized void set(Predicate predicate) { +clear(); +this.predicate = predicate; +// As soon as the barrier is tripped, the barrier will be reset for the next round. +barrier = new CyclicBarrier(2); +} + +/** + * From a thread under test, await for the test orchestrator to continue execution + * @param obj Object to test with the breakpoint's current predicate + */
[GitHub] [kafka] kkonstantine commented on a change in pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences
kkonstantine commented on a change in pull request #8259: URL: https://github.com/apache/kafka/pull/8259#discussion_r582614443 ## File path: connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java ## @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.connect.runtime.isolation; + +import static org.junit.Assert.fail; + +import java.lang.management.LockInfo; +import java.lang.management.ManagementFactory; +import java.lang.management.MonitorInfo; +import java.lang.management.ThreadInfo; +import java.net.URL; +import java.security.AccessController; +import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.connect.runtime.WorkerConfig; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SynchronizationTest { + +public static final Logger log = LoggerFactory.getLogger(SynchronizationTest.class); + +@Rule +public final TestName testName = new TestName(); + +private String threadPrefix; +private Plugins plugins; +private ThreadPoolExecutor exec; +private Breakpoint dclBreakpoint; +private Breakpoint pclBreakpoint; + +@Before +public void setup() { +TestPlugins.assertAvailable(); +Map pluginProps = Collections.singletonMap( +WorkerConfig.PLUGIN_PATH_CONFIG, +String.join(",", TestPlugins.pluginPath()) +); +threadPrefix = SynchronizationTest.class.getSimpleName() ++ "." + testName.getMethodName() + "-"; +dclBreakpoint = new Breakpoint<>(); +pclBreakpoint = new Breakpoint<>(); +plugins = new Plugins(pluginProps) { +@Override +protected DelegatingClassLoader newDelegatingClassLoader(List paths) { +return AccessController.doPrivileged( +(PrivilegedAction) () -> +new SynchronizedDelegatingClassLoader(paths) +); +} +}; +exec = new ThreadPoolExecutor( +2, +2, +1000L, +TimeUnit.MILLISECONDS, +new LinkedBlockingDeque<>(), +threadFactoryWithNamedThreads(threadPrefix) +); + +} + +@After +public void tearDown() throws InterruptedException { +dclBreakpoint.clear(); +pclBreakpoint.clear(); +exec.shutdown(); +exec.awaitTermination(1L, TimeUnit.SECONDS); +} + +private static class Breakpoint { + +private Predicate predicate; +private CyclicBarrier barrier; + +public synchronized void clear() { +if (barrier != null) { +barrier.reset(); +} +predicate = null; +barrier = null; +} + +public synchronized void set(Predicate predicate) { +clear(); +this.predicate = predicate; +// As soon as the barrier is tripped, the barrier will be reset for the next round. +barrier = new CyclicBarrier(2); +} + +/** + * From a thread under test, await for the test orchestrator to continue execution + * @param obj Object to test with the breakpoint's current predicate + */