This is an automated email from the ASF dual-hosted git repository.

ipolyzos pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 596efc8c2 [fs/azure] Set auth.type=Custom for token delegation (#2947)
596efc8c2 is described below

commit 596efc8c2423728136eae41e405213b56378c114
Author: Lorenzo Affetti <[email protected]>
AuthorDate: Tue Mar 31 11:56:00 2026 +0200

    [fs/azure] Set auth.type=Custom for token delegation (#2947)
    
    * [fs/azure] Set auth.type=Custom for token delegation
    
    * address
---
 .../azure/token/AzureDelegationTokenReceiver.java  |  5 ++
 .../fs/azure/AbfsFileSystemBehaviorITCase.java     |  2 +-
 .../azure/AbfsFileSystemDelegationTokenITCase.java | 99 ++++++++++++++++++++++
 .../fluss/fs/azure/AzureFileSystemPluginTest.java  |  7 +-
 .../token/AzureDelegationTokenReceiverTest.java    |  3 +
 website/docs/maintenance/filesystems/azure.md      |  2 +-
 6 files changed, 115 insertions(+), 3 deletions(-)

diff --git 
a/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiver.java
 
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiver.java
index f59444e83..cf9fb43b3 100644
--- 
a/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiver.java
+++ 
b/fluss-filesystems/fluss-fs-azure/src/main/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiver.java
@@ -56,6 +56,11 @@ public abstract class AzureDelegationTokenReceiver 
implements SecurityTokenRecei
             LOG.debug("Provider already exists");
         }
 
+        // Tell the ABFS driver to use the custom token provider instead of 
defaulting to SharedKey.
+        // DynamicTemporaryAzureCredentialsProvider implements 
CustomTokenProviderAdaptee, which
+        // requires auth.type=Custom to be activated.
+        hadoopConfig.set("fs.azure.account.auth.type", "Custom");
+
         // then, set addition info
         if (additionInfos == null) {
             // if addition info is null, it also means we have not received 
any token,
diff --git 
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AbfsFileSystemBehaviorITCase.java
 
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AbfsFileSystemBehaviorITCase.java
index 6ea045436..e2f163288 100644
--- 
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AbfsFileSystemBehaviorITCase.java
+++ 
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AbfsFileSystemBehaviorITCase.java
@@ -39,7 +39,7 @@ class AbfsFileSystemBehaviorITCase extends 
FileSystemBehaviorTestSuite {
 
     private static final String AZURE_ACCOUNT_KEY = "ZmFrZS1rZXkK";
     private static final String ENDPOINT_PREFIX = "http://localhost:";;
-    public static final String ABFS_FS_PATH = 
"abfs://[email protected]/test";
+    public static final String ABFS_FS_PATH = 
"abfs://[email protected]/test";
 
     private static MockAuthServer mockAuthServer;
 
diff --git 
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AbfsFileSystemDelegationTokenITCase.java
 
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AbfsFileSystemDelegationTokenITCase.java
new file mode 100644
index 000000000..453121dc0
--- /dev/null
+++ 
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AbfsFileSystemDelegationTokenITCase.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.fs.azure;
+
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.fs.FileSystemBehaviorTestSuite;
+import org.apache.fluss.fs.FsPath;
+import org.apache.fluss.fs.azure.token.AbfsDelegationTokenReceiver;
+import org.apache.fluss.fs.azure.token.AzureDelegationTokenReceiver;
+import org.apache.fluss.fs.token.Credentials;
+import org.apache.fluss.fs.token.CredentialsJsonSerde;
+import org.apache.fluss.fs.token.ObtainedSecurityToken;
+
+import org.apache.commons.lang3.reflect.FieldUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests that validate the Azure File System Plugin initializes correctly via 
the delegation token
+ * path — i.e., when no {@code fs.azure.account.key} is configured and the 
client receives an OAuth
+ * token from the server instead. This is the code path exercised by Fluss 
clients reading KV
+ * snapshots from remote Azure storage.
+ */
+class AbfsFileSystemDelegationTokenITCase extends FileSystemBehaviorTestSuite {
+
+    private static final String ABFS_FS_PATH = 
"abfs://[email protected]/test";
+
+    @BeforeAll
+    static void setup() throws Exception {
+        // Simulate the client receiving a delegation token from the server.
+        // No fs.azure.account.key is set — this is the client-side path.
+        Credentials credentials = new Credentials(null, null, 
"fake-oauth-access-token");
+        Map<String, String> additionInfos = new HashMap<>();
+        additionInfos.put(
+                AzureFileSystemOptions.ENDPOINT_KEY.key(),
+                "https://login.microsoftonline.com/fake-tenant/oauth2/token";);
+        ObtainedSecurityToken token =
+                new ObtainedSecurityToken(
+                        "abfs",
+                        CredentialsJsonSerde.toJson(credentials),
+                        System.currentTimeMillis() + 3_600_000L,
+                        additionInfos);
+        new AbfsDelegationTokenReceiver().onNewTokensObtained(token);
+
+        // Initialize without account key.
+        FileSystem.initialize(new Configuration(), null);
+    }
+
+    @AfterAll
+    static void tearDown() throws Exception {
+        // Reset static token state so other tests in the same JVM are not 
affected.
+        FieldUtils.writeStaticField(
+                AzureDelegationTokenReceiver.class, "additionInfos", null, 
true);
+        FieldUtils.writeStaticField(AzureDelegationTokenReceiver.class, 
"credentials", null, true);
+        FieldUtils.writeStaticField(AzureDelegationTokenReceiver.class, 
"validUntil", null, true);
+    }
+
+    @Override
+    protected FileSystem getFileSystem() throws IOException {
+        return getBasePath().getFileSystem();
+    }
+
+    @Override
+    protected FsPath getBasePath() throws IOException {
+        FsPath fsPath = new FsPath(ABFS_FS_PATH);
+        applyMockStorage(fsPath.getFileSystem());
+        return fsPath;
+    }
+
+    private static void applyMockStorage(FileSystem fileSystem) throws 
IOException {
+        try {
+            MemoryFileSystem memoryFileSystem = new 
MemoryFileSystem(URI.create(ABFS_FS_PATH));
+            FieldUtils.writeField(fileSystem, "fs", memoryFileSystem, true);
+        } catch (IllegalAccessException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git 
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AzureFileSystemPluginTest.java
 
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AzureFileSystemPluginTest.java
index 4a25bb4bd..6bf66c6cc 100644
--- 
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AzureFileSystemPluginTest.java
+++ 
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/AzureFileSystemPluginTest.java
@@ -25,6 +25,8 @@ import org.apache.fluss.fs.token.ObtainedSecurityToken;
 
 import org.junit.jupiter.api.Test;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
@@ -97,7 +99,10 @@ public class AzureFileSystemPluginTest {
         try {
             plugin.create(uri, flussConfig);
         } catch (Exception e) {
-            // expected or ignored
+            // If the plugin creation fails, it must NOT fail with an "init 
configuration" error.
+            StringWriter sw = new StringWriter();
+            e.printStackTrace(new PrintWriter(sw));
+            assertThat(sw.toString()).doesNotContain("Failure to initialize 
configuration");
         }
     }
 
diff --git 
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiverTest.java
 
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiverTest.java
index c17ebc6c0..f839b90a5 100644
--- 
a/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiverTest.java
+++ 
b/fluss-filesystems/fluss-fs-azure/src/test/java/org/apache/fluss/fs/azure/token/AzureDelegationTokenReceiverTest.java
@@ -61,6 +61,7 @@ class AzureDelegationTokenReceiverTest {
         AzureDelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
         assertThat(hadoopConfiguration.get(PROVIDER_CONFIG_NAME.key()))
                 .isEqualTo(DynamicTemporaryAzureCredentialsProvider.NAME);
+        
assertThat(hadoopConfiguration.get("fs.azure.account.auth.type")).isEqualTo("Custom");
     }
 
     @Test
@@ -73,6 +74,7 @@ class AzureDelegationTokenReceiverTest {
         assertThat(providers.length).isEqualTo(2);
         
assertThat(providers[0]).isEqualTo(DynamicTemporaryAzureCredentialsProvider.NAME);
         assertThat(providers[1]).isEqualTo(PROVIDER_CLASS_NAME);
+        
assertThat(hadoopConfiguration.get("fs.azure.account.auth.type")).isEqualTo("Custom");
     }
 
     @Test
@@ -84,5 +86,6 @@ class AzureDelegationTokenReceiverTest {
         AzureDelegationTokenReceiver.updateHadoopConfig(hadoopConfiguration);
         assertThat(hadoopConfiguration.get(PROVIDER_CONFIG_NAME.key()))
                 .isEqualTo(DynamicTemporaryAzureCredentialsProvider.NAME);
+        
assertThat(hadoopConfiguration.get("fs.azure.account.auth.type")).isEqualTo("Custom");
     }
 }
diff --git a/website/docs/maintenance/filesystems/azure.md 
b/website/docs/maintenance/filesystems/azure.md
index 4570d9b29..8fc1c1852 100644
--- a/website/docs/maintenance/filesystems/azure.md
+++ b/website/docs/maintenance/filesystems/azure.md
@@ -29,7 +29,7 @@ To enabled Azure Blob Storage as remote storage, there are 
some required configu
 
 ```yaml
 # The dir that used to be as the remote storage of Fluss, use the Azure Data 
Lake Storage URI
-remote.data.dir: abfs://[email protected]/path
+remote.data.dir: abfs://[email protected]/path
 # the access key for the azure blob storage account
 fs.azure.account.key: 09a295d5-3da5-4435-a660-f438b331ade8
 # The oauth account provider type for Token-based Authentication

Reply via email to