This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new e3cce56979 Conditional Put (#4879)  (#4984)
e3cce56979 is described below

commit e3cce569798d89b67b15d4d2579e592ea1c88b02
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Oct 27 11:21:03 2023 +0100

    Conditional Put (#4879)  (#4984)
    
    * Add version to PutResult
    
    * Conditional Put (#4879)
    
    * Don't support HttpStore
    
    * Add R2 Support
    
    * Update Azure StatusCode
    
    * Fixes
    
    * Clippy
    
    * Clippy
    
    * PutRequestBuilder
    
    * Clippy
    
    * Add stress test
    
    * Clippy
---
 object_store/src/aws/builder.rs                    |  30 +++-
 object_store/src/aws/client.rs                     | 177 +++++++++---------
 object_store/src/aws/mod.rs                        |  43 ++++-
 object_store/src/aws/{copy.rs => precondition.rs}  |  45 ++++-
 object_store/src/azure/client.rs                   | 139 +++++++++------
 object_store/src/azure/mod.rs                      |  17 +-
 object_store/src/chunked.rs                        |   7 +-
 object_store/src/client/header.rs                  |  17 ++
 object_store/src/client/mod.rs                     |   2 +-
 object_store/src/client/retry.rs                   |   4 +
 .../src/client/{list_response.rs => s3.rs}         |  46 ++++-
 object_store/src/gcp/client.rs                     | 197 +++++++++++++--------
 object_store/src/gcp/mod.rs                        |   9 +-
 object_store/src/http/client.rs                    |   4 +
 object_store/src/http/mod.rs                       |  15 +-
 object_store/src/lib.rs                            | 171 +++++++++++++++++-
 object_store/src/limit.rs                          |   6 +-
 object_store/src/local.rs                          |  48 +++--
 object_store/src/memory.rs                         |  67 ++++++-
 object_store/src/prefix.rs                         |   8 +-
 object_store/src/throttle.rs                       |   9 +-
 object_store/tests/get_range_file.rs               |  32 ++--
 22 files changed, 791 insertions(+), 302 deletions(-)

diff --git a/object_store/src/aws/builder.rs b/object_store/src/aws/builder.rs
index 75a5299a08..79ea75b5ab 100644
--- a/object_store/src/aws/builder.rs
+++ b/object_store/src/aws/builder.rs
@@ -20,7 +20,8 @@ use crate::aws::credential::{
     InstanceCredentialProvider, TaskCredentialProvider, WebIdentityProvider,
 };
 use crate::aws::{
-    AmazonS3, AwsCredential, AwsCredentialProvider, Checksum, 
S3CopyIfNotExists, STORE,
+    AmazonS3, AwsCredential, AwsCredentialProvider, Checksum, 
S3ConditionalPut, S3CopyIfNotExists,
+    STORE,
 };
 use crate::client::TokenCredentialProvider;
 use crate::config::ConfigValue;
@@ -152,6 +153,8 @@ pub struct AmazonS3Builder {
     skip_signature: ConfigValue<bool>,
     /// Copy if not exists
     copy_if_not_exists: Option<ConfigValue<S3CopyIfNotExists>>,
+    /// Put precondition
+    conditional_put: Option<ConfigValue<S3ConditionalPut>>,
 }
 
 /// Configuration keys for [`AmazonS3Builder`]
@@ -288,6 +291,11 @@ pub enum AmazonS3ConfigKey {
     /// See [`S3CopyIfNotExists`]
     CopyIfNotExists,
 
+    /// Configure how to provide conditional put operations
+    ///
+    /// See [`S3ConditionalPut`]
+    ConditionalPut,
+
     /// Skip signing request
     SkipSignature,
 
@@ -312,7 +320,8 @@ impl AsRef<str> for AmazonS3ConfigKey {
             Self::Checksum => "aws_checksum_algorithm",
             Self::ContainerCredentialsRelativeUri => 
"aws_container_credentials_relative_uri",
             Self::SkipSignature => "aws_skip_signature",
-            Self::CopyIfNotExists => "copy_if_not_exists",
+            Self::CopyIfNotExists => "aws_copy_if_not_exists",
+            Self::ConditionalPut => "aws_conditional_put",
             Self::Client(opt) => opt.as_ref(),
         }
     }
@@ -339,7 +348,8 @@ impl FromStr for AmazonS3ConfigKey {
             "aws_checksum_algorithm" | "checksum_algorithm" => 
Ok(Self::Checksum),
             "aws_container_credentials_relative_uri" => 
Ok(Self::ContainerCredentialsRelativeUri),
             "aws_skip_signature" | "skip_signature" => Ok(Self::SkipSignature),
-            "copy_if_not_exists" => Ok(Self::CopyIfNotExists),
+            "aws_copy_if_not_exists" | "copy_if_not_exists" => 
Ok(Self::CopyIfNotExists),
+            "aws_conditional_put" | "conditional_put" => 
Ok(Self::ConditionalPut),
             // Backwards compatibility
             "aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
             _ => match s.parse() {
@@ -446,6 +456,9 @@ impl AmazonS3Builder {
             AmazonS3ConfigKey::CopyIfNotExists => {
                 self.copy_if_not_exists = 
Some(ConfigValue::Deferred(value.into()))
             }
+            AmazonS3ConfigKey::ConditionalPut => {
+                self.conditional_put = 
Some(ConfigValue::Deferred(value.into()))
+            }
         };
         self
     }
@@ -509,6 +522,9 @@ impl AmazonS3Builder {
             AmazonS3ConfigKey::CopyIfNotExists => {
                 self.copy_if_not_exists.as_ref().map(ToString::to_string)
             }
+            AmazonS3ConfigKey::ConditionalPut => {
+                self.conditional_put.as_ref().map(ToString::to_string)
+            }
         }
     }
 
@@ -713,6 +729,12 @@ impl AmazonS3Builder {
         self
     }
 
+    /// Configure how to provide conditional put operations
+    pub fn with_conditional_put(mut self, config: S3ConditionalPut) -> Self {
+        self.conditional_put = Some(config.into());
+        self
+    }
+
     /// Create a [`AmazonS3`] instance from the provided values,
     /// consuming `self`.
     pub fn build(mut self) -> Result<AmazonS3> {
@@ -724,6 +746,7 @@ impl AmazonS3Builder {
         let region = self.region.context(MissingRegionSnafu)?;
         let checksum = self.checksum_algorithm.map(|x| x.get()).transpose()?;
         let copy_if_not_exists = self.copy_if_not_exists.map(|x| 
x.get()).transpose()?;
+        let put_precondition = self.conditional_put.map(|x| 
x.get()).transpose()?;
 
         let credentials = if let Some(credentials) = self.credentials {
             credentials
@@ -830,6 +853,7 @@ impl AmazonS3Builder {
             skip_signature: self.skip_signature.get()?,
             checksum,
             copy_if_not_exists,
+            conditional_put: put_precondition,
         };
 
         let client = Arc::new(S3Client::new(config)?);
diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 4e98f259f8..20c2a96b57 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -17,13 +17,18 @@
 
 use crate::aws::checksum::Checksum;
 use crate::aws::credential::{AwsCredential, CredentialExt};
-use crate::aws::{AwsCredentialProvider, S3CopyIfNotExists, STORE, 
STRICT_PATH_ENCODE_SET};
+use crate::aws::{
+    AwsCredentialProvider, S3ConditionalPut, S3CopyIfNotExists, STORE, 
STRICT_PATH_ENCODE_SET,
+};
 use crate::client::get::GetClient;
-use crate::client::header::get_etag;
 use crate::client::header::HeaderConfig;
+use crate::client::header::{get_put_result, get_version};
 use crate::client::list::ListClient;
-use crate::client::list_response::ListResponse;
 use crate::client::retry::RetryExt;
+use crate::client::s3::{
+    CompleteMultipartUpload, CompleteMultipartUploadResult, 
InitiateMultipartUploadResult,
+    ListResponse,
+};
 use crate::client::GetOptionsExt;
 use crate::multipart::PartId;
 use crate::path::DELIMITER;
@@ -34,17 +39,20 @@ use async_trait::async_trait;
 use base64::prelude::BASE64_STANDARD;
 use base64::Engine;
 use bytes::{Buf, Bytes};
+use hyper::http::HeaderName;
 use itertools::Itertools;
 use percent_encoding::{utf8_percent_encode, PercentEncode};
 use quick_xml::events::{self as xml_events};
 use reqwest::{
     header::{CONTENT_LENGTH, CONTENT_TYPE},
-    Client as ReqwestClient, Method, Response, StatusCode,
+    Client as ReqwestClient, Method, RequestBuilder, Response, StatusCode,
 };
 use serde::{Deserialize, Serialize};
 use snafu::{ResultExt, Snafu};
 use std::sync::Arc;
 
+const VERSION_HEADER: &str = "x-amz-version-id";
+
 /// A specialized `Error` for object store-related errors
 #[derive(Debug, Snafu)]
 #[allow(missing_docs)]
@@ -147,33 +155,6 @@ impl From<Error> for crate::Error {
     }
 }
 
-#[derive(Debug, Deserialize)]
-#[serde(rename_all = "PascalCase")]
-struct InitiateMultipart {
-    upload_id: String,
-}
-
-#[derive(Debug, Serialize)]
-#[serde(rename_all = "PascalCase", rename = "CompleteMultipartUpload")]
-struct CompleteMultipart {
-    part: Vec<MultipartPart>,
-}
-
-#[derive(Debug, Serialize)]
-struct MultipartPart {
-    #[serde(rename = "ETag")]
-    e_tag: String,
-    #[serde(rename = "PartNumber")]
-    part_number: usize,
-}
-
-#[derive(Debug, Deserialize)]
-#[serde(rename_all = "PascalCase", rename = "CompleteMultipartUploadResult")]
-struct CompleteMultipartResult {
-    #[serde(rename = "ETag")]
-    e_tag: String,
-}
-
 #[derive(Deserialize)]
 #[serde(rename_all = "PascalCase", rename = "DeleteResult")]
 struct BatchDeleteResponse {
@@ -225,12 +206,61 @@ pub struct S3Config {
     pub skip_signature: bool,
     pub checksum: Option<Checksum>,
     pub copy_if_not_exists: Option<S3CopyIfNotExists>,
+    pub conditional_put: Option<S3ConditionalPut>,
 }
 
 impl S3Config {
     pub(crate) fn path_url(&self, path: &Path) -> String {
         format!("{}/{}", self.bucket_endpoint, encode_path(path))
     }
+
+    async fn get_credential(&self) -> Result<Option<Arc<AwsCredential>>> {
+        Ok(match self.skip_signature {
+            false => Some(self.credentials.get_credential().await?),
+            true => None,
+        })
+    }
+}
+
+/// A builder for a put request allowing customisation of the headers and 
query string
+pub(crate) struct PutRequest<'a> {
+    path: &'a Path,
+    config: &'a S3Config,
+    builder: RequestBuilder,
+    payload_sha256: Option<Vec<u8>>,
+}
+
+impl<'a> PutRequest<'a> {
+    pub fn query<T: Serialize + ?Sized + Sync>(self, query: &T) -> Self {
+        let builder = self.builder.query(query);
+        Self { builder, ..self }
+    }
+
+    pub fn header(self, k: &HeaderName, v: &str) -> Self {
+        let builder = self.builder.header(k, v);
+        Self { builder, ..self }
+    }
+
+    pub async fn send(self) -> Result<PutResult> {
+        let credential = self.config.get_credential().await?;
+
+        let response = self
+            .builder
+            .with_aws_sigv4(
+                credential.as_deref(),
+                &self.config.region,
+                "s3",
+                self.config.sign_payload,
+                self.payload_sha256.as_deref(),
+            )
+            .send_retry(&self.config.retry_config)
+            .await
+            .context(PutRequestSnafu {
+                path: self.path.as_ref(),
+            })?;
+
+        Ok(get_put_result(response.headers(), 
VERSION_HEADER).context(MetadataSnafu)?)
+    }
 }
 
 #[derive(Debug)]
@@ -250,23 +280,10 @@ impl S3Client {
         &self.config
     }
 
-    async fn get_credential(&self) -> Result<Option<Arc<AwsCredential>>> {
-        Ok(match self.config.skip_signature {
-            false => Some(self.config.credentials.get_credential().await?),
-            true => None,
-        })
-    }
-
     /// Make an S3 PUT request 
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html>
     ///
     /// Returns the ETag
-    pub async fn put_request<T: Serialize + ?Sized + Sync>(
-        &self,
-        path: &Path,
-        bytes: Bytes,
-        query: &T,
-    ) -> Result<String> {
-        let credential = self.get_credential().await?;
+    pub fn put_request<'a>(&'a self, path: &'a Path, bytes: Bytes) -> 
PutRequest<'a> {
         let url = self.config.path_url(path);
         let mut builder = self.client.request(Method::PUT, url);
         let mut payload_sha256 = None;
@@ -288,22 +305,12 @@ impl S3Client {
             builder = builder.header(CONTENT_TYPE, value);
         }
 
-        let response = builder
-            .query(query)
-            .with_aws_sigv4(
-                credential.as_deref(),
-                &self.config.region,
-                "s3",
-                self.config.sign_payload,
-                payload_sha256.as_deref(),
-            )
-            .send_retry(&self.config.retry_config)
-            .await
-            .context(PutRequestSnafu {
-                path: path.as_ref(),
-            })?;
-
-        Ok(get_etag(response.headers()).context(MetadataSnafu)?)
+        PutRequest {
+            path,
+            builder,
+            payload_sha256,
+            config: &self.config,
+        }
     }
 
     /// Make an S3 Delete request 
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObject.html>
@@ -312,7 +319,7 @@ impl S3Client {
         path: &Path,
         query: &T,
     ) -> Result<()> {
-        let credential = self.get_credential().await?;
+        let credential = self.config.get_credential().await?;
         let url = self.config.path_url(path);
 
         self.client
@@ -346,7 +353,7 @@ impl S3Client {
             return Ok(Vec::new());
         }
 
-        let credential = self.get_credential().await?;
+        let credential = self.config.get_credential().await?;
         let url = format!("{}?delete", self.config.bucket_endpoint);
 
         let mut buffer = Vec::new();
@@ -444,7 +451,7 @@ impl S3Client {
 
     /// Make an S3 Copy request 
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html>
     pub async fn copy_request(&self, from: &Path, to: &Path, overwrite: bool) 
-> Result<()> {
-        let credential = self.get_credential().await?;
+        let credential = self.config.get_credential().await?;
         let url = self.config.path_url(to);
         let source = format!("{}/{}", self.config.bucket, encode_path(from));
 
@@ -492,7 +499,7 @@ impl S3Client {
     }
 
     pub async fn create_multipart(&self, location: &Path) -> 
Result<MultipartId> {
-        let credential = self.get_credential().await?;
+        let credential = self.config.get_credential().await?;
         let url = format!("{}?uploads=", self.config.path_url(location),);
 
         let response = self
@@ -512,7 +519,7 @@ impl S3Client {
             .await
             .context(CreateMultipartResponseBodySnafu)?;
 
-        let response: InitiateMultipart =
+        let response: InitiateMultipartUploadResult =
             
quick_xml::de::from_reader(response.reader()).context(InvalidMultipartResponseSnafu)?;
 
         Ok(response.upload_id)
@@ -527,15 +534,15 @@ impl S3Client {
     ) -> Result<PartId> {
         let part = (part_idx + 1).to_string();
 
-        let content_id = self
-            .put_request(
-                path,
-                data,
-                &[("partNumber", &part), ("uploadId", upload_id)],
-            )
+        let result = self
+            .put_request(path, data)
+            .query(&[("partNumber", &part), ("uploadId", upload_id)])
+            .send()
             .await?;
 
-        Ok(PartId { content_id })
+        Ok(PartId {
+            content_id: result.e_tag.unwrap(),
+        })
     }
 
     pub async fn complete_multipart(
@@ -544,19 +551,10 @@ impl S3Client {
         upload_id: &str,
         parts: Vec<PartId>,
     ) -> Result<PutResult> {
-        let parts = parts
-            .into_iter()
-            .enumerate()
-            .map(|(part_idx, part)| MultipartPart {
-                e_tag: part.content_id,
-                part_number: part_idx + 1,
-            })
-            .collect();
-
-        let request = CompleteMultipart { part: parts };
+        let request = CompleteMultipartUpload::from(parts);
         let body = quick_xml::se::to_string(&request).unwrap();
 
-        let credential = self.get_credential().await?;
+        let credential = self.config.get_credential().await?;
         let url = self.config.path_url(location);
 
         let response = self
@@ -575,16 +573,19 @@ impl S3Client {
             .await
             .context(CompleteMultipartRequestSnafu)?;
 
+        let version = get_version(response.headers(), 
VERSION_HEADER).context(MetadataSnafu)?;
+
         let data = response
             .bytes()
             .await
             .context(CompleteMultipartResponseBodySnafu)?;
 
-        let response: CompleteMultipartResult =
+        let response: CompleteMultipartUploadResult =
             
quick_xml::de::from_reader(data.reader()).context(InvalidMultipartResponseSnafu)?;
 
         Ok(PutResult {
             e_tag: Some(response.e_tag),
+            version,
         })
     }
 }
@@ -596,12 +597,12 @@ impl GetClient for S3Client {
     const HEADER_CONFIG: HeaderConfig = HeaderConfig {
         etag_required: false,
         last_modified_required: false,
-        version_header: Some("x-amz-version-id"),
+        version_header: Some(VERSION_HEADER),
     };
 
     /// Make an S3 GET request 
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_GetObject.html>
     async fn get_request(&self, path: &Path, options: GetOptions) -> 
Result<Response> {
-        let credential = self.get_credential().await?;
+        let credential = self.config.get_credential().await?;
         let url = self.config.path_url(path);
         let method = match options.head {
             true => Method::HEAD,
@@ -643,7 +644,7 @@ impl ListClient for S3Client {
         token: Option<&str>,
         offset: Option<&str>,
     ) -> Result<(ListResult, Option<String>)> {
-        let credential = self.get_credential().await?;
+        let credential = self.config.get_credential().await?;
         let url = self.config.bucket_endpoint.clone();
 
         let mut query = Vec::with_capacity(4);
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index 57254c7cf4..99e6376950 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -35,6 +35,7 @@ use async_trait::async_trait;
 use bytes::Bytes;
 use futures::stream::BoxStream;
 use futures::{StreamExt, TryStreamExt};
+use reqwest::header::{IF_MATCH, IF_NONE_MATCH};
 use reqwest::Method;
 use std::{sync::Arc, time::Duration};
 use tokio::io::AsyncWrite;
@@ -47,20 +48,20 @@ use crate::client::CredentialProvider;
 use crate::multipart::{MultiPartStore, PartId, PutPart, WriteMultiPart};
 use crate::signer::Signer;
 use crate::{
-    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
Path, PutResult,
-    Result,
+    Error, GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, 
ObjectStore, Path, PutMode,
+    PutOptions, PutResult, Result,
 };
 
 mod builder;
 mod checksum;
 mod client;
-mod copy;
 mod credential;
+mod precondition;
 mod resolve;
 
 pub use builder::{AmazonS3Builder, AmazonS3ConfigKey};
 pub use checksum::Checksum;
-pub use copy::S3CopyIfNotExists;
+pub use precondition::{S3ConditionalPut, S3CopyIfNotExists};
 pub use resolve::resolve_bucket_region;
 
 // 
http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
@@ -158,9 +159,33 @@ impl Signer for AmazonS3 {
 
 #[async_trait]
 impl ObjectStore for AmazonS3 {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
-        let e_tag = self.client.put_request(location, bytes, &()).await?;
-        Ok(PutResult { e_tag: Some(e_tag) })
+    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
+        let request = self.client.put_request(location, bytes);
+        match (opts.mode, &self.client.config().conditional_put) {
+            (PutMode::Overwrite, _) => request.send().await,
+            (PutMode::Create | PutMode::Update(_), None) => 
Err(Error::NotImplemented),
+            (PutMode::Create, Some(S3ConditionalPut::ETagMatch)) => {
+                match request.header(&IF_NONE_MATCH, "*").send().await {
+                    // Technically If-None-Match should return NotModified but 
some stores,
+                    // such as R2, instead return PreconditionFailed
+                    // 
https://developers.cloudflare.com/r2/api/s3/extensions/#conditional-operations-in-putobject
+                    Err(e @ Error::NotModified { .. } | e @ 
Error::Precondition { .. }) => {
+                        Err(Error::AlreadyExists {
+                            path: location.to_string(),
+                            source: Box::new(e),
+                        })
+                    }
+                    r => r,
+                }
+            }
+            (PutMode::Update(v), Some(S3ConditionalPut::ETagMatch)) => {
+                let etag = v.e_tag.ok_or_else(|| Error::Generic {
+                    store: STORE,
+                    source: "ETag required for conditional 
put".to_string().into(),
+                })?;
+                request.header(&IF_MATCH, etag.as_str()).send().await
+            }
+        }
     }
 
     async fn put_multipart(
@@ -306,6 +331,7 @@ mod tests {
         let config = integration.client.config();
         let is_local = config.endpoint.starts_with("http://";);
         let test_not_exists = config.copy_if_not_exists.is_some();
+        let test_conditional_put = config.conditional_put.is_some();
 
         // Localstack doesn't support listing with spaces 
https://github.com/localstack/localstack/issues/6328
         put_get_delete_list_opts(&integration, is_local).await;
@@ -319,6 +345,9 @@ mod tests {
         if test_not_exists {
             copy_if_not_exists(&integration).await;
         }
+        if test_conditional_put {
+            put_opts(&integration, true).await;
+        }
 
         // run integration test with unsigned payload enabled
         let builder = AmazonS3Builder::from_env().with_unsigned_payload(true);
diff --git a/object_store/src/aws/copy.rs b/object_store/src/aws/precondition.rs
similarity index 68%
rename from object_store/src/aws/copy.rs
rename to object_store/src/aws/precondition.rs
index da4e2809be..a50b57fe23 100644
--- a/object_store/src/aws/copy.rs
+++ b/object_store/src/aws/precondition.rs
@@ -17,8 +17,7 @@
 
 use crate::config::Parse;
 
-/// Configure how to provide [`ObjectStore::copy_if_not_exists`] for
-/// [`AmazonS3`].
+/// Configure how to provide [`ObjectStore::copy_if_not_exists`] for 
[`AmazonS3`].
 ///
 /// [`ObjectStore::copy_if_not_exists`]: crate::ObjectStore::copy_if_not_exists
 /// [`AmazonS3`]: super::AmazonS3
@@ -70,3 +69,45 @@ impl Parse for S3CopyIfNotExists {
         })
     }
 }
+
+/// Configure how to provide conditional put support for [`AmazonS3`].
+///
+/// [`AmazonS3`]: super::AmazonS3
+#[derive(Debug, Clone)]
+#[allow(missing_copy_implementations)]
+#[non_exhaustive]
+pub enum S3ConditionalPut {
+    /// Some S3-compatible stores, such as Cloudflare R2 and minio support 
conditional
+    /// put using the standard [HTTP precondition] headers If-Match and 
If-None-Match
+    ///
+    /// Encoded as `etag` ignoring whitespace
+    ///
+    /// [HTTP precondition]: 
https://datatracker.ietf.org/doc/html/rfc9110#name-preconditions
+    ETagMatch,
+}
+
+impl std::fmt::Display for S3ConditionalPut {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::ETagMatch => write!(f, "etag"),
+        }
+    }
+}
+
+impl S3ConditionalPut {
+    fn from_str(s: &str) -> Option<Self> {
+        match s.trim() {
+            "etag" => Some(Self::ETagMatch),
+            _ => None,
+        }
+    }
+}
+
+impl Parse for S3ConditionalPut {
+    fn parse(v: &str) -> crate::Result<Self> {
+        Self::from_str(v).ok_or_else(|| crate::Error::Generic {
+            store: "Config",
+            source: format!("Failed to parse \"{v}\" as 
S3PutConditional").into(),
+        })
+    }
+}
diff --git a/object_store/src/azure/client.rs b/object_store/src/azure/client.rs
index 9f47b9a815..c7bd791498 100644
--- a/object_store/src/azure/client.rs
+++ b/object_store/src/azure/client.rs
@@ -19,7 +19,7 @@ use super::credential::AzureCredential;
 use crate::azure::credential::*;
 use crate::azure::{AzureCredentialProvider, STORE};
 use crate::client::get::GetClient;
-use crate::client::header::{get_etag, HeaderConfig};
+use crate::client::header::{get_put_result, HeaderConfig};
 use crate::client::list::ListClient;
 use crate::client::retry::RetryExt;
 use crate::client::GetOptionsExt;
@@ -27,25 +27,29 @@ use crate::multipart::PartId;
 use crate::path::DELIMITER;
 use crate::util::deserialize_rfc1123;
 use crate::{
-    ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutResult, 
Result, RetryConfig,
+    ClientOptions, GetOptions, ListResult, ObjectMeta, Path, PutMode, 
PutOptions, PutResult,
+    Result, RetryConfig,
 };
 use async_trait::async_trait;
 use base64::prelude::BASE64_STANDARD;
 use base64::Engine;
 use bytes::{Buf, Bytes};
 use chrono::{DateTime, Utc};
+use hyper::http::HeaderName;
 use itertools::Itertools;
 use reqwest::header::CONTENT_TYPE;
 use reqwest::{
-    header::{HeaderValue, CONTENT_LENGTH, IF_NONE_MATCH},
-    Client as ReqwestClient, Method, Response, StatusCode,
+    header::{HeaderValue, CONTENT_LENGTH, IF_MATCH, IF_NONE_MATCH},
+    Client as ReqwestClient, Method, RequestBuilder, Response,
 };
 use serde::{Deserialize, Serialize};
-use snafu::{ResultExt, Snafu};
+use snafu::{OptionExt, ResultExt, Snafu};
 use std::collections::HashMap;
 use std::sync::Arc;
 use url::Url;
 
+const VERSION_HEADER: &str = "x-ms-version-id";
+
 /// A specialized `Error` for object store-related errors
 #[derive(Debug, Snafu)]
 #[allow(missing_docs)]
@@ -92,6 +96,9 @@ pub(crate) enum Error {
     Metadata {
         source: crate::client::header::Error,
     },
+
+    #[snafu(display("ETag required for conditional update"))]
+    MissingETag,
 }
 
 impl From<Error> for crate::Error {
@@ -134,6 +141,39 @@ impl AzureConfig {
     }
 }
 
+/// A builder for a put request allowing customisation of the headers and 
query string
+struct PutRequest<'a> {
+    path: &'a Path,
+    config: &'a AzureConfig,
+    builder: RequestBuilder,
+}
+
+impl<'a> PutRequest<'a> {
+    fn header(self, k: &HeaderName, v: &str) -> Self {
+        let builder = self.builder.header(k, v);
+        Self { builder, ..self }
+    }
+
+    fn query<T: Serialize + ?Sized + Sync>(self, query: &T) -> Self {
+        let builder = self.builder.query(query);
+        Self { builder, ..self }
+    }
+
+    async fn send(self) -> Result<Response> {
+        let credential = self.config.credentials.get_credential().await?;
+        let response = self
+            .builder
+            .with_azure_authorization(&credential, &self.config.account)
+            .send_retry(&self.config.retry_config)
+            .await
+            .context(PutRequestSnafu {
+                path: self.path.as_ref(),
+            })?;
+
+        Ok(response)
+    }
+}
+
 #[derive(Debug)]
 pub(crate) struct AzureClient {
     config: AzureConfig,
@@ -156,63 +196,52 @@ impl AzureClient {
         self.config.credentials.get_credential().await
     }
 
-    /// Make an Azure PUT request 
<https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob>
-    pub async fn put_request<T: Serialize + crate::Debug + ?Sized + Sync>(
-        &self,
-        path: &Path,
-        bytes: Option<Bytes>,
-        is_block_op: bool,
-        query: &T,
-    ) -> Result<Response> {
-        let credential = self.get_credential().await?;
+    fn put_request<'a>(&'a self, path: &'a Path, bytes: Bytes) -> 
PutRequest<'a> {
         let url = self.config.path_url(path);
 
         let mut builder = self.client.request(Method::PUT, url);
 
-        if !is_block_op {
-            builder = builder.header(&BLOB_TYPE, "BlockBlob").query(query);
-        } else {
-            builder = builder.query(query);
-        }
-
         if let Some(value) = 
self.config().client_options.get_content_type(path) {
             builder = builder.header(CONTENT_TYPE, value);
         }
 
-        if let Some(bytes) = bytes {
-            builder = builder
-                .header(CONTENT_LENGTH, HeaderValue::from(bytes.len()))
-                .body(bytes)
-        } else {
-            builder = builder.header(CONTENT_LENGTH, 
HeaderValue::from_static("0"));
+        builder = builder
+            .header(CONTENT_LENGTH, HeaderValue::from(bytes.len()))
+            .body(bytes);
+
+        PutRequest {
+            path,
+            builder,
+            config: &self.config,
         }
+    }
 
-        let response = builder
-            .with_azure_authorization(&credential, &self.config.account)
-            .send_retry(&self.config.retry_config)
-            .await
-            .context(PutRequestSnafu {
-                path: path.as_ref(),
-            })?;
+    /// Make an Azure PUT request 
<https://docs.microsoft.com/en-us/rest/api/storageservices/put-blob>
+    pub async fn put_blob(&self, path: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
+        let builder = self.put_request(path, bytes);
+
+        let builder = match &opts.mode {
+            PutMode::Overwrite => builder,
+            PutMode::Create => builder.header(&IF_NONE_MATCH, "*"),
+            PutMode::Update(v) => {
+                let etag = v.e_tag.as_ref().context(MissingETagSnafu)?;
+                builder.header(&IF_MATCH, etag)
+            }
+        };
 
-        Ok(response)
+        let response = builder.header(&BLOB_TYPE, "BlockBlob").send().await?;
+        Ok(get_put_result(response.headers(), 
VERSION_HEADER).context(MetadataSnafu)?)
     }
 
     /// PUT a block 
<https://learn.microsoft.com/en-us/rest/api/storageservices/put-block>
     pub async fn put_block(&self, path: &Path, part_idx: usize, data: Bytes) 
-> Result<PartId> {
         let content_id = format!("{part_idx:20}");
-        let block_id: BlockId = content_id.clone().into();
+        let block_id = BASE64_STANDARD.encode(&content_id);
 
-        self.put_request(
-            path,
-            Some(data),
-            true,
-            &[
-                ("comp", "block"),
-                ("blockid", &BASE64_STANDARD.encode(block_id)),
-            ],
-        )
-        .await?;
+        self.put_request(path, data)
+            .query(&[("comp", "block"), ("blockid", &block_id)])
+            .send()
+            .await?;
 
         Ok(PartId { content_id })
     }
@@ -224,15 +253,13 @@ impl AzureClient {
             .map(|part| BlockId::from(part.content_id))
             .collect();
 
-        let block_list = BlockList { blocks };
-        let block_xml = block_list.to_xml();
-
         let response = self
-            .put_request(path, Some(block_xml.into()), true, &[("comp", 
"blocklist")])
+            .put_request(path, BlockList { blocks }.to_xml().into())
+            .query(&[("comp", "blocklist")])
+            .send()
             .await?;
 
-        let e_tag = get_etag(response.headers()).context(MetadataSnafu)?;
-        Ok(PutResult { e_tag: Some(e_tag) })
+        Ok(get_put_result(response.headers(), 
VERSION_HEADER).context(MetadataSnafu)?)
     }
 
     /// Make an Azure Delete request 
<https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob>
@@ -284,13 +311,7 @@ impl AzureClient {
             .with_azure_authorization(&credential, &self.config.account)
             .send_retry(&self.config.retry_config)
             .await
-            .map_err(|err| match err.status() {
-                Some(StatusCode::CONFLICT) => crate::Error::AlreadyExists {
-                    source: Box::new(err),
-                    path: to.to_string(),
-                },
-                _ => err.error(STORE, from.to_string()),
-            })?;
+            .map_err(|err| err.error(STORE, from.to_string()))?;
 
         Ok(())
     }
@@ -303,7 +324,7 @@ impl GetClient for AzureClient {
     const HEADER_CONFIG: HeaderConfig = HeaderConfig {
         etag_required: true,
         last_modified_required: true,
-        version_header: Some("x-ms-version-id"),
+        version_header: Some(VERSION_HEADER),
     };
 
     /// Make an Azure GET request
diff --git a/object_store/src/azure/mod.rs b/object_store/src/azure/mod.rs
index 779ac2f71f..762a51dd9d 100644
--- a/object_store/src/azure/mod.rs
+++ b/object_store/src/azure/mod.rs
@@ -29,7 +29,8 @@
 use crate::{
     multipart::{PartId, PutPart, WriteMultiPart},
     path::Path,
-    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
PutResult, Result,
+    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
PutOptions, PutResult,
+    Result,
 };
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -49,7 +50,6 @@ mod credential;
 
 /// [`CredentialProvider`] for [`MicrosoftAzure`]
 pub type AzureCredentialProvider = Arc<dyn CredentialProvider<Credential = 
AzureCredential>>;
-use crate::client::header::get_etag;
 use crate::multipart::MultiPartStore;
 pub use builder::{AzureConfigKey, MicrosoftAzureBuilder};
 pub use credential::AzureCredential;
@@ -82,16 +82,8 @@ impl std::fmt::Display for MicrosoftAzure {
 
 #[async_trait]
 impl ObjectStore for MicrosoftAzure {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
-        let response = self
-            .client
-            .put_request(location, Some(bytes), false, &())
-            .await?;
-        let e_tag = get_etag(response.headers()).map_err(|e| 
crate::Error::Generic {
-            store: STORE,
-            source: Box::new(e),
-        })?;
-        Ok(PutResult { e_tag: Some(e_tag) })
+    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
+        self.client.put_blob(location, bytes, opts).await
     }
 
     async fn put_multipart(
@@ -208,6 +200,7 @@ mod tests {
         rename_and_copy(&integration).await;
         copy_if_not_exists(&integration).await;
         stream_get(&integration).await;
+        put_opts(&integration, true).await;
         multipart(&integration, &integration).await;
     }
 
diff --git a/object_store/src/chunked.rs b/object_store/src/chunked.rs
index 021f9f5015..d33556f4b1 100644
--- a/object_store/src/chunked.rs
+++ b/object_store/src/chunked.rs
@@ -29,7 +29,8 @@ use tokio::io::AsyncWrite;
 
 use crate::path::Path;
 use crate::{
-    GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, 
ObjectStore, PutResult,
+    GetOptions, GetResult, GetResultPayload, ListResult, ObjectMeta, 
ObjectStore, PutOptions,
+    PutResult,
 };
 use crate::{MultipartId, Result};
 
@@ -62,8 +63,8 @@ impl Display for ChunkedStore {
 
 #[async_trait]
 impl ObjectStore for ChunkedStore {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
-        self.inner.put(location, bytes).await
+    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
+        self.inner.put_opts(location, bytes, opts).await
     }
 
     async fn put_multipart(
diff --git a/object_store/src/client/header.rs 
b/object_store/src/client/header.rs
index e67496833b..e85bf6ba52 100644
--- a/object_store/src/client/header.rs
+++ b/object_store/src/client/header.rs
@@ -67,6 +67,23 @@ pub enum Error {
     },
 }
 
+/// Extracts a PutResult from the provided [`HeaderMap`]
+#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
+pub fn get_put_result(headers: &HeaderMap, version: &str) -> 
Result<crate::PutResult, Error> {
+    let e_tag = Some(get_etag(headers)?);
+    let version = get_version(headers, version)?;
+    Ok(crate::PutResult { e_tag, version })
+}
+
+/// Extracts a optional version from the provided [`HeaderMap`]
+#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
+pub fn get_version(headers: &HeaderMap, version: &str) -> 
Result<Option<String>, Error> {
+    Ok(match headers.get(version) {
+        Some(x) => Some(x.to_str().context(BadHeaderSnafu)?.to_string()),
+        None => None,
+    })
+}
+
 /// Extracts an etag from the provided [`HeaderMap`]
 pub fn get_etag(headers: &HeaderMap) -> Result<String, Error> {
     let e_tag = headers.get(ETAG).ok_or(Error::MissingEtag)?;
diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs
index 77eee7fc92..ae092edac0 100644
--- a/object_store/src/client/mod.rs
+++ b/object_store/src/client/mod.rs
@@ -38,7 +38,7 @@ pub mod token;
 pub mod header;
 
 #[cfg(any(feature = "aws", feature = "gcp"))]
-pub mod list_response;
+pub mod s3;
 
 use async_trait::async_trait;
 use std::collections::HashMap;
diff --git a/object_store/src/client/retry.rs b/object_store/src/client/retry.rs
index d70d6d88de..789103c0f7 100644
--- a/object_store/src/client/retry.rs
+++ b/object_store/src/client/retry.rs
@@ -79,6 +79,10 @@ impl Error {
                 path,
                 source: Box::new(self),
             },
+            Some(StatusCode::CONFLICT) => crate::Error::AlreadyExists {
+                path,
+                source: Box::new(self),
+            },
             _ => crate::Error::Generic {
                 store,
                 source: Box::new(self),
diff --git a/object_store/src/client/list_response.rs 
b/object_store/src/client/s3.rs
similarity index 68%
rename from object_store/src/client/list_response.rs
rename to object_store/src/client/s3.rs
index 7a170c5841..61237dc4be 100644
--- a/object_store/src/client/list_response.rs
+++ b/object_store/src/client/s3.rs
@@ -14,12 +14,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! The list response format used by GCP and AWS
+//! The list and multipart API used by both GCS and S3
 
+use crate::multipart::PartId;
 use crate::path::Path;
 use crate::{ListResult, ObjectMeta, Result};
 use chrono::{DateTime, Utc};
-use serde::Deserialize;
+use serde::{Deserialize, Serialize};
 
 #[derive(Debug, Deserialize)]
 #[serde(rename_all = "PascalCase")]
@@ -84,3 +85,44 @@ impl TryFrom<ListContents> for ObjectMeta {
         })
     }
 }
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct InitiateMultipartUploadResult {
+    pub upload_id: String,
+}
+
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct CompleteMultipartUpload {
+    pub part: Vec<MultipartPart>,
+}
+
+impl From<Vec<PartId>> for CompleteMultipartUpload {
+    fn from(value: Vec<PartId>) -> Self {
+        let part = value
+            .into_iter()
+            .enumerate()
+            .map(|(part_number, part)| MultipartPart {
+                e_tag: part.content_id,
+                part_number: part_number + 1,
+            })
+            .collect();
+        Self { part }
+    }
+}
+
+#[derive(Debug, Serialize)]
+pub struct MultipartPart {
+    #[serde(rename = "ETag")]
+    pub e_tag: String,
+    #[serde(rename = "PartNumber")]
+    pub part_number: usize,
+}
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "PascalCase")]
+pub struct CompleteMultipartUploadResult {
+    #[serde(rename = "ETag")]
+    pub e_tag: String,
+}
diff --git a/object_store/src/gcp/client.rs b/object_store/src/gcp/client.rs
index 8c44f90164..78964077e2 100644
--- a/object_store/src/gcp/client.rs
+++ b/object_store/src/gcp/client.rs
@@ -16,23 +16,34 @@
 // under the License.
 
 use crate::client::get::GetClient;
-use crate::client::header::{get_etag, HeaderConfig};
+use crate::client::header::{get_put_result, get_version, HeaderConfig};
 use crate::client::list::ListClient;
-use crate::client::list_response::ListResponse;
 use crate::client::retry::RetryExt;
+use crate::client::s3::{
+    CompleteMultipartUpload, CompleteMultipartUploadResult, 
InitiateMultipartUploadResult,
+    ListResponse,
+};
 use crate::client::GetOptionsExt;
 use crate::gcp::{GcpCredential, GcpCredentialProvider, STORE};
 use crate::multipart::PartId;
 use crate::path::{Path, DELIMITER};
-use crate::{ClientOptions, GetOptions, ListResult, MultipartId, PutResult, 
Result, RetryConfig};
+use crate::{
+    ClientOptions, GetOptions, ListResult, MultipartId, PutMode, PutOptions, 
PutResult, Result,
+    RetryConfig,
+};
 use async_trait::async_trait;
 use bytes::{Buf, Bytes};
 use percent_encoding::{percent_encode, utf8_percent_encode, NON_ALPHANUMERIC};
-use reqwest::{header, Client, Method, Response, StatusCode};
+use reqwest::header::HeaderName;
+use reqwest::{header, Client, Method, RequestBuilder, Response, StatusCode};
 use serde::Serialize;
-use snafu::{ResultExt, Snafu};
+use snafu::{OptionExt, ResultExt, Snafu};
 use std::sync::Arc;
 
+const VERSION_HEADER: &str = "x-goog-generation";
+
+static VERSION_MATCH: HeaderName = 
HeaderName::from_static("x-goog-if-generation-match");
+
 #[derive(Debug, Snafu)]
 enum Error {
     #[snafu(display("Error performing list request: {}", source))]
@@ -78,6 +89,18 @@ enum Error {
     Metadata {
         source: crate::client::header::Error,
     },
+
+    #[snafu(display("Version required for conditional update"))]
+    MissingVersion,
+
+    #[snafu(display("Error performing complete multipart request: {}", 
source))]
+    CompleteMultipartRequest { source: crate::client::retry::Error },
+
+    #[snafu(display("Error getting complete multipart response body: {}", 
source))]
+    CompleteMultipartResponseBody { source: reqwest::Error },
+
+    #[snafu(display("Got invalid multipart response: {}", source))]
+    InvalidMultipartResponse { source: quick_xml::de::DeError },
 }
 
 impl From<Error> for crate::Error {
@@ -107,6 +130,39 @@ pub struct GoogleCloudStorageConfig {
     pub client_options: ClientOptions,
 }
 
+/// A builder for a put request allowing customisation of the headers and 
query string
+pub struct PutRequest<'a> {
+    path: &'a Path,
+    config: &'a GoogleCloudStorageConfig,
+    builder: RequestBuilder,
+}
+
+impl<'a> PutRequest<'a> {
+    fn header(self, k: &HeaderName, v: &str) -> Self {
+        let builder = self.builder.header(k, v);
+        Self { builder, ..self }
+    }
+
+    fn query<T: Serialize + ?Sized + Sync>(self, query: &T) -> Self {
+        let builder = self.builder.query(query);
+        Self { builder, ..self }
+    }
+
+    async fn send(self) -> Result<PutResult> {
+        let credential = self.config.credentials.get_credential().await?;
+        let response = self
+            .builder
+            .bearer_auth(&credential.bearer)
+            .send_retry(&self.config.retry_config)
+            .await
+            .context(PutRequestSnafu {
+                path: self.path.as_ref(),
+            })?;
+
+        Ok(get_put_result(response.headers(), 
VERSION_HEADER).context(MetadataSnafu)?)
+    }
+}
+
 #[derive(Debug)]
 pub struct GoogleCloudStorageClient {
     config: GoogleCloudStorageConfig,
@@ -152,13 +208,7 @@ impl GoogleCloudStorageClient {
     /// Perform a put request 
<https://cloud.google.com/storage/docs/xml-api/put-object-upload>
     ///
     /// Returns the new ETag
-    pub async fn put_request<T: Serialize + ?Sized + Sync>(
-        &self,
-        path: &Path,
-        payload: Bytes,
-        query: &T,
-    ) -> Result<String> {
-        let credential = self.get_credential().await?;
+    pub fn put_request<'a>(&'a self, path: &'a Path, payload: Bytes) -> 
PutRequest<'a> {
         let url = self.object_url(path);
 
         let content_type = self
@@ -167,21 +217,38 @@ impl GoogleCloudStorageClient {
             .get_content_type(path)
             .unwrap_or("application/octet-stream");
 
-        let response = self
+        let builder = self
             .client
             .request(Method::PUT, url)
-            .query(query)
-            .bearer_auth(&credential.bearer)
             .header(header::CONTENT_TYPE, content_type)
             .header(header::CONTENT_LENGTH, payload.len())
-            .body(payload)
-            .send_retry(&self.config.retry_config)
-            .await
-            .context(PutRequestSnafu {
-                path: path.as_ref(),
-            })?;
+            .body(payload);
 
-        Ok(get_etag(response.headers()).context(MetadataSnafu)?)
+        PutRequest {
+            path,
+            builder,
+            config: &self.config,
+        }
+    }
+
+    pub async fn put(&self, path: &Path, data: Bytes, opts: PutOptions) -> 
Result<PutResult> {
+        let builder = self.put_request(path, data);
+
+        let builder = match &opts.mode {
+            PutMode::Overwrite => builder,
+            PutMode::Create => builder.header(&VERSION_MATCH, "0"),
+            PutMode::Update(v) => {
+                let etag = v.version.as_ref().context(MissingVersionSnafu)?;
+                builder.header(&VERSION_MATCH, etag)
+            }
+        };
+
+        match (opts.mode, builder.send().await) {
+            (PutMode::Create, Err(crate::Error::Precondition { path, source 
})) => {
+                Err(crate::Error::AlreadyExists { path, source })
+            }
+            (_, r) => r,
+        }
     }
 
     /// Perform a put part request 
<https://cloud.google.com/storage/docs/xml-api/put-object-multipart>
@@ -194,18 +261,15 @@ impl GoogleCloudStorageClient {
         part_idx: usize,
         data: Bytes,
     ) -> Result<PartId> {
-        let content_id = self
-            .put_request(
-                path,
-                data,
-                &[
-                    ("partNumber", &format!("{}", part_idx + 1)),
-                    ("uploadId", upload_id),
-                ],
-            )
-            .await?;
-
-        Ok(PartId { content_id })
+        let query = &[
+            ("partNumber", &format!("{}", part_idx + 1)),
+            ("uploadId", upload_id),
+        ];
+        let result = self.put_request(path, data).query(query).send().await?;
+
+        Ok(PartId {
+            content_id: result.e_tag.unwrap(),
+        })
     }
 
     /// Initiate a multi-part upload 
<https://cloud.google.com/storage/docs/xml-api/post-object-multipart>
@@ -268,17 +332,8 @@ impl GoogleCloudStorageClient {
         let upload_id = multipart_id.clone();
         let url = self.object_url(path);
 
-        let parts = completed_parts
-            .into_iter()
-            .enumerate()
-            .map(|(part_number, part)| MultipartPart {
-                e_tag: part.content_id,
-                part_number: part_number + 1,
-            })
-            .collect();
-
+        let upload_info = CompleteMultipartUpload::from(completed_parts);
         let credential = self.get_credential().await?;
-        let upload_info = CompleteMultipartUpload { parts };
 
         let data = quick_xml::se::to_string(&upload_info)
             .context(InvalidPutResponseSnafu)?
@@ -287,7 +342,7 @@ impl GoogleCloudStorageClient {
             // https://github.com/tafia/quick-xml/issues/350
             .replace("&quot;", "\"");
 
-        let result = self
+        let response = self
             .client
             .request(Method::POST, &url)
             .bearer_auth(&credential.bearer)
@@ -295,12 +350,22 @@ impl GoogleCloudStorageClient {
             .body(data)
             .send_retry(&self.config.retry_config)
             .await
-            .context(PostRequestSnafu {
-                path: path.as_ref(),
-            })?;
+            .context(CompleteMultipartRequestSnafu)?;
 
-        let etag = get_etag(result.headers()).context(MetadataSnafu)?;
-        Ok(PutResult { e_tag: Some(etag) })
+        let version = get_version(response.headers(), 
VERSION_HEADER).context(MetadataSnafu)?;
+
+        let data = response
+            .bytes()
+            .await
+            .context(CompleteMultipartResponseBodySnafu)?;
+
+        let response: CompleteMultipartUploadResult =
+            
quick_xml::de::from_reader(data.reader()).context(InvalidMultipartResponseSnafu)?;
+
+        Ok(PutResult {
+            e_tag: Some(response.e_tag),
+            version,
+        })
     }
 
     /// Perform a delete request 
<https://cloud.google.com/storage/docs/xml-api/delete-object>
@@ -334,7 +399,7 @@ impl GoogleCloudStorageClient {
             .header("x-goog-copy-source", source);
 
         if if_not_exists {
-            builder = builder.header("x-goog-if-generation-match", 0);
+            builder = builder.header(&VERSION_MATCH, 0);
         }
 
         builder
@@ -362,7 +427,7 @@ impl GetClient for GoogleCloudStorageClient {
     const HEADER_CONFIG: HeaderConfig = HeaderConfig {
         etag_required: true,
         last_modified_required: true,
-        version_header: Some("x-goog-generation"),
+        version_header: Some(VERSION_HEADER),
     };
 
     /// Perform a get request 
<https://cloud.google.com/storage/docs/xml-api/get-object-download>
@@ -375,13 +440,18 @@ impl GetClient for GoogleCloudStorageClient {
             false => Method::GET,
         };
 
-        let mut request = self.client.request(method, 
url).with_get_options(options);
+        let mut request = self.client.request(method, url);
+
+        if let Some(version) = &options.version {
+            request = request.query(&[("generation", version)]);
+        }
 
         if !credential.bearer.is_empty() {
             request = request.bearer_auth(&credential.bearer);
         }
 
         let response = request
+            .with_get_options(options)
             .send_retry(&self.config.retry_config)
             .await
             .context(GetRequestSnafu {
@@ -444,24 +514,3 @@ impl ListClient for GoogleCloudStorageClient {
         Ok((response.try_into()?, token))
     }
 }
-
-#[derive(serde::Deserialize, Debug)]
-#[serde(rename_all = "PascalCase")]
-struct InitiateMultipartUploadResult {
-    upload_id: String,
-}
-
-#[derive(serde::Serialize, Debug)]
-#[serde(rename_all = "PascalCase", rename(serialize = "Part"))]
-struct MultipartPart {
-    #[serde(rename = "PartNumber")]
-    part_number: usize,
-    e_tag: String,
-}
-
-#[derive(serde::Serialize, Debug)]
-#[serde(rename_all = "PascalCase")]
-struct CompleteMultipartUpload {
-    #[serde(rename = "Part", default)]
-    parts: Vec<MultipartPart>,
-}
diff --git a/object_store/src/gcp/mod.rs b/object_store/src/gcp/mod.rs
index 0eb3e9c23c..7721b1278a 100644
--- a/object_store/src/gcp/mod.rs
+++ b/object_store/src/gcp/mod.rs
@@ -35,7 +35,8 @@ use crate::client::CredentialProvider;
 use crate::{
     multipart::{PartId, PutPart, WriteMultiPart},
     path::Path,
-    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
PutResult, Result,
+    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
PutOptions, PutResult,
+    Result,
 };
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -107,9 +108,8 @@ impl PutPart for GCSMultipartUpload {
 
 #[async_trait]
 impl ObjectStore for GoogleCloudStorage {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
-        let e_tag = self.client.put_request(location, bytes, &()).await?;
-        Ok(PutResult { e_tag: Some(e_tag) })
+    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
+        self.client.put(location, bytes, opts).await
     }
 
     async fn put_multipart(
@@ -221,6 +221,7 @@ mod test {
             multipart(&integration, &integration).await;
             // Fake GCS server doesn't currently honor preconditions
             get_opts(&integration).await;
+            put_opts(&integration, true).await;
         }
     }
 
diff --git a/object_store/src/http/client.rs b/object_store/src/http/client.rs
index a7dbdfcbe8..8700775fb2 100644
--- a/object_store/src/http/client.rs
+++ b/object_store/src/http/client.rs
@@ -243,6 +243,10 @@ impl Client {
                 .header("Destination", self.path_url(to).as_str());
 
             if !overwrite {
+                // While the Overwrite header appears to duplicate
+                // the functionality of the If-Match: * header of HTTP/1.1, 
If-Match
+                // applies only to the Request-URI, and not to the Destination 
of a COPY
+                // or MOVE.
                 builder = builder.header("Overwrite", "F");
             }
 
diff --git a/object_store/src/http/mod.rs b/object_store/src/http/mod.rs
index 8f61011cca..cfcde27fd7 100644
--- a/object_store/src/http/mod.rs
+++ b/object_store/src/http/mod.rs
@@ -46,7 +46,7 @@ use crate::http::client::Client;
 use crate::path::Path;
 use crate::{
     ClientConfigKey, ClientOptions, GetOptions, GetResult, ListResult, 
MultipartId, ObjectMeta,
-    ObjectStore, PutResult, Result, RetryConfig,
+    ObjectStore, PutMode, PutOptions, PutResult, Result, RetryConfig,
 };
 
 mod client;
@@ -96,14 +96,23 @@ impl std::fmt::Display for HttpStore {
 
 #[async_trait]
 impl ObjectStore for HttpStore {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
+    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
+        if opts.mode != PutMode::Overwrite {
+            // TODO: Add support for If header - 
https://datatracker.ietf.org/doc/html/rfc2518#section-9.4
+            return Err(crate::Error::NotImplemented);
+        }
+
         let response = self.client.put(location, bytes).await?;
         let e_tag = match get_etag(response.headers()) {
             Ok(e_tag) => Some(e_tag),
             Err(crate::client::header::Error::MissingEtag) => None,
             Err(source) => return Err(Error::Metadata { source }.into()),
         };
-        Ok(PutResult { e_tag })
+
+        Ok(PutResult {
+            e_tag,
+            version: None,
+        })
     }
 
     async fn put_multipart(
diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs
index 9a06672298..66964304e8 100644
--- a/object_store/src/lib.rs
+++ b/object_store/src/lib.rs
@@ -299,7 +299,12 @@ pub trait ObjectStore: std::fmt::Display + Send + Sync + 
Debug + 'static {
     /// The operation is guaranteed to be atomic, it will either successfully
     /// write the entirety of `bytes` to `location`, or fail. No clients
     /// should be able to observe a partially written object
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult>;
+    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
+        self.put_opts(location, bytes, PutOptions::default()).await
+    }
+
+    /// Save the provided bytes to the specified location with the given 
options
+    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult>;
 
     /// Get a multi-part upload that allows writing data in chunks.
     ///
@@ -531,6 +536,15 @@ macro_rules! as_ref_impl {
                 self.as_ref().put(location, bytes).await
             }
 
+            async fn put_opts(
+                &self,
+                location: &Path,
+                bytes: Bytes,
+                opts: PutOptions,
+            ) -> Result<PutResult> {
+                self.as_ref().put_opts(location, bytes, opts).await
+            }
+
             async fn put_multipart(
                 &self,
                 location: &Path,
@@ -837,13 +851,65 @@ impl GetResult {
     }
 }
 
+/// Configure preconditions for the put operation
+#[derive(Debug, Clone, PartialEq, Eq, Default)]
+pub enum PutMode {
+    /// Perform an atomic write operation, overwriting any object present at 
the provided path
+    #[default]
+    Overwrite,
+    /// Perform an atomic write operation, returning [`Error::AlreadyExists`] 
if an
+    /// object already exists at the provided path
+    Create,
+    /// Perform an atomic write operation if the current version of the object 
matches the
+    /// provided [`UpdateVersion`], returning [`Error::Precondition`] otherwise
+    Update(UpdateVersion),
+}
+
+/// Uniquely identifies a version of an object to update
+///
+/// Stores will use differing combinations of `e_tag` and `version` to provide 
conditional
+/// updates, and it is therefore recommended applications preserve both
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct UpdateVersion {
+    /// The unique identifier for the newly created object
+    ///
+    /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
+    pub e_tag: Option<String>,
+    /// A version indicator for the newly created object
+    pub version: Option<String>,
+}
+
+impl From<PutResult> for UpdateVersion {
+    fn from(value: PutResult) -> Self {
+        Self {
+            e_tag: value.e_tag,
+            version: value.version,
+        }
+    }
+}
+
+/// Options for a put request
+#[derive(Debug, Clone, PartialEq, Eq, Default)]
+pub struct PutOptions {
+    /// Configure the [`PutMode`] for this operation
+    pub mode: PutMode,
+}
+
+impl From<PutMode> for PutOptions {
+    fn from(mode: PutMode) -> Self {
+        Self { mode }
+    }
+}
+
 /// Result for a put request
 #[derive(Debug, Clone, PartialEq, Eq)]
 pub struct PutResult {
-    /// The unique identifier for the object
+    /// The unique identifier for the newly created object
     ///
     /// <https://datatracker.ietf.org/doc/html/rfc9110#name-etag>
     pub e_tag: Option<String>,
+    /// A version indicator for the newly created object
+    pub version: Option<String>,
 }
 
 /// A specialized `Result` for object store-related errors
@@ -947,6 +1013,7 @@ mod tests {
     use crate::multipart::MultiPartStore;
     use crate::test_util::flatten_list_stream;
     use chrono::TimeZone;
+    use futures::stream::FuturesUnordered;
     use rand::{thread_rng, Rng};
     use tokio::io::AsyncWriteExt;
 
@@ -1406,7 +1473,7 @@ mod tests {
             // Can retrieve previous version
             let get_opts = storage.get_opts(&path, options).await.unwrap();
             let old = get_opts.bytes().await.unwrap();
-            assert_eq!(old, b"foo".as_slice());
+            assert_eq!(old, b"test".as_slice());
 
             // Current version contains the updated data
             let current = 
storage.get(&path).await.unwrap().bytes().await.unwrap();
@@ -1414,6 +1481,104 @@ mod tests {
         }
     }
 
+    pub(crate) async fn put_opts(storage: &dyn ObjectStore, supports_update: 
bool) {
+        delete_fixtures(storage).await;
+        let path = Path::from("put_opts");
+        let v1 = storage
+            .put_opts(&path, "a".into(), PutMode::Create.into())
+            .await
+            .unwrap();
+
+        let err = storage
+            .put_opts(&path, "b".into(), PutMode::Create.into())
+            .await
+            .unwrap_err();
+        assert!(matches!(err, Error::AlreadyExists { .. }), "{err}");
+
+        let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
+        assert_eq!(b.as_ref(), b"a");
+
+        if !supports_update {
+            return;
+        }
+
+        let v2 = storage
+            .put_opts(&path, "c".into(), 
PutMode::Update(v1.clone().into()).into())
+            .await
+            .unwrap();
+
+        let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
+        assert_eq!(b.as_ref(), b"c");
+
+        let err = storage
+            .put_opts(&path, "d".into(), PutMode::Update(v1.into()).into())
+            .await
+            .unwrap_err();
+        assert!(matches!(err, Error::Precondition { .. }), "{err}");
+
+        storage
+            .put_opts(&path, "e".into(), 
PutMode::Update(v2.clone().into()).into())
+            .await
+            .unwrap();
+
+        let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
+        assert_eq!(b.as_ref(), b"e");
+
+        // Update not exists
+        let path = Path::from("I don't exist");
+        let err = storage
+            .put_opts(&path, "e".into(), PutMode::Update(v2.into()).into())
+            .await
+            .unwrap_err();
+        assert!(matches!(err, Error::Precondition { .. }), "{err}");
+
+        const NUM_WORKERS: usize = 5;
+        const NUM_INCREMENTS: usize = 10;
+
+        let path = Path::from("RACE");
+        let mut futures: FuturesUnordered<_> = (0..NUM_WORKERS)
+            .map(|_| async {
+                for _ in 0..NUM_INCREMENTS {
+                    loop {
+                        match storage.get(&path).await {
+                            Ok(r) => {
+                                let mode = PutMode::Update(UpdateVersion {
+                                    e_tag: r.meta.e_tag.clone(),
+                                    version: r.meta.version.clone(),
+                                });
+
+                                let b = r.bytes().await.unwrap();
+                                let v: usize = 
std::str::from_utf8(&b).unwrap().parse().unwrap();
+                                let new = (v + 1).to_string();
+
+                                match storage.put_opts(&path, new.into(), 
mode.into()).await {
+                                    Ok(_) => break,
+                                    Err(Error::Precondition { .. }) => 
continue,
+                                    Err(e) => return Err(e),
+                                }
+                            }
+                            Err(Error::NotFound { .. }) => {
+                                let mode = PutMode::Create;
+                                match storage.put_opts(&path, "1".into(), 
mode.into()).await {
+                                    Ok(_) => break,
+                                    Err(Error::AlreadyExists { .. }) => 
continue,
+                                    Err(e) => return Err(e),
+                                }
+                            }
+                            Err(e) => return Err(e),
+                        }
+                    }
+                }
+                Ok(())
+            })
+            .collect();
+
+        while futures.next().await.transpose().unwrap().is_some() {}
+        let b = storage.get(&path).await.unwrap().bytes().await.unwrap();
+        let v = std::str::from_utf8(&b).unwrap().parse::<usize>().unwrap();
+        assert_eq!(v, NUM_WORKERS * NUM_INCREMENTS);
+    }
+
     /// Returns a chunk of length `chunk_length`
     fn get_chunk(chunk_length: usize) -> Bytes {
         let mut data = vec![0_u8; chunk_length];
diff --git a/object_store/src/limit.rs b/object_store/src/limit.rs
index cd01a964dc..39cc605c47 100644
--- a/object_store/src/limit.rs
+++ b/object_store/src/limit.rs
@@ -19,7 +19,7 @@
 
 use crate::{
     BoxStream, GetOptions, GetResult, GetResultPayload, ListResult, 
MultipartId, ObjectMeta,
-    ObjectStore, Path, PutResult, Result, StreamExt,
+    ObjectStore, Path, PutOptions, PutResult, Result, StreamExt,
 };
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -77,6 +77,10 @@ impl<T: ObjectStore> ObjectStore for LimitStore<T> {
         self.inner.put(location, bytes).await
     }
 
+    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
+        let _permit = self.semaphore.acquire().await.unwrap();
+        self.inner.put_opts(location, bytes, opts).await
+    }
     async fn put_multipart(
         &self,
         location: &Path,
diff --git a/object_store/src/local.rs b/object_store/src/local.rs
index ce9aa46834..919baf71b0 100644
--- a/object_store/src/local.rs
+++ b/object_store/src/local.rs
@@ -20,7 +20,7 @@ use crate::{
     maybe_spawn_blocking,
     path::{absolute_path_to_url, Path},
     GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, 
ObjectMeta, ObjectStore,
-    PutResult, Result,
+    PutMode, PutOptions, PutResult, Result,
 };
 use async_trait::async_trait;
 use bytes::Bytes;
@@ -271,20 +271,44 @@ impl Config {
 
 #[async_trait]
 impl ObjectStore for LocalFileSystem {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
+    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
+        if matches!(opts.mode, PutMode::Update(_)) {
+            return Err(crate::Error::NotImplemented);
+        }
+
         let path = self.config.path_to_filesystem(location)?;
         maybe_spawn_blocking(move || {
             let (mut file, suffix) = new_staged_upload(&path)?;
             let staging_path = staged_upload_path(&path, &suffix);
-            file.write_all(&bytes)
-                .context(UnableToCopyDataToFileSnafu)
-                .and_then(|_| {
-                    std::fs::rename(&staging_path, 
&path).context(UnableToRenameFileSnafu)
-                })
-                .map_err(|e| {
-                    let _ = std::fs::remove_file(&staging_path); // Attempt to 
cleanup
-                    e
-                })?;
+
+            let err = match file.write_all(&bytes) {
+                Ok(_) => match opts.mode {
+                    PutMode::Overwrite => match std::fs::rename(&staging_path, 
&path) {
+                        Ok(_) => None,
+                        Err(source) => Some(Error::UnableToRenameFile { source 
}),
+                    },
+                    PutMode::Create => match std::fs::hard_link(&staging_path, 
&path) {
+                        Ok(_) => {
+                            let _ = std::fs::remove_file(&staging_path); // 
Attempt to cleanup
+                            None
+                        }
+                        Err(source) => match source.kind() {
+                            ErrorKind::AlreadyExists => 
Some(Error::AlreadyExists {
+                                path: path.to_str().unwrap().to_string(),
+                                source,
+                            }),
+                            _ => Some(Error::UnableToRenameFile { source }),
+                        },
+                    },
+                    PutMode::Update(_) => unreachable!(),
+                },
+                Err(source) => Some(Error::UnableToCopyDataToFile { source }),
+            };
+
+            if let Some(err) = err {
+                let _ = std::fs::remove_file(&staging_path); // Attempt to 
cleanup
+                return Err(err.into());
+            }
 
             let metadata = file.metadata().map_err(|e| Error::Metadata {
                 source: e.into(),
@@ -293,6 +317,7 @@ impl ObjectStore for LocalFileSystem {
 
             Ok(PutResult {
                 e_tag: Some(get_etag(&metadata)),
+                version: None,
             })
         })
         .await
@@ -1054,6 +1079,7 @@ mod tests {
         rename_and_copy(&integration).await;
         copy_if_not_exists(&integration).await;
         stream_get(&integration).await;
+        put_opts(&integration, false).await;
     }
 
     #[test]
diff --git a/object_store/src/memory.rs b/object_store/src/memory.rs
index 8b9522e48d..9d79a798ad 100644
--- a/object_store/src/memory.rs
+++ b/object_store/src/memory.rs
@@ -17,7 +17,8 @@
 
 //! An in-memory object store implementation
 use crate::{
-    path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, 
ObjectStore, PutResult, Result,
+    path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, 
ObjectStore, PutMode,
+    PutOptions, PutResult, Result, UpdateVersion,
 };
 use crate::{GetOptions, MultipartId};
 use async_trait::async_trait;
@@ -52,6 +53,9 @@ enum Error {
 
     #[snafu(display("Object already exists at that location: {path}"))]
     AlreadyExists { path: String },
+
+    #[snafu(display("ETag required for conditional update"))]
+    MissingETag,
 }
 
 impl From<Error> for super::Error {
@@ -110,9 +114,50 @@ impl Storage {
         let etag = self.next_etag;
         self.next_etag += 1;
         let entry = Entry::new(bytes, Utc::now(), etag);
-        self.map.insert(location.clone(), entry);
+        self.overwrite(location, entry);
         etag
     }
+
+    fn overwrite(&mut self, location: &Path, entry: Entry) {
+        self.map.insert(location.clone(), entry);
+    }
+
+    fn create(&mut self, location: &Path, entry: Entry) -> Result<()> {
+        use std::collections::btree_map;
+        match self.map.entry(location.clone()) {
+            btree_map::Entry::Occupied(_) => Err(Error::AlreadyExists {
+                path: location.to_string(),
+            }
+            .into()),
+            btree_map::Entry::Vacant(v) => {
+                v.insert(entry);
+                Ok(())
+            }
+        }
+    }
+
+    fn update(&mut self, location: &Path, v: UpdateVersion, entry: Entry) -> 
Result<()> {
+        match self.map.get_mut(location) {
+            // Return Precondition instead of NotFound for consistency with 
stores
+            None => Err(crate::Error::Precondition {
+                path: location.to_string(),
+                source: format!("Object at location {location} not 
found").into(),
+            }),
+            Some(e) => {
+                let existing = e.e_tag.to_string();
+                let expected = v.e_tag.context(MissingETagSnafu)?;
+                if existing == expected {
+                    *e = entry;
+                    Ok(())
+                } else {
+                    Err(crate::Error::Precondition {
+                        path: location.to_string(),
+                        source: format!("{existing} does not match 
{expected}").into(),
+                    })
+                }
+            }
+        }
+    }
 }
 
 impl std::fmt::Display for InMemory {
@@ -123,10 +168,21 @@ impl std::fmt::Display for InMemory {
 
 #[async_trait]
 impl ObjectStore for InMemory {
-    async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
-        let etag = self.storage.write().insert(location, bytes);
+    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
+        let mut storage = self.storage.write();
+        let etag = storage.next_etag;
+        let entry = Entry::new(bytes, Utc::now(), etag);
+
+        match opts.mode {
+            PutMode::Overwrite => storage.overwrite(location, entry),
+            PutMode::Create => storage.create(location, entry)?,
+            PutMode::Update(v) => storage.update(location, v, entry)?,
+        }
+        storage.next_etag += 1;
+
         Ok(PutResult {
             e_tag: Some(etag.to_string()),
+            version: None,
         })
     }
 
@@ -425,7 +481,7 @@ impl AsyncWrite for InMemoryAppend {
     fn poll_shutdown(
         self: Pin<&mut Self>,
         cx: &mut std::task::Context<'_>,
-    ) -> std::task::Poll<Result<(), io::Error>> {
+    ) -> Poll<Result<(), io::Error>> {
         self.poll_flush(cx)
     }
 }
@@ -449,6 +505,7 @@ mod tests {
         rename_and_copy(&integration).await;
         copy_if_not_exists(&integration).await;
         stream_get(&integration).await;
+        put_opts(&integration, true).await;
     }
 
     #[tokio::test]
diff --git a/object_store/src/prefix.rs b/object_store/src/prefix.rs
index b5bff8b12d..68101307fb 100644
--- a/object_store/src/prefix.rs
+++ b/object_store/src/prefix.rs
@@ -23,7 +23,8 @@ use tokio::io::AsyncWrite;
 
 use crate::path::Path;
 use crate::{
-    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
PutResult, Result,
+    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
PutOptions, PutResult,
+    Result,
 };
 
 #[doc(hidden)]
@@ -85,6 +86,11 @@ impl<T: ObjectStore> ObjectStore for PrefixStore<T> {
         self.inner.put(&full_path, bytes).await
     }
 
+    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
+        let full_path = self.full_path(location);
+        self.inner.put_opts(&full_path, bytes, opts).await
+    }
+
     async fn put_multipart(
         &self,
         location: &Path,
diff --git a/object_store/src/throttle.rs b/object_store/src/throttle.rs
index c5521256b8..dcd2c04bcf 100644
--- a/object_store/src/throttle.rs
+++ b/object_store/src/throttle.rs
@@ -21,7 +21,8 @@ use std::ops::Range;
 use std::{convert::TryInto, sync::Arc};
 
 use crate::{
-    path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, 
ObjectStore, PutResult, Result,
+    path::Path, GetResult, GetResultPayload, ListResult, ObjectMeta, 
ObjectStore, PutOptions,
+    PutResult, Result,
 };
 use crate::{GetOptions, MultipartId};
 use async_trait::async_trait;
@@ -149,10 +150,14 @@ impl<T: ObjectStore> std::fmt::Display for 
ThrottledStore<T> {
 impl<T: ObjectStore> ObjectStore for ThrottledStore<T> {
     async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
         sleep(self.config().wait_put_per_call).await;
-
         self.inner.put(location, bytes).await
     }
 
+    async fn put_opts(&self, location: &Path, bytes: Bytes, opts: PutOptions) 
-> Result<PutResult> {
+        sleep(self.config().wait_put_per_call).await;
+        self.inner.put_opts(location, bytes, opts).await
+    }
+
     async fn put_multipart(
         &self,
         _location: &Path,
diff --git a/object_store/tests/get_range_file.rs 
b/object_store/tests/get_range_file.rs
index 3fa1cc7104..85231a5a5b 100644
--- a/object_store/tests/get_range_file.rs
+++ b/object_store/tests/get_range_file.rs
@@ -22,9 +22,7 @@ use bytes::Bytes;
 use futures::stream::BoxStream;
 use object_store::local::LocalFileSystem;
 use object_store::path::Path;
-use object_store::{
-    GetOptions, GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, 
PutResult,
-};
+use object_store::*;
 use std::fmt::Formatter;
 use tempfile::tempdir;
 use tokio::io::AsyncWrite;
@@ -40,50 +38,42 @@ impl std::fmt::Display for MyStore {
 
 #[async_trait]
 impl ObjectStore for MyStore {
-    async fn put(&self, path: &Path, data: Bytes) -> 
object_store::Result<PutResult> {
-        self.0.put(path, data).await
+    async fn put_opts(&self, path: &Path, data: Bytes, opts: PutOptions) -> 
Result<PutResult> {
+        self.0.put_opts(path, data, opts).await
     }
 
     async fn put_multipart(
         &self,
         _: &Path,
-    ) -> object_store::Result<(MultipartId, Box<dyn AsyncWrite + Unpin + 
Send>)> {
+    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
         todo!()
     }
 
-    async fn abort_multipart(&self, _: &Path, _: &MultipartId) -> 
object_store::Result<()> {
+    async fn abort_multipart(&self, _: &Path, _: &MultipartId) -> Result<()> {
         todo!()
     }
 
-    async fn get_opts(
-        &self,
-        location: &Path,
-        options: GetOptions,
-    ) -> object_store::Result<GetResult> {
+    async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
         self.0.get_opts(location, options).await
     }
 
-    async fn head(&self, _: &Path) -> object_store::Result<ObjectMeta> {
-        todo!()
-    }
-
-    async fn delete(&self, _: &Path) -> object_store::Result<()> {
+    async fn delete(&self, _: &Path) -> Result<()> {
         todo!()
     }
 
-    fn list(&self, _: Option<&Path>) -> BoxStream<'_, 
object_store::Result<ObjectMeta>> {
+    fn list(&self, _: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
         todo!()
     }
 
-    async fn list_with_delimiter(&self, _: Option<&Path>) -> 
object_store::Result<ListResult> {
+    async fn list_with_delimiter(&self, _: Option<&Path>) -> 
Result<ListResult> {
         todo!()
     }
 
-    async fn copy(&self, _: &Path, _: &Path) -> object_store::Result<()> {
+    async fn copy(&self, _: &Path, _: &Path) -> Result<()> {
         todo!()
     }
 
-    async fn copy_if_not_exists(&self, _: &Path, _: &Path) -> 
object_store::Result<()> {
+    async fn copy_if_not_exists(&self, _: &Path, _: &Path) -> Result<()> {
         todo!()
     }
 }

Reply via email to