This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.9 by this push:
new 392a56423e0 KAFKA-17078: Add SecurityManagerCompatibility shim
(#16522) (#19221)
392a56423e0 is described below
commit 392a56423e0bc35dad47c8e83fc3a5fcc82709b0
Author: Stig Døssing <[email protected]>
AuthorDate: Mon Apr 7 04:39:24 2025 +0200
KAFKA-17078: Add SecurityManagerCompatibility shim (#16522) (#19221)
Backport for 3.9 for https://github.com/apache/kafka/pull/16522
The reason to do this is that this is necessary in order for 3.9 to support
Java 24.
Please see https://lists.apache.org/thread/6k942pphowd28dh9gn6xbnngk6nxs3n0
where it is being discussed whether to do this.
Co-authored-by: Greg Harris <[email protected]>
Reviewers: Luke Chen <[email protected]>, Greg Harris <[email protected]>
---
.../kafka/common/internals/CompositeStrategy.java | 106 +++++++++
.../kafka/common/internals/LegacyStrategy.java | 96 ++++++++
.../kafka/common/internals/ModernStrategy.java | 64 ++++++
.../kafka/common/internals/ReflectiveStrategy.java | 69 ++++++
.../internals/SecurityManagerCompatibility.java | 102 +++++++++
.../common/internals/UnsupportedStrategy.java | 60 +++++
.../authenticator/SaslClientAuthenticator.java | 14 +-
.../authenticator/SaslClientCallbackHandler.java | 4 +-
.../authenticator/SaslServerAuthenticator.java | 12 +-
.../OAuthBearerSaslClientCallbackHandler.java | 6 +-
.../SecurityManagerCompatibilityTest.java | 252 +++++++++++++++++++++
.../OAuthBearerSaslClientCallbackHandlerTest.java | 15 +-
.../runtime/isolation/ClassLoaderFactory.java | 12 +-
.../connect/runtime/isolation/PluginScanner.java | 7 +-
.../runtime/isolation/SynchronizationTest.java | 13 +-
.../java/kafka/log/remote/RemoteLogManager.java | 42 ++--
16 files changed, 805 insertions(+), 69 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/common/internals/CompositeStrategy.java
b/clients/src/main/java/org/apache/kafka/common/internals/CompositeStrategy.java
new file mode 100644
index 00000000000..6a96576f54d
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/internals/CompositeStrategy.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.PrivilegedAction;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import javax.security.auth.Subject;
+
+/**
+ * This strategy combines the functionality of the {@link LegacyStrategy},
{@link ModernStrategy}, and
+ * {@link UnsupportedStrategy} strategies to provide the legacy APIs as long
as they are present and not degraded.
+ * If the legacy APIs are missing or degraded, this falls back to the modern
APIs.
+ */
+class CompositeStrategy implements SecurityManagerCompatibility {
+
+ private static final Logger log =
LoggerFactory.getLogger(CompositeStrategy.class);
+ static final CompositeStrategy INSTANCE = new
CompositeStrategy(ReflectiveStrategy.Loader.forName());
+
+ private final SecurityManagerCompatibility fallbackStrategy;
+ private final AtomicReference<SecurityManagerCompatibility> activeStrategy;
+
+ // Visible for testing
+ CompositeStrategy(ReflectiveStrategy.Loader loader) {
+ SecurityManagerCompatibility initial;
+ SecurityManagerCompatibility fallback = null;
+ try {
+ initial = new LegacyStrategy(loader);
+ try {
+ fallback = new ModernStrategy(loader);
+ // This is expected for JRE 18+
+ log.debug("Loaded legacy SecurityManager methods, will fall
back to modern methods after UnsupportedOperationException");
+ } catch (NoSuchMethodException | ClassNotFoundException ex) {
+ // This is expected for JRE <= 17
+ log.debug("Unable to load modern Subject methods, relying only
on legacy methods", ex);
+ }
+ } catch (ClassNotFoundException | NoSuchMethodException e) {
+ try {
+ initial = new ModernStrategy(loader);
+ // This is expected for JREs after the removal takes place.
+ log.debug("Unable to load legacy SecurityManager methods,
relying only on modern methods", e);
+ } catch (NoSuchMethodException | ClassNotFoundException ex) {
+ initial = new UnsupportedStrategy(e, ex);
+ // This is not expected in normal use, only in test
environments.
+ log.error("Unable to load legacy SecurityManager methods", e);
+ log.error("Unable to load modern Subject methods", ex);
+ }
+ }
+ Objects.requireNonNull(initial, "initial strategy must be defined");
+ activeStrategy = new AtomicReference<>(initial);
+ fallbackStrategy = fallback;
+ }
+
+ private <T> T performAction(Function<SecurityManagerCompatibility, T>
action) {
+ SecurityManagerCompatibility active = activeStrategy.get();
+ try {
+ return action.apply(active);
+ } catch (UnsupportedOperationException e) {
+ // If we chose a fallback strategy during loading, switch to it
and retry this operation.
+ if (active != fallbackStrategy && fallbackStrategy != null) {
+ if (activeStrategy.compareAndSet(active, fallbackStrategy)) {
+ log.debug("Using fallback strategy after encountering
degraded legacy method", e);
+ }
+ return action.apply(fallbackStrategy);
+ }
+ // If we're already using the fallback strategy, then there's
nothing to do to handle these exceptions.
+ throw e;
+ }
+ }
+
+ @Override
+ public <T> T doPrivileged(PrivilegedAction<T> action) {
+ return performAction(compatibility ->
compatibility.doPrivileged(action));
+ }
+
+ @Override
+ public Subject current() {
+ return performAction(SecurityManagerCompatibility::current);
+ }
+
+ @Override
+ public <T> T callAs(Subject subject, Callable<T> action) throws
CompletionException {
+ return performAction(compatibility -> compatibility.callAs(subject,
action));
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/internals/LegacyStrategy.java
b/clients/src/main/java/org/apache/kafka/common/internals/LegacyStrategy.java
new file mode 100644
index 00000000000..de9bf7643d8
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/internals/LegacyStrategy.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.lang.reflect.Method;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionException;
+
+import javax.security.auth.Subject;
+
+/**
+ * This class implements reflective access to the deprecated-for-removal
methods of AccessController and Subject.
+ * <p>Instantiating this class may fail if any of the required classes or
methods are not found.
+ * Method invocations for this class may fail with {@link
UnsupportedOperationException} if all methods are found,
+ * but the operation is not permitted to be invoked.
+ * <p>This class is expected to be instantiable in JRE >=8 until the removal
finally takes place.
+ */
+@SuppressWarnings("unchecked")
+class LegacyStrategy implements SecurityManagerCompatibility {
+
+ private final Method doPrivileged;
+ private final Method getContext;
+ private final Method getSubject;
+ private final Method doAs;
+
+ // Visible for testing
+ LegacyStrategy(ReflectiveStrategy.Loader loader) throws
ClassNotFoundException, NoSuchMethodException {
+ Class<?> accessController =
loader.loadClass("java.security.AccessController");
+ doPrivileged = accessController.getDeclaredMethod("doPrivileged",
PrivilegedAction.class);
+ getContext = accessController.getDeclaredMethod("getContext");
+ Class<?> accessControlContext =
loader.loadClass("java.security.AccessControlContext");
+ Class<?> subject = loader.loadClass(Subject.class.getName());
+ getSubject = subject.getDeclaredMethod("getSubject",
accessControlContext);
+ // Note that the Subject class isn't deprecated or removed, so
reference it as an argument type.
+ // This allows for mocking out the method implementation while still
accepting Subject instances as arguments.
+ doAs = subject.getDeclaredMethod("doAs", Subject.class,
PrivilegedExceptionAction.class);
+ }
+
+ @Override
+ public <T> T doPrivileged(PrivilegedAction<T> action) {
+ return (T) ReflectiveStrategy.invoke(doPrivileged, null, action);
+ }
+
+ /**
+ * @return the result of AccessController.getContext(), of type
AccessControlContext
+ */
+ private Object getContext() {
+ return ReflectiveStrategy.invoke(getContext, null);
+ }
+
+ /**
+ * @param context The current AccessControlContext
+ * @return The result of Subject.getSubject(AccessControlContext)
+ */
+ private Subject getSubject(Object context) {
+ return (Subject) ReflectiveStrategy.invoke(getSubject, null, context);
+ }
+
+ @Override
+ public Subject current() {
+ return getSubject(getContext());
+ }
+
+ /**
+ * @return The result of Subject.doAs(Subject, PrivilegedExceptionAction)
+ */
+ private <T> T doAs(Subject subject, PrivilegedExceptionAction<T> action)
throws PrivilegedActionException {
+ return (T) ReflectiveStrategy.invokeChecked(doAs,
PrivilegedActionException.class, null, subject, action);
+ }
+
+ @Override
+ public <T> T callAs(Subject subject, Callable<T> callable) throws
CompletionException {
+ try {
+ return doAs(subject, callable::call);
+ } catch (PrivilegedActionException e) {
+ throw new CompletionException(e.getCause());
+ }
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/internals/ModernStrategy.java
b/clients/src/main/java/org/apache/kafka/common/internals/ModernStrategy.java
new file mode 100644
index 00000000000..839196541ea
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/internals/ModernStrategy.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.lang.reflect.Method;
+import java.security.PrivilegedAction;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionException;
+
+import javax.security.auth.Subject;
+
+/**
+ * This class implements reflective access to the methods of Subject added to
replace deprecated methods.
+ * <p>Instantiating this class may fail if any of the required classes or
methods are not found.
+ * Method invocations for this class may fail with {@link
UnsupportedOperationException} if all methods are found,
+ * but the operation is not permitted to be invoked.
+ * <p>This class is expected to be instantiable in JRE >= 18. At the time of
writing, these methods do not have
+ * a sunset date, and are expected to be available past the removal of the
SecurityManager.
+ */
+@SuppressWarnings("unchecked")
+class ModernStrategy implements SecurityManagerCompatibility {
+
+ private final Method current;
+ private final Method callAs;
+
+ // Visible for testing
+ ModernStrategy(ReflectiveStrategy.Loader loader) throws
NoSuchMethodException, ClassNotFoundException {
+ Class<?> subject = loader.loadClass(Subject.class.getName());
+ current = subject.getDeclaredMethod("current");
+ // Note that the Subject class isn't deprecated or removed, so
reference it as an argument type.
+ // This allows for mocking out the method implementation while still
accepting Subject instances as arguments.
+ callAs = subject.getDeclaredMethod("callAs", Subject.class,
Callable.class);
+ }
+
+ @Override
+ public <T> T doPrivileged(PrivilegedAction<T> action) {
+ // This is intentionally a pass-through
+ return action.run();
+ }
+
+ @Override
+ public Subject current() {
+ return (Subject) ReflectiveStrategy.invoke(current, null);
+ }
+
+ @Override
+ public <T> T callAs(Subject subject, Callable<T> action) throws
CompletionException {
+ return (T) ReflectiveStrategy.invokeChecked(callAs,
CompletionException.class, null, subject, action);
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/internals/ReflectiveStrategy.java
b/clients/src/main/java/org/apache/kafka/common/internals/ReflectiveStrategy.java
new file mode 100644
index 00000000000..c7d251a00b9
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/internals/ReflectiveStrategy.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * Utility methods for strategies which use reflection to access methods
without requiring them at compile-time.
+ */
+class ReflectiveStrategy {
+
+ static Object invoke(Method method, Object obj, Object... args) {
+ try {
+ return method.invoke(obj, args);
+ } catch (IllegalAccessException e) {
+ throw new UnsupportedOperationException(e);
+ } catch (InvocationTargetException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
+ } else {
+ throw new RuntimeException(cause);
+ }
+ }
+ }
+
+ static <T extends Exception> Object invokeChecked(Method method, Class<T>
ex, Object obj, Object... args) throws T {
+ try {
+ return method.invoke(obj, args);
+ } catch (IllegalAccessException e) {
+ throw new UnsupportedOperationException(e);
+ } catch (InvocationTargetException e) {
+ Throwable cause = e.getCause();
+ if (ex.isInstance(cause)) {
+ throw ex.cast(cause);
+ } else if (cause instanceof RuntimeException) {
+ throw (RuntimeException) cause;
+ } else {
+ throw new RuntimeException(cause);
+ }
+ }
+ }
+
+ /**
+ * Interface to allow mocking out classloading infrastructure. This is
used to test reflective operations.
+ */
+ interface Loader {
+ Class<?> loadClass(String className) throws ClassNotFoundException;
+
+ static Loader forName() {
+ return className -> Class.forName(className, true,
Loader.class.getClassLoader());
+ }
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/internals/SecurityManagerCompatibility.java
b/clients/src/main/java/org/apache/kafka/common/internals/SecurityManagerCompatibility.java
new file mode 100644
index 00000000000..1f7dee3e06c
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/internals/SecurityManagerCompatibility.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.security.PrivilegedAction;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionException;
+
+import javax.security.auth.Subject;
+
+/**
+ * This is a compatibility class to provide dual-support for JREs with and
without SecurityManager support.
+ * <p>Users should call {@link #get()} to retrieve a singleton instance, and
call instance methods
+ * {@link #doPrivileged(PrivilegedAction)}, {@link #current()}, and {@link
#callAs(Subject, Callable)}.
+ * <p>This class's motivation and expected behavior is defined in
+ * <a
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1006%3A+Remove+SecurityManager+Support">KIP-1006</a>
+ */
+public interface SecurityManagerCompatibility {
+
+ /**
+ * @return an implementation of this interface which conforms to the
functionality available in the current JRE.
+ */
+ static SecurityManagerCompatibility get() {
+ return CompositeStrategy.INSTANCE;
+ }
+
+ /**
+ * Performs the specified {@code PrivilegedAction} with privileges
+ * enabled. The action is performed with <i>all</i> of the permissions
+ * possessed by the caller's protection domain.
+ *
+ * <p> If the action's {@code run} method throws an (unchecked)
+ * exception, it will propagate through this method.
+ *
+ * <p> Note that any DomainCombiner associated with the current
+ * AccessControlContext will be ignored while the action is performed.
+ *
+ * @param <T> the type of the value returned by the PrivilegedAction's
+ * {@code run} method.
+ *
+ * @param action the action to be performed.
+ *
+ * @return the value returned by the action's {@code run} method.
+ *
+ * @exception NullPointerException if the action is {@code null}
+ * @see java.security.AccessController#doPrivileged(PrivilegedAction)
+ */
+ <T> T doPrivileged(PrivilegedAction<T> action);
+
+ /**
+ * Returns the current subject.
+ * <p>
+ * The current subject is installed by the {@link #callAs} method.
+ * When {@code callAs(subject, action)} is called, {@code action} is
+ * executed with {@code subject} as its current subject which can be
+ * retrieved by this method. After {@code action} is finished, the current
+ * subject is reset to its previous value. The current
+ * subject is {@code null} before the first call of {@code callAs()}.
+ *
+ * @return the current subject, or {@code null} if a current subject is
+ * not installed or the current subject is set to {@code null}.
+ * @see #callAs(Subject, Callable)
+ * @see Subject#current()
+ * @see Subject#callAs(Subject, Callable)
+ */
+ Subject current();
+
+ /**
+ * Executes a {@code Callable} with {@code subject} as the
+ * current subject.
+ *
+ * @param subject the {@code Subject} that the specified {@code action}
+ * will run as. This parameter may be {@code null}.
+ * @param action the code to be run with {@code subject} as its current
+ * subject. Must not be {@code null}.
+ * @param <T> the type of value returned by the {@code call} method
+ * of {@code action}
+ * @return the value returned by the {@code call} method of {@code action}
+ * @throws NullPointerException if {@code action} is {@code null}
+ * @throws CompletionException if {@code action.call()} throws an
exception.
+ * The cause of the {@code CompletionException} is set to the
exception
+ * thrown by {@code action.call()}.
+ * @see #current()
+ * @see Subject#current()
+ * @see Subject#callAs(Subject, Callable)
+ */
+ <T> T callAs(Subject subject, Callable<T> action) throws
CompletionException;
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/internals/UnsupportedStrategy.java
b/clients/src/main/java/org/apache/kafka/common/internals/UnsupportedStrategy.java
new file mode 100644
index 00000000000..ca4175badff
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/internals/UnsupportedStrategy.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.security.PrivilegedAction;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionException;
+
+import javax.security.auth.Subject;
+
+/**
+ * This is a fallback strategy to use if no other strategies are available.
+ * <p>This is used to improve control flow and provide detailed error messages
in unusual situations.
+ */
+class UnsupportedStrategy implements SecurityManagerCompatibility {
+
+ private final Throwable e1;
+ private final Throwable e2;
+
+ UnsupportedStrategy(Throwable e1, Throwable e2) {
+ this.e1 = e1;
+ this.e2 = e2;
+ }
+
+ private UnsupportedOperationException createException(String message) {
+ UnsupportedOperationException e = new
UnsupportedOperationException(message);
+ e.addSuppressed(e1);
+ e.addSuppressed(e2);
+ return e;
+ }
+
+ @Override
+ public <T> T doPrivileged(PrivilegedAction<T> action) {
+ throw createException("Unable to find suitable
AccessController#doPrivileged implementation");
+ }
+
+ @Override
+ public Subject current() {
+ throw createException("Unable to find suitable Subject#getCurrent or
Subject#current implementation");
+ }
+
+ @Override
+ public <T> T callAs(Subject subject, Callable<T> action) throws
CompletionException {
+ throw createException("Unable to find suitable Subject#doAs or
Subject#callAs implementation");
+ }
+}
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index a327c4b6c1b..45169feb657 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
+import org.apache.kafka.common.internals.SecurityManagerCompatibility;
import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.message.SaslAuthenticateRequestData;
@@ -59,8 +60,6 @@ import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.security.Principal;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
@@ -70,6 +69,7 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
+import java.util.concurrent.CompletionException;
import javax.security.auth.Subject;
import javax.security.sasl.Sasl;
@@ -215,7 +215,7 @@ public class SaslClientAuthenticator implements
Authenticator {
// visible for testing
SaslClient createSaslClient() {
try {
- return Subject.doAs(subject,
(PrivilegedExceptionAction<SaslClient>) () -> {
+ return SecurityManagerCompatibility.get().callAs(subject, () -> {
String[] mechs = {mechanism};
log.debug("Creating SaslClient:
client={};service={};serviceHostname={};mechs={}",
clientPrincipalName, servicePrincipal, host,
Arrays.toString(mechs));
@@ -225,7 +225,7 @@ public class SaslClientAuthenticator implements
Authenticator {
}
return retvalSaslClient;
});
- } catch (PrivilegedActionException e) {
+ } catch (CompletionException e) {
throw new SaslAuthenticationException("Failed to create SaslClient
with mechanism " + mechanism, e.getCause());
}
}
@@ -533,8 +533,8 @@ public class SaslClientAuthenticator implements
Authenticator {
if (isInitial && !saslClient.hasInitialResponse())
return saslToken;
else
- return Subject.doAs(subject,
(PrivilegedExceptionAction<byte[]>) () ->
saslClient.evaluateChallenge(saslToken));
- } catch (PrivilegedActionException e) {
+ return SecurityManagerCompatibility.get().callAs(subject, ()
-> saslClient.evaluateChallenge(saslToken));
+ } catch (CompletionException e) {
String error = "An error: (" + e + ") occurred when evaluating
SASL token received from the Kafka Broker.";
KerberosError kerberosError = KerberosError.fromException(e);
// Try to provide hints to use about what went wrong so they can
fix their configuration.
@@ -545,7 +545,7 @@ public class SaslClientAuthenticator implements
Authenticator {
" Users must configure FQDN of kafka brokers when
authenticating using SASL and" +
" `socketChannel.socket().getInetAddress().getHostName()`
must match the hostname in `principal/hostname@realm`";
}
- //Unwrap the SaslException inside `PrivilegedActionException`
+ //Unwrap the SaslException
Throwable cause = e.getCause();
// Treat transient Kerberos errors as non-fatal SaslExceptions
that are processed as I/O exceptions
// and all other failures as fatal SaslAuthenticationException.
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
index fc27feb0502..c2257f3b6b8 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
@@ -17,13 +17,13 @@
package org.apache.kafka.common.security.authenticator;
import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.internals.SecurityManagerCompatibility;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.SaslExtensions;
import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
import org.apache.kafka.common.security.scram.ScramExtensionsCallback;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
-import java.security.AccessController;
import java.util.List;
import java.util.Map;
@@ -55,7 +55,7 @@ public class SaslClientCallbackHandler implements
AuthenticateCallbackHandler {
@Override
public void handle(Callback[] callbacks) throws
UnsupportedCallbackException {
- Subject subject = Subject.getSubject(AccessController.getContext());
+ Subject subject = SecurityManagerCompatibility.get().current();
for (Callback callback : callbacks) {
if (callback instanceof NameCallback) {
NameCallback nc = (NameCallback) callback;
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 43c6ed683d9..b8fb7299a8a 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.internals.SecurityManagerCompatibility;
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.message.SaslHandshakeResponseData;
import org.apache.kafka.common.network.Authenticator;
@@ -73,8 +74,6 @@ import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
@@ -82,6 +81,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.concurrent.CompletionException;
import java.util.function.Function;
import javax.net.ssl.SSLSession;
@@ -205,12 +205,12 @@ public class SaslServerAuthenticator implements
Authenticator {
saslServer = createSaslKerberosServer(callbackHandler, configs,
subject);
} else {
try {
- saslServer = Subject.doAs(subject,
(PrivilegedExceptionAction<SaslServer>) () ->
+ saslServer =
SecurityManagerCompatibility.get().callAs(subject, () ->
Sasl.createSaslServer(saslMechanism, "kafka",
serverAddress().getHostName(), configs, callbackHandler));
if (saslServer == null) {
throw new SaslException("Kafka Server failed to create a
SaslServer to interact with a client during session authentication with server
mechanism " + saslMechanism);
}
- } catch (PrivilegedActionException e) {
+ } catch (CompletionException e) {
throw new SaslException("Kafka Server failed to create a
SaslServer to interact with a client during session authentication with server
mechanism " + saslMechanism, e.getCause());
}
}
@@ -231,9 +231,9 @@ public class SaslServerAuthenticator implements
Authenticator {
LOG.debug("Creating SaslServer for {} with mechanism {}",
kerberosName, saslMechanism);
try {
- return Subject.doAs(subject,
(PrivilegedExceptionAction<SaslServer>) () ->
+ return SecurityManagerCompatibility.get().callAs(subject, () ->
Sasl.createSaslServer(saslMechanism, servicePrincipalName,
serviceHostname, configs, saslServerCallbackHandler));
- } catch (PrivilegedActionException e) {
+ } catch (CompletionException e) {
throw new SaslException("Kafka Server failed to create a
SaslServer to interact with a client during session authentication",
e.getCause());
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
index e4e845a8dea..955ab4c1fac 100644
---
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
+++
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.security.oauthbearer.internals;
+import org.apache.kafka.common.internals.SecurityManagerCompatibility;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.SaslExtensions;
import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
@@ -27,7 +28,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.security.AccessController;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
@@ -83,7 +83,7 @@ public class OAuthBearerSaslClientCallbackHandler implements
AuthenticateCallbac
if (callback instanceof OAuthBearerTokenCallback)
handleCallback((OAuthBearerTokenCallback) callback);
else if (callback instanceof SaslExtensionsCallback)
- handleCallback((SaslExtensionsCallback) callback,
Subject.getSubject(AccessController.getContext()));
+ handleCallback((SaslExtensionsCallback) callback,
SecurityManagerCompatibility.get().current());
else
throw new UnsupportedCallbackException(callback);
}
@@ -97,7 +97,7 @@ public class OAuthBearerSaslClientCallbackHandler implements
AuthenticateCallbac
private void handleCallback(OAuthBearerTokenCallback callback) throws
IOException {
if (callback.token() != null)
throw new IllegalArgumentException("Callback had a token already");
- Subject subject = Subject.getSubject(AccessController.getContext());
+ Subject subject = SecurityManagerCompatibility.get().current();
Set<OAuthBearerToken> privateCredentials = subject != null
? subject.getPrivateCredentials(OAuthBearerToken.class)
: Collections.emptySet();
diff --git
a/clients/src/test/java/org/apache/kafka/common/internals/SecurityManagerCompatibilityTest.java
b/clients/src/test/java/org/apache/kafka/common/internals/SecurityManagerCompatibilityTest.java
new file mode 100644
index 00000000000..7789a924c78
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/common/internals/SecurityManagerCompatibilityTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledForJreRange;
+import org.junit.jupiter.api.condition.JRE;
+
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionException;
+
+import javax.security.auth.Subject;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class SecurityManagerCompatibilityTest {
+
+ @EnabledForJreRange(min = JRE.JAVA_8, max = JRE.JAVA_22)
+ @Test
+ public void testLegacyStrategyLoadable() throws ClassNotFoundException,
NoSuchMethodException {
+ new LegacyStrategy(ReflectiveStrategy.Loader.forName());
+ }
+
+ @EnabledForJreRange(min = JRE.JAVA_18)
+ @Test
+ public void testModernStrategyLoadable() throws ClassNotFoundException,
NoSuchMethodException {
+ new ModernStrategy(ReflectiveStrategy.Loader.forName());
+ }
+
+ @Test
+ public void testCompositeStrategyLoadable() {
+ new CompositeStrategy(ReflectiveStrategy.Loader.forName());
+ }
+
+ @Test
+ public void testDefaultStrategyLoadable() {
+ assertNotNull(SecurityManagerCompatibility.get());
+ }
+
+ @Test
+ public void testDefaultStrategyDoPrivilegedReturn() {
+ Object object = new Object();
+ Object returned = SecurityManagerCompatibility.get().doPrivileged(()
-> object);
+ assertSame(object, returned);
+ }
+
+ @Test
+ public void testDefaultStrategyDoPrivilegedThrow() {
+ assertThrows(RuntimeException.class, () ->
+ SecurityManagerCompatibility.get().doPrivileged(() -> {
+ throw new RuntimeException();
+ })
+ );
+ }
+
+ @Test
+ public void testDefaultStrategyCurrentNull() {
+ Subject current = SecurityManagerCompatibility.get().current();
+ assertNull(current);
+ }
+
+ @Test
+ public void testDefaultStrategyCallAsReturn() {
+ Subject subject = new Subject();
+ Object object = new Object();
+ Object returned = SecurityManagerCompatibility.get().callAs(subject,
() -> object);
+ assertSame(object, returned);
+ }
+
+ @Test
+ public void testDefaultStrategyCallAsCurrent() {
+ Subject subject = new Subject();
+ Subject returned = SecurityManagerCompatibility.get().callAs(subject,
SecurityManagerCompatibility.get()::current);
+ assertSame(subject, returned);
+ }
+
+ @Test
+ public void testLegacyStrategyThrowsWhenSecurityManagerRemoved() {
+ ReflectiveStrategy.Loader loader = simulateSecurityManagerRemoval();
+ assertThrows(ClassNotFoundException.class, () -> new
LegacyStrategy(loader));
+ }
+
+ @EnabledForJreRange(min = JRE.JAVA_18)
+ @Test
+ public void testModernStrategyLoadableWhenSecurityManagerRemoved() throws
ClassNotFoundException, NoSuchMethodException {
+ ReflectiveStrategy.Loader loader = simulateSecurityManagerRemoval();
+ new ModernStrategy(loader);
+ }
+
+ @Test
+ public void testCompositeStrategyLoadableWhenSecurityManagerRemoved() {
+ ReflectiveStrategy.Loader loader = simulateSecurityManagerRemoval();
+ new CompositeStrategy(loader);
+ }
+
+ @Test
+ public void
testLegacyStrategyCurrentThrowsWhenSecurityManagerUnsupported() throws
ClassNotFoundException, NoSuchMethodException {
+ ReflectiveStrategy.Loader loader =
simulateMethodsThrowUnsupportedOperationExceptions();
+ SecurityManagerCompatibility legacy = new LegacyStrategy(loader);
+ assertThrows(UnsupportedOperationException.class, legacy::current);
+ }
+
+ @Test
+ public void testLegacyStrategyCallAsThrowsWhenSecurityManagerUnsupported()
throws ClassNotFoundException, NoSuchMethodException {
+ ReflectiveStrategy.Loader loader =
simulateMethodsThrowUnsupportedOperationExceptions();
+ SecurityManagerCompatibility legacy = new LegacyStrategy(loader);
+ assertThrows(UnsupportedOperationException.class, () ->
legacy.callAs(null, () -> null));
+ }
+
+ @Test
+ public void
testCompositeStrategyDoPrivilegedWhenSecurityManagerUnsupported() {
+ ReflectiveStrategy.Loader loader =
simulateMethodsThrowUnsupportedOperationExceptions();
+ CompositeStrategy composite = new CompositeStrategy(loader);
+ Object object = new Object();
+ Object returned = composite.doPrivileged(() -> object);
+ assertSame(object, returned);
+ }
+
+ @Test
+ public void testCompositeStrategyCurrentWhenSecurityManagerUnsupported() {
+ ReflectiveStrategy.Loader loader =
simulateMethodsThrowUnsupportedOperationExceptions();
+ CompositeStrategy composite = new CompositeStrategy(loader);
+ Object returned = composite.current();
+ assertNull(returned);
+ }
+
+ @Test
+ public void testCompositeStrategyCallAsWhenSecurityManagerUnsupported() {
+ ReflectiveStrategy.Loader loader =
simulateMethodsThrowUnsupportedOperationExceptions();
+ CompositeStrategy composite = new CompositeStrategy(loader);
+ Subject subject = new Subject();
+ Subject returned = composite.callAs(subject, composite::current);
+ assertSame(subject, returned);
+ }
+
+ private ReflectiveStrategy.Loader simulateSecurityManagerRemoval() {
+ return name -> {
+ if (name.equals("java.security.AccessController")) {
+ throw new ClassNotFoundException();
+ } else {
+ return ReflectiveStrategy.Loader.forName().loadClass(name);
+ }
+ };
+ }
+
+ private ReflectiveStrategy.Loader
simulateMethodsThrowUnsupportedOperationExceptions() {
+ // WARNING: These assertions are here to prevent warnings about unused
methods.
+ // These methods are used reflectively, and can't be removed.
+ assertThrows(UnsupportedOperationException.class, () ->
UnsupportedOperations.doPrivileged(null));
+ assertThrows(UnsupportedOperationException.class, () ->
UnsupportedOperations.getSubject(null));
+ assertThrows(UnsupportedOperationException.class, () ->
UnsupportedOperations.doAs(null, null));
+ assertNull(UnsupportedOperations.current());
+ assertNull(UnsupportedOperations.callAs(null, () -> null));
+ return name -> {
+ switch (name) {
+ case "java.security.AccessController":
+ case "javax.security.auth.Subject":
+ return UnsupportedOperations.class;
+ case "java.security.AccessControlContext":
+ return UnsupportedOperations.DummyContext.class;
+ default:
+ return ReflectiveStrategy.Loader.forName().loadClass(name);
+ }
+ };
+ }
+
+ /**
+ * This is a class meant to stand-in for the AccessController, and Subject
classes.
+ * It simulates a scenario where all legacy methods throw
UnsupportedOperationException, and only the modern
+ * methods are functional.
+ */
+ public static class UnsupportedOperations {
+
+ /**
+ * This class stands-in for the AccessControlContext in the mocked
signatures below, because we can't have a
+ * compile-time dependency on the real class. This needs no methods
and is just a dummy class.
+ */
+ public static class DummyContext {
+ }
+
+ private static final ThreadLocal<Subject> ACTIVE_SUBJECT = new
ThreadLocal<>();
+
+ /**
+ * Copy of AccessController#doPrivileged
+ */
+ public static <T> void doPrivileged(PrivilegedAction<T> ignored) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Copy of AccessController#getContext
+ */
+ public static DummyContext getContext() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Copy of Subject#getSubject
+ */
+ public static void getSubject(DummyContext ignored) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Copy of Subject#doAs
+ */
+ public static <T> void doAs(Subject ignored1,
PrivilegedExceptionAction<T> ignored2) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Copy of Subject#current
+ */
+ public static Subject current() {
+ return ACTIVE_SUBJECT.get();
+ }
+
+ /**
+ * Copy of Subject#callAs
+ */
+ public static <T> T callAs(Subject subject, Callable<T> action) throws
CompletionException {
+ Subject previous = ACTIVE_SUBJECT.get();
+ ACTIVE_SUBJECT.set(subject);
+ try {
+ return action.call();
+ } catch (Throwable e) {
+ throw new CompletionException(e);
+ } finally {
+ ACTIVE_SUBJECT.set(previous);
+ }
+ }
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClientCallbackHandlerTest.java
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClientCallbackHandlerTest.java
index 98b9544a5b5..29d9a9f792e 100644
---
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClientCallbackHandlerTest.java
+++
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClientCallbackHandlerTest.java
@@ -16,16 +16,15 @@
*/
package org.apache.kafka.common.security.oauthbearer;
+import org.apache.kafka.common.internals.SecurityManagerCompatibility;
import
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler;
import org.junit.jupiter.api.Test;
import java.io.IOException;
-import java.security.AccessController;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.Set;
+import java.util.concurrent.CompletionException;
import javax.security.auth.Subject;
import javax.security.auth.callback.Callback;
@@ -66,8 +65,8 @@ public class OAuthBearerSaslClientCallbackHandlerTest {
@Test
public void testWithZeroTokens() {
OAuthBearerSaslClientCallbackHandler handler = createCallbackHandler();
- PrivilegedActionException e =
assertThrows(PrivilegedActionException.class, () -> Subject.doAs(new Subject(),
- (PrivilegedExceptionAction<Void>) () -> {
+ CompletionException e = assertThrows(CompletionException.class, () ->
SecurityManagerCompatibility.get().callAs(new Subject(),
+ () -> {
OAuthBearerTokenCallback callback = new
OAuthBearerTokenCallback();
handler.handle(new Callback[] {callback});
return null;
@@ -77,11 +76,11 @@ public class OAuthBearerSaslClientCallbackHandlerTest {
}
@Test()
- public void testWithPotentiallyMultipleTokens() throws Exception {
+ public void testWithPotentiallyMultipleTokens() {
OAuthBearerSaslClientCallbackHandler handler = createCallbackHandler();
- Subject.doAs(new Subject(), (PrivilegedExceptionAction<Void>) () -> {
+ SecurityManagerCompatibility.get().callAs(new Subject(), () -> {
final int maxTokens = 4;
- final Set<Object> privateCredentials =
Subject.getSubject(AccessController.getContext())
+ final Set<Object> privateCredentials =
SecurityManagerCompatibility.get().current()
.getPrivateCredentials();
privateCredentials.clear();
for (int num = 1; num <= maxTokens; ++num) {
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ClassLoaderFactory.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ClassLoaderFactory.java
index 9de24f796d1..2a480fde851 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ClassLoaderFactory.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ClassLoaderFactory.java
@@ -16,9 +16,9 @@
*/
package org.apache.kafka.connect.runtime.isolation;
+import org.apache.kafka.common.internals.SecurityManagerCompatibility;
+
import java.net.URL;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
/**
* Factory for {@link DelegatingClassLoader} and {@link PluginClassLoader}
instances.
@@ -27,14 +27,14 @@ import java.security.PrivilegedAction;
public class ClassLoaderFactory implements PluginClassLoaderFactory {
public DelegatingClassLoader newDelegatingClassLoader(ClassLoader parent) {
- return AccessController.doPrivileged(
- (PrivilegedAction<DelegatingClassLoader>) () -> new
DelegatingClassLoader(parent)
+ return SecurityManagerCompatibility.get().doPrivileged(
+ () -> new DelegatingClassLoader(parent)
);
}
public PluginClassLoader newPluginClassLoader(URL pluginLocation, URL[]
urls, ClassLoader parent) {
- return AccessController.doPrivileged(
- (PrivilegedAction<PluginClassLoader>) () -> new
PluginClassLoader(pluginLocation, urls, parent)
+ return SecurityManagerCompatibility.get().doPrivileged(
+ () -> new PluginClassLoader(pluginLocation, urls, parent)
);
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
index 9ca1e5aad9a..004f78fe1c0 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
@@ -16,14 +16,13 @@
*/
package org.apache.kafka.connect.runtime.isolation;
+import org.apache.kafka.common.internals.SecurityManagerCompatibility;
import org.apache.kafka.connect.components.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
import java.sql.Driver;
import java.util.ArrayList;
import java.util.Arrays;
@@ -93,8 +92,8 @@ public abstract class PluginScanner {
private void loadJdbcDrivers(final ClassLoader loader) {
// Apply here what java.sql.DriverManager does to discover and
register classes
// implementing the java.sql.Driver interface.
- AccessController.doPrivileged(
- (PrivilegedAction<Void>) () -> {
+ SecurityManagerCompatibility.get().doPrivileged(
+ () -> {
ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(
Driver.class,
loader
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
index 6d8d7f71768..1f3d42fd3dc 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.internals.SecurityManagerCompatibility;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.Converter;
@@ -40,8 +41,6 @@ import java.lang.management.ManagementFactory;
import java.lang.management.MonitorInfo;
import java.lang.management.ThreadInfo;
import java.net.URL;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
@@ -161,9 +160,8 @@ public class SynchronizationTest {
private class SynchronizedClassLoaderFactory extends ClassLoaderFactory {
@Override
public DelegatingClassLoader newDelegatingClassLoader(ClassLoader
parent) {
- return AccessController.doPrivileged(
- (PrivilegedAction<DelegatingClassLoader>) () ->
- new SynchronizedDelegatingClassLoader(parent,
dclBreakpoint)
+ return SecurityManagerCompatibility.get().doPrivileged(
+ () -> new SynchronizedDelegatingClassLoader(parent,
dclBreakpoint)
);
}
@@ -173,9 +171,8 @@ public class SynchronizationTest {
URL[] urls,
ClassLoader parent
) {
- return AccessController.doPrivileged(
- (PrivilegedAction<PluginClassLoader>) () ->
- new SynchronizedPluginClassLoader(pluginLocation,
urls, parent, pclBreakpoint)
+ return SecurityManagerCompatibility.get().doPrivileged(
+ () -> new SynchronizedPluginClassLoader(pluginLocation,
urls, parent, pclBreakpoint)
);
}
}
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index c64d76013fa..a4f1ba4cb98 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.internals.SecurityManagerCompatibility;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Quota;
@@ -98,7 +99,6 @@ import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
-import java.security.PrivilegedAction;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
@@ -330,19 +330,15 @@ public class RemoteLogManager implements Closeable {
}
}
- @SuppressWarnings("removal")
RemoteStorageManager createRemoteStorageManager() {
- return java.security.AccessController.doPrivileged(new
PrivilegedAction<RemoteStorageManager>() {
- private final String classPath =
rlmConfig.remoteStorageManagerClassPath();
-
- public RemoteStorageManager run() {
- if (classPath != null && !classPath.trim().isEmpty()) {
- ChildFirstClassLoader classLoader = new
ChildFirstClassLoader(classPath, this.getClass().getClassLoader());
- RemoteStorageManager delegate =
createDelegate(classLoader, rlmConfig.remoteStorageManagerClassName());
- return new ClassLoaderAwareRemoteStorageManager(delegate,
classLoader);
- } else {
- return createDelegate(this.getClass().getClassLoader(),
rlmConfig.remoteStorageManagerClassName());
- }
+ return SecurityManagerCompatibility.get().doPrivileged(() -> {
+ final String classPath = rlmConfig.remoteStorageManagerClassPath();
+ if (classPath != null && !classPath.trim().isEmpty()) {
+ ChildFirstClassLoader classLoader = new
ChildFirstClassLoader(classPath, this.getClass().getClassLoader());
+ RemoteStorageManager delegate = createDelegate(classLoader,
rlmConfig.remoteStorageManagerClassName());
+ return (RemoteStorageManager) new
ClassLoaderAwareRemoteStorageManager(delegate, classLoader);
+ } else {
+ return createDelegate(this.getClass().getClassLoader(),
rlmConfig.remoteStorageManagerClassName());
}
});
}
@@ -353,19 +349,15 @@ public class RemoteLogManager implements Closeable {
remoteLogStorageManager.configure(rsmProps);
}
- @SuppressWarnings("removal")
RemoteLogMetadataManager createRemoteLogMetadataManager() {
- return java.security.AccessController.doPrivileged(new
PrivilegedAction<RemoteLogMetadataManager>() {
- private final String classPath =
rlmConfig.remoteLogMetadataManagerClassPath();
-
- public RemoteLogMetadataManager run() {
- if (classPath != null && !classPath.trim().isEmpty()) {
- ClassLoader classLoader = new
ChildFirstClassLoader(classPath, this.getClass().getClassLoader());
- RemoteLogMetadataManager delegate =
createDelegate(classLoader, rlmConfig.remoteLogMetadataManagerClassName());
- return new
ClassLoaderAwareRemoteLogMetadataManager(delegate, classLoader);
- } else {
- return createDelegate(this.getClass().getClassLoader(),
rlmConfig.remoteLogMetadataManagerClassName());
- }
+ return SecurityManagerCompatibility.get().doPrivileged(() -> {
+ final String classPath =
rlmConfig.remoteLogMetadataManagerClassPath();
+ if (classPath != null && !classPath.trim().isEmpty()) {
+ ClassLoader classLoader = new ChildFirstClassLoader(classPath,
this.getClass().getClassLoader());
+ RemoteLogMetadataManager delegate =
createDelegate(classLoader, rlmConfig.remoteLogMetadataManagerClassName());
+ return (RemoteLogMetadataManager) new
ClassLoaderAwareRemoteLogMetadataManager(delegate, classLoader);
+ } else {
+ return createDelegate(this.getClass().getClassLoader(),
rlmConfig.remoteLogMetadataManagerClassName());
}
});
}