[GitHub] [kafka] kkonstantine commented on a change in pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences

2021-07-14 Thread GitBox


kkonstantine commented on a change in pull request #8259:
URL: https://github.com/apache/kafka/pull/8259#discussion_r669806313



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##
@@ -82,11 +93,19 @@
 Arrays.stream(SERVICE_LOADER_PLUGINS).map(serviceLoaderPlugin -> 
MANIFEST_PREFIX + serviceLoaderPlugin.getName())
 .collect(Collectors.toSet());
 
+// Although this classloader does not load classes directly but rather 
delegates loading to a
+// PluginClassLoader or its parent through its base class, because of the 
use of inheritance in
+// in the latter case, this classloader needs to also be declared as 
parallel capable to use
+// fine-grain locking when loading classes.
+static {

Review comment:
   Keeping the static block here, because it's a block and that's what we 
have in `PluginClassLoader`. Our style is not too strict with respect of this 
ordering. 
   
   The class overwrites the `loadClass` method. But it's delegating the loading 
to different classloaders and the locking is embedded in these classes. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences

2021-07-13 Thread GitBox


kkonstantine commented on a change in pull request #8259:
URL: https://github.com/apache/kafka/pull/8259#discussion_r669042150



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/DelegatingClassLoader.java
##
@@ -48,25 +48,25 @@
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Enumeration;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 import java.util.ServiceLoader;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.stream.Collectors;
 
 public class DelegatingClassLoader extends URLClassLoader {
 private static final Logger log = 
LoggerFactory.getLogger(DelegatingClassLoader.class);
 private static final String CLASSPATH_NAME = "classpath";
 private static final String UNDEFINED_VERSION = "undefined";
 
-private final Map, ClassLoader>> 
pluginLoaders;
-private final Map aliases;
+private final ConcurrentMap, ClassLoader>> 
pluginLoaders;
+private final ConcurrentMap aliases;

Review comment:
   The purpose of this change was to highlight that the data structure is 
required to be concurrent. 
   
   Of course if a method that existed in `ConcurrentMap` and not in `Map` was 
used, that would be a hard requirement. 
   `putIfAbsent` used to be such a method but that's not the case after 1.8. In 
any case, the use of the more accurate interface is valid even if we don't 
explicitly use methods that don't exist in the parent. That's because the need 
for this implementation to be thread safe is a requirement here. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences

2021-03-01 Thread GitBox


kkonstantine commented on a change in pull request #8259:
URL: https://github.com/apache/kafka/pull/8259#discussion_r585078292



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
##
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import static org.junit.Assert.fail;
+
+import java.lang.management.LockInfo;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MonitorInfo;
+import java.lang.management.ThreadInfo;
+import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SynchronizationTest {
+
+public static final Logger log = 
LoggerFactory.getLogger(SynchronizationTest.class);
+
+@Rule
+public final TestName testName = new TestName();
+
+private String threadPrefix;
+private Plugins plugins;
+private ThreadPoolExecutor exec;
+private Breakpoint dclBreakpoint;
+private Breakpoint pclBreakpoint;
+
+@Before
+public void setup() {
+TestPlugins.assertAvailable();
+Map pluginProps = Collections.singletonMap(
+WorkerConfig.PLUGIN_PATH_CONFIG,
+String.join(",", TestPlugins.pluginPath())
+);
+threadPrefix = SynchronizationTest.class.getSimpleName()
++ "." + testName.getMethodName() + "-";
+dclBreakpoint = new Breakpoint<>();
+pclBreakpoint = new Breakpoint<>();
+plugins = new Plugins(pluginProps) {
+@Override
+protected DelegatingClassLoader 
newDelegatingClassLoader(List paths) {
+return AccessController.doPrivileged(
+(PrivilegedAction) () ->
+new SynchronizedDelegatingClassLoader(paths)
+);
+}
+};
+exec = new ThreadPoolExecutor(
+2,
+2,
+1000L,
+TimeUnit.MILLISECONDS,
+new LinkedBlockingDeque<>(),
+threadFactoryWithNamedThreads(threadPrefix)
+);
+
+}
+
+@After
+public void tearDown() throws InterruptedException {
+dclBreakpoint.clear();
+pclBreakpoint.clear();
+exec.shutdown();
+exec.awaitTermination(1L, TimeUnit.SECONDS);
+}
+
+private static class Breakpoint {
+
+private Predicate predicate;
+private CyclicBarrier barrier;
+
+public synchronized void clear() {
+if (barrier != null) {
+barrier.reset();
+}
+predicate = null;
+barrier = null;
+}
+
+public synchronized void set(Predicate predicate) {
+clear();
+this.predicate = predicate;
+// As soon as the barrier is tripped, the barrier will be reset 
for the next round.
+barrier = new CyclicBarrier(2);
+}
+
+/**
+ * From a thread under test, await for the test orchestrator to 
continue execution
+ * @param obj Object to test with the breakpoint's current predicate
+ 

[GitHub] [kafka] kkonstantine commented on a change in pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences

2021-02-24 Thread GitBox


kkonstantine commented on a change in pull request #8259:
URL: https://github.com/apache/kafka/pull/8259#discussion_r582614443



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
##
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import static org.junit.Assert.fail;
+
+import java.lang.management.LockInfo;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MonitorInfo;
+import java.lang.management.ThreadInfo;
+import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SynchronizationTest {
+
+public static final Logger log = 
LoggerFactory.getLogger(SynchronizationTest.class);
+
+@Rule
+public final TestName testName = new TestName();
+
+private String threadPrefix;
+private Plugins plugins;
+private ThreadPoolExecutor exec;
+private Breakpoint dclBreakpoint;
+private Breakpoint pclBreakpoint;
+
+@Before
+public void setup() {
+TestPlugins.assertAvailable();
+Map pluginProps = Collections.singletonMap(
+WorkerConfig.PLUGIN_PATH_CONFIG,
+String.join(",", TestPlugins.pluginPath())
+);
+threadPrefix = SynchronizationTest.class.getSimpleName()
++ "." + testName.getMethodName() + "-";
+dclBreakpoint = new Breakpoint<>();
+pclBreakpoint = new Breakpoint<>();
+plugins = new Plugins(pluginProps) {
+@Override
+protected DelegatingClassLoader 
newDelegatingClassLoader(List paths) {
+return AccessController.doPrivileged(
+(PrivilegedAction) () ->
+new SynchronizedDelegatingClassLoader(paths)
+);
+}
+};
+exec = new ThreadPoolExecutor(
+2,
+2,
+1000L,
+TimeUnit.MILLISECONDS,
+new LinkedBlockingDeque<>(),
+threadFactoryWithNamedThreads(threadPrefix)
+);
+
+}
+
+@After
+public void tearDown() throws InterruptedException {
+dclBreakpoint.clear();
+pclBreakpoint.clear();
+exec.shutdown();
+exec.awaitTermination(1L, TimeUnit.SECONDS);
+}
+
+private static class Breakpoint {
+
+private Predicate predicate;
+private CyclicBarrier barrier;
+
+public synchronized void clear() {
+if (barrier != null) {
+barrier.reset();
+}
+predicate = null;
+barrier = null;
+}
+
+public synchronized void set(Predicate predicate) {
+clear();
+this.predicate = predicate;
+// As soon as the barrier is tripped, the barrier will be reset 
for the next round.
+barrier = new CyclicBarrier(2);
+}
+
+/**
+ * From a thread under test, await for the test orchestrator to 
continue execution
+ * @param obj Object to test with the breakpoint's current predicate
+