This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cca029144 Return `PutResult` with an ETag from ObjectStore::put 
(#4934) (#4944)
4cca029144 is described below

commit 4cca0291441fe622f13db6724f8bc3efb1a31b5b
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Thu Oct 19 09:44:46 2023 +0100

    Return `PutResult` with an ETag from ObjectStore::put (#4934) (#4944)
    
    * Return ETag from ObjectStore::put (#4934)
    
    * Further tests
    
    * Clippy
    
    * Review feedback
---
 object_store/src/aws/client.rs       | 12 ++++-
 object_store/src/aws/mod.rs          | 25 +++--------
 object_store/src/azure/mod.rs        | 20 +++++----
 object_store/src/chunked.rs          |  3 +-
 object_store/src/client/header.rs    | 17 ++++---
 object_store/src/gcp/mod.rs          | 87 +++++++++++++++---------------------
 object_store/src/http/client.rs      |  4 +-
 object_store/src/http/mod.rs         | 13 ++++--
 object_store/src/lib.rs              | 35 ++++++++++++++-
 object_store/src/limit.rs            |  4 +-
 object_store/src/local.rs            | 43 +++++++++++++-----
 object_store/src/memory.rs           | 14 +++---
 object_store/src/prefix.rs           |  5 ++-
 object_store/src/throttle.rs         |  5 ++-
 object_store/tests/get_range_file.rs |  4 +-
 15 files changed, 169 insertions(+), 122 deletions(-)

diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 8a45a9f3ac..eb81e92fb9 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -21,6 +21,7 @@ use crate::aws::{
     AwsCredentialProvider, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET,
 };
 use crate::client::get::GetClient;
+use crate::client::header::get_etag;
 use crate::client::list::ListClient;
 use crate::client::list_response::ListResponse;
 use crate::client::retry::RetryExt;
@@ -122,6 +123,11 @@ pub(crate) enum Error {
 
     #[snafu(display("Got invalid multipart response: {}", source))]
     InvalidMultipartResponse { source: quick_xml::de::DeError },
+
+    #[snafu(display("Unable to extract metadata from headers: {}", source))]
+    Metadata {
+        source: crate::client::header::Error,
+    },
 }
 
 impl From<Error> for crate::Error {
@@ -243,12 +249,14 @@ impl S3Client {
     }
 
     /// Make an S3 PUT request 
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html>
+    ///
+    /// Returns the ETag
     pub async fn put_request<T: Serialize + ?Sized + Sync>(
         &self,
         path: &Path,
         bytes: Bytes,
         query: &T,
-    ) -> Result<Response> {
+    ) -> Result<String> {
         let credential = self.get_credential().await?;
         let url = self.config.path_url(path);
         let mut builder = self.client.request(Method::PUT, url);
@@ -287,7 +295,7 @@ impl S3Client {
                 path: path.as_ref(),
             })?;
 
-        Ok(response)
+        Ok(get_etag(response.headers()).context(MetadataSnafu)?)
     }
 
     /// Make an S3 Delete request 
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html>
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index d3c50861c1..6d5aecea2d 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -59,7 +59,7 @@ use crate::multipart::{PartId, PutPart, WriteMultiPart};
 use crate::signer::Signer;
 use crate::{
     ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
-    ObjectStore, Path, Result, RetryConfig,
+    ObjectStore, Path, PutResult, Result, RetryConfig,
 };
 
 mod checksum;
@@ -109,12 +109,6 @@ enum Error {
     #[snafu(display("Missing SecretAccessKey"))]
     MissingSecretAccessKey,
 
-    #[snafu(display("ETag Header missing from response"))]
-    MissingEtag,
-
-    #[snafu(display("Received header containing non-ASCII data"))]
-    BadHeader { source: reqwest::header::ToStrError },
-
     #[snafu(display("Unable parse source url. Url: {}, Error: {}", url, 
source))]
     UnableToParseUrl {
         source: url::ParseError,
@@ -273,9 +267,9 @@ impl Signer for AmazonS3 {
 
 #[async_trait]
 impl ObjectStore for AmazonS3 {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
-        self.client.put_request(location, bytes, &()).await?;
-        Ok(())
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
+        let e_tag = self.client.put_request(location, bytes, &()).await?;
+        Ok(PutResult { e_tag: Some(e_tag) })
     }
 
     async fn put_multipart(
@@ -365,10 +359,9 @@ struct S3MultiPartUpload {
 #[async_trait]
 impl PutPart for S3MultiPartUpload {
     async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
-        use reqwest::header::ETAG;
         let part = (part_idx + 1).to_string();
 
-        let response = self
+        let content_id = self
             .client
             .put_request(
                 &self.location,
@@ -377,13 +370,7 @@ impl PutPart for S3MultiPartUpload {
             )
             .await?;
 
-        let etag = response.headers().get(ETAG).context(MissingEtagSnafu)?;
-
-        let etag = etag.to_str().context(BadHeaderSnafu)?;
-
-        Ok(PartId {
-            content_id: etag.to_string(),
-        })
+        Ok(PartId { content_id })
     }
 
     async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 2a08c67758..0e638efc39 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -31,7 +31,7 @@ use crate::{
     multipart::{PartId, PutPart, WriteMultiPart},
     path::Path,
     ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
-    ObjectStore, Result, RetryConfig,
+    ObjectStore, PutResult, Result, RetryConfig,
 };
 use async_trait::async_trait;
 use base64::prelude::BASE64_STANDARD;
@@ -62,6 +62,7 @@ mod credential;
 /// [`CredentialProvider`] for [`MicrosoftAzure`]
 pub type AzureCredentialProvider =
     Arc<dyn CredentialProvider<Credential = AzureCredential>>;
+use crate::client::header::get_etag;
 pub use credential::AzureCredential;
 
 const STORE: &str = "MicrosoftAzure";
@@ -81,9 +82,6 @@ const MSI_ENDPOINT_ENV_KEY: &str = "IDENTITY_ENDPOINT";
 #[derive(Debug, Snafu)]
 #[allow(missing_docs)]
 enum Error {
-    #[snafu(display("Received header containing non-ASCII data"))]
-    BadHeader { source: reqwest::header::ToStrError },
-
     #[snafu(display("Unable parse source url. Url: {}, Error: {}", url, 
source))]
     UnableToParseUrl {
         source: url::ParseError,
@@ -126,8 +124,10 @@ enum Error {
     #[snafu(display("Configuration key: '{}' is not known.", key))]
     UnknownConfigurationKey { key: String },
 
-    #[snafu(display("ETag Header missing from response"))]
-    MissingEtag,
+    #[snafu(display("Unable to extract metadata from headers: {}", source))]
+    Metadata {
+        source: crate::client::header::Error,
+    },
 }
 
 impl From<Error> for super::Error {
@@ -170,11 +170,13 @@ impl std::fmt::Display for MicrosoftAzure {
 
 #[async_trait]
 impl ObjectStore for MicrosoftAzure {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
-        self.client
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
+        let response = self
+            .client
             .put_request(location, Some(bytes), false, &())
             .await?;
-        Ok(())
+        let e_tag = Some(get_etag(response.headers()).context(MetadataSnafu)?);
+        Ok(PutResult { e_tag })
     }
 
     async fn put_multipart(
diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs
index d3e02b4127..5694c55d78 100644
--- a/object_store/src/chunked.rs
+++ b/object_store/src/chunked.rs
@@ -30,6 +30,7 @@ use tokio::io::AsyncWrite;
 use crate::path::Path;
 use crate::{
     GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, 
ObjectStore,
+    PutResult,
 };
 use crate::{MultipartId, Result};
 
@@ -62,7 +63,7 @@ impl Display for ChunkedStore {
 
 #[async_trait]
 impl ObjectStore for ChunkedStore {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
         self.inner.put(location, bytes).await
     }
 
diff --git a/object_store/src/client/header.rs 
b/object_store/src/client/header.rs
index 6499eff5ae..17f83a2ba8 100644
--- a/object_store/src/client/header.rs
+++ b/object_store/src/client/header.rs
@@ -64,6 +64,12 @@ pub enum Error {
     },
 }
 
+/// Extracts an etag from the provided [`HeaderMap`]
+pub fn get_etag(headers: &HeaderMap) -> Result<String, Error> {
+    let e_tag = headers.get(ETAG).ok_or(Error::MissingEtag)?;
+    Ok(e_tag.to_str().context(BadHeaderSnafu)?.to_string())
+}
+
 /// Extracts [`ObjectMeta`] from the provided [`HeaderMap`]
 pub fn header_meta(
     location: &Path,
@@ -81,13 +87,10 @@ pub fn header_meta(
         None => Utc.timestamp_nanos(0),
     };
 
-    let e_tag = match headers.get(ETAG) {
-        Some(e_tag) => {
-            let e_tag = e_tag.to_str().context(BadHeaderSnafu)?;
-            Some(e_tag.to_string())
-        }
-        None if cfg.etag_required => return Err(Error::MissingEtag),
-        None => None,
+    let e_tag = match get_etag(headers) {
+        Ok(e_tag) => Some(e_tag),
+        Err(Error::MissingEtag) if !cfg.etag_required => None,
+        Err(e) => return Err(e),
     };
 
     let content_length = headers
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 513e396cba..97755c07c6 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -54,7 +54,7 @@ use crate::{
     multipart::{PartId, PutPart, WriteMultiPart},
     path::{Path, DELIMITER},
     ClientOptions, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta,
-    ObjectStore, Result, RetryConfig,
+    ObjectStore, PutResult, Result, RetryConfig,
 };
 
 use credential::{InstanceCredentialProvider, ServiceAccountCredentials};
@@ -65,6 +65,7 @@ const STORE: &str = "GCS";
 
 /// [`CredentialProvider`] for [`GoogleCloudStorage`]
 pub type GcpCredentialProvider = Arc<dyn CredentialProvider<Credential = 
GcpCredential>>;
+use crate::client::header::get_etag;
 use crate::gcp::credential::{ApplicationDefaultCredentials, 
DEFAULT_GCS_BASE_URL};
 pub use credential::GcpCredential;
 
@@ -155,11 +156,10 @@ enum Error {
     #[snafu(display("Configuration key: '{}' is not known.", key))]
     UnknownConfigurationKey { key: String },
 
-    #[snafu(display("ETag Header missing from response"))]
-    MissingEtag,
-
-    #[snafu(display("Received header containing non-ASCII data"))]
-    BadHeader { source: header::ToStrError },
+    #[snafu(display("Unable to extract metadata from headers: {}", source))]
+    Metadata {
+        source: crate::client::header::Error,
+    },
 }
 
 impl From<Error> for super::Error {
@@ -247,7 +247,14 @@ impl GoogleCloudStorageClient {
     }
 
     /// Perform a put request 
<https://cloud.google.com/storage/docs/xml-api/put-object-upload>
-    async fn put_request(&self, path: &Path, payload: Bytes) -> Result<()> {
+    ///
+    /// Returns the new ETag
+    async fn put_request<T: Serialize + ?Sized + Sync>(
+        &self,
+        path: &Path,
+        payload: Bytes,
+        query: &T,
+    ) -> Result<String> {
         let credential = self.get_credential().await?;
         let url = self.object_url(path);
 
@@ -256,8 +263,10 @@ impl GoogleCloudStorageClient {
             .get_content_type(path)
             .unwrap_or("application/octet-stream");
 
-        self.client
+        let response = self
+            .client
             .request(Method::PUT, url)
+            .query(query)
             .bearer_auth(&credential.bearer)
             .header(header::CONTENT_TYPE, content_type)
             .header(header::CONTENT_LENGTH, payload.len())
@@ -268,7 +277,7 @@ impl GoogleCloudStorageClient {
                 path: path.as_ref(),
             })?;
 
-        Ok(())
+        Ok(get_etag(response.headers()).context(MetadataSnafu)?)
     }
 
     /// Initiate a multi-part upload 
<https://cloud.google.com/storage/docs/xml-api/post-object-multipart>
@@ -469,7 +478,7 @@ impl ListClient for GoogleCloudStorageClient {
 
 struct GCSMultipartUpload {
     client: Arc<GoogleCloudStorageClient>,
-    encoded_path: String,
+    path: Path,
     multipart_id: MultipartId,
 }
 
@@ -478,38 +487,17 @@ impl PutPart for GCSMultipartUpload {
     /// Upload an object part 
<https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
     async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
         let upload_id = self.multipart_id.clone();
-        let url = format!(
-            "{}/{}/{}",
-            self.client.base_url, self.client.bucket_name_encoded, 
self.encoded_path
-        );
-
-        let credential = self.client.get_credential().await?;
-
-        let response = self
+        let content_id = self
             .client
-            .client
-            .request(Method::PUT, &url)
-            .bearer_auth(&credential.bearer)
-            .query(&[
-                ("partNumber", format!("{}", part_idx + 1)),
-                ("uploadId", upload_id),
-            ])
-            .header(header::CONTENT_TYPE, "application/octet-stream")
-            .header(header::CONTENT_LENGTH, format!("{}", buf.len()))
-            .body(buf)
-            .send_retry(&self.client.retry_config)
-            .await
-            .context(PutRequestSnafu {
-                path: &self.encoded_path,
-            })?;
-
-        let content_id = response
-            .headers()
-            .get("ETag")
-            .context(MissingEtagSnafu)?
-            .to_str()
-            .context(BadHeaderSnafu)?
-            .to_string();
+            .put_request(
+                &self.path,
+                buf.into(),
+                &[
+                    ("partNumber", format!("{}", part_idx + 1)),
+                    ("uploadId", upload_id),
+                ],
+            )
+            .await?;
 
         Ok(PartId { content_id })
     }
@@ -517,10 +505,7 @@ impl PutPart for GCSMultipartUpload {
     /// Complete a multipart upload 
<https://cloud.google.com/storage/docs/xml-api/post-object-complete>
     async fn complete(&self, completed_parts: Vec<PartId>) -> Result<()> {
         let upload_id = self.multipart_id.clone();
-        let url = format!(
-            "{}/{}/{}",
-            self.client.base_url, self.client.bucket_name_encoded, 
self.encoded_path
-        );
+        let url = self.client.object_url(&self.path);
 
         let parts = completed_parts
             .into_iter()
@@ -550,7 +535,7 @@ impl PutPart for GCSMultipartUpload {
             .send_retry(&self.client.retry_config)
             .await
             .context(PostRequestSnafu {
-                path: &self.encoded_path,
+                path: self.path.as_ref(),
             })?;
 
         Ok(())
@@ -559,8 +544,9 @@ impl PutPart for GCSMultipartUpload {
 
 #[async_trait]
 impl ObjectStore for GoogleCloudStorage {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
-        self.client.put_request(location, bytes).await
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
+        let e_tag = self.client.put_request(location, bytes, &()).await?;
+        Ok(PutResult { e_tag: Some(e_tag) })
     }
 
     async fn put_multipart(
@@ -569,12 +555,9 @@ impl ObjectStore for GoogleCloudStorage {
     ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
         let upload_id = self.client.multipart_initiate(location).await?;
 
-        let encoded_path =
-            percent_encode(location.to_string().as_bytes(), 
NON_ALPHANUMERIC).to_string();
-
         let inner = GCSMultipartUpload {
             client: Arc::clone(&self.client),
-            encoded_path,
+            path: location.clone(),
             multipart_id: upload_id.clone(),
         };
 
diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs
index b2a6ac0aa3..4c2a7fcf8d 100644
--- a/object_store/src/http/client.rs
+++ b/object_store/src/http/client.rs
@@ -160,7 +160,7 @@ impl Client {
         Ok(())
     }
 
-    pub async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+    pub async fn put(&self, location: &Path, bytes: Bytes) -> Result<Response> 
{
         let mut retry = false;
         loop {
             let url = self.path_url(location);
@@ -170,7 +170,7 @@ impl Client {
             }
 
             match builder.send_retry(&self.retry_config).await {
-                Ok(_) => return Ok(()),
+                Ok(response) => return Ok(response),
                 Err(source) => match source.status() {
                     // Some implementations return 404 instead of 409
                     Some(StatusCode::CONFLICT | StatusCode::NOT_FOUND) if 
!retry => {
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index 2fd7850b6b..e41e4f9901 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -41,11 +41,12 @@ use tokio::io::AsyncWrite;
 use url::Url;
 
 use crate::client::get::GetClientExt;
+use crate::client::header::get_etag;
 use crate::http::client::Client;
 use crate::path::Path;
 use crate::{
     ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, 
MultipartId,
-    ObjectMeta, ObjectStore, Result, RetryConfig,
+    ObjectMeta, ObjectStore, PutResult, Result, RetryConfig,
 };
 
 mod client;
@@ -95,8 +96,14 @@ impl std::fmt::Display for HttpStore {
 
 #[async_trait]
 impl ObjectStore for HttpStore {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
-        self.client.put(location, bytes).await
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
+        let response = self.client.put(location, bytes).await?;
+        let e_tag = match get_etag(response.headers()) {
+            Ok(e_tag) => Some(e_tag),
+            Err(crate::client::header::Error::MissingEtag) => None,
+            Err(source) => return Err(Error::Metadata { source }.into()),
+        };
+        Ok(PutResult { e_tag })
     }
 
     async fn put_multipart(
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 9b396444fa..018f0f5e8d 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -300,7 +300,7 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// The operation is guaranteed to be atomic, it will either successfully
     /// write the entirety of `bytes` to `location`, or fail. No clients
     /// should be able to observe a partially written object
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()>;
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult>;
 
     /// Get a multi-part upload that allows writing data in chunks
     ///
@@ -528,7 +528,7 @@ macro_rules! as_ref_impl {
     ($type:ty) => {
         #[async_trait]
         impl ObjectStore for $type {
-            async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+            async fn put(&self, location: &Path, bytes: Bytes) -> 
Result<PutResult> {
                 self.as_ref().put(location, bytes).await
             }
 
@@ -659,6 +659,8 @@ pub struct ObjectMeta {
     /// The size in bytes of the object
     pub size: usize,
     /// The unique identifier for the object
+    ///
+    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
     pub e_tag: Option<String>,
 }
 
@@ -850,6 +852,15 @@ impl GetResult {
     }
 }
 
+/// Result for a put request
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct PutResult {
+    /// The unique identifier for the object
+    ///
+    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
+    pub e_tag: Option<String>,
+}
+
 /// A specialized `Result` for object store-related errors
 pub type Result<T, E = Error> = std::result::Result<T, E>;
 
@@ -1383,6 +1394,26 @@ mod tests {
             ..GetOptions::default()
         };
         storage.get_opts(&path, options).await.unwrap();
+
+        let result = storage.put(&path, "test".into()).await.unwrap();
+        let new_tag = result.e_tag.unwrap();
+        assert_ne!(tag, new_tag);
+
+        let meta = storage.head(&path).await.unwrap();
+        assert_eq!(meta.e_tag.unwrap(), new_tag);
+
+        let options = GetOptions {
+            if_match: Some(new_tag),
+            ..GetOptions::default()
+        };
+        storage.get_opts(&path, options).await.unwrap();
+
+        let options = GetOptions {
+            if_match: Some(tag),
+            ..GetOptions::default()
+        };
+        let err = storage.get_opts(&path, options).await.unwrap_err();
+        assert!(matches!(err, Error::Precondition { .. }), "{err}");
     }
 
     /// Returns a chunk of length `chunk_length`
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index 00cbce023c..8a453813c2 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -19,7 +19,7 @@
 
 use crate::{
     BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, 
MultipartId,
-    ObjectMeta, ObjectStore, Path, Result, StreamExt,
+    ObjectMeta, ObjectStore, Path, PutResult, Result, StreamExt,
 };
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -72,7 +72,7 @@ impl<T: ObjectStore> std::fmt::Display for LimitStore<T> {
 
 #[async_trait]
 impl<T: ObjectStore> ObjectStore for LimitStore<T> {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
         let _permit = self.semaphore.acquire().await.unwrap();
         self.inner.put(location, bytes).await
     }
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index 38467c3a9e..4b7c96346e 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -20,7 +20,7 @@ use crate::{
     maybe_spawn_blocking,
     path::{absolute_path_to_url, Path},
     GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, 
ObjectMeta,
-    ObjectStore, Result,
+    ObjectStore, PutResult, Result,
 };
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -36,6 +36,7 @@ use std::ops::Range;
 use std::pin::Pin;
 use std::sync::Arc;
 use std::task::Poll;
+use std::time::SystemTime;
 use std::{collections::BTreeSet, convert::TryFrom, io};
 use std::{collections::VecDeque, path::PathBuf};
 use tokio::io::AsyncWrite;
@@ -270,7 +271,7 @@ impl Config {
 
 #[async_trait]
 impl ObjectStore for LocalFileSystem {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
         let path = self.config.path_to_filesystem(location)?;
         maybe_spawn_blocking(move || {
             let (mut file, suffix) = new_staged_upload(&path)?;
@@ -282,8 +283,17 @@ impl ObjectStore for LocalFileSystem {
                 })
                 .map_err(|e| {
                     let _ = std::fs::remove_file(&staging_path); // Attempt to 
cleanup
-                    e.into()
-                })
+                    e
+                })?;
+
+            let metadata = file.metadata().map_err(|e| Error::Metadata {
+                source: e.into(),
+                path: path.to_string_lossy().to_string(),
+            })?;
+
+            Ok(PutResult {
+                e_tag: Some(get_etag(&metadata)),
+            })
         })
         .await
     }
@@ -959,24 +969,33 @@ fn last_modified(metadata: &Metadata) -> DateTime<Utc> {
         .into()
 }
 
+fn get_etag(metadata: &Metadata) -> String {
+    let inode = get_inode(metadata);
+    let size = metadata.len();
+    let mtime = metadata
+        .modified()
+        .ok()
+        .and_then(|mtime| mtime.duration_since(SystemTime::UNIX_EPOCH).ok())
+        .unwrap_or_default()
+        .as_micros();
+
+    // Use an ETag scheme based on that used by many popular HTTP servers
+    // <https://httpd.apache.org/docs/2.2/mod/core.html#fileetag>
+    // 
<https://stackoverflow.com/questions/47512043/how-etags-are-generated-and-configured>
+    format!("{inode:x}-{mtime:x}-{size:x}")
+}
+
 fn convert_metadata(metadata: Metadata, location: Path) -> Result<ObjectMeta> {
     let last_modified = last_modified(&metadata);
     let size = 
usize::try_from(metadata.len()).context(FileSizeOverflowedUsizeSnafu {
         path: location.as_ref(),
     })?;
-    let inode = get_inode(&metadata);
-    let mtime = last_modified.timestamp_micros();
-
-    // Use an ETag scheme based on that used by many popular HTTP servers
-    // <https://httpd.apache.org/docs/2.2/mod/core.html#fileetag>
-    // 
<https://stackoverflow.com/questions/47512043/how-etags-are-generated-and-configured>
-    let etag = format!("{inode:x}-{mtime:x}-{size:x}");
 
     Ok(ObjectMeta {
         location,
         last_modified,
         size,
-        e_tag: Some(etag),
+        e_tag: Some(get_etag(&metadata)),
     })
 }
 
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index 00b330b5eb..952b457397 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -17,7 +17,8 @@
 
 //! An in-memory object store implementation
 use crate::{
-    path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, 
ObjectStore, Result,
+    path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, 
ObjectStore,
+    PutResult, Result,
 };
 use crate::{GetOptions, MultipartId};
 use async_trait::async_trait;
@@ -106,11 +107,12 @@ struct Storage {
 type SharedStorage = Arc<RwLock<Storage>>;
 
 impl Storage {
-    fn insert(&mut self, location: &Path, bytes: Bytes) {
+    fn insert(&mut self, location: &Path, bytes: Bytes) -> usize {
         let etag = self.next_etag;
         self.next_etag += 1;
         let entry = Entry::new(bytes, Utc::now(), etag);
         self.map.insert(location.clone(), entry);
+        etag
     }
 }
 
@@ -122,9 +124,11 @@ impl std::fmt::Display for InMemory {
 
 #[async_trait]
 impl ObjectStore for InMemory {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
-        self.storage.write().insert(location, bytes);
-        Ok(())
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
+        let etag = self.storage.write().insert(location, bytes);
+        Ok(PutResult {
+            e_tag: Some(etag.to_string()),
+        })
     }
 
     async fn put_multipart(
diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
index 3776dec2e8..21f6c1d99d 100644
--- a/object_store/src/prefix.rs
+++ b/object_store/src/prefix.rs
@@ -23,7 +23,8 @@ use tokio::io::AsyncWrite;
 
 use crate::path::Path;
 use crate::{
-    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
Result,
+    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
PutResult,
+    Result,
 };
 
 #[doc(hidden)]
@@ -79,7 +80,7 @@ impl<T: ObjectStore> PrefixStore<T> {
 
 #[async_trait::async_trait]
 impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
         let full_path = self.full_path(location);
         self.inner.put(&full_path, bytes).await
     }
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
index f716a11f8a..d6f191baf8 100644
--- a/object_store/src/throttle.rs
+++ b/object_store/src/throttle.rs
@@ -21,7 +21,8 @@ use std::ops::Range;
 use std::{convert::TryInto, sync::Arc};
 
 use crate::{
-    path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, 
ObjectStore, Result,
+    path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, 
ObjectStore,
+    PutResult, Result,
 };
 use crate::{GetOptions, MultipartId};
 use async_trait::async_trait;
@@ -147,7 +148,7 @@ impl<T: ObjectStore> std::fmt::Display for 
ThrottledStore<T> {
 
 #[async_trait]
 impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
         sleep(self.config().wait_put_per_call).await;
 
         self.inner.put(location, bytes).await
diff --git a/object_store/tests/get_range_file.rs 
b/object_store/tests/get_range_file.rs
index 25c4692606..5703d7f248 100644
--- a/object_store/tests/get_range_file.rs
+++ b/object_store/tests/get_range_file.rs
@@ -23,7 +23,7 @@ use futures::stream::BoxStream;
 use object_store::local::LocalFileSystem;
 use object_store::path::Path;
 use object_store::{
-    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore,
+    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
PutResult,
 };
 use std::fmt::Formatter;
 use tempfile::tempdir;
@@ -40,7 +40,7 @@ impl std::fmt::Display for MyStore {
 
 #[async_trait]
 impl ObjectStore for MyStore {
-    async fn put(&self, path: &Path, data: Bytes) -> object_store::Result<()> {
+    async fn put(&self, path: &Path, data: Bytes) -> 
object_store::Result<PutResult> {
         self.0.put(path, data).await
     }
 

Reply via email to