This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 4cca029144 Return `PutResult` with an ETag from ObjectStore::put
(#4934) (#4944)
4cca029144 is described below
commit 4cca0291441fe622f13db6724f8bc3efb1a31b5b
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Oct 19 09:44:46 2023 +0100
Return `PutResult` with an ETag from ObjectStore::put (#4934) (#4944)
* Return ETag from ObjectStore::put (#4934)
* Further tests
* Clippy
* Review feedback
---
object_store/src/aws/client.rs | 12 ++++-
object_store/src/aws/mod.rs | 25 +++--------
object_store/src/azure/mod.rs | 20 +++++----
object_store/src/chunked.rs | 3 +-
object_store/src/client/header.rs | 17 ++++---
object_store/src/gcp/mod.rs | 87 +++++++++++++++---------------------
object_store/src/http/client.rs | 4 +-
object_store/src/http/mod.rs | 13 ++++--
object_store/src/lib.rs | 35 ++++++++++++++-
object_store/src/limit.rs | 4 +-
object_store/src/local.rs | 43 +++++++++++++-----
object_store/src/memory.rs | 14 +++---
object_store/src/prefix.rs | 5 ++-
object_store/src/throttle.rs | 5 ++-
object_store/tests/get_range_file.rs | 4 +-
15 files changed, 169 insertions(+), 122 deletions(-)
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 8a45a9f3ac..eb81e92fb9 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -21,6 +21,7 @@ use crate::aws::{
AwsCredentialProvider, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET,
};
use crate::client::get::GetClient;
+use crate::client::header::get_etag;
use crate::client::list::ListClient;
use crate::client::list_response::ListResponse;
use crate::client::retry::RetryExt;
@@ -122,6 +123,11 @@ pub(crate) enum Error {
#[snafu(display("Got invalid multipart response: {}", source))]
InvalidMultipartResponse { source: quick_xml::de::DeError },
+
+ #[snafu(display("Unable to extract metadata from headers: {}", source))]
+ Metadata {
+ source: crate::client::header::Error,
+ },
}
impl From<Error> for crate::Error {
@@ -243,12 +249,14 @@ impl S3Client {
}
/// Make an S3 PUT request
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html>
+ ///
+ /// Returns the ETag
pub async fn put_request<T: Serialize + ?Sized + Sync>(
&self,
path: &Path,
bytes: Bytes,
query: &T,
- ) -> Result<Response> {
+ ) -> Result<String> {
let credential = self.get_credential().await?;
let url = self.config.path_url(path);
let mut builder = self.client.request(Method::PUT, url);
@@ -287,7 +295,7 @@ impl S3Client {
path: path.as_ref(),
})?;
- Ok(response)
+ Ok(get_etag(response.headers()).context(MetadataSnafu)?)
}
/// Make an S3 Delete request
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html>
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index d3c50861c1..6d5aecea2d 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -59,7 +59,7 @@ use crate::multipart::{PartId, PutPart, WriteMultiPart};
use crate::signer::Signer;
use crate::{
ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
- ObjectStore, Path, Result, RetryConfig,
+ ObjectStore, Path, PutResult, Result, RetryConfig,
};
mod checksum;
@@ -109,12 +109,6 @@ enum Error {
#[snafu(display("Missing SecretAccessKey"))]
MissingSecretAccessKey,
- #[snafu(display("ETag Header missing from response"))]
- MissingEtag,
-
- #[snafu(display("Received header containing non-ASCII data"))]
- BadHeader { source: reqwest::header::ToStrError },
-
#[snafu(display("Unable parse source url. Url: {}, Error: {}", url,
source))]
UnableToParseUrl {
source: url::ParseError,
@@ -273,9 +267,9 @@ impl Signer for AmazonS3 {
#[async_trait]
impl ObjectStore for AmazonS3 {
- async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
- self.client.put_request(location, bytes, &()).await?;
- Ok(())
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
+ let e_tag = self.client.put_request(location, bytes, &()).await?;
+ Ok(PutResult { e_tag: Some(e_tag) })
}
async fn put_multipart(
@@ -365,10 +359,9 @@ struct S3MultiPartUpload {
#[async_trait]
impl PutPart for S3MultiPartUpload {
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
- use reqwest::header::ETAG;
let part = (part_idx + 1).to_string();
- let response = self
+ let content_id = self
.client
.put_request(
&self.location,
@@ -377,13 +370,7 @@ impl PutPart for S3MultiPartUpload {
)
.await?;
- let etag = response.headers().get(ETAG).context(MissingEtagSnafu)?;
-
- let etag = etag.to_str().context(BadHeaderSnafu)?;
-
- Ok(PartId {
- content_id: etag.to_string(),
- })
+ Ok(PartId { content_id })
}
async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 2a08c67758..0e638efc39 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -31,7 +31,7 @@ use crate::{
multipart::{PartId, PutPart, WriteMultiPart},
path::Path,
ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
- ObjectStore, Result, RetryConfig,
+ ObjectStore, PutResult, Result, RetryConfig,
};
use async_trait::async_trait;
use base64::prelude::BASE64_STANDARD;
@@ -62,6 +62,7 @@ mod credential;
/// [`CredentialProvider`] for [`MicrosoftAzure`]
pub type AzureCredentialProvider =
Arc<dyn CredentialProvider<Credential = AzureCredential>>;
+use crate::client::header::get_etag;
pub use credential::AzureCredential;
const STORE: &str = "MicrosoftAzure";
@@ -81,9 +82,6 @@ const MSI_ENDPOINT_ENV_KEY: &str = "IDENTITY_ENDPOINT";
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
enum Error {
- #[snafu(display("Received header containing non-ASCII data"))]
- BadHeader { source: reqwest::header::ToStrError },
-
#[snafu(display("Unable parse source url. Url: {}, Error: {}", url,
source))]
UnableToParseUrl {
source: url::ParseError,
@@ -126,8 +124,10 @@ enum Error {
#[snafu(display("Configuration key: '{}' is not known.", key))]
UnknownConfigurationKey { key: String },
- #[snafu(display("ETag Header missing from response"))]
- MissingEtag,
+ #[snafu(display("Unable to extract metadata from headers: {}", source))]
+ Metadata {
+ source: crate::client::header::Error,
+ },
}
impl From<Error> for super::Error {
@@ -170,11 +170,13 @@ impl std::fmt::Display for MicrosoftAzure {
#[async_trait]
impl ObjectStore for MicrosoftAzure {
- async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
- self.client
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
+ let response = self
+ .client
.put_request(location, Some(bytes), false, &())
.await?;
- Ok(())
+ let e_tag = Some(get_etag(response.headers()).context(MetadataSnafu)?);
+ Ok(PutResult { e_tag })
}
async fn put_multipart(
diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs
index d3e02b4127..5694c55d78 100644
--- a/object_store/src/chunked.rs
+++ b/object_store/src/chunked.rs
@@ -30,6 +30,7 @@ use tokio::io::AsyncWrite;
use crate::path::Path;
use crate::{
GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta,
ObjectStore,
+ PutResult,
};
use crate::{MultipartId, Result};
@@ -62,7 +63,7 @@ impl Display for ChunkedStore {
#[async_trait]
impl ObjectStore for ChunkedStore {
- async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
self.inner.put(location, bytes).await
}
diff --git a/object_store/src/client/header.rs
b/object_store/src/client/header.rs
index 6499eff5ae..17f83a2ba8 100644
--- a/object_store/src/client/header.rs
+++ b/object_store/src/client/header.rs
@@ -64,6 +64,12 @@ pub enum Error {
},
}
+/// Extracts an etag from the provided [`HeaderMap`]
+pub fn get_etag(headers: &HeaderMap) -> Result<String, Error> {
+ let e_tag = headers.get(ETAG).ok_or(Error::MissingEtag)?;
+ Ok(e_tag.to_str().context(BadHeaderSnafu)?.to_string())
+}
+
/// Extracts [`ObjectMeta`] from the provided [`HeaderMap`]
pub fn header_meta(
location: &Path,
@@ -81,13 +87,10 @@ pub fn header_meta(
None => Utc.timestamp_nanos(0),
};
- let e_tag = match headers.get(ETAG) {
- Some(e_tag) => {
- let e_tag = e_tag.to_str().context(BadHeaderSnafu)?;
- Some(e_tag.to_string())
- }
- None if cfg.etag_required => return Err(Error::MissingEtag),
- None => None,
+ let e_tag = match get_etag(headers) {
+ Ok(e_tag) => Some(e_tag),
+ Err(Error::MissingEtag) if !cfg.etag_required => None,
+ Err(e) => return Err(e),
};
let content_length = headers
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 513e396cba..97755c07c6 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -54,7 +54,7 @@ use crate::{
multipart::{PartId, PutPart, WriteMultiPart},
path::{Path, DELIMITER},
ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
- ObjectStore, Result, RetryConfig,
+ ObjectStore, PutResult, Result, RetryConfig,
};
use credential::{InstanceCredentialProvider, ServiceAccountCredentials};
@@ -65,6 +65,7 @@ const STORE: &str = "GCS";
/// [`CredentialProvider`] for [`GoogleCloudStorage`]
pub type GcpCredentialProvider = Arc<dyn CredentialProvider<Credential =
GcpCredential>>;
+use crate::client::header::get_etag;
use crate::gcp::credential::{ApplicationDefaultCredentials,
DEFAULT_GCS_BASE_URL};
pub use credential::GcpCredential;
@@ -155,11 +156,10 @@ enum Error {
#[snafu(display("Configuration key: '{}' is not known.", key))]
UnknownConfigurationKey { key: String },
- #[snafu(display("ETag Header missing from response"))]
- MissingEtag,
-
- #[snafu(display("Received header containing non-ASCII data"))]
- BadHeader { source: header::ToStrError },
+ #[snafu(display("Unable to extract metadata from headers: {}", source))]
+ Metadata {
+ source: crate::client::header::Error,
+ },
}
impl From<Error> for super::Error {
@@ -247,7 +247,14 @@ impl GoogleCloudStorageClient {
}
/// Perform a put request
<https://cloud.google.com/storage/docs/xml-api/put-object-upload>
- async fn put_request(&self, path: &Path, payload: Bytes) -> Result<()> {
+ ///
+ /// Returns the new ETag
+ async fn put_request<T: Serialize + ?Sized + Sync>(
+ &self,
+ path: &Path,
+ payload: Bytes,
+ query: &T,
+ ) -> Result<String> {
let credential = self.get_credential().await?;
let url = self.object_url(path);
@@ -256,8 +263,10 @@ impl GoogleCloudStorageClient {
.get_content_type(path)
.unwrap_or("application/octet-stream");
- self.client
+ let response = self
+ .client
.request(Method::PUT, url)
+ .query(query)
.bearer_auth(&credential.bearer)
.header(header::CONTENT_TYPE, content_type)
.header(header::CONTENT_LENGTH, payload.len())
@@ -268,7 +277,7 @@ impl GoogleCloudStorageClient {
path: path.as_ref(),
})?;
- Ok(())
+ Ok(get_etag(response.headers()).context(MetadataSnafu)?)
}
/// Initiate a multi-part upload
<https://cloud.google.com/storage/docs/xml-api/post-object-multipart>
@@ -469,7 +478,7 @@ impl ListClient for GoogleCloudStorageClient {
struct GCSMultipartUpload {
client: Arc<GoogleCloudStorageClient>,
- encoded_path: String,
+ path: Path,
multipart_id: MultipartId,
}
@@ -478,38 +487,17 @@ impl PutPart for GCSMultipartUpload {
/// Upload an object part
<https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
let upload_id = self.multipart_id.clone();
- let url = format!(
- "{}/{}/{}",
- self.client.base_url, self.client.bucket_name_encoded,
self.encoded_path
- );
-
- let credential = self.client.get_credential().await?;
-
- let response = self
+ let content_id = self
.client
- .client
- .request(Method::PUT, &url)
- .bearer_auth(&credential.bearer)
- .query(&[
- ("partNumber", format!("{}", part_idx + 1)),
- ("uploadId", upload_id),
- ])
- .header(header::CONTENT_TYPE, "application/octet-stream")
- .header(header::CONTENT_LENGTH, format!("{}", buf.len()))
- .body(buf)
- .send_retry(&self.client.retry_config)
- .await
- .context(PutRequestSnafu {
- path: &self.encoded_path,
- })?;
-
- let content_id = response
- .headers()
- .get("ETag")
- .context(MissingEtagSnafu)?
- .to_str()
- .context(BadHeaderSnafu)?
- .to_string();
+ .put_request(
+ &self.path,
+ buf.into(),
+ &[
+ ("partNumber", format!("{}", part_idx + 1)),
+ ("uploadId", upload_id),
+ ],
+ )
+ .await?;
Ok(PartId { content_id })
}
@@ -517,10 +505,7 @@ impl PutPart for GCSMultipartUpload {
/// Complete a multipart upload
<https://cloud.google.com/storage/docs/xml-api/post-object-complete>
async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
let upload_id = self.multipart_id.clone();
- let url = format!(
- "{}/{}/{}",
- self.client.base_url, self.client.bucket_name_encoded,
self.encoded_path
- );
+ let url = self.client.object_url(&self.path);
let parts = completed_parts
.into_iter()
@@ -550,7 +535,7 @@ impl PutPart for GCSMultipartUpload {
.send_retry(&self.client.retry_config)
.await
.context(PostRequestSnafu {
- path: &self.encoded_path,
+ path: self.path.as_ref(),
})?;
Ok(())
@@ -559,8 +544,9 @@ impl PutPart for GCSMultipartUpload {
#[async_trait]
impl ObjectStore for GoogleCloudStorage {
- async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
- self.client.put_request(location, bytes).await
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
+ let e_tag = self.client.put_request(location, bytes, &()).await?;
+ Ok(PutResult { e_tag: Some(e_tag) })
}
async fn put_multipart(
@@ -569,12 +555,9 @@ impl ObjectStore for GoogleCloudStorage {
) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
let upload_id = self.client.multipart_initiate(location).await?;
- let encoded_path =
- percent_encode(location.to_string().as_bytes(),
NON_ALPHANUMERIC).to_string();
-
let inner = GCSMultipartUpload {
client: Arc::clone(&self.client),
- encoded_path,
+ path: location.clone(),
multipart_id: upload_id.clone(),
};
diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs
index b2a6ac0aa3..4c2a7fcf8d 100644
--- a/object_store/src/http/client.rs
+++ b/object_store/src/http/client.rs
@@ -160,7 +160,7 @@ impl Client {
Ok(())
}
- pub async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+ pub async fn put(&self, location: &Path, bytes: Bytes) -> Result<Response>
{
let mut retry = false;
loop {
let url = self.path_url(location);
@@ -170,7 +170,7 @@ impl Client {
}
match builder.send_retry(&self.retry_config).await {
- Ok(_) => return Ok(()),
+ Ok(response) => return Ok(response),
Err(source) => match source.status() {
// Some implementations return 404 instead of 409
Some(StatusCode::CONFLICT | StatusCode::NOT_FOUND) if
!retry => {
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index 2fd7850b6b..e41e4f9901 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -41,11 +41,12 @@ use tokio::io::AsyncWrite;
use url::Url;
use crate::client::get::GetClientExt;
+use crate::client::header::get_etag;
use crate::http::client::Client;
use crate::path::Path;
use crate::{
ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult,
MultipartId,
- ObjectMeta, ObjectStore, Result, RetryConfig,
+ ObjectMeta, ObjectStore, PutResult, Result, RetryConfig,
};
mod client;
@@ -95,8 +96,14 @@ impl std::fmt::Display for HttpStore {
#[async_trait]
impl ObjectStore for HttpStore {
- async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
- self.client.put(location, bytes).await
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
+ let response = self.client.put(location, bytes).await?;
+ let e_tag = match get_etag(response.headers()) {
+ Ok(e_tag) => Some(e_tag),
+ Err(crate::client::header::Error::MissingEtag) => None,
+ Err(source) => return Err(Error::Metadata { source }.into()),
+ };
+ Ok(PutResult { e_tag })
}
async fn put_multipart(
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 9b396444fa..018f0f5e8d 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -300,7 +300,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync +
Debug + 'static {
/// The operation is guaranteed to be atomic, it will either successfully
/// write the entirety of `bytes` to `location`, or fail. No clients
/// should be able to observe a partially written object
- async fn put(&self, location: &Path, bytes: Bytes) -> Result<()>;
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult>;
/// Get a multi-part upload that allows writing data in chunks
///
@@ -528,7 +528,7 @@ macro_rules! as_ref_impl {
($type:ty) => {
#[async_trait]
impl ObjectStore for $type {
- async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+ async fn put(&self, location: &Path, bytes: Bytes) ->
Result<PutResult> {
self.as_ref().put(location, bytes).await
}
@@ -659,6 +659,8 @@ pub struct ObjectMeta {
/// The size in bytes of the object
pub size: usize,
/// The unique identifier for the object
+ ///
+ /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
pub e_tag: Option<String>,
}
@@ -850,6 +852,15 @@ impl GetResult {
}
}
+/// Result for a put request
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct PutResult {
+ /// The unique identifier for the object
+ ///
+ /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
+ pub e_tag: Option<String>,
+}
+
/// A specialized `Result` for object store-related errors
pub type Result<T, E = Error> = std::result::Result<T, E>;
@@ -1383,6 +1394,26 @@ mod tests {
..GetOptions::default()
};
storage.get_opts(&path, options).await.unwrap();
+
+ let result = storage.put(&path, "test".into()).await.unwrap();
+ let new_tag = result.e_tag.unwrap();
+ assert_ne!(tag, new_tag);
+
+ let meta = storage.head(&path).await.unwrap();
+ assert_eq!(meta.e_tag.unwrap(), new_tag);
+
+ let options = GetOptions {
+ if_match: Some(new_tag),
+ ..GetOptions::default()
+ };
+ storage.get_opts(&path, options).await.unwrap();
+
+ let options = GetOptions {
+ if_match: Some(tag),
+ ..GetOptions::default()
+ };
+ let err = storage.get_opts(&path, options).await.unwrap_err();
+ assert!(matches!(err, Error::Precondition { .. }), "{err}");
}
/// Returns a chunk of length `chunk_length`
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index 00cbce023c..8a453813c2 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -19,7 +19,7 @@
use crate::{
BoxStream, GetOptions, GetResult, GetResultPayload, ListResult,
MultipartId,
- ObjectMeta, ObjectStore, Path, Result, StreamExt,
+ ObjectMeta, ObjectStore, Path, PutResult, Result, StreamExt,
};
use async_trait::async_trait;
use bytes::Bytes;
@@ -72,7 +72,7 @@ impl<T: ObjectStore> std::fmt::Display for LimitStore<T> {
#[async_trait]
impl<T: ObjectStore> ObjectStore for LimitStore<T> {
- async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
let _permit = self.semaphore.acquire().await.unwrap();
self.inner.put(location, bytes).await
}
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index 38467c3a9e..4b7c96346e 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -20,7 +20,7 @@ use crate::{
maybe_spawn_blocking,
path::{absolute_path_to_url, Path},
GetOptions, GetResult, GetResultPayload, ListResult, MultipartId,
ObjectMeta,
- ObjectStore, Result,
+ ObjectStore, PutResult, Result,
};
use async_trait::async_trait;
use bytes::Bytes;
@@ -36,6 +36,7 @@ use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
+use std::time::SystemTime;
use std::{collections::BTreeSet, convert::TryFrom, io};
use std::{collections::VecDeque, path::PathBuf};
use tokio::io::AsyncWrite;
@@ -270,7 +271,7 @@ impl Config {
#[async_trait]
impl ObjectStore for LocalFileSystem {
- async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
let path = self.config.path_to_filesystem(location)?;
maybe_spawn_blocking(move || {
let (mut file, suffix) = new_staged_upload(&path)?;
@@ -282,8 +283,17 @@ impl ObjectStore for LocalFileSystem {
})
.map_err(|e| {
let _ = std::fs::remove_file(&staging_path); // Attempt to
cleanup
- e.into()
- })
+ e
+ })?;
+
+ let metadata = file.metadata().map_err(|e| Error::Metadata {
+ source: e.into(),
+ path: path.to_string_lossy().to_string(),
+ })?;
+
+ Ok(PutResult {
+ e_tag: Some(get_etag(&metadata)),
+ })
})
.await
}
@@ -959,24 +969,33 @@ fn last_modified(metadata: &Metadata) -> DateTime<Utc> {
.into()
}
+fn get_etag(metadata: &Metadata) -> String {
+ let inode = get_inode(metadata);
+ let size = metadata.len();
+ let mtime = metadata
+ .modified()
+ .ok()
+ .and_then(|mtime| mtime.duration_since(SystemTime::UNIX_EPOCH).ok())
+ .unwrap_or_default()
+ .as_micros();
+
+ // Use an ETag scheme based on that used by many popular HTTP servers
+ // <https://httpd.apache.org/docs/2.2/mod/core.html#fileetag>
+ //
<https://stackoverflow.com/questions/47512043/how-etags-are-generated-and-configured>
+ format!("{inode:x}-{mtime:x}-{size:x}")
+}
+
fn convert_metadata(metadata: Metadata, location: Path) -> Result<ObjectMeta> {
let last_modified = last_modified(&metadata);
let size =
usize::try_from(metadata.len()).context(FileSizeOverflowedUsizeSnafu {
path: location.as_ref(),
})?;
- let inode = get_inode(&metadata);
- let mtime = last_modified.timestamp_micros();
-
- // Use an ETag scheme based on that used by many popular HTTP servers
- // <https://httpd.apache.org/docs/2.2/mod/core.html#fileetag>
- //
<https://stackoverflow.com/questions/47512043/how-etags-are-generated-and-configured>
- let etag = format!("{inode:x}-{mtime:x}-{size:x}");
Ok(ObjectMeta {
location,
last_modified,
size,
- e_tag: Some(etag),
+ e_tag: Some(get_etag(&metadata)),
})
}
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index 00b330b5eb..952b457397 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -17,7 +17,8 @@
//! An in-memory object store implementation
use crate::{
- path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta,
ObjectStore, Result,
+ path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta,
ObjectStore,
+ PutResult, Result,
};
use crate::{GetOptions, MultipartId};
use async_trait::async_trait;
@@ -106,11 +107,12 @@ struct Storage {
type SharedStorage = Arc<RwLock<Storage>>;
impl Storage {
- fn insert(&mut self, location: &Path, bytes: Bytes) {
+ fn insert(&mut self, location: &Path, bytes: Bytes) -> usize {
let etag = self.next_etag;
self.next_etag += 1;
let entry = Entry::new(bytes, Utc::now(), etag);
self.map.insert(location.clone(), entry);
+ etag
}
}
@@ -122,9 +124,11 @@ impl std::fmt::Display for InMemory {
#[async_trait]
impl ObjectStore for InMemory {
- async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
- self.storage.write().insert(location, bytes);
- Ok(())
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
+ let etag = self.storage.write().insert(location, bytes);
+ Ok(PutResult {
+ e_tag: Some(etag.to_string()),
+ })
}
async fn put_multipart(
diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
index 3776dec2e8..21f6c1d99d 100644
--- a/object_store/src/prefix.rs
+++ b/object_store/src/prefix.rs
@@ -23,7 +23,8 @@ use tokio::io::AsyncWrite;
use crate::path::Path;
use crate::{
- GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
Result,
+ GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
PutResult,
+ Result,
};
#[doc(hidden)]
@@ -79,7 +80,7 @@ impl<T: ObjectStore> PrefixStore<T> {
#[async_trait::async_trait]
impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
- async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
let full_path = self.full_path(location);
self.inner.put(&full_path, bytes).await
}
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
index f716a11f8a..d6f191baf8 100644
--- a/object_store/src/throttle.rs
+++ b/object_store/src/throttle.rs
@@ -21,7 +21,8 @@ use std::ops::Range;
use std::{convert::TryInto, sync::Arc};
use crate::{
- path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta,
ObjectStore, Result,
+ path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta,
ObjectStore,
+ PutResult, Result,
};
use crate::{GetOptions, MultipartId};
use async_trait::async_trait;
@@ -147,7 +148,7 @@ impl<T: ObjectStore> std::fmt::Display for
ThrottledStore<T> {
#[async_trait]
impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
- async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+ async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
sleep(self.config().wait_put_per_call).await;
self.inner.put(location, bytes).await
diff --git a/object_store/tests/get_range_file.rs
b/object_store/tests/get_range_file.rs
index 25c4692606..5703d7f248 100644
--- a/object_store/tests/get_range_file.rs
+++ b/object_store/tests/get_range_file.rs
@@ -23,7 +23,7 @@ use futures::stream::BoxStream;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
use object_store::{
- GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
+ GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
PutResult,
};
use std::fmt::Formatter;
use tempfile::tempdir;
@@ -40,7 +40,7 @@ impl std::fmt::Display for MyStore {
#[async_trait]
impl ObjectStore for MyStore {
- async fn put(&self, path: &Path, data: Bytes) -> object_store::Result<()> {
+ async fn put(&self, path: &Path, data: Bytes) ->
object_store::Result<PutResult> {
self.0.put(path, data).await
}