This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 88ee6d84 [client] Fluss client shouldn't load plugin by thread context
classloader (#1267)
88ee6d84 is described below
commit 88ee6d84ba987891343d83624b23b8a68b12b1ec
Author: Hongshun Wang <[email protected]>
AuthorDate: Mon Jul 7 18:34:40 2025 +0800
[client] Fluss client shouldn't load plugin by thread context classloader
(#1267)
---
.../token/SecurityTokenReceiverRepository.java | 5 +-
.../main/java/com/alibaba/fluss/fs/FileSystem.java | 6 ++-
.../lake/lakestorage/LakeStoragePluginSetUp.java | 4 +-
.../fluss/security/auth/AuthenticationFactory.java | 7 ++-
.../security/auth/sasl/jaas/DefaultLogin.java | 29 +++++++-----
.../security/auth/AuthenticationFactoryTest.java | 16 +++++++
.../utils/ParentResourceBlockingClassLoader.java | 54 ++++++++++++++++++++++
.../security/acl/FlinkAuthorizationITCase.java | 26 +++++++++++
.../fluss/rpc/netty/server/NettyServer.java | 5 +-
.../fluss/server/authorizer/AuthorizerLoader.java | 7 ++-
10 files changed, 141 insertions(+), 18 deletions(-)
diff --git
a/fluss-client/src/main/java/com/alibaba/fluss/client/token/SecurityTokenReceiverRepository.java
b/fluss-client/src/main/java/com/alibaba/fluss/client/token/SecurityTokenReceiverRepository.java
index 898e3ff0..ba18f44b 100644
---
a/fluss-client/src/main/java/com/alibaba/fluss/client/token/SecurityTokenReceiverRepository.java
+++
b/fluss-client/src/main/java/com/alibaba/fluss/client/token/SecurityTokenReceiverRepository.java
@@ -61,7 +61,10 @@ class SecurityTokenReceiverRepository {
receiver.scheme());
};
-
ServiceLoader.load(SecurityTokenReceiver.class).iterator().forEachRemaining(loadReceiver);
+ ServiceLoader.load(
+ SecurityTokenReceiver.class,
SecurityTokenReceiver.class.getClassLoader())
+ .iterator()
+ .forEachRemaining(loadReceiver);
LOG.info("Security token receivers loaded successfully");
return receivers;
diff --git a/fluss-common/src/main/java/com/alibaba/fluss/fs/FileSystem.java
b/fluss-common/src/main/java/com/alibaba/fluss/fs/FileSystem.java
index f6c7ceb6..f5b0c62b 100644
--- a/fluss-common/src/main/java/com/alibaba/fluss/fs/FileSystem.java
+++ b/fluss-common/src/main/java/com/alibaba/fluss/fs/FileSystem.java
@@ -266,7 +266,11 @@ public abstract class FileSystem {
Collection<Supplier<Iterator<FileSystemPlugin>>>
pluginSuppliers =
new ArrayList<>(2);
pluginSuppliers.add(
- () ->
ServiceLoader.load(FileSystemPlugin.class).iterator());
+ () ->
+ ServiceLoader.load(
+ FileSystemPlugin.class,
+
FileSystem.class.getClassLoader())
+ .iterator());
if (pluginManager != null) {
pluginSuppliers.add(
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/lake/lakestorage/LakeStoragePluginSetUp.java
b/fluss-common/src/main/java/com/alibaba/fluss/lake/lakestorage/LakeStoragePluginSetUp.java
index 8f226eed..0e77baca 100644
---
a/fluss-common/src/main/java/com/alibaba/fluss/lake/lakestorage/LakeStoragePluginSetUp.java
+++
b/fluss-common/src/main/java/com/alibaba/fluss/lake/lakestorage/LakeStoragePluginSetUp.java
@@ -54,7 +54,9 @@ public class LakeStoragePluginSetUp {
private static Iterator<LakeStoragePlugin> getAllLakeStoragePlugins(
@Nullable PluginManager pluginManager) {
final Iterator<LakeStoragePlugin> pluginIteratorSPI =
- ServiceLoader.load(LakeStoragePlugin.class).iterator();
+ ServiceLoader.load(
+ LakeStoragePlugin.class,
LakeStoragePlugin.class.getClassLoader())
+ .iterator();
if (pluginManager == null) {
return pluginIteratorSPI;
} else {
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/security/auth/AuthenticationFactory.java
b/fluss-common/src/main/java/com/alibaba/fluss/security/auth/AuthenticationFactory.java
index 8f9ebbcd..8a31dbca 100644
---
a/fluss-common/src/main/java/com/alibaba/fluss/security/auth/AuthenticationFactory.java
+++
b/fluss-common/src/main/java/com/alibaba/fluss/security/auth/AuthenticationFactory.java
@@ -118,7 +118,12 @@ public class AuthenticationFactory {
String protocol, Class<T> pluginInterface, @Nullable PluginManager
pluginManager) {
Collection<Supplier<Iterator<AuthenticationPlugin>>> pluginSuppliers =
new ArrayList<>(2);
- pluginSuppliers.add(() ->
ServiceLoader.load(AuthenticationPlugin.class).iterator());
+ pluginSuppliers.add(
+ () ->
+ ServiceLoader.load(
+ AuthenticationPlugin.class,
+
AuthenticationPlugin.class.getClassLoader())
+ .iterator());
if (pluginManager != null) {
pluginSuppliers.add(() ->
pluginManager.load(AuthenticationPlugin.class));
}
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/jaas/DefaultLogin.java
b/fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/jaas/DefaultLogin.java
index 2d41ef14..f32a0a7a 100644
---
a/fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/jaas/DefaultLogin.java
+++
b/fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/jaas/DefaultLogin.java
@@ -17,6 +17,8 @@
package com.alibaba.fluss.security.auth.sasl.jaas;
+import com.alibaba.fluss.utils.TemporaryClassLoaderContext;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,18 +47,21 @@ public class DefaultLogin implements Login {
@Override
public LoginContext login() throws LoginException {
- loginContext =
- new LoginContext(
- contextName,
- null,
- callbacks -> {
- // Nothing here until we support some mechanisms
such as sasl/GSSAPI
- // later.
- throw new UnsupportedCallbackException(
- callbacks[0], "Unrecognized SASL
mechanism.");
- },
- jaasConfig);
- loginContext.login();
+ try (TemporaryClassLoaderContext ignored =
+
TemporaryClassLoaderContext.of(DefaultLogin.class.getClassLoader())) {
+ loginContext =
+ new LoginContext(
+ contextName,
+ null,
+ callbacks -> {
+ // Nothing here until we support some
mechanisms such as sasl/GSSAPI
+ // later.
+ throw new UnsupportedCallbackException(
+ callbacks[0], "Unrecognized SASL
mechanism.");
+ },
+ jaasConfig);
+ loginContext.login();
+ }
LOG.info("Successfully logged in.");
return loginContext;
}
diff --git
a/fluss-common/src/test/java/com/alibaba/fluss/security/auth/AuthenticationFactoryTest.java
b/fluss-common/src/test/java/com/alibaba/fluss/security/auth/AuthenticationFactoryTest.java
index fb21d0d6..3bf5c57f 100644
---
a/fluss-common/src/test/java/com/alibaba/fluss/security/auth/AuthenticationFactoryTest.java
+++
b/fluss-common/src/test/java/com/alibaba/fluss/security/auth/AuthenticationFactoryTest.java
@@ -21,9 +21,13 @@ import com.alibaba.fluss.config.Configuration;
import com.alibaba.fluss.metadata.ValidationException;
import
com.alibaba.fluss.security.auth.TestIdentifierAuthenticationPlugin.TestIdentifierClientAuthenticator;
import
com.alibaba.fluss.security.auth.TestIdentifierAuthenticationPlugin.TestIdentifierServerAuthenticator;
+import com.alibaba.fluss.utils.ParentResourceBlockingClassLoader;
+import com.alibaba.fluss.utils.TemporaryClassLoaderContext;
import org.junit.jupiter.api.Test;
+import java.net.URL;
+
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -95,4 +99,16 @@ public class AuthenticationFactoryTest {
.get())
.isInstanceOf(TestIdentifierServerAuthenticator.class);
}
+
+ @Test
+ void testNotIncludedInThreadContextClassloader() {
+ try (TemporaryClassLoaderContext ignored =
+ TemporaryClassLoaderContext.of(new
ParentResourceBlockingClassLoader(new URL[0]))) {
+ Configuration configuration = new Configuration();
+ configuration.setString("client.security.protocol", "SSL_TEST");
+ configuration.setString("security.protocol.map", "FLUSS:SSL_TEST");
+
assertThat(AuthenticationFactory.loadClientAuthenticatorSupplier(configuration).get())
+ .isInstanceOf(TestIdentifierClientAuthenticator.class);
+ }
+ }
}
diff --git
a/fluss-common/src/test/java/com/alibaba/fluss/utils/ParentResourceBlockingClassLoader.java
b/fluss-common/src/test/java/com/alibaba/fluss/utils/ParentResourceBlockingClassLoader.java
new file mode 100644
index 00000000..cd68ebde
--- /dev/null
+++
b/fluss-common/src/test/java/com/alibaba/fluss/utils/ParentResourceBlockingClassLoader.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.utils;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Enumeration;
+
+import static org.apache.logging.log4j.util.LoaderUtil.findResources;
+
+/**
+ * A class loader that blocks resource loading from parent class loader.
Designed to simulate SPI
+ * loading behavior without delegation to parent.
+ */
+public class ParentResourceBlockingClassLoader extends URLClassLoader {
+ public ParentResourceBlockingClassLoader(URL[] urls) {
+ super(urls);
+ }
+
+ @Override
+ protected Class<?> loadClass(String name, boolean resolve) throws
ClassNotFoundException {
+ synchronized (getClassLoadingLock(name)) {
+ // First, check if the class has already been loaded
+ Class<?> c = findLoadedClass(name);
+ c = findClass(name);
+ if (resolve) {
+ resolveClass(c);
+ }
+ return c;
+ }
+ }
+
+ @Override
+ public Enumeration<URL> getResources(String name) throws IOException {
+ // Skip parent class loader resource loading during Service.load
+ return findResources(name);
+ }
+}
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java
index 15f84f5e..3d0e5bec 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/com/alibaba/fluss/flink/security/acl/FlinkAuthorizationITCase.java
@@ -27,7 +27,10 @@ import com.alibaba.fluss.metadata.TablePath;
import com.alibaba.fluss.security.acl.FlussPrincipal;
import com.alibaba.fluss.security.acl.OperationType;
import com.alibaba.fluss.security.acl.Resource;
+import com.alibaba.fluss.security.auth.sasl.jaas.LoginManager;
import com.alibaba.fluss.server.testutils.FlussClusterExtension;
+import com.alibaba.fluss.utils.ParentResourceBlockingClassLoader;
+import com.alibaba.fluss.utils.TemporaryClassLoaderContext;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.table.api.EnvironmentSettings;
@@ -43,6 +46,7 @@ import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
+import java.net.URL;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
@@ -385,6 +389,28 @@ abstract class FlinkAuthorizationITCase extends
AbstractTestBase {
Arrays.asList("+I[1, beijing, zhangsan]", "+I[2, shanghai,
lisi]"));
}
+ // this test is to mock `--jar` which is not loaded by the app classloader.
+ @Test
+ void testNotIncludedInThreadContextClassloader() throws Exception {
+ try (TemporaryClassLoaderContext ignored =
+ TemporaryClassLoaderContext.of(new
ParentResourceBlockingClassLoader(new URL[0]))) {
+ // clear the cache of login context to make sure load the class
again.
+ LoginManager.closeAll();
+ tEnv.executeSql(
+ String.format(
+ "create catalog test_classloader_catalog
with ('type' = 'fluss', "
+ + "'bootstrap.servers' = '%s',"
+ + "'client.security.protocol' =
'sasl',"
+ +
"'client.security.sasl.mechanism' = 'PLAIN', \n"
+ + "'client.security.sasl.username'
= 'guest', \n"
+ + "'client.security.sasl.password'
= 'password2' \n"
+ + ")",
+ String.join(
+ ",",
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS))))
+ .await();
+ }
+ }
+
void addAcl(Resource resource, OperationType operationType)
throws ExecutionException, InterruptedException {
tEnv.executeSql(
diff --git
a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/NettyServer.java
b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/NettyServer.java
index e1245d85..9e641239 100644
---
a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/NettyServer.java
+++
b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/NettyServer.java
@@ -256,7 +256,10 @@ public final class NettyServer implements RpcServer {
protocol.name());
protocols.put(protocol.name(), protocol);
};
-
ServiceLoader.load(NetworkProtocolPlugin.class).iterator().forEachRemaining(loadProtocol);
+ ServiceLoader.load(
+ NetworkProtocolPlugin.class,
NetworkProtocolPlugin.class.getClassLoader())
+ .iterator()
+ .forEachRemaining(loadProtocol);
if (protocols.containsKey(protocolName)) {
LOG.info("Protocol plugin {} loaded successfully", protocolName);
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/authorizer/AuthorizerLoader.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/authorizer/AuthorizerLoader.java
index d69f131b..08f1f1c5 100644
---
a/fluss-server/src/main/java/com/alibaba/fluss/server/authorizer/AuthorizerLoader.java
+++
b/fluss-server/src/main/java/com/alibaba/fluss/server/authorizer/AuthorizerLoader.java
@@ -49,7 +49,12 @@ public class AuthorizerLoader {
}
String authorizerType = configuration.get(AUTHORIZER_TYPE);
Collection<Supplier<Iterator<AuthorizationPlugin>>> pluginSuppliers =
new ArrayList<>(2);
- pluginSuppliers.add(() ->
ServiceLoader.load(AuthorizationPlugin.class).iterator());
+ pluginSuppliers.add(
+ () ->
+ ServiceLoader.load(
+ AuthorizationPlugin.class,
+
AuthorizationPlugin.class.getClassLoader())
+ .iterator());
if (pluginManager != null) {
pluginSuppliers.add(() ->
pluginManager.load(AuthorizationPlugin.class));