This is an automated email from the ASF dual-hosted git repository.
ipolyzos pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 596efc8c2 [fs/azure] Set auth.type=Custom for token delegation (#2947)
596efc8c2 is described below
commit 596efc8c2423728136eae41e405213b56378c114
Author: Lorenzo Affetti <[email protected]>
AuthorDate: Tue Mar 31 11:56:00 2026 +0200
[fs/azure] Set auth.type=Custom for token delegation (#2947)
* [fs/azure] Set auth.type=Custom for token delegation
* address
---
.../azure/token/AzureDelegationTokenReceiver.java | 5 ++
.../fs/azure/AbfsFileSystemBehaviorITCase.java | 2 +-
.../azure/AbfsFileSystemDelegationTokenITCase.java | 99 ++++++++++++++++++++++
.../fluss/fs/azure/AzureFileSystemPluginTest.java | 7 +-
.../token/AzureDelegationTokenReceiverTest.java | 3 +
website/docs/maintenance/filesystems/azure.md | 2 +-
6 files changed, 115 insertions(+), 3 deletions(-)
diff --git
a/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiver.java
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiver.java
index f59444e83..cf9fb43b3 100644
---
a/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiver.java
+++
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiver.java
@@ -56,6 +56,11 @@ public abstract class AzureDelegationTokenReceiver
implements SecurityTokenRecei
LOG.debug("Provider already exists");
}
+ // Tell the ABFS driver to use the custom token provider instead of
defaulting to SharedKey.
+ // DynamicTemporaryAzureCredentialsProvider implements
CustomTokenProviderAdaptee, which
+ // requires auth.type=Custom to be activated.
+ hadoopConfig.set("fs.azure.account.auth.type", "Custom");
+
// then, set addition info
if (additionInfos == null) {
// if addition info is null, it also means we have not received
any token,
diff --git
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AbfsFileSystemBehaviorITCase.java
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AbfsFileSystemBehaviorITCase.java
index 6ea045436..e2f163288 100644
---
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AbfsFileSystemBehaviorITCase.java
+++
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AbfsFileSystemBehaviorITCase.java
@@ -39,7 +39,7 @@ class AbfsFileSystemBehaviorITCase extends
FileSystemBehaviorTestSuite {
private static final String AZURE_ACCOUNT_KEY = "ZmFrZS1rZXkK";
private static final String ENDPOINT_PREFIX = "http://localhost:";
- public static final String ABFS_FS_PATH =
"abfs://[email protected]/test";
+ public static final String ABFS_FS_PATH =
"abfs://[email protected]/test";
private static MockAuthServer mockAuthServer;
diff --git
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AbfsFileSystemDelegationTokenITCase.java
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AbfsFileSystemDelegationTokenITCase.java
new file mode 100644
index 000000000..453121dc0
--- /dev/null
+++
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AbfsFileSystemDelegationTokenITCase.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FileSystemBehaviorTestSuite;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.fs.azure.token.AbfsDelegationTokenReceiver;
+import org.apache.fluss.fs.azure.token.AzureDelegationTokenReceiver;
+import org.apache.fluss.fs.token.Credentials;
+import org.apache.fluss.fs.token.CredentialsJsonSerde;
+import org.apache.fluss.fs.token.ObtainedSecurityToken;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests that validate the Azure File System Plugin initializes correctly via
the delegation token
+ * path — i.e., when no {@code fs.azure.account.key} is configured and the
client receives an OAuth
+ * token from the server instead. This is the code path exercised by Fluss
clients reading KV
+ * snapshots from remote Azure storage.
+ */
+class AbfsFileSystemDelegationTokenITCase extends FileSystemBehaviorTestSuite {
+
+ private static final String ABFS_FS_PATH =
"abfs://[email protected]/test";
+
+ @BeforeAll
+ static void setup() throws Exception {
+ // Simulate the client receiving a delegation token from the server.
+ // No fs.azure.account.key is set — this is the client-side path.
+ Credentials credentials = new Credentials(null, null,
"fake-oauth-access-token");
+ Map<String, String> additionInfos = new HashMap<>();
+ additionInfos.put(
+ AzureFileSystemOptions.ENDPOINT_KEY.key(),
+ "https://login.microsoftonline.com/fake-tenant/oauth2/token");
+ ObtainedSecurityToken token =
+ new ObtainedSecurityToken(
+ "abfs",
+ CredentialsJsonSerde.toJson(credentials),
+ System.currentTimeMillis() + 3_600_000L,
+ additionInfos);
+ new AbfsDelegationTokenReceiver().onNewTokensObtained(token);
+
+ // Initialize without account key.
+ FileSystem.initialize(new Configuration(), null);
+ }
+
+ @AfterAll
+ static void tearDown() throws Exception {
+ // Reset static token state so other tests in the same JVM are not
affected.
+ FieldUtils.writeStaticField(
+ AzureDelegationTokenReceiver.class, "additionInfos", null,
true);
+ FieldUtils.writeStaticField(AzureDelegationTokenReceiver.class,
"credentials", null, true);
+ FieldUtils.writeStaticField(AzureDelegationTokenReceiver.class,
"validUntil", null, true);
+ }
+
+ @Override
+ protected FileSystem getFileSystem() throws IOException {
+ return getBasePath().getFileSystem();
+ }
+
+ @Override
+ protected FsPath getBasePath() throws IOException {
+ FsPath fsPath = new FsPath(ABFS_FS_PATH);
+ applyMockStorage(fsPath.getFileSystem());
+ return fsPath;
+ }
+
+ private static void applyMockStorage(FileSystem fileSystem) throws
IOException {
+ try {
+ MemoryFileSystem memoryFileSystem = new
MemoryFileSystem(URI.create(ABFS_FS_PATH));
+ FieldUtils.writeField(fileSystem, "fs", memoryFileSystem, true);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AzureFileSystemPluginTest.java
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AzureFileSystemPluginTest.java
index 4a25bb4bd..6bf66c6cc 100644
---
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AzureFileSystemPluginTest.java
+++
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AzureFileSystemPluginTest.java
@@ -25,6 +25,8 @@ import org.apache.fluss.fs.token.ObtainedSecurityToken;
import org.junit.jupiter.api.Test;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
@@ -97,7 +99,10 @@ public class AzureFileSystemPluginTest {
try {
plugin.create(uri, flussConfig);
} catch (Exception e) {
- // expected or ignored
+ // If the plugin creation fails, it must NOT fail with an "init
configuration" error.
+ StringWriter sw = new StringWriter();
+ e.printStackTrace(new PrintWriter(sw));
+ assertThat(sw.toString()).doesNotContain("Failure to initialize
configuration");
}
}
diff --git
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiverTest.java
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiverTest.java
index c17ebc6c0..f839b90a5 100644
---
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiverTest.java
+++
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiverTest.java
@@ -61,6 +61,7 @@ class AzureDelegationTokenReceiverTest {
AzureDelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
assertThat(hadoopConfiguration.get(PROVIDER_CONFIG_NAME.key()))
.isEqualTo(DynamicTemporaryAzureCredentialsProvider.NAME);
+
assertThat(hadoopConfiguration.get("fs.azure.account.auth.type")).isEqualTo("Custom");
}
@Test
@@ -73,6 +74,7 @@ class AzureDelegationTokenReceiverTest {
assertThat(providers.length).isEqualTo(2);
assertThat(providers[0]).isEqualTo(DynamicTemporaryAzureCredentialsProvider.NAME);
assertThat(providers[1]).isEqualTo(PROVIDER_CLASS_NAME);
+
assertThat(hadoopConfiguration.get("fs.azure.account.auth.type")).isEqualTo("Custom");
}
@Test
@@ -84,5 +86,6 @@ class AzureDelegationTokenReceiverTest {
AzureDelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
assertThat(hadoopConfiguration.get(PROVIDER_CONFIG_NAME.key()))
.isEqualTo(DynamicTemporaryAzureCredentialsProvider.NAME);
+
assertThat(hadoopConfiguration.get("fs.azure.account.auth.type")).isEqualTo("Custom");
}
}
diff --git a/website/docs/maintenance/filesystems/azure.md
b/website/docs/maintenance/filesystems/azure.md
index 4570d9b29..8fc1c1852 100644
--- a/website/docs/maintenance/filesystems/azure.md
+++ b/website/docs/maintenance/filesystems/azure.md
@@ -29,7 +29,7 @@ To enabled Azure Blob Storage as remote storage, there are
some required configu
```yaml
# The dir that used to be as the remote storage of Fluss, use the Azure Data
Lake Storage URI
-remote.data.dir: abfs://[email protected]/path
+remote.data.dir: abfs://[email protected]/path
# the access key for the azure blob storage account
fs.azure.account.key: 09a295d5-3da5-4435-a660-f438b331ade8
# The oauth account provider type for Token-based Authentication