[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-03-02 Thread via GitHub


C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1123470094


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##
@@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String 
connName) {
 log.error("Task that requested reconfiguration does not exist: 
{}", connName);
 return;
 }
-updateConnectorTasks(connName);
+try {
+updateConnectorTasks(connName);
+} catch (Exception e) {
+log.error("Unable to generate task configs for {}", connName, e);
+}

Review Comment:
   I don't think it's especially likely for connectors to continually invoke 
`requestTaskReconfiguration` given the automatic retry logic in distributed 
mode, and as of https://github.com/apache/kafka/pull/13276, the impact of 
ongoing retries for that operation is drastically reduced.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-03-02 Thread via GitHub


C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1123467255


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##
@@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String 
connName) {
 log.error("Task that requested reconfiguration does not exist: 
{}", connName);
 return;
 }
-updateConnectorTasks(connName);
+try {
+updateConnectorTasks(connName);
+} catch (Exception e) {
+log.error("Unable to generate task configs for {}", connName, e);
+}

Review Comment:
   Okay, a lot to unpack here!
   
   The more I think about it, the more I like the existing behavior for 
handling failures in task config generation. We automatically retry in 
distributed mode in order to absorb the risk of writing to the config topic or 
issuing a REST request to the leader, but since neither of those take place in 
standalone mode, it's fine to just throw the exception back to the caller 
(either a connector invoking `ConnectorContext::requestTaskReconfiguration`, or 
a REST API call to restart the connector) since the likeliest cause is a failed 
call to `Connector::taskConfigs` and automatic retries are less likely to be 
useful.
   
   I think we should basically just preserve existing behavior here, with the 
one exception of fixing how we handle failed calls to 
`requestTaskReconfiguration`  that occur during a call to `restartConnector`. 
Right now we don't handle any of those and, IIUC, just cause the REST request 
to time out after 90 seconds. Instead of timing out, we should return a 500 
response in that case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-03-02 Thread via GitHub


C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120754565


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##
@@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String 
connName) {
 log.error("Task that requested reconfiguration does not exist: 
{}", connName);
 return;
 }
-updateConnectorTasks(connName);
+try {
+updateConnectorTasks(connName);
+} catch (Exception e) {
+log.error("Unable to generate task configs for {}", connName, e);
+}

Review Comment:
   This is a change in behavior too, right? We no longer throw in 
`ConnectorContext::requestTaskReconfiguration` if we encounter any errors.
   
   This also seems reasonable (it aligns the behavior across standalone and 
distributed modes), but it does have consequences for the REST API, where 
restarting a connector no longer fails if we're unable to generate task configs 
for it (which is currently the case for both distributed and standalone modes).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-03-02 Thread via GitHub


C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120754565


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java:
##
@@ -243,7 +248,11 @@ public synchronized void requestTaskReconfiguration(String 
connName) {
 log.error("Task that requested reconfiguration does not exist: 
{}", connName);
 return;
 }
-updateConnectorTasks(connName);
+try {
+updateConnectorTasks(connName);
+} catch (Exception e) {
+log.error("Unable to generate task configs for {}", connName, e);
+}

Review Comment:
   This is a change in behavior too, right? We no longer throw in 
`ConnectorContext::requestTaskReconfiguration` if we encounter any errors.
   
   This also seems reasonable (it aligns the behavior across standalone and 
distributed modes), but it does has consequences for the REST API, where 
restarting a connector no longer fails if we're unable to generate task configs 
for it (which is currently the case for both distributed and standalone modes).
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-03-02 Thread via GitHub


C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1123318824


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin {
+
+private final Plugins plugins;
+private final Class pluginClass;
+protected final P delegate;
+private final ClassLoader classLoader;
+private final PluginType type;
+
+IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+this.plugins = Objects.requireNonNull(plugins, "plugins must be 
non-null");
+this.delegate = Objects.requireNonNull(delegate, "delegate plugin must 
be non-null");
+this.pluginClass = delegate.getClass();
+ClassLoader classLoader = pluginClass.getClassLoader();
+this.classLoader = Objects.requireNonNull(classLoader, "delegate 
plugin must not be a boostrap class");
+this.type = Objects.requireNonNull(type, "plugin type must be 
non-null");
+}
+
+public PluginType type() {
+return type;
+}
+
+@SuppressWarnings("unchecked")
+public Class pluginClass() {
+return (Class) pluginClass;
+}
+
+protected  V isolate(Callable callable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return callable.call();
+}
+}
+
+protected void isolateV(ThrowingRunnable runnable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+runnable.run();
+}
+}
+
+protected  void isolateV(Consumer consumer, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t);
+}
+}
+
+protected  void isolateV(BiConsumer consumer, T t, U u) throws 
Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t, u);
+}
+}
+
+protected  R isolate(Function function, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t);
+}
+}
+
+protected  R isolate(BiFunction function, T t, U u) 
throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t, u);
+}
+}

Review Comment:
   Ah, I misunderstood--I thought you'd found a counterexample that couldn't be 
covered by the minimal set of `isolate`/`isolateV` variants, instead of one 
that couldn't be covered by the expanded set of variants originally present in 
the PR. Was really scratching my head over that one!
   
   Thanks for the changes, this class LGTM now 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-02-28 Thread via GitHub


C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120889645


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin {
+
+private final Plugins plugins;
+private final Class pluginClass;
+protected final P delegate;
+private final ClassLoader classLoader;
+private final PluginType type;
+
+IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+this.plugins = Objects.requireNonNull(plugins, "plugins must be 
non-null");
+this.delegate = Objects.requireNonNull(delegate, "delegate plugin must 
be non-null");
+this.pluginClass = delegate.getClass();
+ClassLoader classLoader = pluginClass.getClassLoader();
+this.classLoader = Objects.requireNonNull(classLoader, "delegate 
plugin must not be a boostrap class");
+this.type = Objects.requireNonNull(type, "plugin type must be 
non-null");
+}
+
+public PluginType type() {
+return type;
+}
+
+@SuppressWarnings("unchecked")
+public Class pluginClass() {
+return (Class) pluginClass;
+}
+
+protected  V isolate(Callable callable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return callable.call();
+}
+}
+
+protected void isolateV(ThrowingRunnable runnable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+runnable.run();
+}
+}
+
+protected  void isolateV(Consumer consumer, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t);
+}
+}
+
+protected  void isolateV(BiConsumer consumer, T t, U u) throws 
Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t, u);
+}
+}
+
+protected  R isolate(Function function, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t);
+}
+}
+
+protected  R isolate(BiFunction function, T t, U u) 
throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t, u);
+}
+}

Review Comment:
   If it comes down to not being able to completely satisfy all use cases with 
just the `isolate(Callable)` (and possibly `isolateV(ThrowingRunnable)` 
methods, then I think it could be alright to add more variants. But the 
additional stack frames, and even ugly class names, don't seem likely to be 
super important to users or plugin developers. In both cases (though probably 
more often the latter), aren't they more likely to look for the first stack 
frame that originates with their plugin class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-02-28 Thread via GitHub


C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120887378


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin {
+
+private final Plugins plugins;
+private final Class pluginClass;
+protected final P delegate;
+private final ClassLoader classLoader;
+private final PluginType type;
+
+IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+this.plugins = Objects.requireNonNull(plugins, "plugins must be 
non-null");
+this.delegate = Objects.requireNonNull(delegate, "delegate plugin must 
be non-null");
+this.pluginClass = delegate.getClass();
+ClassLoader classLoader = pluginClass.getClassLoader();
+this.classLoader = Objects.requireNonNull(classLoader, "delegate 
plugin must not be a boostrap class");
+this.type = Objects.requireNonNull(type, "plugin type must be 
non-null");
+}
+
+public PluginType type() {
+return type;
+}
+
+@SuppressWarnings("unchecked")
+public Class pluginClass() {
+return (Class) pluginClass;
+}
+
+protected  V isolate(Callable callable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return callable.call();
+}
+}
+
+protected void isolateV(ThrowingRunnable runnable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+runnable.run();
+}
+}
+
+protected  void isolateV(Consumer consumer, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t);
+}
+}
+
+protected  void isolateV(BiConsumer consumer, T t, U u) throws 
Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t, u);
+}
+}
+
+protected  R isolate(Function function, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t);
+}
+}
+
+protected  R isolate(BiFunction function, T t, U u) 
throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t, u);
+}
+}

Review Comment:
   I would be really interested in knowing which case can't be covered here. If 
you can dig up the `Converter` counterexample, would you mind sharing it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-02-28 Thread via GitHub


C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120886025


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin {
+
+private final Plugins plugins;
+private final Class pluginClass;
+protected final P delegate;
+private final ClassLoader classLoader;
+private final PluginType type;
+
+IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+this.plugins = Objects.requireNonNull(plugins, "plugins must be 
non-null");
+this.delegate = Objects.requireNonNull(delegate, "delegate plugin must 
be non-null");
+this.pluginClass = delegate.getClass();
+ClassLoader classLoader = pluginClass.getClassLoader();
+this.classLoader = Objects.requireNonNull(classLoader, "delegate 
plugin must not be a boostrap class");
+this.type = Objects.requireNonNull(type, "plugin type must be 
non-null");
+}
+
+public PluginType type() {
+return type;
+}
+
+@SuppressWarnings("unchecked")
+public Class pluginClass() {
+return (Class) pluginClass;
+}
+
+protected  V isolate(Callable callable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return callable.call();
+}
+}
+
+protected void isolateV(ThrowingRunnable runnable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+runnable.run();
+}
+}
+
+protected  void isolateV(Consumer consumer, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t);
+}
+}
+
+protected  void isolateV(BiConsumer consumer, T t, U u) throws 
Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+consumer.accept(t, u);
+}
+}
+
+protected  R isolate(Function function, T t) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t);
+}
+}
+
+protected  R isolate(BiFunction function, T t, U u) 
throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return function.apply(t, u);
+}
+}
+
+public interface ThrowingRunnable {
+void run() throws Exception;
+}
+
+@Override
+public int hashCode() {
+try {
+return isolate(delegate::hashCode);
+} catch (Throwable e) {
+throw new RuntimeException("unable to evaluate plugin hashCode", 
e);
+}
+}
+
+@Override
+public boolean equals(Object obj) {
+if (obj == null || this.getClass() != obj.getClass()) {
+return false;
+}

Review Comment:
   Given that equality across `Connector` instances is likely not a developer 
priority, I think you're right regarding reference quality for delegates. Good 
point 👍



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] C0urante commented on a diff in pull request #13185: KAFKA-14670: (part 1) Wrap Connectors in IsolatedConnector objects

2023-02-28 Thread via GitHub


C0urante commented on code in PR #13185:
URL: https://github.com/apache/kafka/pull/13185#discussion_r1120685793


##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.isolation;
+
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+public abstract class IsolatedPlugin {
+
+private final Plugins plugins;
+private final Class pluginClass;
+protected final P delegate;
+private final ClassLoader classLoader;
+private final PluginType type;
+
+IsolatedPlugin(Plugins plugins, P delegate, PluginType type) {
+this.plugins = Objects.requireNonNull(plugins, "plugins must be 
non-null");
+this.delegate = Objects.requireNonNull(delegate, "delegate plugin must 
be non-null");
+this.pluginClass = delegate.getClass();
+ClassLoader classLoader = pluginClass.getClassLoader();
+this.classLoader = Objects.requireNonNull(classLoader, "delegate 
plugin must not be a boostrap class");
+this.type = Objects.requireNonNull(type, "plugin type must be 
non-null");
+}
+
+public PluginType type() {
+return type;
+}
+
+@SuppressWarnings("unchecked")
+public Class pluginClass() {
+return (Class) pluginClass;
+}
+
+protected  V isolate(Callable callable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+return callable.call();
+}
+}
+
+protected void isolateV(ThrowingRunnable runnable) throws Exception {
+try (LoaderSwap loaderSwap = plugins.withClassLoader(classLoader)) {
+runnable.run();
+}

Review Comment:
   We can reduce duplication here by piggybacking off of the non-void `isolate` 
method:
   ```suggestion
   isolate(() -> {
   runnable.run();
   return null;
   });
   ```



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java:
##
@@ -419,8 +420,14 @@ public ConnectorMetricsGroup(ConnectMetrics 
connectMetrics, AbstractStatus.State
 metricGroup.close();
 
 metricGroup.addImmutableValueMetric(registry.connectorType, 
connectorType());
-metricGroup.addImmutableValueMetric(registry.connectorClass, 
connector.getClass().getName());
-metricGroup.addImmutableValueMetric(registry.connectorVersion, 
connector.version());
+metricGroup.addImmutableValueMetric(registry.connectorClass, 
connector.pluginClass().getName());
+String version;
+try {
+version = connector.version();
+} catch (Exception e) {
+version = DelegatingClassLoader.UNDEFINED_VERSION;
+}

Review Comment:
   This technically changes behavior, right? We go from failing the connector 
on startup to just assigning it an undefined version.
   
   I think this is probably fine (the benefits outweigh the costs, and I have a 
hard time imagining any use case that would favor failing here), just want to 
make sure I'm gauging the impact here correctly.



##
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/IsolatedPlugin.java:
##
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, eith