mbutrovich commented on code in PR #4783:
URL: https://github.com/apache/datafusion-comet/pull/4783#discussion_r3508600343


##########
native/core/src/parquet/parquet_support.rs:
##########
@@ -434,6 +434,14 @@ pub fn is_hdfs_scheme(url: &Url, object_store_configs: 
&HashMap<String, String>)
     }
 }
 
+/// Check if the scheme is an Azure ABFS / WASB / other legacy schemes.
+fn is_azure_scheme(scheme: &str) -> bool {
+    matches!(
+        scheme,
+        "abfs" | "abfss" | "wasb" | "wasbs" | "az" | "azure" | "adl"

Review Comment:
   `is_azure_scheme` includes `wasb` and `wasbs`, but `object_store` 0.13.2 
does not recognize `wasb` anywhere. `ObjectStoreScheme::parse` only matches `az 
| adl | azure | abfs | abfss` for Azure 
([parse.rs](https://docs.rs/object_store/0.13.2/src/object_store/parse.rs.html)),
 so `create_store` returns `Scheme of URL is not Azure` on its first line for 
any `wasb[s]://` URL. This is not a regression since the old `parse_url` path 
failed the same way, but the PR advertises support it does not deliver. Could 
we drop `wasb`/`wasbs` here (and the WASB mention in the module doc at the top 
of `azure.rs`), or note them as unsupported by the current `object_store`?



##########
native/core/src/parquet/objectstore/azure.rs:
##########
@@ -0,0 +1,442 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Construct a `MicrosoftAzure` object store from a Hadoop ABFS/WASB 
configuration.
+//!
+//! Comet's native scans run outside the JVM, so they bypass Hadoop's
+//! `AzureBlobFileSystem` driver entirely. This module bridges the gap by 
translating the
+//! Hadoop `fs.azure.*` configuration namespace (the same keys users already 
put in
+//! `core-site.xml` or `spark.hadoop.*`) into the `object_store` crate's 
`AzureConfigKey`
+//! options, then layering them on top of any AZURE_* environment variables 
that AKS's
+//! Workload Identity webhook injects.
+//!
+//! Supported authentication, in the priority order applied to the builder:
+//!
+//! 1. `MicrosoftAzureBuilder::from_env()` — picks up `AZURE_CLIENT_ID`, 
`AZURE_TENANT_ID`,
+//!    `AZURE_FEDERATED_TOKEN_FILE`, `AZURE_AUTHORITY_HOST`, 
`AZURE_STORAGE_*`, etc.
+//!    This is what makes Workload Identity work out of the box in AKS pods.
+//! 2. Account-scoped Hadoop keys 
(`fs.azure.account.X.<account>.dfs.core.windows.net`).
+//! 3. Global Hadoop keys (`fs.azure.account.X`).
+//!
+//! Items 2 and 3 are forwarded via `MicrosoftAzureBuilder::with_config`, 
which overrides
+//! whatever `from_env()` produced. The account-scoped variant wins over the 
global one,
+//! mirroring Hadoop ABFS's own `AbfsConfiguration` precedence.
+//!
+//! The translated keys cover the auth schemes that ABFS users actually 
configure:
+//!
+//! | Hadoop key (account-scoped suffix omitted)             | 
`AzureConfigKey`       |
+//! | ------------------------------------------------------- | 
---------------------- |
+//! | `fs.azure.account.key`                                   | `AccessKey`   
         |
+//! | `fs.azure.account.oauth2.client.id`                      | `ClientId`    
         |
+//! | `fs.azure.account.oauth2.client.secret`                  | 
`ClientSecret`         |
+//! | `fs.azure.account.oauth2.client.endpoint`                | `AuthorityId` 
(from URL) |
+//! | `fs.azure.account.oauth2.msi.tenant`                     | `AuthorityId` 
         |
+//! | `fs.azure.account.oauth2.msi.endpoint`                   | `MsiEndpoint` 
         |
+//! | `fs.azure.account.oauth2.msi.authority`                  | 
`AuthorityHost`        |
+//! | `fs.azure.account.oauth2.token.file`                     | 
`FederatedTokenFile`   |
+//! | `fs.azure.sas.<container>.<account>`                     | `SasKey`      
         |
+//!
+//! Anything beyond these falls through to whatever `from_env()` or the URL 
itself provided.
+
+use log::debug;
+use std::collections::HashMap;
+use url::Url;
+
+use object_store::{
+    azure::{AzureConfigKey, MicrosoftAzureBuilder},
+    path::Path,
+    ObjectStore, ObjectStoreScheme,
+};
+
+const HADOOP_KEY: &str = "fs.azure.account.key";
+const HADOOP_OAUTH_CLIENT_ID: &str = "fs.azure.account.oauth2.client.id";
+const HADOOP_OAUTH_CLIENT_SECRET: &str = 
"fs.azure.account.oauth2.client.secret";
+const HADOOP_OAUTH_CLIENT_ENDPOINT: &str = 
"fs.azure.account.oauth2.client.endpoint";
+const HADOOP_MSI_TENANT: &str = "fs.azure.account.oauth2.msi.tenant";
+const HADOOP_MSI_ENDPOINT: &str = "fs.azure.account.oauth2.msi.endpoint";
+const HADOOP_MSI_AUTHORITY: &str = "fs.azure.account.oauth2.msi.authority";
+const HADOOP_WI_TOKEN_FILE: &str = "fs.azure.account.oauth2.token.file";
+const HADOOP_SAS_PREFIX: &str = "fs.azure.sas.";
+
+const ENDPOINT_SUFFIXES: &[&str] = &[
+    "dfs.core.windows.net",
+    "blob.core.windows.net",
+];
+
+/// Build a `MicrosoftAzure` `ObjectStore` for `url` using `configs`.
+///
+/// The returned `Path` is the URL's resource path (container-relative for 
ABFS / WASB,
+/// container+key for `az://`), suitable for direct use with 
`ObjectStore::get`.
+pub fn create_store(
+    url: &Url,
+    configs: &HashMap<String, String>,
+) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
+    let (scheme, path) = ObjectStoreScheme::parse(url)?;
+    if scheme != ObjectStoreScheme::MicrosoftAzure {
+        return Err(object_store::Error::Generic {
+            store: "MicrosoftAzure",
+            source: format!("Scheme of URL is not Azure: {url}").into(),
+        });
+    }
+    let path = Path::parse(path)?;
+
+    let account = extract_account(url);
+    let container = extract_container(url);
+
+    // Start from the environment so AKS Workload Identity (AZURE_CLIENT_ID,
+    // AZURE_TENANT_ID, AZURE_FEDERATED_TOKEN_FILE, AZURE_AUTHORITY_HOST) and 
any
+    // explicit AZURE_STORAGE_* variables are honoured without further 
configuration.
+    // `with_url` then fills in account/container from the URL itself.
+    let mut builder = 
MicrosoftAzureBuilder::from_env().with_url(url.to_string());
+
+    let translated = translate_hadoop_configs(configs, account.as_deref(), 
container.as_deref());
+    debug!(
+        "Azure configs for account={:?}, container={:?}: keys={:?}",
+        account,
+        container,
+        translated
+            .iter()
+            .map(|(k, _)| k.as_ref())
+            .collect::<Vec<_>>()
+    );
+    for (key, value) in translated {
+        builder = builder.with_config(key, value);
+    }
+
+    let store = builder.build()?;
+    Ok((Box::new(store), path))
+}
+
+/// Translate a Hadoop ABFS/WASB configuration map into `(AzureConfigKey, 
value)` pairs.
+///
+/// `account` and `container` are extracted from the URL and used to resolve 
account-scoped
+/// keys (`fs.azure.X.<account>.<endpoint-suffix>`) and the SAS namespace
+/// (`fs.azure.sas.<container>.<account>`). Account-scoped keys win over 
global ones.
+fn translate_hadoop_configs(
+    configs: &HashMap<String, String>,
+    account: Option<&str>,
+    container: Option<&str>,
+) -> Vec<(AzureConfigKey, String)> {
+    let mut out: Vec<(AzureConfigKey, String)> = Vec::new();
+
+    let mappings: &[(&str, AzureConfigKey)] = &[
+        (HADOOP_KEY, AzureConfigKey::AccessKey),
+        (HADOOP_OAUTH_CLIENT_ID, AzureConfigKey::ClientId),
+        (HADOOP_OAUTH_CLIENT_SECRET, AzureConfigKey::ClientSecret),
+        (HADOOP_MSI_TENANT, AzureConfigKey::AuthorityId),
+        (HADOOP_MSI_ENDPOINT, AzureConfigKey::MsiEndpoint),
+        (HADOOP_MSI_AUTHORITY, AzureConfigKey::AuthorityHost),
+        (HADOOP_WI_TOKEN_FILE, AzureConfigKey::FederatedTokenFile),
+    ];
+
+    for (hadoop_base, azure_key) in mappings {
+        if let Some(value) = account_scoped_value(configs, hadoop_base, 
account) {
+            out.push((*azure_key, value));
+        }
+    }
+
+    // `fs.azure.account.oauth2.client.endpoint` is a full token URL of the 
form
+    // `https://login.microsoftonline.com/<tenant>/oauth2/token`. object_store 
wants the
+    // tenant id directly (`AuthorityId`), so extract it if AuthorityId hasn't 
already
+    // been set from `fs.azure.account.oauth2.msi.tenant`.
+    let has_authority_id = out
+        .iter()
+        .any(|(k, _)| matches!(k, AzureConfigKey::AuthorityId));
+    if !has_authority_id {
+        if let Some(endpoint) = account_scoped_value(configs, 
HADOOP_OAUTH_CLIENT_ENDPOINT, account)
+        {
+            if let Some(tenant) = tenant_from_oauth_endpoint(&endpoint) {
+                out.push((AzureConfigKey::AuthorityId, tenant));
+            }
+        }
+    }
+
+    // SAS tokens are scoped to 
`fs.azure.sas.<container>.<account>[.<endpoint-suffix>]`.
+    if let (Some(container), Some(account)) = (container, account) {
+        if let Some(sas) = sas_value(configs, container, account) {
+            out.push((AzureConfigKey::SasKey, sas));
+        }
+    }
+
+    out
+}
+
+/// Look up `base_key`, preferring account-scoped variants.
+///
+/// Probes (in order): `<base>.<account>.<endpoint-suffix>`, 
`<base>.<account>`,
+/// then the unscoped `<base>`. Returns the first hit.
+fn account_scoped_value(
+    configs: &HashMap<String, String>,
+    base_key: &str,
+    account: Option<&str>,
+) -> Option<String> {
+    if let Some(acc) = account {
+        for suffix in ENDPOINT_SUFFIXES {
+            let scoped = format!("{base_key}.{acc}.{suffix}");
+            if let Some(v) = configs.get(&scoped) {
+                return Some(v.clone());
+            }
+        }
+        let bare = format!("{base_key}.{acc}");
+        if let Some(v) = configs.get(&bare) {
+            return Some(v.clone());
+        }
+    }
+    configs.get(base_key).cloned()
+}
+
+/// Resolve the SAS token for `(container, account)`, accepting any of the
+/// `fs.azure.sas.<container>.<account>[.<endpoint-suffix>]` variants.
+fn sas_value(configs: &HashMap<String, String>, container: &str, account: 
&str) -> Option<String> {
+    for suffix in ENDPOINT_SUFFIXES {
+        let key = format!("{HADOOP_SAS_PREFIX}{container}.{account}.{suffix}");
+        if let Some(v) = configs.get(&key) {
+            return Some(v.clone());
+        }
+    }
+    let bare = format!("{HADOOP_SAS_PREFIX}{container}.{account}");
+    configs.get(&bare).cloned()
+}
+
+/// Extract the storage account name from an Azure URL.
+///
+/// Handles ABFS/WASB hostnames of the form `<account>.<endpoint-suffix>` and 
the
+/// shorter `az://<account>/...` form.
+fn extract_account(url: &Url) -> Option<String> {

Review Comment:
   For `az://myacct/data/...`, `extract_account` returns `myacct` and 
`extract_container` (line 231) returns `data`. But `object_store`'s own 
`MicrosoftAzureBuilder::parse_url` treats the host as the container for these 
schemes (`adl | azure => container = host`, and `az` with an empty username 
also sets `container = host`, see 
[builder.rs](https://docs.rs/object_store/0.13.2/src/object_store/azure/builder.rs.html)).
 So the store ends up with `container = myacct, account = None` while the 
translation looks up account-scoped keys under `account = myacct, container = 
data`, and `build()` would then hit `MissingAccount`. In practice Spark and 
Hadoop only emit `abfss://[email protected]/...`, and 
`NativeConfig` only forwards configs for `abfs[s]` and `wasb[s]`, so the 
working and tested path is `abfs[s]` with the `container@account` host form. 
Would it be cleaner to scope this to `abfs[s]` and correct the 
`az://account/container` example in the module doc, which does
  not match how `object_store` interprets that URL?



##########
native/core/src/parquet/objectstore/azure.rs:
##########
@@ -0,0 +1,442 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Construct a `MicrosoftAzure` object store from a Hadoop ABFS/WASB 
configuration.
+//!
+//! Comet's native scans run outside the JVM, so they bypass Hadoop's
+//! `AzureBlobFileSystem` driver entirely. This module bridges the gap by 
translating the
+//! Hadoop `fs.azure.*` configuration namespace (the same keys users already 
put in
+//! `core-site.xml` or `spark.hadoop.*`) into the `object_store` crate's 
`AzureConfigKey`
+//! options, then layering them on top of any AZURE_* environment variables 
that AKS's
+//! Workload Identity webhook injects.
+//!
+//! Supported authentication, in the priority order applied to the builder:
+//!
+//! 1. `MicrosoftAzureBuilder::from_env()` — picks up `AZURE_CLIENT_ID`, 
`AZURE_TENANT_ID`,
+//!    `AZURE_FEDERATED_TOKEN_FILE`, `AZURE_AUTHORITY_HOST`, 
`AZURE_STORAGE_*`, etc.
+//!    This is what makes Workload Identity work out of the box in AKS pods.
+//! 2. Account-scoped Hadoop keys 
(`fs.azure.account.X.<account>.dfs.core.windows.net`).
+//! 3. Global Hadoop keys (`fs.azure.account.X`).
+//!
+//! Items 2 and 3 are forwarded via `MicrosoftAzureBuilder::with_config`, 
which overrides
+//! whatever `from_env()` produced. The account-scoped variant wins over the 
global one,
+//! mirroring Hadoop ABFS's own `AbfsConfiguration` precedence.
+//!
+//! The translated keys cover the auth schemes that ABFS users actually 
configure:
+//!
+//! | Hadoop key (account-scoped suffix omitted)             | 
`AzureConfigKey`       |
+//! | ------------------------------------------------------- | 
---------------------- |
+//! | `fs.azure.account.key`                                   | `AccessKey`   
         |
+//! | `fs.azure.account.oauth2.client.id`                      | `ClientId`    
         |
+//! | `fs.azure.account.oauth2.client.secret`                  | 
`ClientSecret`         |
+//! | `fs.azure.account.oauth2.client.endpoint`                | `AuthorityId` 
(from URL) |
+//! | `fs.azure.account.oauth2.msi.tenant`                     | `AuthorityId` 
         |
+//! | `fs.azure.account.oauth2.msi.endpoint`                   | `MsiEndpoint` 
         |
+//! | `fs.azure.account.oauth2.msi.authority`                  | 
`AuthorityHost`        |
+//! | `fs.azure.account.oauth2.token.file`                     | 
`FederatedTokenFile`   |
+//! | `fs.azure.sas.<container>.<account>`                     | `SasKey`      
         |
+//!
+//! Anything beyond these falls through to whatever `from_env()` or the URL 
itself provided.
+
+use log::debug;
+use std::collections::HashMap;
+use url::Url;
+
+use object_store::{
+    azure::{AzureConfigKey, MicrosoftAzureBuilder},
+    path::Path,
+    ObjectStore, ObjectStoreScheme,
+};
+
+const HADOOP_KEY: &str = "fs.azure.account.key";
+const HADOOP_OAUTH_CLIENT_ID: &str = "fs.azure.account.oauth2.client.id";
+const HADOOP_OAUTH_CLIENT_SECRET: &str = 
"fs.azure.account.oauth2.client.secret";
+const HADOOP_OAUTH_CLIENT_ENDPOINT: &str = 
"fs.azure.account.oauth2.client.endpoint";
+const HADOOP_MSI_TENANT: &str = "fs.azure.account.oauth2.msi.tenant";
+const HADOOP_MSI_ENDPOINT: &str = "fs.azure.account.oauth2.msi.endpoint";
+const HADOOP_MSI_AUTHORITY: &str = "fs.azure.account.oauth2.msi.authority";
+const HADOOP_WI_TOKEN_FILE: &str = "fs.azure.account.oauth2.token.file";

Review Comment:
   This maps `fs.azure.account.oauth2.token.file` to `FederatedTokenFile` (used 
in the mapping table at line 143), and the Scala test uses that key. The issue 
repro, though, used `fs.azure.account.oauth2.client.federated.token.file`. 
Those are different strings. If the canonical Hadoop 
`WorkloadIdentityTokenProvider` key is the longer one, the account-scoped 
lookup silently misses and Workload Identity breaks, which is the exact failure 
this PR targets. The `client.id` + `msi.tenant` + `token.file` triple looks 
right to me, but could you confirm the exact constant against Hadoop's 
`org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys` and the [ABFS OAuth 
docs](https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html)? This is 
the one mapping I would most like nailed down since it is central to the 
reported bug.



##########
native/core/src/parquet/objectstore/azure.rs:
##########
@@ -0,0 +1,442 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Construct a `MicrosoftAzure` object store from a Hadoop ABFS/WASB 
configuration.
+//!
+//! Comet's native scans run outside the JVM, so they bypass Hadoop's
+//! `AzureBlobFileSystem` driver entirely. This module bridges the gap by 
translating the
+//! Hadoop `fs.azure.*` configuration namespace (the same keys users already 
put in
+//! `core-site.xml` or `spark.hadoop.*`) into the `object_store` crate's 
`AzureConfigKey`
+//! options, then layering them on top of any AZURE_* environment variables 
that AKS's
+//! Workload Identity webhook injects.
+//!
+//! Supported authentication, in the priority order applied to the builder:
+//!
+//! 1. `MicrosoftAzureBuilder::from_env()` — picks up `AZURE_CLIENT_ID`, 
`AZURE_TENANT_ID`,
+//!    `AZURE_FEDERATED_TOKEN_FILE`, `AZURE_AUTHORITY_HOST`, 
`AZURE_STORAGE_*`, etc.
+//!    This is what makes Workload Identity work out of the box in AKS pods.
+//! 2. Account-scoped Hadoop keys 
(`fs.azure.account.X.<account>.dfs.core.windows.net`).
+//! 3. Global Hadoop keys (`fs.azure.account.X`).
+//!
+//! Items 2 and 3 are forwarded via `MicrosoftAzureBuilder::with_config`, 
which overrides
+//! whatever `from_env()` produced. The account-scoped variant wins over the 
global one,
+//! mirroring Hadoop ABFS's own `AbfsConfiguration` precedence.
+//!
+//! The translated keys cover the auth schemes that ABFS users actually 
configure:
+//!
+//! | Hadoop key (account-scoped suffix omitted)             | 
`AzureConfigKey`       |
+//! | ------------------------------------------------------- | 
---------------------- |
+//! | `fs.azure.account.key`                                   | `AccessKey`   
         |
+//! | `fs.azure.account.oauth2.client.id`                      | `ClientId`    
         |
+//! | `fs.azure.account.oauth2.client.secret`                  | 
`ClientSecret`         |
+//! | `fs.azure.account.oauth2.client.endpoint`                | `AuthorityId` 
(from URL) |
+//! | `fs.azure.account.oauth2.msi.tenant`                     | `AuthorityId` 
         |
+//! | `fs.azure.account.oauth2.msi.endpoint`                   | `MsiEndpoint` 
         |
+//! | `fs.azure.account.oauth2.msi.authority`                  | 
`AuthorityHost`        |
+//! | `fs.azure.account.oauth2.token.file`                     | 
`FederatedTokenFile`   |
+//! | `fs.azure.sas.<container>.<account>`                     | `SasKey`      
         |
+//!
+//! Anything beyond these falls through to whatever `from_env()` or the URL 
itself provided.
+
+use log::debug;
+use std::collections::HashMap;
+use url::Url;
+
+use object_store::{
+    azure::{AzureConfigKey, MicrosoftAzureBuilder},
+    path::Path,
+    ObjectStore, ObjectStoreScheme,
+};
+
+const HADOOP_KEY: &str = "fs.azure.account.key";
+const HADOOP_OAUTH_CLIENT_ID: &str = "fs.azure.account.oauth2.client.id";
+const HADOOP_OAUTH_CLIENT_SECRET: &str = 
"fs.azure.account.oauth2.client.secret";
+const HADOOP_OAUTH_CLIENT_ENDPOINT: &str = 
"fs.azure.account.oauth2.client.endpoint";
+const HADOOP_MSI_TENANT: &str = "fs.azure.account.oauth2.msi.tenant";
+const HADOOP_MSI_ENDPOINT: &str = "fs.azure.account.oauth2.msi.endpoint";
+const HADOOP_MSI_AUTHORITY: &str = "fs.azure.account.oauth2.msi.authority";
+const HADOOP_WI_TOKEN_FILE: &str = "fs.azure.account.oauth2.token.file";
+const HADOOP_SAS_PREFIX: &str = "fs.azure.sas.";
+
+const ENDPOINT_SUFFIXES: &[&str] = &[
+    "dfs.core.windows.net",
+    "blob.core.windows.net",
+];
+
+/// Build a `MicrosoftAzure` `ObjectStore` for `url` using `configs`.
+///
+/// The returned `Path` is the URL's resource path (container-relative for 
ABFS / WASB,
+/// container+key for `az://`), suitable for direct use with 
`ObjectStore::get`.
+pub fn create_store(
+    url: &Url,
+    configs: &HashMap<String, String>,
+) -> Result<(Box<dyn ObjectStore>, Path), object_store::Error> {
+    let (scheme, path) = ObjectStoreScheme::parse(url)?;
+    if scheme != ObjectStoreScheme::MicrosoftAzure {
+        return Err(object_store::Error::Generic {
+            store: "MicrosoftAzure",
+            source: format!("Scheme of URL is not Azure: {url}").into(),
+        });
+    }
+    let path = Path::parse(path)?;
+
+    let account = extract_account(url);
+    let container = extract_container(url);
+
+    // Start from the environment so AKS Workload Identity (AZURE_CLIENT_ID,
+    // AZURE_TENANT_ID, AZURE_FEDERATED_TOKEN_FILE, AZURE_AUTHORITY_HOST) and 
any
+    // explicit AZURE_STORAGE_* variables are honoured without further 
configuration.
+    // `with_url` then fills in account/container from the URL itself.
+    let mut builder = 
MicrosoftAzureBuilder::from_env().with_url(url.to_string());
+
+    let translated = translate_hadoop_configs(configs, account.as_deref(), 
container.as_deref());
+    debug!(
+        "Azure configs for account={:?}, container={:?}: keys={:?}",
+        account,
+        container,
+        translated
+            .iter()
+            .map(|(k, _)| k.as_ref())
+            .collect::<Vec<_>>()
+    );
+    for (key, value) in translated {
+        builder = builder.with_config(key, value);
+    }
+
+    let store = builder.build()?;
+    Ok((Box::new(store), path))
+}
+
+/// Translate a Hadoop ABFS/WASB configuration map into `(AzureConfigKey, 
value)` pairs.
+///
+/// `account` and `container` are extracted from the URL and used to resolve 
account-scoped
+/// keys (`fs.azure.X.<account>.<endpoint-suffix>`) and the SAS namespace
+/// (`fs.azure.sas.<container>.<account>`). Account-scoped keys win over 
global ones.
+fn translate_hadoop_configs(
+    configs: &HashMap<String, String>,
+    account: Option<&str>,
+    container: Option<&str>,
+) -> Vec<(AzureConfigKey, String)> {
+    let mut out: Vec<(AzureConfigKey, String)> = Vec::new();
+
+    let mappings: &[(&str, AzureConfigKey)] = &[
+        (HADOOP_KEY, AzureConfigKey::AccessKey),
+        (HADOOP_OAUTH_CLIENT_ID, AzureConfigKey::ClientId),
+        (HADOOP_OAUTH_CLIENT_SECRET, AzureConfigKey::ClientSecret),
+        (HADOOP_MSI_TENANT, AzureConfigKey::AuthorityId),
+        (HADOOP_MSI_ENDPOINT, AzureConfigKey::MsiEndpoint),
+        (HADOOP_MSI_AUTHORITY, AzureConfigKey::AuthorityHost),
+        (HADOOP_WI_TOKEN_FILE, AzureConfigKey::FederatedTokenFile),
+    ];
+
+    for (hadoop_base, azure_key) in mappings {
+        if let Some(value) = account_scoped_value(configs, hadoop_base, 
account) {
+            out.push((*azure_key, value));
+        }
+    }
+
+    // `fs.azure.account.oauth2.client.endpoint` is a full token URL of the 
form
+    // `https://login.microsoftonline.com/<tenant>/oauth2/token`. object_store 
wants the
+    // tenant id directly (`AuthorityId`), so extract it if AuthorityId hasn't 
already
+    // been set from `fs.azure.account.oauth2.msi.tenant`.
+    let has_authority_id = out
+        .iter()
+        .any(|(k, _)| matches!(k, AzureConfigKey::AuthorityId));
+    if !has_authority_id {
+        if let Some(endpoint) = account_scoped_value(configs, 
HADOOP_OAUTH_CLIENT_ENDPOINT, account)
+        {
+            if let Some(tenant) = tenant_from_oauth_endpoint(&endpoint) {
+                out.push((AzureConfigKey::AuthorityId, tenant));
+            }
+        }
+    }
+
+    // SAS tokens are scoped to 
`fs.azure.sas.<container>.<account>[.<endpoint-suffix>]`.
+    if let (Some(container), Some(account)) = (container, account) {
+        if let Some(sas) = sas_value(configs, container, account) {
+            out.push((AzureConfigKey::SasKey, sas));
+        }
+    }
+
+    out
+}
+
+/// Look up `base_key`, preferring account-scoped variants.
+///
+/// Probes (in order): `<base>.<account>.<endpoint-suffix>`, 
`<base>.<account>`,
+/// then the unscoped `<base>`. Returns the first hit.
+fn account_scoped_value(
+    configs: &HashMap<String, String>,
+    base_key: &str,
+    account: Option<&str>,
+) -> Option<String> {
+    if let Some(acc) = account {
+        for suffix in ENDPOINT_SUFFIXES {
+            let scoped = format!("{base_key}.{acc}.{suffix}");
+            if let Some(v) = configs.get(&scoped) {
+                return Some(v.clone());
+            }
+        }
+        let bare = format!("{base_key}.{acc}");
+        if let Some(v) = configs.get(&bare) {
+            return Some(v.clone());
+        }
+    }
+    configs.get(base_key).cloned()
+}
+
+/// Resolve the SAS token for `(container, account)`, accepting any of the
+/// `fs.azure.sas.<container>.<account>[.<endpoint-suffix>]` variants.
+fn sas_value(configs: &HashMap<String, String>, container: &str, account: 
&str) -> Option<String> {
+    for suffix in ENDPOINT_SUFFIXES {
+        let key = format!("{HADOOP_SAS_PREFIX}{container}.{account}.{suffix}");
+        if let Some(v) = configs.get(&key) {
+            return Some(v.clone());
+        }
+    }
+    let bare = format!("{HADOOP_SAS_PREFIX}{container}.{account}");
+    configs.get(&bare).cloned()
+}
+
+/// Extract the storage account name from an Azure URL.
+///
+/// Handles ABFS/WASB hostnames of the form `<account>.<endpoint-suffix>` and 
the
+/// shorter `az://<account>/...` form.
+fn extract_account(url: &Url) -> Option<String> {
+    let host = url.host_str()?;
+    match url.scheme() {
+        "az" | "azure" | "adl" => Some(host.to_string()),
+        _ => host.split('.').next().map(str::to_string),
+    }
+}
+
+/// Extract the container name from an Azure URL.
+///
+/// ABFS/WASB encode the container as the URL user-info 
(`[email protected]...`);
+/// `az://account/container/...` encodes it as the first path segment.
+fn extract_container(url: &Url) -> Option<String> {
+    let user = url.username();
+    if !user.is_empty() {
+        return Some(user.to_string());
+    }
+    match url.scheme() {
+        "az" | "azure" | "adl" => url
+            .path_segments()
+            .and_then(|mut segs| segs.next())
+            .filter(|s| !s.is_empty())
+            .map(str::to_string),
+        _ => None,
+    }
+}
+
+/// Pull the tenant id out of an OAuth token endpoint like
+/// `https://login.microsoftonline.com/<tenant>/oauth2/token`.
+fn tenant_from_oauth_endpoint(endpoint: &str) -> Option<String> {
+    let parsed = Url::parse(endpoint).ok()?;
+    let mut segments = parsed.path_segments()?;
+    let tenant = segments.next()?;
+    if tenant.is_empty() {
+        return None;
+    }
+    Some(tenant.to_string())
+}
+
+#[cfg(test)]
+mod tests {

Review Comment:
   The unit coverage for the translation logic is good. A couple of additions 
would help lock in the behavior above. A build-time test for the `az://` form, 
and one for `wasbs://` if it stays in scope, to document what actually happens. 
The `from_env()` fallback path is not exercised yet either, only the 
explicit-config path.
   
   For end-to-end coverage, note this is the native Parquet scan path that goes 
through `object_store` directly, not the Iceberg path, which reads through 
iceberg-rust and opendal. So the analog to aim for is `ParquetReadFromS3Suite` 
on `CometS3TestBase`, which spins up a MinIO Testcontainer, rather than 
`IcebergReadFromS3Suite`. The Azure counterpart to MinIO is 
[Azurite](https://learn.microsoft.com/en-us/azure/storage/common/storage-use-azurite),
 the official Azure Storage emulator, which `object_store` supports through its 
`UseEmulator` config key. A Testcontainers-based `ParquetReadFromAzureSuite` on 
an equivalent `CometAzureTestBase` would give us real coverage for the 
shared-key and SAS paths. Azurite does not emulate Entra ID, so it would not 
cover the Workload Identity token exchange, but it would validate the store 
wiring and key translation, which is most of the surface area here. No need to 
land that in this PR, but it would be a great follow-up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to