This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.9
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.9 by this push:
     new 392a56423e0 KAFKA-17078: Add SecurityManagerCompatibility shim 
(#16522) (#19221)
392a56423e0 is described below

commit 392a56423e0bc35dad47c8e83fc3a5fcc82709b0
Author: Stig Døssing <[email protected]>
AuthorDate: Mon Apr 7 04:39:24 2025 +0200

    KAFKA-17078: Add SecurityManagerCompatibility shim (#16522) (#19221)
    
    Backport for 3.9 for https://github.com/apache/kafka/pull/16522
    
    The reason to do this is that this is necessary in order for 3.9 to support 
Java 24.
    
    Please see https://lists.apache.org/thread/6k942pphowd28dh9gn6xbnngk6nxs3n0 
where it is being discussed whether to do this.
    
    Co-authored-by: Greg Harris <[email protected]>
    
    Reviewers: Luke Chen <[email protected]>, Greg Harris <[email protected]>
---
 .../kafka/common/internals/CompositeStrategy.java  | 106 +++++++++
 .../kafka/common/internals/LegacyStrategy.java     |  96 ++++++++
 .../kafka/common/internals/ModernStrategy.java     |  64 ++++++
 .../kafka/common/internals/ReflectiveStrategy.java |  69 ++++++
 .../internals/SecurityManagerCompatibility.java    | 102 +++++++++
 .../common/internals/UnsupportedStrategy.java      |  60 +++++
 .../authenticator/SaslClientAuthenticator.java     |  14 +-
 .../authenticator/SaslClientCallbackHandler.java   |   4 +-
 .../authenticator/SaslServerAuthenticator.java     |  12 +-
 .../OAuthBearerSaslClientCallbackHandler.java      |   6 +-
 .../SecurityManagerCompatibilityTest.java          | 252 +++++++++++++++++++++
 .../OAuthBearerSaslClientCallbackHandlerTest.java  |  15 +-
 .../runtime/isolation/ClassLoaderFactory.java      |  12 +-
 .../connect/runtime/isolation/PluginScanner.java   |   7 +-
 .../runtime/isolation/SynchronizationTest.java     |  13 +-
 .../java/kafka/log/remote/RemoteLogManager.java    |  42 ++--
 16 files changed, 805 insertions(+), 69 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/internals/CompositeStrategy.java
 
b/clients/src/main/java/org/apache/kafka/common/internals/CompositeStrategy.java
new file mode 100644
index 00000000000..6a96576f54d
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/internals/CompositeStrategy.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.security.PrivilegedAction;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
+
+import javax.security.auth.Subject;
+
+/**
+ * This strategy combines the functionality of the {@link LegacyStrategy}, 
{@link ModernStrategy}, and
+ * {@link UnsupportedStrategy} strategies to provide the legacy APIs as long 
as they are present and not degraded.
+ * If the legacy APIs are missing or degraded, this falls back to the modern 
APIs.
+ */
+class CompositeStrategy implements SecurityManagerCompatibility {
+
+    private static final Logger log = 
LoggerFactory.getLogger(CompositeStrategy.class);
+    static final CompositeStrategy INSTANCE = new 
CompositeStrategy(ReflectiveStrategy.Loader.forName());
+
+    private final SecurityManagerCompatibility fallbackStrategy;
+    private final AtomicReference<SecurityManagerCompatibility> activeStrategy;
+
+    // Visible for testing
+    CompositeStrategy(ReflectiveStrategy.Loader loader) {
+        SecurityManagerCompatibility initial;
+        SecurityManagerCompatibility fallback = null;
+        try {
+            initial = new LegacyStrategy(loader);
+            try {
+                fallback = new ModernStrategy(loader);
+                // This is expected for JRE 18+
+                log.debug("Loaded legacy SecurityManager methods, will fall 
back to modern methods after UnsupportedOperationException");
+            } catch (NoSuchMethodException | ClassNotFoundException ex) {
+                // This is expected for JRE <= 17
+                log.debug("Unable to load modern Subject methods, relying only 
on legacy methods", ex);
+            }
+        } catch (ClassNotFoundException | NoSuchMethodException e) {
+            try {
+                initial = new ModernStrategy(loader);
+                // This is expected for JREs after the removal takes place.
+                log.debug("Unable to load legacy SecurityManager methods, 
relying only on modern methods", e);
+            } catch (NoSuchMethodException | ClassNotFoundException ex) {
+                initial = new UnsupportedStrategy(e, ex);
+                // This is not expected in normal use, only in test 
environments.
+                log.error("Unable to load legacy SecurityManager methods", e);
+                log.error("Unable to load modern Subject methods", ex);
+            }
+        }
+        Objects.requireNonNull(initial, "initial strategy must be defined");
+        activeStrategy = new AtomicReference<>(initial);
+        fallbackStrategy = fallback;
+    }
+
+    private <T> T performAction(Function<SecurityManagerCompatibility, T> 
action) {
+        SecurityManagerCompatibility active = activeStrategy.get();
+        try {
+            return action.apply(active);
+        } catch (UnsupportedOperationException e) {
+            // If we chose a fallback strategy during loading, switch to it 
and retry this operation.
+            if (active != fallbackStrategy && fallbackStrategy != null) {
+                if (activeStrategy.compareAndSet(active, fallbackStrategy)) {
+                    log.debug("Using fallback strategy after encountering 
degraded legacy method", e);
+                }
+                return action.apply(fallbackStrategy);
+            }
+            // If we're already using the fallback strategy, then there's 
nothing to do to handle these exceptions.
+            throw e;
+        }
+    }
+
+    @Override
+    public <T> T doPrivileged(PrivilegedAction<T> action) {
+        return performAction(compatibility -> 
compatibility.doPrivileged(action));
+    }
+
+    @Override
+    public Subject current() {
+        return performAction(SecurityManagerCompatibility::current);
+    }
+
+    @Override
+    public <T> T callAs(Subject subject, Callable<T> action) throws 
CompletionException {
+        return performAction(compatibility -> compatibility.callAs(subject, 
action));
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/internals/LegacyStrategy.java 
b/clients/src/main/java/org/apache/kafka/common/internals/LegacyStrategy.java
new file mode 100644
index 00000000000..de9bf7643d8
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/internals/LegacyStrategy.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.lang.reflect.Method;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedActionException;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionException;
+
+import javax.security.auth.Subject;
+
+/**
+ * This class implements reflective access to the deprecated-for-removal 
methods of AccessController and Subject.
+ * <p>Instantiating this class may fail if any of the required classes or 
methods are not found.
+ * Method invocations for this class may fail with {@link 
UnsupportedOperationException} if all methods are found,
+ * but the operation is not permitted to be invoked.
+ * <p>This class is expected to be instantiable in JRE >=8 until the removal 
finally takes place.
+ */
+@SuppressWarnings("unchecked")
+class LegacyStrategy implements SecurityManagerCompatibility {
+
+    private final Method doPrivileged;
+    private final Method getContext;
+    private final Method getSubject;
+    private final Method doAs;
+
+    // Visible for testing
+    LegacyStrategy(ReflectiveStrategy.Loader loader) throws 
ClassNotFoundException, NoSuchMethodException {
+        Class<?> accessController = 
loader.loadClass("java.security.AccessController");
+        doPrivileged = accessController.getDeclaredMethod("doPrivileged", 
PrivilegedAction.class);
+        getContext = accessController.getDeclaredMethod("getContext");
+        Class<?> accessControlContext = 
loader.loadClass("java.security.AccessControlContext");
+        Class<?> subject = loader.loadClass(Subject.class.getName());
+        getSubject = subject.getDeclaredMethod("getSubject", 
accessControlContext);
+        // Note that the Subject class isn't deprecated or removed, so 
reference it as an argument type.
+        // This allows for mocking out the method implementation while still 
accepting Subject instances as arguments.
+        doAs = subject.getDeclaredMethod("doAs", Subject.class, 
PrivilegedExceptionAction.class);
+    }
+
+    @Override
+    public <T> T doPrivileged(PrivilegedAction<T> action) {
+        return (T) ReflectiveStrategy.invoke(doPrivileged, null, action);
+    }
+
+    /**
+     * @return the result of AccessController.getContext(), of type 
AccessControlContext
+     */
+    private Object getContext() {
+        return ReflectiveStrategy.invoke(getContext, null);
+    }
+
+    /**
+     * @param context The current AccessControlContext
+     * @return The result of Subject.getSubject(AccessControlContext)
+     */
+    private Subject getSubject(Object context) {
+        return (Subject) ReflectiveStrategy.invoke(getSubject, null, context);
+    }
+
+    @Override
+    public Subject current() {
+        return getSubject(getContext());
+    }
+
+    /**
+     * @return The result of Subject.doAs(Subject, PrivilegedExceptionAction)
+     */
+    private <T> T doAs(Subject subject, PrivilegedExceptionAction<T> action) 
throws PrivilegedActionException {
+        return (T) ReflectiveStrategy.invokeChecked(doAs, 
PrivilegedActionException.class, null, subject, action);
+    }
+
+    @Override
+    public <T> T callAs(Subject subject, Callable<T> callable) throws 
CompletionException {
+        try {
+            return doAs(subject, callable::call);
+        } catch (PrivilegedActionException e) {
+            throw new CompletionException(e.getCause());
+        }
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/internals/ModernStrategy.java 
b/clients/src/main/java/org/apache/kafka/common/internals/ModernStrategy.java
new file mode 100644
index 00000000000..839196541ea
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/internals/ModernStrategy.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.lang.reflect.Method;
+import java.security.PrivilegedAction;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionException;
+
+import javax.security.auth.Subject;
+
+/**
+ * This class implements reflective access to the methods of Subject added to 
replace deprecated methods.
+ * <p>Instantiating this class may fail if any of the required classes or 
methods are not found.
+ * Method invocations for this class may fail with {@link 
UnsupportedOperationException} if all methods are found,
+ * but the operation is not permitted to be invoked.
+ * <p>This class is expected to be instantiable in JRE >= 18. At the time of 
writing, these methods do not have
+ * a sunset date, and are expected to be available past the removal of the 
SecurityManager.
+ */
+@SuppressWarnings("unchecked")
+class ModernStrategy implements SecurityManagerCompatibility {
+
+    private final Method current;
+    private final Method callAs;
+
+    // Visible for testing
+    ModernStrategy(ReflectiveStrategy.Loader loader) throws 
NoSuchMethodException, ClassNotFoundException {
+        Class<?> subject = loader.loadClass(Subject.class.getName());
+        current = subject.getDeclaredMethod("current");
+        // Note that the Subject class isn't deprecated or removed, so 
reference it as an argument type.
+        // This allows for mocking out the method implementation while still 
accepting Subject instances as arguments.
+        callAs = subject.getDeclaredMethod("callAs", Subject.class, 
Callable.class);
+    }
+
+    @Override
+    public <T> T doPrivileged(PrivilegedAction<T> action) {
+        // This is intentionally a pass-through
+        return action.run();
+    }
+
+    @Override
+    public Subject current() {
+        return (Subject) ReflectiveStrategy.invoke(current, null);
+    }
+
+    @Override
+    public <T> T callAs(Subject subject, Callable<T> action) throws 
CompletionException {
+        return (T) ReflectiveStrategy.invokeChecked(callAs, 
CompletionException.class, null, subject, action);
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/internals/ReflectiveStrategy.java
 
b/clients/src/main/java/org/apache/kafka/common/internals/ReflectiveStrategy.java
new file mode 100644
index 00000000000..c7d251a00b9
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/internals/ReflectiveStrategy.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+/**
+ * Utility methods for strategies which use reflection to access methods 
without requiring them at compile-time.
+ */
+class ReflectiveStrategy {
+
+    static Object invoke(Method method, Object obj, Object... args) {
+        try {
+            return method.invoke(obj, args);
+        } catch (IllegalAccessException e) {
+            throw new UnsupportedOperationException(e);
+        } catch (InvocationTargetException e) {
+            Throwable cause = e.getCause();
+            if (cause instanceof RuntimeException) {
+                throw (RuntimeException) cause;
+            } else {
+                throw new RuntimeException(cause);
+            }
+        }
+    }
+
+    static <T extends Exception> Object invokeChecked(Method method, Class<T> 
ex, Object obj, Object... args) throws T {
+        try {
+            return method.invoke(obj, args);
+        } catch (IllegalAccessException e) {
+            throw new UnsupportedOperationException(e);
+        } catch (InvocationTargetException e) {
+            Throwable cause = e.getCause();
+            if (ex.isInstance(cause)) {
+                throw ex.cast(cause);
+            } else if (cause instanceof RuntimeException) {
+                throw (RuntimeException) cause;
+            } else {
+                throw new RuntimeException(cause);
+            }
+        }
+    }
+
+    /**
+     * Interface to allow mocking out classloading infrastructure. This is 
used to test reflective operations.
+     */
+    interface Loader {
+        Class<?> loadClass(String className) throws ClassNotFoundException;
+
+        static Loader forName() {
+            return className -> Class.forName(className, true, 
Loader.class.getClassLoader());
+        }
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/internals/SecurityManagerCompatibility.java
 
b/clients/src/main/java/org/apache/kafka/common/internals/SecurityManagerCompatibility.java
new file mode 100644
index 00000000000..1f7dee3e06c
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/internals/SecurityManagerCompatibility.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.security.PrivilegedAction;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionException;
+
+import javax.security.auth.Subject;
+
+/**
+ * This is a compatibility class to provide dual-support for JREs with and 
without SecurityManager support.
+ * <p>Users should call {@link #get()} to retrieve a singleton instance, and 
call instance methods
+ * {@link #doPrivileged(PrivilegedAction)}, {@link #current()}, and {@link 
#callAs(Subject, Callable)}.
+ * <p>This class's motivation and expected behavior is defined in
+ * <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-1006%3A+Remove+SecurityManager+Support";>KIP-1006</a>
+ */
+public interface SecurityManagerCompatibility {
+
+    /**
+     * @return an implementation of this interface which conforms to the 
functionality available in the current JRE.
+     */
+    static SecurityManagerCompatibility get() {
+        return CompositeStrategy.INSTANCE;
+    }
+
+    /**
+     * Performs the specified {@code PrivilegedAction} with privileges
+     * enabled. The action is performed with <i>all</i> of the permissions
+     * possessed by the caller's protection domain.
+     *
+     * <p> If the action's {@code run} method throws an (unchecked)
+     * exception, it will propagate through this method.
+     *
+     * <p> Note that any DomainCombiner associated with the current
+     * AccessControlContext will be ignored while the action is performed.
+     *
+     * @param <T> the type of the value returned by the PrivilegedAction's
+     *            {@code run} method.
+     *
+     * @param action the action to be performed.
+     *
+     * @return the value returned by the action's {@code run} method.
+     *
+     * @exception NullPointerException if the action is {@code null}
+     * @see java.security.AccessController#doPrivileged(PrivilegedAction)
+     */
+    <T> T doPrivileged(PrivilegedAction<T> action);
+
+    /**
+     * Returns the current subject.
+     * <p>
+     * The current subject is installed by the {@link #callAs} method.
+     * When {@code callAs(subject, action)} is called, {@code action} is
+     * executed with {@code subject} as its current subject which can be
+     * retrieved by this method. After {@code action} is finished, the current
+     * subject is reset to its previous value. The current
+     * subject is {@code null} before the first call of {@code callAs()}.
+     *
+     * @return the current subject, or {@code null} if a current subject is
+     *         not installed or the current subject is set to {@code null}.
+     * @see #callAs(Subject, Callable)
+     * @see Subject#current()
+     * @see Subject#callAs(Subject, Callable)
+     */
+    Subject current();
+
+    /**
+     * Executes a {@code Callable} with {@code subject} as the
+     * current subject.
+     *
+     * @param subject the {@code Subject} that the specified {@code action}
+     *                will run as.  This parameter may be {@code null}.
+     * @param action the code to be run with {@code subject} as its current
+     *               subject. Must not be {@code null}.
+     * @param <T> the type of value returned by the {@code call} method
+     *            of {@code action}
+     * @return the value returned by the {@code call} method of {@code action}
+     * @throws NullPointerException if {@code action} is {@code null}
+     * @throws CompletionException if {@code action.call()} throws an 
exception.
+     *      The cause of the {@code CompletionException} is set to the 
exception
+     *      thrown by {@code action.call()}.
+     * @see #current()
+     * @see Subject#current()
+     * @see Subject#callAs(Subject, Callable)
+     */
+    <T> T callAs(Subject subject, Callable<T> action) throws 
CompletionException;
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/internals/UnsupportedStrategy.java
 
b/clients/src/main/java/org/apache/kafka/common/internals/UnsupportedStrategy.java
new file mode 100644
index 00000000000..ca4175badff
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/internals/UnsupportedStrategy.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import java.security.PrivilegedAction;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionException;
+
+import javax.security.auth.Subject;
+
+/**
+ * This is a fallback strategy to use if no other strategies are available.
+ * <p>This is used to improve control flow and provide detailed error messages 
in unusual situations.
+ */
+class UnsupportedStrategy implements SecurityManagerCompatibility {
+
+    private final Throwable e1;
+    private final Throwable e2;
+
+    UnsupportedStrategy(Throwable e1, Throwable e2) {
+        this.e1 = e1;
+        this.e2 = e2;
+    }
+
+    private UnsupportedOperationException createException(String message) {
+        UnsupportedOperationException e = new 
UnsupportedOperationException(message);
+        e.addSuppressed(e1);
+        e.addSuppressed(e2);
+        return e;
+    }
+
+    @Override
+    public <T> T doPrivileged(PrivilegedAction<T> action) {
+        throw createException("Unable to find suitable 
AccessController#doPrivileged implementation");
+    }
+
+    @Override
+    public Subject current() {
+        throw createException("Unable to find suitable Subject#getCurrent or 
Subject#current implementation");
+    }
+
+    @Override
+    public <T> T callAs(Subject subject, Callable<T> action) throws 
CompletionException {
+        throw createException("Unable to find suitable Subject#doAs or 
Subject#callAs implementation");
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
index a327c4b6c1b..45169feb657 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.errors.IllegalSaslStateException;
 import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
+import org.apache.kafka.common.internals.SecurityManagerCompatibility;
 import org.apache.kafka.common.message.ApiVersionsResponseData.ApiVersion;
 import org.apache.kafka.common.message.RequestHeaderData;
 import org.apache.kafka.common.message.SaslAuthenticateRequestData;
@@ -59,8 +60,6 @@ import java.nio.BufferUnderflowException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.security.Principal;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -70,6 +69,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.CompletionException;
 
 import javax.security.auth.Subject;
 import javax.security.sasl.Sasl;
@@ -215,7 +215,7 @@ public class SaslClientAuthenticator implements 
Authenticator {
     // visible for testing
     SaslClient createSaslClient() {
         try {
-            return Subject.doAs(subject, 
(PrivilegedExceptionAction<SaslClient>) () -> {
+            return SecurityManagerCompatibility.get().callAs(subject, () -> {
                 String[] mechs = {mechanism};
                 log.debug("Creating SaslClient: 
client={};service={};serviceHostname={};mechs={}",
                     clientPrincipalName, servicePrincipal, host, 
Arrays.toString(mechs));
@@ -225,7 +225,7 @@ public class SaslClientAuthenticator implements 
Authenticator {
                 }
                 return retvalSaslClient;
             });
-        } catch (PrivilegedActionException e) {
+        } catch (CompletionException e) {
             throw new SaslAuthenticationException("Failed to create SaslClient 
with mechanism " + mechanism, e.getCause());
         }
     }
@@ -533,8 +533,8 @@ public class SaslClientAuthenticator implements 
Authenticator {
             if (isInitial && !saslClient.hasInitialResponse())
                 return saslToken;
             else
-                return Subject.doAs(subject, 
(PrivilegedExceptionAction<byte[]>) () -> 
saslClient.evaluateChallenge(saslToken));
-        } catch (PrivilegedActionException e) {
+                return SecurityManagerCompatibility.get().callAs(subject, () 
-> saslClient.evaluateChallenge(saslToken));
+        } catch (CompletionException e) {
             String error = "An error: (" + e + ") occurred when evaluating 
SASL token received from the Kafka Broker.";
             KerberosError kerberosError = KerberosError.fromException(e);
             // Try to provide hints to use about what went wrong so they can 
fix their configuration.
@@ -545,7 +545,7 @@ public class SaslClientAuthenticator implements 
Authenticator {
                     " Users must configure FQDN of kafka brokers when 
authenticating using SASL and" +
                     " `socketChannel.socket().getInetAddress().getHostName()` 
must match the hostname in `principal/hostname@realm`";
             }
-            //Unwrap the SaslException inside `PrivilegedActionException`
+            //Unwrap the SaslException
             Throwable cause = e.getCause();
             // Treat transient Kerberos errors as non-fatal SaslExceptions 
that are processed as I/O exceptions
             // and all other failures as fatal SaslAuthenticationException.
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
index fc27feb0502..c2257f3b6b8 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
@@ -17,13 +17,13 @@
 package org.apache.kafka.common.security.authenticator;
 
 import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.internals.SecurityManagerCompatibility;
 import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
 import org.apache.kafka.common.security.auth.SaslExtensions;
 import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
 import org.apache.kafka.common.security.scram.ScramExtensionsCallback;
 import org.apache.kafka.common.security.scram.internals.ScramMechanism;
 
-import java.security.AccessController;
 import java.util.List;
 import java.util.Map;
 
@@ -55,7 +55,7 @@ public class SaslClientCallbackHandler implements 
AuthenticateCallbackHandler {
 
     @Override
     public void handle(Callback[] callbacks) throws 
UnsupportedCallbackException {
-        Subject subject = Subject.getSubject(AccessController.getContext());
+        Subject subject = SecurityManagerCompatibility.get().current();
         for (Callback callback : callbacks) {
             if (callback instanceof NameCallback) {
                 NameCallback nc = (NameCallback) callback;
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
index 43c6ed683d9..b8fb7299a8a 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.errors.InvalidRequestException;
 import org.apache.kafka.common.errors.SaslAuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.internals.SecurityManagerCompatibility;
 import org.apache.kafka.common.message.SaslAuthenticateResponseData;
 import org.apache.kafka.common.message.SaslHandshakeResponseData;
 import org.apache.kafka.common.network.Authenticator;
@@ -73,8 +74,6 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashSet;
@@ -82,6 +81,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.CompletionException;
 import java.util.function.Function;
 
 import javax.net.ssl.SSLSession;
@@ -205,12 +205,12 @@ public class SaslServerAuthenticator implements 
Authenticator {
             saslServer = createSaslKerberosServer(callbackHandler, configs, 
subject);
         } else {
             try {
-                saslServer = Subject.doAs(subject, 
(PrivilegedExceptionAction<SaslServer>) () ->
+                saslServer = 
SecurityManagerCompatibility.get().callAs(subject, () ->
                     Sasl.createSaslServer(saslMechanism, "kafka", 
serverAddress().getHostName(), configs, callbackHandler));
                 if (saslServer == null) {
                     throw new SaslException("Kafka Server failed to create a 
SaslServer to interact with a client during session authentication with server 
mechanism " + saslMechanism);
                 }
-            } catch (PrivilegedActionException e) {
+            } catch (CompletionException e) {
                 throw new SaslException("Kafka Server failed to create a 
SaslServer to interact with a client during session authentication with server 
mechanism " + saslMechanism, e.getCause());
             }
         }
@@ -231,9 +231,9 @@ public class SaslServerAuthenticator implements 
Authenticator {
         LOG.debug("Creating SaslServer for {} with mechanism {}", 
kerberosName, saslMechanism);
 
         try {
-            return Subject.doAs(subject, 
(PrivilegedExceptionAction<SaslServer>) () ->
+            return SecurityManagerCompatibility.get().callAs(subject, () ->
                     Sasl.createSaslServer(saslMechanism, servicePrincipalName, 
serviceHostname, configs, saslServerCallbackHandler));
-        } catch (PrivilegedActionException e) {
+        } catch (CompletionException e) {
             throw new SaslException("Kafka Server failed to create a 
SaslServer to interact with a client during session authentication", 
e.getCause());
         }
     }
diff --git 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
index e4e845a8dea..955ab4c1fac 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/internals/OAuthBearerSaslClientCallbackHandler.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.security.oauthbearer.internals;
 
+import org.apache.kafka.common.internals.SecurityManagerCompatibility;
 import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
 import org.apache.kafka.common.security.auth.SaslExtensions;
 import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
@@ -27,7 +28,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.security.AccessController;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Date;
@@ -83,7 +83,7 @@ public class OAuthBearerSaslClientCallbackHandler implements 
AuthenticateCallbac
             if (callback instanceof OAuthBearerTokenCallback)
                 handleCallback((OAuthBearerTokenCallback) callback);
             else if (callback instanceof SaslExtensionsCallback)
-                handleCallback((SaslExtensionsCallback) callback, 
Subject.getSubject(AccessController.getContext()));
+                handleCallback((SaslExtensionsCallback) callback, 
SecurityManagerCompatibility.get().current());
             else
                 throw new UnsupportedCallbackException(callback);
         }
@@ -97,7 +97,7 @@ public class OAuthBearerSaslClientCallbackHandler implements 
AuthenticateCallbac
     private void handleCallback(OAuthBearerTokenCallback callback) throws 
IOException {
         if (callback.token() != null)
             throw new IllegalArgumentException("Callback had a token already");
-        Subject subject = Subject.getSubject(AccessController.getContext());
+        Subject subject = SecurityManagerCompatibility.get().current();
         Set<OAuthBearerToken> privateCredentials = subject != null
             ? subject.getPrivateCredentials(OAuthBearerToken.class)
             : Collections.emptySet();
diff --git 
a/clients/src/test/java/org/apache/kafka/common/internals/SecurityManagerCompatibilityTest.java
 
b/clients/src/test/java/org/apache/kafka/common/internals/SecurityManagerCompatibilityTest.java
new file mode 100644
index 00000000000..7789a924c78
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/internals/SecurityManagerCompatibilityTest.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.internals;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledForJreRange;
+import org.junit.jupiter.api.condition.JRE;
+
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionException;
+
+import javax.security.auth.Subject;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class SecurityManagerCompatibilityTest {
+
+    @EnabledForJreRange(min = JRE.JAVA_8, max = JRE.JAVA_22)
+    @Test
+    public void testLegacyStrategyLoadable() throws ClassNotFoundException, 
NoSuchMethodException {
+        new LegacyStrategy(ReflectiveStrategy.Loader.forName());
+    }
+
+    @EnabledForJreRange(min = JRE.JAVA_18)
+    @Test
+    public void testModernStrategyLoadable() throws ClassNotFoundException, 
NoSuchMethodException {
+        new ModernStrategy(ReflectiveStrategy.Loader.forName());
+    }
+
+    @Test
+    public void testCompositeStrategyLoadable() {
+        new CompositeStrategy(ReflectiveStrategy.Loader.forName());
+    }
+
+    @Test
+    public void testDefaultStrategyLoadable() {
+        assertNotNull(SecurityManagerCompatibility.get());
+    }
+
+    @Test
+    public void testDefaultStrategyDoPrivilegedReturn() {
+        Object object = new Object();
+        Object returned = SecurityManagerCompatibility.get().doPrivileged(() 
-> object);
+        assertSame(object, returned);
+    }
+
+    @Test
+    public void testDefaultStrategyDoPrivilegedThrow() {
+        assertThrows(RuntimeException.class, () ->
+                SecurityManagerCompatibility.get().doPrivileged(() -> {
+                    throw new RuntimeException();
+                })
+        );
+    }
+
+    @Test
+    public void testDefaultStrategyCurrentNull() {
+        Subject current = SecurityManagerCompatibility.get().current();
+        assertNull(current);
+    }
+
+    @Test
+    public void testDefaultStrategyCallAsReturn() {
+        Subject subject = new Subject();
+        Object object = new Object();
+        Object returned = SecurityManagerCompatibility.get().callAs(subject, 
() -> object);
+        assertSame(object, returned);
+    }
+
+    @Test
+    public void testDefaultStrategyCallAsCurrent() {
+        Subject subject = new Subject();
+        Subject returned = SecurityManagerCompatibility.get().callAs(subject, 
SecurityManagerCompatibility.get()::current);
+        assertSame(subject, returned);
+    }
+
+    @Test
+    public void testLegacyStrategyThrowsWhenSecurityManagerRemoved() {
+        ReflectiveStrategy.Loader loader = simulateSecurityManagerRemoval();
+        assertThrows(ClassNotFoundException.class, () -> new 
LegacyStrategy(loader));
+    }
+
+    @EnabledForJreRange(min = JRE.JAVA_18)
+    @Test
+    public void testModernStrategyLoadableWhenSecurityManagerRemoved() throws 
ClassNotFoundException, NoSuchMethodException {
+        ReflectiveStrategy.Loader loader = simulateSecurityManagerRemoval();
+        new ModernStrategy(loader);
+    }
+
+    @Test
+    public void testCompositeStrategyLoadableWhenSecurityManagerRemoved() {
+        ReflectiveStrategy.Loader loader = simulateSecurityManagerRemoval();
+        new CompositeStrategy(loader);
+    }
+
+    @Test
+    public void 
testLegacyStrategyCurrentThrowsWhenSecurityManagerUnsupported() throws 
ClassNotFoundException, NoSuchMethodException {
+        ReflectiveStrategy.Loader loader = 
simulateMethodsThrowUnsupportedOperationExceptions();
+        SecurityManagerCompatibility legacy = new LegacyStrategy(loader);
+        assertThrows(UnsupportedOperationException.class, legacy::current);
+    }
+
+    @Test
+    public void testLegacyStrategyCallAsThrowsWhenSecurityManagerUnsupported() 
throws ClassNotFoundException, NoSuchMethodException {
+        ReflectiveStrategy.Loader loader = 
simulateMethodsThrowUnsupportedOperationExceptions();
+        SecurityManagerCompatibility legacy = new LegacyStrategy(loader);
+        assertThrows(UnsupportedOperationException.class, () -> 
legacy.callAs(null, () -> null));
+    }
+
+    @Test
+    public void 
testCompositeStrategyDoPrivilegedWhenSecurityManagerUnsupported() {
+        ReflectiveStrategy.Loader loader = 
simulateMethodsThrowUnsupportedOperationExceptions();
+        CompositeStrategy composite = new CompositeStrategy(loader);
+        Object object = new Object();
+        Object returned = composite.doPrivileged(() -> object);
+        assertSame(object, returned);
+    }
+
+    @Test
+    public void testCompositeStrategyCurrentWhenSecurityManagerUnsupported() {
+        ReflectiveStrategy.Loader loader = 
simulateMethodsThrowUnsupportedOperationExceptions();
+        CompositeStrategy composite = new CompositeStrategy(loader);
+        Object returned = composite.current();
+        assertNull(returned);
+    }
+
+    @Test
+    public void testCompositeStrategyCallAsWhenSecurityManagerUnsupported() {
+        ReflectiveStrategy.Loader loader = 
simulateMethodsThrowUnsupportedOperationExceptions();
+        CompositeStrategy composite = new CompositeStrategy(loader);
+        Subject subject = new Subject();
+        Subject returned = composite.callAs(subject, composite::current);
+        assertSame(subject, returned);
+    }
+
+    private ReflectiveStrategy.Loader simulateSecurityManagerRemoval() {
+        return name -> {
+            if (name.equals("java.security.AccessController")) {
+                throw new ClassNotFoundException();
+            } else {
+                return ReflectiveStrategy.Loader.forName().loadClass(name);
+            }
+        };
+    }
+
+    private ReflectiveStrategy.Loader 
simulateMethodsThrowUnsupportedOperationExceptions() {
+        // WARNING: These assertions are here to prevent warnings about unused 
methods.
+        // These methods are used reflectively, and can't be removed.
+        assertThrows(UnsupportedOperationException.class, () -> 
UnsupportedOperations.doPrivileged(null));
+        assertThrows(UnsupportedOperationException.class, () -> 
UnsupportedOperations.getSubject(null));
+        assertThrows(UnsupportedOperationException.class, () -> 
UnsupportedOperations.doAs(null, null));
+        assertNull(UnsupportedOperations.current());
+        assertNull(UnsupportedOperations.callAs(null, () -> null));
+        return name -> {
+            switch (name) {
+                case "java.security.AccessController":
+                case "javax.security.auth.Subject":
+                    return UnsupportedOperations.class;
+                case "java.security.AccessControlContext":
+                    return UnsupportedOperations.DummyContext.class;
+                default:
+                    return ReflectiveStrategy.Loader.forName().loadClass(name);
+            }
+        };
+    }
+
+    /**
+     * This is a class meant to stand-in for the AccessController, and Subject 
classes.
+     * It simulates a scenario where all legacy methods throw 
UnsupportedOperationException, and only the modern
+     * methods are functional.
+     */
+    public static class UnsupportedOperations {
+
+        /**
+         * This class stands-in for the AccessControlContext in the mocked 
signatures below, because we can't have a
+         * compile-time dependency on the real class. This needs no methods 
and is just a dummy class.
+         */
+        public static class DummyContext {
+        }
+
+        private static final ThreadLocal<Subject> ACTIVE_SUBJECT = new 
ThreadLocal<>();
+
+        /**
+         * Copy of AccessController#doPrivileged
+         */
+        public static <T> void doPrivileged(PrivilegedAction<T> ignored) {
+            throw new UnsupportedOperationException();
+        }
+
+        /**
+         * Copy of AccessController#getContext
+         */
+        public static DummyContext getContext() {
+            throw new UnsupportedOperationException();
+        }
+
+        /**
+         * Copy of Subject#getSubject
+         */
+        public static void getSubject(DummyContext ignored) {
+            throw new UnsupportedOperationException();
+        }
+
+        /**
+         * Copy of Subject#doAs
+         */
+        public static <T> void doAs(Subject ignored1, 
PrivilegedExceptionAction<T> ignored2) {
+            throw new UnsupportedOperationException();
+        }
+
+        /**
+         * Copy of Subject#current
+         */
+        public static Subject current() {
+            return ACTIVE_SUBJECT.get();
+        }
+
+        /**
+         * Copy of Subject#callAs
+         */
+        public static <T> T callAs(Subject subject, Callable<T> action) throws 
CompletionException {
+            Subject previous = ACTIVE_SUBJECT.get();
+            ACTIVE_SUBJECT.set(subject);
+            try {
+                return action.call();
+            } catch (Throwable e) {
+                throw new CompletionException(e);
+            } finally {
+                ACTIVE_SUBJECT.set(previous);
+            }
+        }
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClientCallbackHandlerTest.java
 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClientCallbackHandlerTest.java
index 98b9544a5b5..29d9a9f792e 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClientCallbackHandlerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerSaslClientCallbackHandlerTest.java
@@ -16,16 +16,15 @@
  */
 package org.apache.kafka.common.security.oauthbearer;
 
+import org.apache.kafka.common.internals.SecurityManagerCompatibility;
 import 
org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClientCallbackHandler;
 
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
-import java.security.AccessController;
-import java.security.PrivilegedActionException;
-import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
 import java.util.Set;
+import java.util.concurrent.CompletionException;
 
 import javax.security.auth.Subject;
 import javax.security.auth.callback.Callback;
@@ -66,8 +65,8 @@ public class OAuthBearerSaslClientCallbackHandlerTest {
     @Test
     public void testWithZeroTokens() {
         OAuthBearerSaslClientCallbackHandler handler = createCallbackHandler();
-        PrivilegedActionException e = 
assertThrows(PrivilegedActionException.class, () -> Subject.doAs(new Subject(),
-            (PrivilegedExceptionAction<Void>) () -> {
+        CompletionException e = assertThrows(CompletionException.class, () -> 
SecurityManagerCompatibility.get().callAs(new Subject(),
+            () -> {
                 OAuthBearerTokenCallback callback = new 
OAuthBearerTokenCallback();
                 handler.handle(new Callback[] {callback});
                 return null;
@@ -77,11 +76,11 @@ public class OAuthBearerSaslClientCallbackHandlerTest {
     }
 
     @Test()
-    public void testWithPotentiallyMultipleTokens() throws Exception {
+    public void testWithPotentiallyMultipleTokens() {
         OAuthBearerSaslClientCallbackHandler handler = createCallbackHandler();
-        Subject.doAs(new Subject(), (PrivilegedExceptionAction<Void>) () -> {
+        SecurityManagerCompatibility.get().callAs(new Subject(), () -> {
             final int maxTokens = 4;
-            final Set<Object> privateCredentials = 
Subject.getSubject(AccessController.getContext())
+            final Set<Object> privateCredentials = 
SecurityManagerCompatibility.get().current()
                     .getPrivateCredentials();
             privateCredentials.clear();
             for (int num = 1; num <= maxTokens; ++num) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ClassLoaderFactory.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ClassLoaderFactory.java
index 9de24f796d1..2a480fde851 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ClassLoaderFactory.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/ClassLoaderFactory.java
@@ -16,9 +16,9 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
+import org.apache.kafka.common.internals.SecurityManagerCompatibility;
+
 import java.net.URL;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
 
 /**
  * Factory for {@link DelegatingClassLoader} and {@link PluginClassLoader} 
instances.
@@ -27,14 +27,14 @@ import java.security.PrivilegedAction;
 public class ClassLoaderFactory implements PluginClassLoaderFactory {
 
     public DelegatingClassLoader newDelegatingClassLoader(ClassLoader parent) {
-        return AccessController.doPrivileged(
-                (PrivilegedAction<DelegatingClassLoader>) () -> new 
DelegatingClassLoader(parent)
+        return SecurityManagerCompatibility.get().doPrivileged(
+                () -> new DelegatingClassLoader(parent)
         );
     }
 
     public PluginClassLoader newPluginClassLoader(URL pluginLocation, URL[] 
urls, ClassLoader parent) {
-        return AccessController.doPrivileged(
-                (PrivilegedAction<PluginClassLoader>) () -> new 
PluginClassLoader(pluginLocation, urls, parent)
+        return SecurityManagerCompatibility.get().doPrivileged(
+                () -> new PluginClassLoader(pluginLocation, urls, parent)
         );
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
index 9ca1e5aad9a..004f78fe1c0 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/PluginScanner.java
@@ -16,14 +16,13 @@
  */
 package org.apache.kafka.connect.runtime.isolation;
 
+import org.apache.kafka.common.internals.SecurityManagerCompatibility;
 import org.apache.kafka.connect.components.Versioned;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.lang.reflect.InvocationTargetException;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
 import java.sql.Driver;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -93,8 +92,8 @@ public abstract class PluginScanner {
     private void loadJdbcDrivers(final ClassLoader loader) {
         // Apply here what java.sql.DriverManager does to discover and 
register classes
         // implementing the java.sql.Driver interface.
-        AccessController.doPrivileged(
-            (PrivilegedAction<Void>) () -> {
+        SecurityManagerCompatibility.get().doPrivileged(
+            () -> {
                 ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(
                         Driver.class,
                         loader
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
index 6d8d7f71768..1f3d42fd3dc 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.internals.SecurityManagerCompatibility;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.storage.Converter;
@@ -40,8 +41,6 @@ import java.lang.management.ManagementFactory;
 import java.lang.management.MonitorInfo;
 import java.lang.management.ThreadInfo;
 import java.net.URL;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
@@ -161,9 +160,8 @@ public class SynchronizationTest {
     private class SynchronizedClassLoaderFactory extends ClassLoaderFactory {
         @Override
         public DelegatingClassLoader newDelegatingClassLoader(ClassLoader 
parent) {
-            return AccessController.doPrivileged(
-                    (PrivilegedAction<DelegatingClassLoader>) () ->
-                            new SynchronizedDelegatingClassLoader(parent, 
dclBreakpoint)
+            return SecurityManagerCompatibility.get().doPrivileged(
+                    () -> new SynchronizedDelegatingClassLoader(parent, 
dclBreakpoint)
             );
         }
 
@@ -173,9 +171,8 @@ public class SynchronizationTest {
                 URL[] urls,
                 ClassLoader parent
         ) {
-            return AccessController.doPrivileged(
-                    (PrivilegedAction<PluginClassLoader>) () ->
-                            new SynchronizedPluginClassLoader(pluginLocation, 
urls, parent, pclBreakpoint)
+            return SecurityManagerCompatibility.get().doPrivileged(
+                    () -> new SynchronizedPluginClassLoader(pluginLocation, 
urls, parent, pclBreakpoint)
             );
         }
     }
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java 
b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index c64d76013fa..a4f1ba4cb98 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.OffsetOutOfRangeException;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.internals.SecurityManagerCompatibility;
 import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Quota;
@@ -98,7 +99,6 @@ import java.lang.reflect.InvocationTargetException;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Path;
-import java.security.PrivilegedAction;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -330,19 +330,15 @@ public class RemoteLogManager implements Closeable {
         }
     }
 
-    @SuppressWarnings("removal")
     RemoteStorageManager createRemoteStorageManager() {
-        return java.security.AccessController.doPrivileged(new 
PrivilegedAction<RemoteStorageManager>() {
-            private final String classPath = 
rlmConfig.remoteStorageManagerClassPath();
-
-            public RemoteStorageManager run() {
-                if (classPath != null && !classPath.trim().isEmpty()) {
-                    ChildFirstClassLoader classLoader = new 
ChildFirstClassLoader(classPath, this.getClass().getClassLoader());
-                    RemoteStorageManager delegate = 
createDelegate(classLoader, rlmConfig.remoteStorageManagerClassName());
-                    return new ClassLoaderAwareRemoteStorageManager(delegate, 
classLoader);
-                } else {
-                    return createDelegate(this.getClass().getClassLoader(), 
rlmConfig.remoteStorageManagerClassName());
-                }
+        return SecurityManagerCompatibility.get().doPrivileged(() -> {
+            final String classPath = rlmConfig.remoteStorageManagerClassPath();
+            if (classPath != null && !classPath.trim().isEmpty()) {
+                ChildFirstClassLoader classLoader = new 
ChildFirstClassLoader(classPath, this.getClass().getClassLoader());
+                RemoteStorageManager delegate = createDelegate(classLoader, 
rlmConfig.remoteStorageManagerClassName());
+                return (RemoteStorageManager) new 
ClassLoaderAwareRemoteStorageManager(delegate, classLoader);
+            } else {
+                return createDelegate(this.getClass().getClassLoader(), 
rlmConfig.remoteStorageManagerClassName());
             }
         });
     }
@@ -353,19 +349,15 @@ public class RemoteLogManager implements Closeable {
         remoteLogStorageManager.configure(rsmProps);
     }
 
-    @SuppressWarnings("removal")
     RemoteLogMetadataManager createRemoteLogMetadataManager() {
-        return java.security.AccessController.doPrivileged(new 
PrivilegedAction<RemoteLogMetadataManager>() {
-            private final String classPath = 
rlmConfig.remoteLogMetadataManagerClassPath();
-
-            public RemoteLogMetadataManager run() {
-                if (classPath != null && !classPath.trim().isEmpty()) {
-                    ClassLoader classLoader = new 
ChildFirstClassLoader(classPath, this.getClass().getClassLoader());
-                    RemoteLogMetadataManager delegate = 
createDelegate(classLoader, rlmConfig.remoteLogMetadataManagerClassName());
-                    return new 
ClassLoaderAwareRemoteLogMetadataManager(delegate, classLoader);
-                } else {
-                    return createDelegate(this.getClass().getClassLoader(), 
rlmConfig.remoteLogMetadataManagerClassName());
-                }
+        return SecurityManagerCompatibility.get().doPrivileged(() -> {
+            final String classPath = 
rlmConfig.remoteLogMetadataManagerClassPath();
+            if (classPath != null && !classPath.trim().isEmpty()) {
+                ClassLoader classLoader = new ChildFirstClassLoader(classPath, 
this.getClass().getClassLoader());
+                RemoteLogMetadataManager delegate = 
createDelegate(classLoader, rlmConfig.remoteLogMetadataManagerClassName());
+                return (RemoteLogMetadataManager) new 
ClassLoaderAwareRemoteLogMetadataManager(delegate, classLoader);
+            } else {
+                return createDelegate(this.getClass().getClassLoader(), 
rlmConfig.remoteLogMetadataManagerClassName());
             }
         });
     }

Reply via email to