loserwang1024 commented on code in PR #2263:
URL: https://github.com/apache/fluss/pull/2263#discussion_r2763778173
##########
fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/security/acl/FlinkAuthorizationITCase.java:
##########
@@ -132,6 +141,152 @@ void after() throws ExecutionException,
InterruptedException {
dropAcl(Resource.any(), OperationType.ANY);
}
+ /** Tests Kerberos (GSSAPI) authorization. */
+ @Test
+ void testKerberosAuthorization() throws Exception {
+ // Initialize and start a MiniKDC
+ Properties kdcConf = MiniKdc.createConf();
+ FlussMiniKdc kdc = new FlussMiniKdc(kdcConf);
+ kdc.start();
+
+ // Prepare workspace for keytab and krb5.conf
+ Path tempDir = Files.createTempDirectory("fluss-gssapi-test-" +
UUID.randomUUID());
+ File workDir = tempDir.toFile();
+ File keytab = new File(workDir, "fluss.keytab");
+ File krb5Conf = kdc.getKrb5Conf();
+
+ try {
+ // Create principal for server and client
+ // Format
+ // - server: service/hostname
+ // - client: username
+ kdc.createPrincipal(keytab, "fluss/127.0.0.1", "client");
+
+ // Customize krb5.conf to enforce TCP and correct realm settings
if necessary.
+ if (krb5Conf.exists()) {
+ String krb5Content =
+ "[libdefaults]\n"
+ + " default_realm = "
+ + kdc.getRealm()
+ + "\n"
+ + " udp_preference_limit = 1\n"
+ + " kdc_tcp_port = "
+ + kdc.getPort()
+ + "\n"
+ + "\n"
+ + "[realms]\n"
+ + " "
+ + kdc.getRealm()
+ + " = {\n"
+ + " kdc = 127.0.0.1:"
+ + kdc.getPort()
+ + "\n"
+ + " admin_server = 127.0.0.1:"
+ + kdc.getPort()
+ + "\n"
+ + " }\n";
+
+ File customKrb5Conf =
+ new File(workDir, "krb5-custom-" + UUID.randomUUID() +
".conf");
+ Files.write(customKrb5Conf.toPath(), krb5Content.getBytes());
+ System.setProperty("java.security.krb5.conf",
customKrb5Conf.getAbsolutePath());
+ }
+
+ String realm = kdc.getRealm();
+ String serverPrincipal = String.format("fluss/127.0.0.1@%s",
realm);
+ String clientPrincipal = String.format("client@%s", realm);
+
+ // Configure Fluss Cluster
+ Configuration serverConf = initConfig();
+
serverConf.setString(ConfigOptions.SERVER_SECURITY_PROTOCOL_MAP.key(),
"CLIENT:sasl");
+ serverConf.setString(
+ ConfigOptions.SERVER_SASL_ENABLED_MECHANISMS_CONFIG.key(),
"GSSAPI");
+
+ String serverJaas =
+ String.format(
+ "com.sun.security.auth.module.Krb5LoginModule
required "
+ + "useKeyTab=true storeKey=true
useTicketCache=false "
+ + "keyTab=\"%s\" principal=\"%s\";",
+ keytab.getAbsolutePath(), serverPrincipal);
+ serverConf.setString("security.sasl.gssapi.jaas.config",
serverJaas);
+
+ // Grant Super User privileges to client principal
+ serverConf.set(
+ ConfigOptions.SUPER_USERS,
+ "User:"
+ + clientPrincipal
+ + ";User:"
+ + clientPrincipal.toLowerCase()
+ + ";User:client");
+ // Enable authorization to ensure permissions are actually checked.
+ serverConf.set(ConfigOptions.AUTHORIZER_ENABLED, true);
+
+ // Create Fluss cluster with the secure configuration
+ final FlussClusterExtension kerberosFluss =
+ FlussClusterExtension.builder()
+ .setNumOfTabletServers(3)
+ // Use 127.0.0.1 listeners to match kerberos
service principal hostname
+ .setCoordinatorServerListeners(
+ "FLUSS://127.0.0.1:0,
CLIENT://127.0.0.1:0")
+ .setTabletServerListeners("FLUSS://127.0.0.1:0,
CLIENT://127.0.0.1:0")
+ .setClusterConf(serverConf)
+ .build();
+ try {
+ // Start Fluss cluster
+ kerberosFluss.start();
+
+ Configuration clientConf =
kerberosFluss.getClientConfig("CLIENT");
+ String bootstrapServers =
+ String.join(",",
clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+
+ TableEnvironment tEnv =
+
TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+
+ String clientJaas =
+ String.format(
+ "com.sun.security.auth.module.Krb5LoginModule
required "
+ + "useKeyTab=true storeKey=true
useTicketCache=false "
+ + "keyTab=\"%s\" principal=\"%s\";",
+ keytab.getAbsolutePath(), clientPrincipal);
+
+ // Create a Flink catalog configured with Kerberos credentials.
+ String createCatalogDDL =
Review Comment:
would your like to also test what if the client.security.sasl.jaas.config is
not valid for server? Add we can test with client.security.kerberos.keytab and
client.security.kerberos.principal
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]