This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new 5dc1c5a  Supporting CPK (Customer Provided Keys) in Azure Blob Storage 
requests (#742)
5dc1c5a is described below

commit 5dc1c5a1c327553bcaca58f17cbe0db6cd908410
Author: Braedon Wooding <[email protected]>
AuthorDate: Thu Jun 18 03:13:33 2026 +1000

    Supporting CPK (Customer Provided Keys) in Azure Blob Storage requests 
(#742)
    
    * Supporting CPK (Customer Provided Keys) in Azure Blob Storage requests
    
    * Fix Azure CPK copy source authorization
    
    * Apply #[cfg(feature = reqwest)] to tests
    
    ---------
    
    Co-authored-by: Kevin Liu <[email protected]>
---
 src/azure/builder.rs    | 105 ++++++++-
 src/azure/client.rs     | 579 +++++++++++++++++++++++++++++++++++++++++++++---
 src/azure/credential.rs |  30 ++-
 src/azure/mod.rs        | 103 +++++++++
 src/client/builder.rs   |  37 +++-
 5 files changed, 813 insertions(+), 41 deletions(-)

diff --git a/src/azure/builder.rs b/src/azure/builder.rs
index 4ef95cb..64a44b8 100644
--- a/src/azure/builder.rs
+++ b/src/azure/builder.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::azure::client::{AzureClient, AzureConfig};
+use crate::azure::client::{AzureClient, AzureConfig, AzureEncryptionHeaders};
 use crate::azure::credential::{
     AzureAccessKey, AzureCliCredential, ClientSecretOAuthProvider, 
FabricTokenOAuthProvider,
     ImdsManagedIdentityProvider, WorkloadIdentityOAuthProvider,
@@ -85,6 +85,11 @@ enum Error {
     #[error("Missing component in SAS query pair")]
     MissingSasComponent {},
 
+    #[error("Invalid encryption key: {source}")]
+    InvalidEncryptionKey {
+        source: Box<dyn std::error::Error + Send + Sync + 'static>,
+    },
+
     #[error("Configuration key: '{}' is not known.", key)]
     UnknownConfigurationKey { key: String },
 }
@@ -178,6 +183,8 @@ pub struct MicrosoftAzureBuilder {
     fabric_session_token: Option<String>,
     /// Fabric cluster identifier
     fabric_cluster_identifier: Option<String>,
+    /// Base64-encoded 256-bit customer-provided encryption key
+    encryption_key: Option<String>,
     /// The [`HttpConnector`] to use
     http_connector: Option<Arc<dyn HttpConnector>>,
 }
@@ -380,6 +387,13 @@ pub enum AzureConfigKey {
     /// - `fabric_cluster_identifier`
     FabricClusterIdentifier,
 
+    /// Base64-encoded customer-provided encryption key
+    ///
+    /// Supported keys:
+    /// - `azure_storage_encryption_key`
+    /// - `encryption_key`
+    EncryptionKey,
+
     /// Client options
     Client(ClientConfigKey),
 }
@@ -410,6 +424,7 @@ impl AsRef<str> for AzureConfigKey {
             Self::FabricWorkloadHost => "azure_fabric_workload_host",
             Self::FabricSessionToken => "azure_fabric_session_token",
             Self::FabricClusterIdentifier => "azure_fabric_cluster_identifier",
+            Self::EncryptionKey => "azure_storage_encryption_key",
             Self::Client(key) => key.as_ref(),
         }
     }
@@ -466,6 +481,7 @@ impl FromStr for AzureConfigKey {
             "azure_fabric_cluster_identifier" | "fabric_cluster_identifier" => 
{
                 Ok(Self::FabricClusterIdentifier)
             }
+            "azure_storage_encryption_key" | "encryption_key" => 
Ok(Self::EncryptionKey),
             // Backwards compatibility
             "azure_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
             _ => match s.strip_prefix("azure_").unwrap_or(s).parse() {
@@ -594,6 +610,7 @@ impl MicrosoftAzureBuilder {
             AzureConfigKey::FabricClusterIdentifier => {
                 self.fabric_cluster_identifier = Some(value.into())
             }
+            AzureConfigKey::EncryptionKey => self.encryption_key = 
Some(value.into()),
         };
         self
     }
@@ -635,6 +652,7 @@ impl MicrosoftAzureBuilder {
             AzureConfigKey::FabricWorkloadHost => 
self.fabric_workload_host.clone(),
             AzureConfigKey::FabricSessionToken => 
self.fabric_session_token.clone(),
             AzureConfigKey::FabricClusterIdentifier => 
self.fabric_cluster_identifier.clone(),
+            AzureConfigKey::EncryptionKey => self.encryption_key.clone(),
         }
     }
 
@@ -938,6 +956,25 @@ impl MicrosoftAzureBuilder {
         self
     }
 
+    /// Set the customer-provided encryption key (CPK) used to encrypt blob 
content.
+    ///
+    /// `key` must be a base64-encoded 256-bit AES key (the decoded value must 
be
+    /// exactly 32 bytes). The same key must be supplied on every subsequent 
read,
+    /// write, or copy of any blob created with it; if the key is lost or 
omitted
+    /// the data is unrecoverable. CPK material is sent to Azure on every 
request,
+    /// so the configured endpoint must use HTTPS.
+    ///
+    /// Only a subset of Blob storage operations support CPK
+    /// (see the [Azure documentation][cpk-ops]). When CPK is enabled, `copy`
+    /// switches from the asynchronous `Copy Blob` API to `Put Blob From URL`,
+    /// which is synchronous and limits the source blob to 5,000 MiB.
+    ///
+    /// [cpk-ops]: 
https://learn.microsoft.com/en-us/azure/storage/blobs/encryption-customer-provided-keys#blob-storage-operations-supporting-customer-provided-keys
+    pub fn with_encryption_key(mut self, key: impl Into<String>) -> Self {
+        self.encryption_key = Some(key.into());
+        self
+    }
+
     /// The [`HttpConnector`] to use
     ///
     /// On non-WASM32 platforms uses [`reqwest`] by default, on WASM32 
platforms must be provided
@@ -1079,6 +1116,16 @@ impl MicrosoftAzureBuilder {
             (false, url, credential, account_name)
         };
 
+        let encryption_headers =
+            
AzureEncryptionHeaders::try_new(self.encryption_key).map_err(|source| {
+                Error::InvalidEncryptionKey {
+                    source: match source {
+                        crate::Error::Generic { source, .. } => source,
+                        other => Box::new(other),
+                    },
+                }
+            })?;
+
         let config = AzureConfig {
             account,
             is_emulator,
@@ -1089,6 +1136,7 @@ impl MicrosoftAzureBuilder {
             client_options: self.client_options,
             service: storage_url,
             credentials: auth,
+            encryption_headers,
         };
 
         let http_client = http.connect(&config.client_options)?;
@@ -1137,6 +1185,8 @@ pub fn split_sas(sas: &str) -> Result<Vec<(String, 
String)>> {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use base64::Engine;
+    use base64::prelude::BASE64_STANDARD;
     use std::collections::HashMap;
 
     #[test]
@@ -1357,6 +1407,59 @@ mod tests {
         }
     }
 
+    #[test]
+    fn azure_encryption_key_roundtrip() {
+        let key = BASE64_STANDARD.encode([7_u8; 32]);
+        let builder = MicrosoftAzureBuilder::new().with_encryption_key(&key);
+
+        assert_eq!(
+            builder
+                .get_config_value(&AzureConfigKey::EncryptionKey)
+                .as_deref(),
+            Some(key.as_str())
+        );
+    }
+
+    #[test]
+    fn azure_encryption_key_rejects_malformed_base64() {
+        let err = MicrosoftAzureBuilder::new()
+            .with_account("account")
+            .with_container_name("container")
+            .with_access_key(EMULATOR_ACCOUNT_KEY)
+            .with_encryption_key("not-base64!!!")
+            .build()
+            .unwrap_err();
+
+        let msg = err.to_string();
+        assert!(msg.contains("Invalid encryption key") || 
msg.contains("Invalid byte"));
+    }
+
+    #[test]
+    fn azure_encryption_key_rejects_short_decoded_key() {
+        let err = MicrosoftAzureBuilder::new()
+            .with_account("account")
+            .with_container_name("container")
+            .with_access_key(EMULATOR_ACCOUNT_KEY)
+            .with_encryption_key(BASE64_STANDARD.encode([7_u8; 31]))
+            .build()
+            .unwrap_err();
+
+        assert!(err.to_string().contains("must decode to 32 bytes, got 31"));
+    }
+
+    #[test]
+    fn azure_encryption_key_rejects_long_decoded_key() {
+        let err = MicrosoftAzureBuilder::new()
+            .with_account("account")
+            .with_container_name("container")
+            .with_access_key(EMULATOR_ACCOUNT_KEY)
+            .with_encryption_key(BASE64_STANDARD.encode([7_u8; 33]))
+            .build()
+            .unwrap_err();
+
+        assert!(err.to_string().contains("must decode to 32 bytes, got 33"));
+    }
+
     #[test]
     fn azure_test_config_from_map() {
         let azure_client_id = "object_store:fake_access_key_id";
diff --git a/src/azure/client.rs b/src/azure/client.rs
index 5a3fcbc..3c76b1c 100644
--- a/src/azure/client.rs
+++ b/src/azure/client.rs
@@ -41,6 +41,7 @@ use http::{
     header::{CONTENT_LENGTH, CONTENT_TYPE, HeaderMap, HeaderValue, IF_MATCH, 
IF_NONE_MATCH},
 };
 use rand::RngExt;
+use ring::digest;
 use serde::{Deserialize, Serialize};
 use std::collections::HashMap;
 use std::sync::Arc;
@@ -58,6 +59,25 @@ static MS_CONTENT_ENCODING: HeaderName = 
HeaderName::from_static("x-ms-blob-cont
 static MS_CONTENT_LANGUAGE: HeaderName = 
HeaderName::from_static("x-ms-blob-content-language");
 
 static TAGS_HEADER: HeaderName = HeaderName::from_static("x-ms-tags");
+static ENCRYPTION_KEY_HEADER: HeaderName = 
HeaderName::from_static("x-ms-encryption-key");
+static ENCRYPTION_KEY_SHA256_HEADER: HeaderName =
+    HeaderName::from_static("x-ms-encryption-key-sha256");
+static ENCRYPTION_ALGORITHM_HEADER: HeaderName =
+    HeaderName::from_static("x-ms-encryption-algorithm");
+static SOURCE_ENCRYPTION_KEY_HEADER: HeaderName =
+    HeaderName::from_static("x-ms-source-encryption-key");
+static SOURCE_ENCRYPTION_KEY_SHA256_HEADER: HeaderName =
+    HeaderName::from_static("x-ms-source-encryption-key-sha256");
+static SOURCE_ENCRYPTION_ALGORITHM_HEADER: HeaderName =
+    HeaderName::from_static("x-ms-source-encryption-algorithm");
+static COPY_SOURCE_AUTHORIZATION: HeaderName =
+    HeaderName::from_static("x-ms-copy-source-authorization");
+// Put Blob From URL added source CPK headers in 2026-02-06.
+// 
https://learn.microsoft.com/en-us/rest/api/storageservices/version-2026-02-06
+// before this version you could only specify CPK headers for the destination.
+// we only upgrade to this version if you are applying CPK headers to a copy 
request.
+const PUT_BLOB_FROM_URL_SOURCE_CPK_VERSION: &str = "2026-02-06";
+const COPY_SOURCE_SAS_EXPIRES_IN: Duration = Duration::from_secs(3600);
 
 /// A specialized `Error` for object store-related errors
 #[derive(Debug, thiserror::Error)]
@@ -167,6 +187,7 @@ pub(crate) struct AzureConfig {
     pub skip_signature: bool,
     pub disable_tagging: bool,
     pub client_options: ClientOptions,
+    pub encryption_headers: AzureEncryptionHeaders,
 }
 
 impl AzureConfig {
@@ -181,6 +202,28 @@ impl AzureConfig {
         }
         url
     }
+
+    /// Whether a request built with this config must be treated as sensitive.
+    ///
+    /// The retry layer's `sensitive` flag suppresses the request URL from
+    /// error messages (see [`RetryableRequestBuilder::sensitive`]). For SAS
+    /// credentials this is load-bearing because the token is carried as URL
+    /// query parameters.
+    ///
+    /// CPK material lives in request *headers* (`x-ms-encryption-key` etc.),
+    /// not in the URL. Those header values are marked sensitive when added to
+    /// the request, while this flag ensures retry/error formatting also treats
+    /// the whole request as sensitive.
+    ///
+    /// [`RetryableRequestBuilder::sensitive`]: 
crate::client::retry::RetryableRequestBuilder
+    fn is_sensitive(&self, credential: &Option<Arc<AzureCredential>>) -> bool {
+        let credential_sensitive = credential
+            .as_deref()
+            .map(|c| c.sensitive_request())
+            .unwrap_or_default();
+        credential_sensitive || self.encryption_headers.is_enabled()
+    }
+
     async fn get_credential(&self) -> Result<Option<Arc<AzureCredential>>> {
         if self.skip_signature {
             Ok(None)
@@ -190,6 +233,118 @@ impl AzureConfig {
     }
 }
 
+/// Encryption headers for Azure requests.
+/// Azure only supports AES256 encryption with customer-provided keys.
+#[derive(Default, Clone)]
+pub(crate) struct AzureEncryptionHeaders {
+    pub encryption_key: Option<String>,
+    pub encryption_key_sha256: Option<String>,
+}
+
+impl std::fmt::Debug for AzureEncryptionHeaders {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("AzureEncryptionHeaders")
+            .field("key_configured", &self.encryption_key.is_some())
+            .finish()
+    }
+}
+
+impl AzureEncryptionHeaders {
+    pub(crate) fn try_new(encryption_key: Option<String>) -> Result<Self> {
+        let Some(encryption_key) = encryption_key else {
+            return Ok(Self::default());
+        };
+
+        let decoded_key = BASE64_STANDARD
+            .decode(encryption_key.as_bytes())
+            .map_err(|source| crate::Error::Generic {
+                store: STORE,
+                source: Box::new(source),
+            })?;
+
+        // As above encryption keys must be 256-bit AES keys,
+        // which means the base64-encoded value must decode to 32 bytes.
+        if decoded_key.len() != 32 {
+            return Err(crate::Error::Generic {
+                store: STORE,
+                source: format!(
+                    "Azure customer-provided encryption key must decode to 32 
bytes, got {}",
+                    decoded_key.len()
+                )
+                .into(),
+            });
+        }
+
+        let encryption_key_sha256 =
+            BASE64_STANDARD.encode(digest::digest(&digest::SHA256, 
&decoded_key));
+
+        Ok(Self {
+            encryption_key: Some(encryption_key),
+            encryption_key_sha256: Some(encryption_key_sha256),
+        })
+    }
+
+    pub(crate) fn is_enabled(&self) -> bool {
+        self.encryption_key.is_some()
+    }
+}
+
+/// Override the Azure Blob service version used for a single request.
+///
+/// [`with_azure_authorization`](CredentialExt::with_azure_authorization) 
preserves
+/// an explicit version header instead of replacing it with the backend 
default.
+pub(crate) trait RequestVersionExt {
+    fn with_azure_version(self, version: &'static str) -> Self;
+}
+
+impl RequestVersionExt for HttpRequestBuilder {
+    fn with_azure_version(self, version: &'static str) -> Self {
+        self.header(&VERSION, version)
+    }
+}
+
+/// Request-builder extension for customer-provided encryption keys (CPK).
+pub(crate) trait EncryptionHeadersExt {
+    /// The only encryption algorithm supported by Azure when using 
customer-provided keys.
+    const AES256: &'static str = "AES256";
+
+    /// Apply the customer-provided encryption headers for the request target.
+    /// 
<https://learn.microsoft.com/en-us/azure/storage/blobs/encryption-customer-provided-keys>
+    fn with_azure_encryption_headers(self, headers: &AzureEncryptionHeaders) 
-> Self;
+
+    /// Apply the customer-provided encryption headers for a copy *source*.
+    ///
+    /// When performing a copy operation with a customer-provided key, the 
standard x-ms-encryption-*
+    /// headers apply to the destination, with separate 
x-ms-source-encryption-* headers for the source.
+    ///
+    /// 
<https://learn.microsoft.com/en-us/rest/api/storageservices/put-block-from-url?tabs=microsoft-entra-id#request-headers-source-customer-provided-encryption-keys>
+    fn with_azure_source_encryption_headers(self, headers: 
&AzureEncryptionHeaders) -> Self;
+}
+
+impl EncryptionHeadersExt for HttpRequestBuilder {
+    fn with_azure_encryption_headers(self, headers: &AzureEncryptionHeaders) 
-> Self {
+        match (&headers.encryption_key, &headers.encryption_key_sha256) {
+            (Some(encryption_key), Some(encryption_key_sha256)) => self
+                // The key is secret material, so mark it sensitive to keep it
+                // out of any `Debug`/diagnostic output.
+                .sensitive_header(&ENCRYPTION_KEY_HEADER, encryption_key)
+                .sensitive_header(&ENCRYPTION_KEY_SHA256_HEADER, 
encryption_key_sha256)
+                .header(&ENCRYPTION_ALGORITHM_HEADER, Self::AES256),
+            _ => self,
+        }
+    }
+
+    fn with_azure_source_encryption_headers(self, headers: 
&AzureEncryptionHeaders) -> Self {
+        match (&headers.encryption_key, &headers.encryption_key_sha256) {
+            (Some(encryption_key), Some(encryption_key_sha256)) => self
+                .sensitive_header(&SOURCE_ENCRYPTION_KEY_HEADER, 
encryption_key)
+                .sensitive_header(&SOURCE_ENCRYPTION_KEY_SHA256_HEADER, 
encryption_key_sha256)
+                .header(&SOURCE_ENCRYPTION_ALGORITHM_HEADER, Self::AES256),
+            _ => self,
+        }
+    }
+}
+
 /// A builder for a put request allowing customisation of the headers and 
query string
 struct PutRequest<'a> {
     path: &'a Path,
@@ -260,12 +415,10 @@ impl PutRequest<'_> {
 
     async fn send(self) -> Result<HttpResponse> {
         let credential = self.config.get_credential().await?;
-        let sensitive = credential
-            .as_deref()
-            .map(|c| c.sensitive_request())
-            .unwrap_or_default();
+        let sensitive = self.config.is_sensitive(&credential);
         let response = self
             .builder
+            .with_azure_encryption_headers(&self.config.encryption_headers)
             .header(CONTENT_LENGTH, self.payload.content_length())
             .with_azure_authorization(&credential, &self.config.account)
             .retryable(&self.config.retry_config)
@@ -637,6 +790,8 @@ impl AzureClient {
             let url = self.config.path_url(path);
 
             // Build subrequest with proper authorization
+            // Note: Delete operations don't require us to pass customer 
provided keys
+            // 
https://learn.microsoft.com/en-us/azure/storage/blobs/encryption-customer-provided-keys#blob-storage-operations-supporting-customer-provided-keys
             let request = self
                 .client
                 .delete(url.as_str())
@@ -680,6 +835,7 @@ impl AzureClient {
 
         // Send multipart request
         let url = self.config.path_url(&Path::from("/"));
+        let sensitive = self.config.is_sensitive(&credential);
         let batch_response = self
             .client
             .post(url.as_str())
@@ -692,7 +848,9 @@ impl AzureClient {
             .header(CONTENT_LENGTH, HeaderValue::from(body_bytes.len()))
             .body(body_bytes)
             .with_azure_authorization(&credential, &self.config.account)
-            .send_retry(&self.config.retry_config)
+            .retryable(&self.config.retry_config)
+            .sensitive(sensitive)
+            .send()
             .await
             .map_err(|source| Error::BulkDeleteRequest { source })?;
 
@@ -709,11 +867,21 @@ impl AzureClient {
         Ok(results)
     }
 
-    /// Make an Azure Copy request 
<https://docs.microsoft.com/en-us/rest/api/storageservices/copy-blob>
+    /// Make an Azure copy request 
<https://docs.microsoft.com/en-us/rest/api/storageservices/copy-blob>.
+    ///
+    /// The classic `Copy Blob` API does not accept CPK headers, so when
+    /// customer-provided keys are enabled this falls back to
+    /// [Put Blob From URL][put-blob-from-url] and opts into the service 
version
+    /// that added source CPK headers. That changes the semantics: the 
operation
+    /// is synchronous, the source must be a block blob no larger than 5,000 
MiB,
+    /// and uncommitted blocks / the source block list are not preserved.
+    ///
+    /// [put-blob-from-url]: 
https://learn.microsoft.com/en-us/rest/api/storageservices/put-blob-from-url
     pub(crate) async fn copy_request(&self, from: &Path, to: &Path, overwrite: 
bool) -> Result<()> {
         let credential = self.get_credential().await?;
         let url = self.config.path_url(to);
         let mut source = self.config.path_url(from);
+        let mut source_authorization = None;
 
         // If using SAS authorization must include the headers in the URL
         // 
<https://docs.microsoft.com/en-us/rest/api/storageservices/copy-blob#request-headers>
@@ -721,20 +889,51 @@ impl AzureClient {
             source.query_pairs_mut().extend_pairs(pairs);
         }
 
+        if self.config.encryption_headers.is_enabled() {
+            match credential.as_deref() {
+                Some(AzureCredential::AccessKey(key)) => {
+                    let signed_start = Utc::now();
+                    let signed_expiry = signed_start + 
COPY_SOURCE_SAS_EXPIRES_IN;
+                    AzureSigner::new(
+                        key.clone(),
+                        self.config.account.clone(),
+                        signed_start,
+                        signed_expiry,
+                        None,
+                    )
+                    .sign(&Method::GET, &mut source)?;
+                }
+                Some(AzureCredential::BearerToken(token)) => {
+                    source_authorization = Some(format!("Bearer {token}"));
+                }
+                _ => {}
+            }
+        }
+
         let mut builder = self
             .client
             .request(Method::PUT, url.as_str())
-            .header(&COPY_SOURCE, source.to_string())
             .header(CONTENT_LENGTH, HeaderValue::from_static("0"));
 
+        builder = builder.sensitive_header(&COPY_SOURCE, source.to_string());
+
+        if self.config.encryption_headers.is_enabled() {
+            builder = builder
+                .header(&BLOB_TYPE, "BlockBlob")
+                .with_azure_encryption_headers(&self.config.encryption_headers)
+                
.with_azure_source_encryption_headers(&self.config.encryption_headers)
+                .with_azure_version(PUT_BLOB_FROM_URL_SOURCE_CPK_VERSION);
+        }
+
+        if let Some(source_authorization) = source_authorization {
+            builder = builder.sensitive_header(&COPY_SOURCE_AUTHORIZATION, 
source_authorization);
+        }
+
         if !overwrite {
             builder = builder.header(IF_NONE_MATCH, "*");
         }
 
-        let sensitive = credential
-            .as_deref()
-            .map(|c| c.sensitive_request())
-            .unwrap_or_default();
+        let sensitive = self.config.is_sensitive(&credential);
         builder
             .with_azure_authorization(&credential, &self.config.account)
             .retryable(&self.config.retry_config)
@@ -767,10 +966,7 @@ impl AzureClient {
         ));
         body.push_str("</KeyInfo>");
 
-        let sensitive = credential
-            .as_deref()
-            .map(|c| c.sensitive_request())
-            .unwrap_or_default();
+        let sensitive = self.config.is_sensitive(&credential);
 
         let response = self
             .client
@@ -833,10 +1029,9 @@ impl AzureClient {
     pub(crate) async fn get_blob_tagging(&self, path: &Path) -> 
Result<HttpResponse> {
         let credential = self.get_credential().await?;
         let url = self.config.path_url(path);
-        let sensitive = credential
-            .as_deref()
-            .map(|c| c.sensitive_request())
-            .unwrap_or_default();
+        let sensitive = self.config.is_sensitive(&credential);
+        // Note: Get blob tags doesn't require us to pass customer provided 
keys
+        // 
https://learn.microsoft.com/en-us/azure/storage/blobs/encryption-customer-provided-keys#blob-storage-operations-supporting-customer-provided-keys
         let response = self
             .client
             .get(url.as_str())
@@ -900,14 +1095,13 @@ impl GetClient for AzureClient {
             .header(CONTENT_LENGTH, HeaderValue::from_static("0"))
             .body(Bytes::new());
 
+        builder = 
builder.with_azure_encryption_headers(&self.config.encryption_headers);
+
         if let Some(v) = &options.version {
             builder = builder.query(&[("versionid", v)])
         }
 
-        let sensitive = credential
-            .as_deref()
-            .map(|c| c.sensitive_request())
-            .unwrap_or_default();
+        let sensitive = self.config.is_sensitive(&credential);
 
         let response = builder
             .with_get_options(options)
@@ -972,10 +1166,7 @@ impl ListClient for Arc<AzureClient> {
             query.push(("maxresults", max_keys_str.as_ref()))
         }
 
-        let sensitive = credential
-            .as_deref()
-            .map(|c| c.sensitive_request())
-            .unwrap_or_default();
+        let sensitive = self.config.is_sensitive(&credential);
 
         let response = self
             .client
@@ -1191,6 +1382,7 @@ pub(crate) struct UserDelegationKey {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use crate::ObjectStoreExt;
     use crate::StaticCredentialProvider;
     use bytes::Bytes;
     use regex::bytes::Regex;
@@ -1402,6 +1594,10 @@ mod tests {
             skip_signature: false,
             disable_tagging: false,
             client_options: Default::default(),
+            encryption_headers: AzureEncryptionHeaders::try_new(Some(
+                BASE64_STANDARD.encode([7_u8; 32]),
+            ))
+            .unwrap(),
         };
 
         let client = AzureClient::new(config, HttpClient::new(Client::new()));
@@ -1461,6 +1657,335 @@ Authorization: Bearer static-token\r
         assert_eq!(expected_body, body_bytes);
     }
 
+    #[test]
+    fn test_azure_encryption_headers_debug_redacts_key() {
+        let encryption_key = BASE64_STANDARD.encode([7_u8; 32]);
+        let headers = 
AzureEncryptionHeaders::try_new(Some(encryption_key.clone())).unwrap();
+        let encryption_key_sha256 = 
headers.encryption_key_sha256.clone().unwrap();
+
+        let debug = format!("{headers:?}");
+
+        assert!(!debug.contains(&encryption_key));
+        assert!(!debug.contains(&encryption_key_sha256));
+        assert!(debug.contains("key_configured: true"));
+    }
+
+    #[cfg(feature = "reqwest")]
+    #[test]
+    fn test_azure_sensitive_headers_redact_client_request_debug() {
+        let encryption_key = BASE64_STANDARD.encode([7_u8; 32]);
+        let headers = 
AzureEncryptionHeaders::try_new(Some(encryption_key.clone())).unwrap();
+        let encryption_key_sha256 = 
headers.encryption_key_sha256.clone().unwrap();
+        let copy_source = 
"http://example.com/source.txt?sig=secret-source-sas";;
+        let source_authorization = "Bearer static-token";
+
+        let request = HttpClient::new(Client::new())
+            .request(Method::PUT, "http://example.com/dest.txt";)
+            .with_azure_encryption_headers(&headers)
+            .with_azure_source_encryption_headers(&headers)
+            .sensitive_header(&COPY_SOURCE, copy_source)
+            .sensitive_header(&COPY_SOURCE_AUTHORIZATION, source_authorization)
+            .into_parts()
+            .1
+            .unwrap();
+
+        assert_eq!(
+            request
+                .headers()
+                .get("x-ms-encryption-key")
+                .unwrap()
+                .to_str()
+                .unwrap(),
+            encryption_key
+        );
+        assert_eq!(
+            request
+                .headers()
+                .get("x-ms-source-encryption-key")
+                .unwrap()
+                .to_str()
+                .unwrap(),
+            encryption_key
+        );
+        assert_eq!(
+            request
+                .headers()
+                .get("x-ms-copy-source")
+                .unwrap()
+                .to_str()
+                .unwrap(),
+            copy_source
+        );
+
+        let debug = format!("{:?}", request.headers());
+        assert!(!debug.contains(&encryption_key));
+        assert!(!debug.contains(&encryption_key_sha256));
+        assert!(!debug.contains(copy_source));
+        assert!(!debug.contains(source_authorization));
+        assert!(debug.contains("Sensitive"));
+    }
+
+    #[tokio::test]
+    async fn test_get_request_includes_encryption_headers() {
+        let server = crate::client::mock_server::MockServer::new().await;
+
+        let store = crate::azure::MicrosoftAzureBuilder::new()
+            .with_account("testaccount")
+            .with_container_name("testcontainer")
+            
.with_access_key("Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==")
+            .with_allow_http(true)
+            .with_endpoint(server.url().to_string())
+            .with_encryption_key(BASE64_STANDARD.encode([7_u8; 32]))
+            .build()
+            .unwrap();
+
+        server.push_fn(|req| {
+            assert_eq!(req.method(), Method::GET);
+            assert_eq!(
+                req.headers().get("range").unwrap().to_str().unwrap(),
+                "bytes=1-3"
+            );
+            assert_eq!(
+                req.headers()
+                    .get("x-ms-encryption-key")
+                    .unwrap()
+                    .to_str()
+                    .unwrap(),
+                "BwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwc="
+            );
+            assert_eq!(
+                req.headers()
+                    .get("x-ms-encryption-key-sha256")
+                    .unwrap()
+                    .to_str()
+                    .unwrap(),
+                "S7Bvjk46dxXSAdVz0KpCN2LlXavWGiwCJ4+lbMbSlOA="
+            );
+            assert_eq!(
+                req.headers()
+                    .get("x-ms-encryption-algorithm")
+                    .unwrap()
+                    .to_str()
+                    .unwrap(),
+                "AES256"
+            );
+
+            http::Response::builder()
+                .status(206)
+                .header("content-length", "3")
+                .header("content-range", "bytes 1-3/5")
+                .header("etag", "test-etag")
+                .header("last-modified", "Tue, 05 Nov 2024 15:01:15 GMT")
+                .body("ell".to_string())
+                .unwrap()
+        });
+
+        let bytes = store
+            .get_range(&Path::from("file.txt"), 1..4)
+            .await
+            .unwrap();
+
+        assert_eq!(bytes, Bytes::from_static(b"ell"));
+    }
+
+    #[tokio::test]
+    async fn test_copy_request_includes_encryption_headers() {
+        let server = crate::client::mock_server::MockServer::new().await;
+        let endpoint = server.url().to_string();
+        let expected_source = format!("{endpoint}/testcontainer/source.txt");
+
+        let store = crate::azure::MicrosoftAzureBuilder::new()
+            .with_account("testaccount")
+            .with_container_name("testcontainer")
+            
.with_access_key("Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==")
+            .with_allow_http(true)
+            .with_endpoint(endpoint)
+            .with_encryption_key(BASE64_STANDARD.encode([7_u8; 32]))
+            .build()
+            .unwrap();
+
+        server.push_fn(move |req| {
+            assert_eq!(req.method(), Method::PUT);
+            let copy_source = req
+                .headers()
+                .get("x-ms-copy-source")
+                .unwrap()
+                .to_str()
+                .unwrap();
+            let mut parsed_source = Url::parse(copy_source).unwrap();
+            let query: HashMap<_, _> = 
parsed_source.query_pairs().into_owned().collect();
+            parsed_source.set_query(None);
+            assert_eq!(parsed_source.to_string(), expected_source);
+            assert_eq!(query.get("sp").map(String::as_str), Some("r"));
+            assert_eq!(query.get("sr").map(String::as_str), Some("b"));
+            assert!(query.contains_key("sig"));
+            assert_eq!(
+                req.headers()
+                    .get("x-ms-source-encryption-key")
+                    .unwrap()
+                    .to_str()
+                    .unwrap(),
+                "BwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwc="
+            );
+            assert_eq!(
+                req.headers()
+                    .get("x-ms-source-encryption-key-sha256")
+                    .unwrap()
+                    .to_str()
+                    .unwrap(),
+                "S7Bvjk46dxXSAdVz0KpCN2LlXavWGiwCJ4+lbMbSlOA="
+            );
+            assert_eq!(
+                req.headers()
+                    .get("x-ms-source-encryption-algorithm")
+                    .unwrap()
+                    .to_str()
+                    .unwrap(),
+                "AES256"
+            );
+            assert_eq!(
+                req.headers().get("x-ms-version").unwrap().to_str().unwrap(),
+                "2026-02-06"
+            );
+            assert_eq!(
+                req.headers()
+                    .get("x-ms-blob-type")
+                    .unwrap()
+                    .to_str()
+                    .unwrap(),
+                "BlockBlob"
+            );
+            assert_eq!(
+                req.headers()
+                    .get("x-ms-encryption-key")
+                    .unwrap()
+                    .to_str()
+                    .unwrap(),
+                "BwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwc="
+            );
+            assert_eq!(
+                req.headers()
+                    .get("x-ms-encryption-key-sha256")
+                    .unwrap()
+                    .to_str()
+                    .unwrap(),
+                "S7Bvjk46dxXSAdVz0KpCN2LlXavWGiwCJ4+lbMbSlOA="
+            );
+            assert_eq!(
+                req.headers()
+                    .get("x-ms-encryption-algorithm")
+                    .unwrap()
+                    .to_str()
+                    .unwrap(),
+                "AES256"
+            );
+
+            http::Response::builder()
+                .status(201)
+                .body(String::new())
+                .unwrap()
+        });
+
+        store
+            .copy(&Path::from("source.txt"), &Path::from("dest.txt"))
+            .await
+            .unwrap();
+    }
+
+    #[tokio::test]
+    async fn test_copy_request_uses_source_authorization_for_bearer_cpk() {
+        let server = crate::client::mock_server::MockServer::new().await;
+        let endpoint = server.url().to_string();
+        let expected_source = format!("{endpoint}/testcontainer/source.txt");
+
+        let store = crate::azure::MicrosoftAzureBuilder::new()
+            .with_account("testaccount")
+            .with_container_name("testcontainer")
+            .with_bearer_token_authorization("static-token")
+            .with_allow_http(true)
+            .with_endpoint(endpoint)
+            .with_encryption_key(BASE64_STANDARD.encode([7_u8; 32]))
+            .build()
+            .unwrap();
+
+        server.push_fn(move |req| {
+            assert_eq!(req.method(), Method::PUT);
+            assert_eq!(
+                req.headers()
+                    .get("x-ms-copy-source")
+                    .unwrap()
+                    .to_str()
+                    .unwrap(),
+                expected_source
+            );
+            assert_eq!(
+                req.headers()
+                    .get("x-ms-copy-source-authorization")
+                    .unwrap()
+                    .to_str()
+                    .unwrap(),
+                "Bearer static-token"
+            );
+            assert_eq!(
+                req.headers()
+                    .get("x-ms-source-encryption-key")
+                    .unwrap()
+                    .to_str()
+                    .unwrap(),
+                "BwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwc="
+            );
+            assert_eq!(
+                req.headers()
+                    .get("x-ms-encryption-key")
+                    .unwrap()
+                    .to_str()
+                    .unwrap(),
+                "BwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwcHBwc="
+            );
+
+            http::Response::builder()
+                .status(201)
+                .body(String::new())
+                .unwrap()
+        });
+
+        store
+            .copy(&Path::from("source.txt"), &Path::from("dest.txt"))
+            .await
+            .unwrap();
+    }
+
+    #[tokio::test]
+    async fn test_cpk_errors_redact_request_url() {
+        let server = crate::client::mock_server::MockServer::new().await;
+        let endpoint = server.url().to_string();
+
+        let store = crate::azure::MicrosoftAzureBuilder::new()
+            .with_account("testaccount")
+            .with_container_name("testcontainer")
+            
.with_access_key("Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==")
+            .with_allow_http(true)
+            .with_endpoint(endpoint.clone())
+            .with_encryption_key(BASE64_STANDARD.encode([7_u8; 32]))
+            .build()
+            .unwrap();
+
+        server.push_fn(|_req| {
+            http::Response::builder()
+                .status(409)
+                .body(String::from("conflict"))
+                .unwrap()
+        });
+
+        let err = store
+            .get_range(&Path::from("file.txt"), 0..1)
+            .await
+            .unwrap_err();
+        let msg = err.to_string();
+        assert!(msg.contains("REDACTED"), "{msg}");
+        assert!(!msg.contains(&endpoint), "{msg}");
+    }
+
     #[tokio::test]
     async fn test_parse_blob_batch_delete_body() {
         let response_body = 
b"--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r
diff --git a/src/azure/credential.rs b/src/azure/credential.rs
index 4e84fb8..111c908 100644
--- a/src/azure/credential.rs
+++ b/src/azure/credential.rs
@@ -45,7 +45,7 @@ use std::time::{Duration, Instant, SystemTime};
 use url::Url;
 
 static AZURE_VERSION: HeaderValue = HeaderValue::from_static("2023-11-03");
-static VERSION: HeaderName = HeaderName::from_static("x-ms-version");
+pub(crate) static VERSION: HeaderName = 
HeaderName::from_static("x-ms-version");
 pub(crate) static BLOB_TYPE: HeaderName = 
HeaderName::from_static("x-ms-blob-type");
 pub(crate) static COPY_SOURCE: HeaderName = 
HeaderName::from_static("x-ms-copy-source");
 static CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5");
@@ -214,9 +214,13 @@ fn add_date_and_version_headers(request: &mut HttpRequest) 
{
     // we formatted the data string ourselves, so unwrapping should be fine
     let date_val = HeaderValue::from_str(&date_str).unwrap();
     request.headers_mut().insert(DATE, date_val);
-    request
-        .headers_mut()
-        .insert(&VERSION, AZURE_VERSION.clone());
+    // Preserve an explicit per-request override when a call site needs a newer
+    // service version than the backend default.
+    if !request.headers().contains_key(&VERSION) {
+        request
+            .headers_mut()
+            .insert(&VERSION, AZURE_VERSION.clone());
+    }
 }
 
 /// Authorize a [`HttpRequest`] with an [`AzureAuthorizer`]
@@ -1072,6 +1076,7 @@ mod tests {
 
     use super::*;
     use crate::azure::MicrosoftAzureBuilder;
+    use crate::azure::client::RequestVersionExt;
     use crate::client::mock_server::MockServer;
     use crate::{ObjectStoreExt, Path};
 
@@ -1280,4 +1285,21 @@ mod tests {
             );
         }
     }
+
+    #[cfg(feature = "reqwest")]
+    #[test]
+    fn test_azure_authorization_preserves_explicit_version() {
+        let credential = 
Some(Arc::new(AzureCredential::BearerToken("token".to_string())));
+        let client = HttpClient::new(Client::new());
+
+        let request = client
+            .request(Method::GET, "http://example.com/container/blob";)
+            .with_azure_version("2026-02-06")
+            .with_azure_authorization(&credential, "account")
+            .into_parts()
+            .1
+            .unwrap();
+
+        assert_eq!(request.headers().get(&VERSION).unwrap(), "2026-02-06");
+    }
 }
diff --git a/src/azure/mod.rs b/src/azure/mod.rs
index 1156743..2e802e3 100644
--- a/src/azure/mod.rs
+++ b/src/azure/mod.rs
@@ -218,6 +218,12 @@ impl Signer for MicrosoftAzure {
     /// # }
     /// ```
     async fn signed_url(&self, method: Method, path: &Path, expires_in: 
Duration) -> Result<Url> {
+        if self.client.config().encryption_headers.is_enabled() {
+            return Err(crate::Error::NotSupported {
+                source: "Azure signed URLs cannot be used with 
customer-provided keys because CPK values must be supplied as request 
headers".into(),
+            });
+        }
+
         let mut url = self.path_url(path);
         let signer = self.client.signer(expires_in).await?;
         signer.sign(&method, &mut url)?;
@@ -230,6 +236,12 @@ impl Signer for MicrosoftAzure {
         paths: &[Path],
         expires_in: Duration,
     ) -> Result<Vec<Url>> {
+        if self.client.config().encryption_headers.is_enabled() {
+            return Err(crate::Error::NotSupported {
+                source: "Azure signed URLs cannot be used with 
customer-provided keys because CPK values must be supplied as request 
headers".into(),
+            });
+        }
+
         let mut urls = Vec::with_capacity(paths.len());
         let signer = self.client.signer(expires_in).await?;
         for path in paths {
@@ -339,8 +351,12 @@ mod tests {
     use crate::ObjectStoreExt;
     use crate::integration::*;
     use crate::tests::*;
+    use base64::Engine;
+    use base64::prelude::BASE64_STANDARD;
     use bytes::Bytes;
+    use std::time::{SystemTime, UNIX_EPOCH};
 
+    #[cfg(feature = "reqwest")]
     #[tokio::test]
     async fn azure_blob_test() {
         maybe_skip_integration!();
@@ -428,6 +444,93 @@ mod tests {
         store.delete(&path).await.unwrap();
     }
 
+    // Azurite doesn't support CPK (just ignores it)
+    #[ignore = "Used for manual testing against a real storage account."]
+    #[tokio::test]
+    async fn azure_blob_cpk_test() {
+        let base = MicrosoftAzureBuilder::from_env();
+        let key = BASE64_STANDARD.encode([7_u8; 32]);
+        let wrong_key = BASE64_STANDARD.encode([9_u8; 32]);
+
+        let encrypted = 
base.clone().with_encryption_key(&key).build().unwrap();
+        let unencrypted = base.clone().build().unwrap();
+        let wrong = base.with_encryption_key(&wrong_key).build().unwrap();
+
+        let suffix = SystemTime::now()
+            .duration_since(UNIX_EPOCH)
+            .unwrap()
+            .as_nanos();
+        let path = Path::from(format!("cpk-test-{suffix}.txt"));
+        let copy_path = Path::from(format!("cpk-test-copy-{suffix}.txt"));
+        let payload = Bytes::from("customer-provided-key");
+
+        encrypted.put(&path, payload.clone().into()).await.unwrap();
+
+        let loaded = 
encrypted.get(&path).await.unwrap().bytes().await.unwrap();
+        assert_eq!(loaded, payload);
+
+        let range = encrypted.get_range(&path, 9..17).await.unwrap();
+        assert_eq!(range, Bytes::from("provided"));
+
+        let meta = encrypted.head(&path).await.unwrap();
+        assert_eq!(meta.size, payload.len() as u64);
+
+        encrypted.copy(&path, &copy_path).await.unwrap();
+        let copied = encrypted
+            .get(&copy_path)
+            .await
+            .unwrap()
+            .bytes()
+            .await
+            .unwrap();
+        assert_eq!(copied, payload);
+
+        assert!(unencrypted.get(&path).await.is_err());
+        assert!(wrong.get(&path).await.is_err());
+        assert!(unencrypted.head(&path).await.is_err());
+        assert!(wrong.get_range(&path, 0..8).await.is_err());
+        assert!(unencrypted.get(&copy_path).await.is_err());
+        assert!(wrong.get(&copy_path).await.is_err());
+        let meta = encrypted.head(&copy_path).await.unwrap();
+        assert_eq!(meta.size, payload.len() as u64);
+
+        encrypted.delete(&copy_path).await.unwrap();
+        encrypted.delete(&path).await.unwrap();
+    }
+
+    #[tokio::test]
+    async fn azure_signed_url_rejects_cpk_configuration() {
+        let store = MicrosoftAzureBuilder::new()
+            .with_account("testaccount")
+            .with_container_name("testcontainer")
+            
.with_access_key("Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==")
+            .with_encryption_key(BASE64_STANDARD.encode([7_u8; 32]))
+            .build()
+            .unwrap();
+
+        let err = store
+            .signed_url(
+                Method::GET,
+                &Path::from("file.txt"),
+                Duration::from_secs(60),
+            )
+            .await
+            .unwrap_err();
+        assert!(matches!(err, crate::Error::NotSupported { .. }));
+        assert!(err.to_string().contains("customer-provided keys"));
+
+        let err = store
+            .signed_urls(
+                Method::GET,
+                &[Path::from("file.txt")],
+                Duration::from_secs(60),
+            )
+            .await
+            .unwrap_err();
+        assert!(matches!(err, crate::Error::NotSupported { .. }));
+        assert!(err.to_string().contains("customer-provided keys"));
+    }
+
     #[ignore = "Used for manual testing against a real storage account."]
     #[tokio::test]
     async fn test_user_delegation_key() {
diff --git a/src/client/builder.rs b/src/client/builder.rs
index fd0a719..cca6f93 100644
--- a/src/client/builder.rs
+++ b/src/client/builder.rs
@@ -151,20 +151,39 @@ impl HttpRequestBuilder {
         self
     }
 
-    #[cfg(feature = "gcp-base")]
-    pub(crate) fn bearer_auth(mut self, token: &str) -> Self {
-        let value = HeaderValue::try_from(format!("Bearer {token}"));
-        match (value, &mut self.request) {
-            (Ok(mut v), Ok(r)) => {
-                v.set_sensitive(true);
-                r.headers_mut().insert(http::header::AUTHORIZATION, v);
+    /// Insert a header whose value is marked [sensitive], so it is redacted 
from
+    /// the `Debug` representation of the request and any `HeaderValue`.
+    ///
+    /// Used for secret material carried in headers (e.g. Azure 
customer-provided
+    /// encryption keys) so it cannot leak through diagnostics as well as 
bearer
+    /// auth tokens by GCP.
+    ///
+    /// [sensitive]: HeaderValue::set_sensitive
+    #[cfg(any(feature = "gcp-base", feature = "azure-base"))]
+    pub(crate) fn sensitive_header<K, V>(mut self, name: K, value: V) -> Self
+    where
+        K: TryInto<HeaderName>,
+        K::Error: Into<RequestBuilderError>,
+        V: TryInto<HeaderValue>,
+        V::Error: Into<RequestBuilderError>,
+    {
+        match (name.try_into(), value.try_into(), &mut self.request) {
+            (Ok(name), Ok(mut value), Ok(r)) => {
+                value.set_sensitive(true);
+                r.headers_mut().insert(name, value);
             }
-            (Err(e), Ok(_)) => self.request = Err(e.into()),
-            (_, Err(_)) => {}
+            (Err(e), _, Ok(_)) => self.request = Err(e.into()),
+            (_, Err(e), Ok(_)) => self.request = Err(e.into()),
+            (_, _, Err(_)) => {}
         }
         self
     }
 
+    #[cfg(feature = "gcp-base")]
+    pub(crate) fn bearer_auth(self, token: &str) -> Self {
+        self.sensitive_header(http::header::AUTHORIZATION, format!("Bearer 
{token}"))
+    }
+
     #[cfg(feature = "gcp-base")]
     pub(crate) fn json<S: serde::Serialize>(mut self, s: S) -> Self {
         match (serde_json::to_vec(&s), &mut self.request) {


Reply via email to