This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new f2765283b67 Add Attributes API (#5329) (#5650)
f2765283b67 is described below
commit f2765283b67e64a6923352ed23398ca9ca194ef2
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Tue Apr 16 12:48:56 2024 +0100
Add Attributes API (#5329) (#5650)
* Add Attributes API (#5329)
* Clippy
* Emulator test tweaks
---
object_store/src/attributes.rs | 211 +++++++++++++++++++++++++++++++++++++++
object_store/src/aws/client.rs | 30 ++++--
object_store/src/aws/mod.rs | 10 +-
object_store/src/azure/client.rs | 43 +++++---
object_store/src/azure/mod.rs | 9 +-
object_store/src/client/get.rs | 21 +++-
object_store/src/client/mod.rs | 2 +-
object_store/src/gcp/client.rs | 47 +++++----
object_store/src/gcp/mod.rs | 2 +
object_store/src/http/client.rs | 29 +++++-
object_store/src/http/mod.rs | 4 +-
object_store/src/lib.rs | 36 ++++++-
object_store/src/local.rs | 9 +-
object_store/src/memory.rs | 36 +++++--
14 files changed, 419 insertions(+), 70 deletions(-)
diff --git a/object_store/src/attributes.rs b/object_store/src/attributes.rs
new file mode 100644
index 00000000000..9b90b532585
--- /dev/null
+++ b/object_store/src/attributes.rs
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::borrow::Cow;
+use std::collections::HashMap;
+use std::ops::Deref;
+
+/// Additional object attribute types
+#[non_exhaustive]
+#[derive(Debug, Hash, Eq, PartialEq, Clone)]
+pub enum Attribute {
+ /// Specifies the MIME type of the object
+ ///
+ /// This takes precedence over any [ClientOptions](crate::ClientOptions)
configuration
+ ///
+ /// See
[Content-Type](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Type)
+ ContentType,
+ /// Overrides cache control policy of the object
+ ///
+ /// See
[Cache-Control](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Cache-Control)
+ CacheControl,
+}
+
+/// The value of an [`Attribute`]
+///
+/// Provides efficient conversion from both static and owned strings
+///
+/// ```
+/// # use object_store::AttributeValue;
+/// // Can use static strings without needing an allocation
+/// let value = AttributeValue::from("bar");
+/// // Can also store owned strings
+/// let value = AttributeValue::from("foo".to_string());
+/// ```
+#[derive(Debug, Hash, Eq, PartialEq, Clone)]
+pub struct AttributeValue(Cow<'static, str>);
+
+impl AsRef<str> for AttributeValue {
+ fn as_ref(&self) -> &str {
+ &self.0
+ }
+}
+
+impl From<&'static str> for AttributeValue {
+ fn from(value: &'static str) -> Self {
+ Self(Cow::Borrowed(value))
+ }
+}
+
+impl From<String> for AttributeValue {
+ fn from(value: String) -> Self {
+ Self(Cow::Owned(value))
+ }
+}
+
+impl Deref for AttributeValue {
+ type Target = str;
+
+ fn deref(&self) -> &Self::Target {
+ self.0.as_ref()
+ }
+}
+
+/// Additional attributes of an object
+///
+/// Attributes can be specified in [PutOptions](crate::PutOptions) and
retrieved
+/// from APIs returning [GetResult](crate::GetResult).
+///
+/// Unlike [`ObjectMeta`](crate::ObjectMeta), [`Attributes`] are not returned
by
+/// listing APIs
+#[derive(Debug, Default, Eq, PartialEq, Clone)]
+pub struct Attributes(HashMap<Attribute, AttributeValue>);
+
+impl Attributes {
+ /// Create a new empty [`Attributes`]
+ pub fn new() -> Self {
+ Self::default()
+ }
+
+ /// Create a new [`Attributes`] with space for `capacity` [`Attribute`]
+ pub fn with_capacity(capacity: usize) -> Self {
+ Self(HashMap::with_capacity(capacity))
+ }
+
+ /// Insert a new [`Attribute`], [`AttributeValue`] pair
+ ///
+ /// Returns the previous value for `key` if any
+ pub fn insert(&mut self, key: Attribute, value: AttributeValue) ->
Option<AttributeValue> {
+ self.0.insert(key, value)
+ }
+
+ /// Returns the [`AttributeValue`] for `key` if any
+ pub fn get(&self, key: &Attribute) -> Option<&AttributeValue> {
+ self.0.get(key)
+ }
+
+ /// Removes the [`AttributeValue`] for `key` if any
+ pub fn remove(&mut self, key: &Attribute) -> Option<AttributeValue> {
+ self.0.remove(key)
+ }
+
+ /// Returns an [`AttributesIter`] over this
+ pub fn iter(&self) -> AttributesIter<'_> {
+ self.into_iter()
+ }
+
+ /// Returns the number of [`Attribute`] in this collection
+ #[inline]
+ pub fn len(&self) -> usize {
+ self.0.len()
+ }
+
+ /// Returns true if this contains no [`Attribute`]
+ #[inline]
+ pub fn is_empty(&self) -> bool {
+ self.0.is_empty()
+ }
+}
+
+impl<K, V> FromIterator<(K, V)> for Attributes
+where
+ K: Into<Attribute>,
+ V: Into<AttributeValue>,
+{
+ fn from_iter<T: IntoIterator<Item = (K, V)>>(iter: T) -> Self {
+ Self(
+ iter.into_iter()
+ .map(|(k, v)| (k.into(), v.into()))
+ .collect(),
+ )
+ }
+}
+
+impl<'a> IntoIterator for &'a Attributes {
+ type Item = (&'a Attribute, &'a AttributeValue);
+ type IntoIter = AttributesIter<'a>;
+
+ fn into_iter(self) -> Self::IntoIter {
+ AttributesIter(self.0.iter())
+ }
+}
+
+/// Iterator over [`Attributes`]
+#[derive(Debug)]
+pub struct AttributesIter<'a>(std::collections::hash_map::Iter<'a, Attribute,
AttributeValue>);
+
+impl<'a> Iterator for AttributesIter<'a> {
+ type Item = (&'a Attribute, &'a AttributeValue);
+
+ fn next(&mut self) -> Option<Self::Item> {
+ self.0.next()
+ }
+
+ fn size_hint(&self) -> (usize, Option<usize>) {
+ self.0.size_hint()
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_attributes_basic() {
+ let mut attributes = Attributes::from_iter([
+ (Attribute::ContentType, "test"),
+ (Attribute::CacheControl, "control"),
+ ]);
+
+ assert!(!attributes.is_empty());
+ assert_eq!(attributes.len(), 2);
+
+ assert_eq!(
+ attributes.get(&Attribute::ContentType),
+ Some(&"test".into())
+ );
+
+ let metav = "control".into();
+ assert_eq!(attributes.get(&Attribute::CacheControl), Some(&metav));
+ assert_eq!(
+ attributes.insert(Attribute::CacheControl, "v1".into()),
+ Some(metav)
+ );
+ assert_eq!(attributes.len(), 2);
+
+ assert_eq!(
+ attributes.remove(&Attribute::CacheControl).unwrap(),
+ "v1".into()
+ );
+ assert_eq!(attributes.len(), 1);
+
+ let metav: AttributeValue = "v2".into();
+ attributes.insert(Attribute::CacheControl, metav.clone());
+ assert_eq!(attributes.get(&Attribute::CacheControl), Some(&metav));
+ assert_eq!(attributes.len(), 2);
+ }
+}
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index c1789ed143e..e81ef6aa220 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -35,23 +35,21 @@ use crate::client::GetOptionsExt;
use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::{
- ClientOptions, GetOptions, ListResult, MultipartId, Path, PutPayload,
PutResult, Result,
- RetryConfig,
+ Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId,
Path, PutPayload,
+ PutResult, Result, RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::{Buf, Bytes};
+use hyper::header::{CACHE_CONTROL, CONTENT_LENGTH};
use hyper::http;
use hyper::http::HeaderName;
use itertools::Itertools;
use md5::{Digest, Md5};
use percent_encoding::{utf8_percent_encode, PercentEncode};
use quick_xml::events::{self as xml_events};
-use reqwest::{
- header::{CONTENT_LENGTH, CONTENT_TYPE},
- Client as ReqwestClient, Method, RequestBuilder, Response,
-};
+use reqwest::{header::CONTENT_TYPE, Client as ReqwestClient, Method,
RequestBuilder, Response};
use ring::digest;
use ring::digest::Context;
use serde::{Deserialize, Serialize};
@@ -344,6 +342,7 @@ impl S3Client {
&'a self,
path: &'a Path,
payload: PutPayload,
+ attributes: Attributes,
with_encryption_headers: bool,
) -> Request<'a> {
let url = self.config.path_url(path);
@@ -363,8 +362,21 @@ impl S3Client {
)
}
- if let Some(value) = self.config.client_options.get_content_type(path)
{
- builder = builder.header(CONTENT_TYPE, value);
+ let mut has_content_type = false;
+ for (k, v) in &attributes {
+ builder = match k {
+ Attribute::CacheControl => builder.header(CACHE_CONTROL,
v.as_ref()),
+ Attribute::ContentType => {
+ has_content_type = true;
+ builder.header(CONTENT_TYPE, v.as_ref())
+ }
+ };
+ }
+
+ if !has_content_type {
+ if let Some(value) =
self.config.client_options.get_content_type(path) {
+ builder = builder.header(CONTENT_TYPE, value);
+ }
}
Request {
@@ -556,7 +568,7 @@ impl S3Client {
let part = (part_idx + 1).to_string();
let response = self
- .put_request(path, data, false)
+ .put_request(path, data, Attributes::default(), false)
.query(&[("partNumber", &part), ("uploadId", upload_id)])
.idempotent(true)
.send()
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 9e741c9142d..43bd38a6de2 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -156,7 +156,8 @@ impl ObjectStore for AmazonS3 {
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
- let mut request = self.client.put_request(location, payload, true);
+ let attrs = opts.attributes;
+ let mut request = self.client.put_request(location, payload, attrs,
true);
let tags = opts.tags.encoded();
if !tags.is_empty() && !self.client.config.disable_tagging {
request = request.header(&TAGS_HEADER, tags);
@@ -403,7 +404,7 @@ mod tests {
let test_not_exists = config.copy_if_not_exists.is_some();
let test_conditional_put = config.conditional_put.is_some();
- put_get_delete_list_opts(&integration).await;
+ put_get_delete_list(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
@@ -412,6 +413,7 @@ mod tests {
multipart(&integration, &integration).await;
signing(&integration).await;
s3_encryption(&integration).await;
+ put_get_attributes(&integration).await;
// Object tagging is not supported by S3 Express One Zone
if config.session_provider.is_none() {
@@ -432,12 +434,12 @@ mod tests {
// run integration test with unsigned payload enabled
let builder = AmazonS3Builder::from_env().with_unsigned_payload(true);
let integration = builder.build().unwrap();
- put_get_delete_list_opts(&integration).await;
+ put_get_delete_list(&integration).await;
// run integration test with checksum set to sha256
let builder =
AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256);
let integration = builder.build().unwrap();
- put_get_delete_list_opts(&integration).await;
+ put_get_delete_list(&integration).await;
match &integration.client.config.copy_if_not_exists {
Some(S3CopyIfNotExists::Dynamo(d)) =>
dynamo::integration_test(&integration, d).await,
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index d5972d0a8c1..134609eb262 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -27,14 +27,15 @@ use crate::multipart::PartId;
use crate::path::DELIMITER;
use crate::util::{deserialize_rfc1123, GetRange};
use crate::{
- ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode,
PutOptions, PutPayload,
- PutResult, Result, RetryConfig,
+ Attribute, Attributes, ClientOptions, GetOptions, ListResult, ObjectMeta,
Path, PutMode,
+ PutOptions, PutPayload, PutResult, Result, RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::{Buf, Bytes};
use chrono::{DateTime, Utc};
+use hyper::header::CACHE_CONTROL;
use hyper::http::HeaderName;
use reqwest::header::CONTENT_TYPE;
use reqwest::{
@@ -187,9 +188,8 @@ impl<'a> PutRequest<'a> {
Self { builder, ..self }
}
- fn set_idempotent(mut self, idempotent: bool) -> Self {
- self.idempotent = idempotent;
- self
+ fn set_idempotent(self, idempotent: bool) -> Self {
+ Self { idempotent, ..self }
}
async fn send(self) -> Result<Response> {
@@ -199,7 +199,7 @@ impl<'a> PutRequest<'a> {
.header(CONTENT_LENGTH, self.payload.content_length())
.with_azure_authorization(&credential, &self.config.account)
.retryable(&self.config.retry_config)
- .idempotent(true)
+ .idempotent(self.idempotent)
.payload(Some(self.payload))
.send()
.await
@@ -233,13 +233,31 @@ impl AzureClient {
self.config.get_credential().await
}
- fn put_request<'a>(&'a self, path: &'a Path, payload: PutPayload) ->
PutRequest<'a> {
+ fn put_request<'a>(
+ &'a self,
+ path: &'a Path,
+ payload: PutPayload,
+ attributes: Attributes,
+ ) -> PutRequest<'a> {
let url = self.config.path_url(path);
let mut builder = self.client.request(Method::PUT, url);
- if let Some(value) =
self.config().client_options.get_content_type(path) {
- builder = builder.header(CONTENT_TYPE, value);
+ let mut has_content_type = false;
+ for (k, v) in &attributes {
+ builder = match k {
+ Attribute::CacheControl => builder.header(CACHE_CONTROL,
v.as_ref()),
+ Attribute::ContentType => {
+ has_content_type = true;
+ builder.header(CONTENT_TYPE, v.as_ref())
+ }
+ };
+ }
+
+ if !has_content_type {
+ if let Some(value) =
self.config.client_options.get_content_type(path) {
+ builder = builder.header(CONTENT_TYPE, value);
+ }
}
PutRequest {
@@ -258,7 +276,7 @@ impl AzureClient {
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
- let builder = self.put_request(path, payload);
+ let builder = self.put_request(path, payload, opts.attributes);
let builder = match &opts.mode {
PutMode::Overwrite => builder.set_idempotent(true),
@@ -288,7 +306,7 @@ impl AzureClient {
let content_id = format!("{part_idx:20}");
let block_id = BASE64_STANDARD.encode(&content_id);
- self.put_request(path, payload)
+ self.put_request(path, payload, Attributes::default())
.query(&[("comp", "block"), ("blockid", &block_id)])
.set_idempotent(true)
.send()
@@ -304,8 +322,9 @@ impl AzureClient {
.map(|part| BlockId::from(part.content_id))
.collect();
+ let payload = BlockList { blocks }.to_xml().into();
let response = self
- .put_request(path, BlockList { blocks }.to_xml().into())
+ .put_request(path, payload, Attributes::default())
.query(&[("comp", "blocklist")])
.set_idempotent(true)
.send()
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 8dc52422b7d..3bb57c45aa6 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -276,7 +276,7 @@ mod tests {
crate::test_util::maybe_skip_integration!();
let integration = MicrosoftAzureBuilder::from_env().build().unwrap();
- put_get_delete_list_opts(&integration).await;
+ put_get_delete_list(&integration).await;
get_opts(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
@@ -292,7 +292,12 @@ mod tests {
let client = Arc::clone(&integration.client);
async move { client.get_blob_tagging(&p).await }
})
- .await
+ .await;
+
+ // Azurite doesn't support attributes properly
+ if !integration.client.config().is_emulator {
+ put_get_attributes(&integration).await;
+ }
}
#[ignore = "Used for manual testing against a real storage account."]
diff --git a/object_store/src/client/get.rs b/object_store/src/client/get.rs
index 2e399e523ed..f700457611f 100644
--- a/object_store/src/client/get.rs
+++ b/object_store/src/client/get.rs
@@ -19,10 +19,10 @@ use std::ops::Range;
use crate::client::header::{header_meta, HeaderConfig};
use crate::path::Path;
-use crate::{GetOptions, GetRange, GetResult, GetResultPayload, Result};
+use crate::{Attribute, Attributes, GetOptions, GetRange, GetResult,
GetResultPayload, Result};
use async_trait::async_trait;
use futures::{StreamExt, TryStreamExt};
-use hyper::header::CONTENT_RANGE;
+use hyper::header::{CACHE_CONTROL, CONTENT_RANGE, CONTENT_TYPE};
use hyper::StatusCode;
use reqwest::header::ToStrError;
use reqwest::Response;
@@ -117,6 +117,12 @@ enum GetResultError {
#[snafu(display("Content-Range header contained non UTF-8 characters"))]
InvalidContentRange { source: ToStrError },
+ #[snafu(display("Cache-Control header contained non UTF-8 characters"))]
+ InvalidCacheControl { source: ToStrError },
+
+ #[snafu(display("Content-Type header contained non UTF-8 characters"))]
+ InvalidContentType { source: ToStrError },
+
#[snafu(display("Requested {expected:?}, got {actual:?}"))]
UnexpectedRange {
expected: Range<usize>,
@@ -161,6 +167,16 @@ fn get_result<T: GetClient>(
0..meta.size
};
+ let mut attributes = Attributes::new();
+ if let Some(x) = response.headers().get(CACHE_CONTROL) {
+ let x = x.to_str().context(InvalidCacheControlSnafu)?;
+ attributes.insert(Attribute::CacheControl, x.to_string().into());
+ }
+ if let Some(x) = response.headers().get(CONTENT_TYPE) {
+ let x = x.to_str().context(InvalidContentTypeSnafu)?;
+ attributes.insert(Attribute::ContentType, x.to_string().into());
+ }
+
let stream = response
.bytes_stream()
.map_err(|source| crate::Error::Generic {
@@ -172,6 +188,7 @@ fn get_result<T: GetClient>(
Ok(GetResult {
range,
meta,
+ attributes,
payload: GetResultPayload::Stream(stream),
})
}
diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs
index 7728f38954f..3fefbb56834 100644
--- a/object_store/src/client/mod.rs
+++ b/object_store/src/client/mod.rs
@@ -485,7 +485,7 @@ impl ClientOptions {
/// mime type if it was defined initially through
/// `ClientOptions::with_content_type_for_suffix`
///
- /// Otherwise returns the default mime type if it was defined
+ /// Otherwise, returns the default mime type if it was defined
/// earlier through `ClientOptions::with_default_content_type`
pub fn get_content_type(&self, path: &Path) -> Option<&str> {
match path.extension() {
diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs
index f91217f6f9a..4ee03eaad62 100644
--- a/object_store/src/gcp/client.rs
+++ b/object_store/src/gcp/client.rs
@@ -29,14 +29,14 @@ use crate::multipart::PartId;
use crate::path::{Path, DELIMITER};
use crate::util::hex_encode;
use crate::{
- ClientOptions, GetOptions, ListResult, MultipartId, PutMode, PutOptions,
PutPayload, PutResult,
- Result, RetryConfig,
+ Attribute, Attributes, ClientOptions, GetOptions, ListResult, MultipartId,
PutMode, PutOptions,
+ PutPayload, PutResult, Result, RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
use base64::Engine;
use bytes::Buf;
-use hyper::header::CONTENT_LENGTH;
+use hyper::header::{CACHE_CONTROL, CONTENT_LENGTH, CONTENT_TYPE};
use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
use reqwest::header::HeaderName;
use reqwest::{header, Client, Method, RequestBuilder, Response, StatusCode};
@@ -45,6 +45,7 @@ use snafu::{OptionExt, ResultExt, Snafu};
use std::sync::Arc;
const VERSION_HEADER: &str = "x-goog-generation";
+const DEFAULT_CONTENT_TYPE: &str = "application/octet-stream";
static VERSION_MATCH: HeaderName =
HeaderName::from_static("x-goog-if-generation-match");
@@ -323,19 +324,31 @@ impl GoogleCloudStorageClient {
/// Perform a put request
<https://cloud.google.com/storage/docs/xml-api/put-object-upload>
///
/// Returns the new ETag
- pub fn put_request<'a>(&'a self, path: &'a Path, payload: PutPayload) ->
PutRequest<'a> {
+ pub fn put_request<'a>(
+ &'a self,
+ path: &'a Path,
+ payload: PutPayload,
+ attributes: Attributes,
+ ) -> PutRequest<'a> {
let url = self.object_url(path);
+ let mut builder = self.client.request(Method::PUT, url);
+
+ let mut has_content_type = false;
+ for (k, v) in &attributes {
+ builder = match k {
+ Attribute::CacheControl => builder.header(CACHE_CONTROL,
v.as_ref()),
+ Attribute::ContentType => {
+ has_content_type = true;
+ builder.header(CONTENT_TYPE, v.as_ref())
+ }
+ };
+ }
- let content_type = self
- .config
- .client_options
- .get_content_type(path)
- .unwrap_or("application/octet-stream");
-
- let builder = self
- .client
- .request(Method::PUT, url)
- .header(header::CONTENT_TYPE, content_type);
+ if !has_content_type {
+ let opts = &self.config.client_options;
+ let value =
opts.get_content_type(path).unwrap_or(DEFAULT_CONTENT_TYPE);
+ builder = builder.header(CONTENT_TYPE, value)
+ }
PutRequest {
path,
@@ -352,7 +365,7 @@ impl GoogleCloudStorageClient {
payload: PutPayload,
opts: PutOptions,
) -> Result<PutResult> {
- let builder = self.put_request(path, payload);
+ let builder = self.put_request(path, payload, opts.attributes);
let builder = match &opts.mode {
PutMode::Overwrite => builder.set_idempotent(true),
@@ -386,7 +399,7 @@ impl GoogleCloudStorageClient {
("uploadId", upload_id),
];
let result = self
- .put_request(path, data)
+ .put_request(path, data, Attributes::new())
.query(query)
.set_idempotent(true)
.send()
@@ -459,7 +472,7 @@ impl GoogleCloudStorageClient {
if completed_parts.is_empty() {
// GCS doesn't allow empty multipart uploads
let result = self
- .put_request(path, Default::default())
+ .put_request(path, Default::default(), Attributes::new())
.set_idempotent(true)
.send()
.await?;
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 149da76f559..af6e671cbc3 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -292,6 +292,8 @@ mod test {
// Fake GCS server doesn't currently honor preconditions
get_opts(&integration).await;
put_opts(&integration, true).await;
+ // Fake GCS server doesn't currently support attributes
+ put_get_attributes(&integration).await;
}
}
diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs
index 39f68ece65a..cf259196ba4 100644
--- a/object_store/src/http/client.rs
+++ b/object_store/src/http/client.rs
@@ -21,11 +21,11 @@ use crate::client::retry::{self, RetryConfig, RetryExt};
use crate::client::GetOptionsExt;
use crate::path::{Path, DELIMITER};
use crate::util::deserialize_rfc1123;
-use crate::{ClientOptions, GetOptions, ObjectMeta, PutPayload, Result};
+use crate::{Attribute, Attributes, ClientOptions, GetOptions, ObjectMeta,
PutPayload, Result};
use async_trait::async_trait;
use bytes::Buf;
use chrono::{DateTime, Utc};
-use hyper::header::CONTENT_LENGTH;
+use hyper::header::{CACHE_CONTROL, CONTENT_LENGTH};
use percent_encoding::percent_decode_str;
use reqwest::header::CONTENT_TYPE;
use reqwest::{Method, Response, StatusCode};
@@ -157,13 +157,32 @@ impl Client {
Ok(())
}
- pub async fn put(&self, location: &Path, payload: PutPayload) ->
Result<Response> {
+ pub async fn put(
+ &self,
+ location: &Path,
+ payload: PutPayload,
+ attributes: Attributes,
+ ) -> Result<Response> {
let mut retry = false;
loop {
let url = self.path_url(location);
let mut builder = self.client.put(url);
- if let Some(value) =
self.client_options.get_content_type(location) {
- builder = builder.header(CONTENT_TYPE, value);
+
+ let mut has_content_type = false;
+ for (k, v) in &attributes {
+ builder = match k {
+ Attribute::CacheControl => builder.header(CACHE_CONTROL,
v.as_ref()),
+ Attribute::ContentType => {
+ has_content_type = true;
+ builder.header(CONTENT_TYPE, v.as_ref())
+ }
+ };
+ }
+
+ if !has_content_type {
+ if let Some(value) =
self.client_options.get_content_type(location) {
+ builder = builder.header(CONTENT_TYPE, value);
+ }
}
let resp = builder
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index a838a0f479d..d6ba4f4d913 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -105,7 +105,7 @@ impl ObjectStore for HttpStore {
return Err(crate::Error::NotImplemented);
}
- let response = self.client.put(location, payload).await?;
+ let response = self.client.put(location, payload,
opts.attributes).await?;
let e_tag = match get_etag(response.headers()) {
Ok(e_tag) => Some(e_tag),
Err(crate::client::header::Error::MissingEtag) => None,
@@ -260,7 +260,7 @@ mod tests {
.build()
.unwrap();
- put_get_delete_list_opts(&integration).await;
+ put_get_delete_list(&integration).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 157852ff9a6..b492d93894a 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -543,6 +543,10 @@ mod payload;
mod upload;
mod util;
+mod attributes;
+
+pub use attributes::*;
+
pub use parse::{parse_url, parse_url_opts};
pub use payload::*;
pub use upload::*;
@@ -989,6 +993,8 @@ pub struct GetResult {
pub meta: ObjectMeta,
/// The range of bytes returned by this request
pub range: Range<usize>,
+ /// Additional object attributes
+ pub attributes: Attributes,
}
/// The kind of a [`GetResult`]
@@ -1114,6 +1120,10 @@ pub struct PutOptions {
///
/// Implementations that don't support object tagging should ignore this
pub tags: TagSet,
+ /// Provide a set of [`Attributes`]
+ ///
+ /// Implementations that don't support an attribute should return an error
+ pub attributes: Attributes,
}
impl From<PutMode> for PutOptions {
@@ -1251,10 +1261,6 @@ mod tests {
use rand::{thread_rng, Rng};
pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) {
- put_get_delete_list_opts(storage).await
- }
-
- pub(crate) async fn put_get_delete_list_opts(storage: &DynObjectStore) {
delete_fixtures(storage).await;
let content_list = flatten_list_stream(storage, None).await.unwrap();
@@ -1674,6 +1680,28 @@ mod tests {
storage.delete(&path).await.unwrap();
}
+ pub(crate) async fn put_get_attributes(integration: &dyn ObjectStore) {
+ // Test handling of attributes
+ let attributes = Attributes::from_iter([
+ (Attribute::ContentType, "text/html; charset=utf-8"),
+ (Attribute::CacheControl, "max-age=604800"),
+ ]);
+
+ let path = Path::from("attributes");
+ let opts = PutOptions {
+ attributes: attributes.clone(),
+ ..Default::default()
+ };
+ match integration.put_opts(&path, "foo".into(), opts).await {
+ Ok(_) => {
+ let r = integration.get(&path).await.unwrap();
+ assert_eq!(r.attributes, attributes);
+ }
+ Err(Error::NotImplemented) => {}
+ Err(e) => panic!("{e}"),
+ }
+ }
+
pub(crate) async fn get_opts(storage: &dyn ObjectStore) {
let path = Path::from("test");
storage.put(&path, "foo".into()).await.unwrap();
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index d5581cdc8f5..a3695ad9174 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -38,8 +38,8 @@ use crate::{
maybe_spawn_blocking,
path::{absolute_path_to_url, Path},
util::InvalidGetRange,
- GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload,
ObjectMeta, ObjectStore,
- PutMode, PutOptions, PutPayload, PutResult, Result, UploadPart,
+ Attributes, GetOptions, GetResult, GetResultPayload, ListResult,
MultipartUpload, ObjectMeta,
+ ObjectStore, PutMode, PutOptions, PutPayload, PutResult, Result,
UploadPart,
};
/// A specialized `Error` for filesystem object store-related errors
@@ -346,6 +346,10 @@ impl ObjectStore for LocalFileSystem {
return Err(crate::Error::NotImplemented);
}
+ if !opts.attributes.is_empty() {
+ return Err(crate::Error::NotImplemented);
+ }
+
let path = self.path_to_filesystem(location)?;
maybe_spawn_blocking(move || {
let (mut file, staging_path) = new_staged_upload(&path)?;
@@ -421,6 +425,7 @@ impl ObjectStore for LocalFileSystem {
Ok(GetResult {
payload: GetResultPayload::File(file, path),
+ attributes: Attributes::default(),
range,
meta,
})
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index d42e6f231c0..e34b28fd27c 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -30,8 +30,9 @@ use snafu::{OptionExt, ResultExt, Snafu};
use crate::multipart::{MultipartStore, PartId};
use crate::util::InvalidGetRange;
use crate::{
- path::Path, GetRange, GetResult, GetResultPayload, ListResult,
MultipartId, MultipartUpload,
- ObjectMeta, ObjectStore, PutMode, PutOptions, PutResult, Result,
UpdateVersion, UploadPart,
+ path::Path, Attributes, GetRange, GetResult, GetResultPayload, ListResult,
MultipartId,
+ MultipartUpload, ObjectMeta, ObjectStore, PutMode, PutOptions, PutResult,
Result,
+ UpdateVersion, UploadPart,
};
use crate::{GetOptions, PutPayload};
@@ -88,15 +89,22 @@ pub struct InMemory {
struct Entry {
data: Bytes,
last_modified: DateTime<Utc>,
+ attributes: Attributes,
e_tag: usize,
}
impl Entry {
- fn new(data: Bytes, last_modified: DateTime<Utc>, e_tag: usize) -> Self {
+ fn new(
+ data: Bytes,
+ last_modified: DateTime<Utc>,
+ e_tag: usize,
+ attributes: Attributes,
+ ) -> Self {
Self {
data,
last_modified,
e_tag,
+ attributes,
}
}
}
@@ -116,10 +124,10 @@ struct PartStorage {
type SharedStorage = Arc<RwLock<Storage>>;
impl Storage {
- fn insert(&mut self, location: &Path, bytes: Bytes) -> usize {
+ fn insert(&mut self, location: &Path, bytes: Bytes, attributes:
Attributes) -> usize {
let etag = self.next_etag;
self.next_etag += 1;
- let entry = Entry::new(bytes, Utc::now(), etag);
+ let entry = Entry::new(bytes, Utc::now(), etag, attributes);
self.overwrite(location, entry);
etag
}
@@ -200,7 +208,7 @@ impl ObjectStore for InMemory {
) -> Result<PutResult> {
let mut storage = self.storage.write();
let etag = storage.next_etag;
- let entry = Entry::new(payload.into(), Utc::now(), etag);
+ let entry = Entry::new(payload.into(), Utc::now(), etag,
opts.attributes);
match opts.mode {
PutMode::Overwrite => storage.overwrite(location, entry),
@@ -247,6 +255,7 @@ impl ObjectStore for InMemory {
Ok(GetResult {
payload: GetResultPayload::Stream(stream.boxed()),
+ attributes: entry.attributes,
meta,
range,
})
@@ -363,7 +372,9 @@ impl ObjectStore for InMemory {
async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
let entry = self.entry(from).await?;
- self.storage.write().insert(to, entry.data);
+ self.storage
+ .write()
+ .insert(to, entry.data, entry.attributes);
Ok(())
}
@@ -376,7 +387,7 @@ impl ObjectStore for InMemory {
}
.into());
}
- storage.insert(to, entry.data);
+ storage.insert(to, entry.data, entry.attributes);
Ok(())
}
}
@@ -426,7 +437,7 @@ impl MultipartStore for InMemory {
for x in &upload.parts {
buf.extend_from_slice(x.as_ref().unwrap())
}
- let etag = storage.insert(path, buf.into());
+ let etag = storage.insert(path, buf.into(), Default::default());
Ok(PutResult {
e_tag: Some(etag.to_string()),
version: None,
@@ -492,7 +503,11 @@ impl MultipartUpload for InMemoryUpload {
let mut buf = Vec::with_capacity(cap);
let parts = self.parts.iter().flatten();
parts.for_each(|x| buf.extend_from_slice(x));
- let etag = self.storage.write().insert(&self.location, buf.into());
+ let etag = self
+ .storage
+ .write()
+ .insert(&self.location, buf.into(), Attributes::new());
+
Ok(PutResult {
e_tag: Some(etag.to_string()),
version: None,
@@ -523,6 +538,7 @@ mod tests {
stream_get(&integration).await;
put_opts(&integration, true).await;
multipart(&integration, &integration).await;
+ put_get_attributes(&integration).await;
}
#[tokio::test]