This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 1f466dc62c Support copy_if_not_exists for Cloudflare R2 (#4190) (#4239)
1f466dc62c is described below

commit 1f466dc62c9ad2fbea206b2bfdec40ca783a9c33
Author: Raphael Taylor-Davies <1781103+tustv...@users.noreply.github.com>
AuthorDate: Mon Aug 7 15:44:40 2023 +0100

    Support copy_if_not_exists for Cloudflare R2 (#4190) (#4239)
    
    * Support copy_if_not_exists for Cloudflare R2 (#4190)
    
    * Add tests
---
 object_store/src/aws/client.rs | 48 +++++++++++++++++++++++++-----
 object_store/src/aws/copy.rs   | 66 ++++++++++++++++++++++++++++++++++++++++++
 object_store/src/aws/mod.rs    | 44 +++++++++++++++++++++++-----
 3 files changed, 144 insertions(+), 14 deletions(-)

diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs
index 188897620b..1c35586f8b 100644
--- a/object_store/src/aws/client.rs
+++ b/object_store/src/aws/client.rs
@@ -17,7 +17,9 @@
 
 use crate::aws::checksum::Checksum;
 use crate::aws::credential::{AwsCredential, CredentialExt};
-use crate::aws::{AwsCredentialProvider, STORE, STRICT_PATH_ENCODE_SET};
+use crate::aws::{
+    AwsCredentialProvider, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET,
+};
 use crate::client::get::GetClient;
 use crate::client::list::ListClient;
 use crate::client::list_response::ListResponse;
@@ -37,7 +39,7 @@ use percent_encoding::{utf8_percent_encode, PercentEncode};
 use quick_xml::events::{self as xml_events};
 use reqwest::{
     header::{CONTENT_LENGTH, CONTENT_TYPE},
-    Client as ReqwestClient, Method, Response,
+    Client as ReqwestClient, Method, Response, StatusCode,
 };
 use serde::{Deserialize, Serialize};
 use snafu::{ResultExt, Snafu};
@@ -206,6 +208,7 @@ pub struct S3Config {
     pub client_options: ClientOptions,
     pub sign_payload: bool,
     pub checksum: Option<Checksum>,
+    pub copy_if_not_exists: Option<S3CopyIfNotExists>,
 }
 
 impl S3Config {
@@ -424,14 +427,37 @@ impl S3Client {
     }
 
     /// Make an S3 Copy request 
<https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html>
-    pub async fn copy_request(&self, from: &Path, to: &Path) -> Result<()> {
+    pub async fn copy_request(
+        &self,
+        from: &Path,
+        to: &Path,
+        overwrite: bool,
+    ) -> Result<()> {
         let credential = self.get_credential().await?;
         let url = self.config.path_url(to);
         let source = format!("{}/{}", self.config.bucket, encode_path(from));
 
-        self.client
+        let mut builder = self
+            .client
             .request(Method::PUT, url)
-            .header("x-amz-copy-source", source)
+            .header("x-amz-copy-source", source);
+
+        if !overwrite {
+            match &self.config.copy_if_not_exists {
+                Some(S3CopyIfNotExists::Header(k, v)) => {
+                    builder = builder.header(k, v);
+                }
+                None => {
+                    return Err(crate::Error::NotSupported {
+                        source: "S3 does not support copy-if-not-exists"
+                            .to_string()
+                            .into(),
+                    })
+                }
+            }
+        }
+
+        builder
             .with_aws_sigv4(
                 credential.as_ref(),
                 &self.config.region,
@@ -441,8 +467,16 @@ impl S3Client {
             )
             .send_retry(&self.config.retry_config)
             .await
-            .context(CopyRequestSnafu {
-                path: from.as_ref(),
+            .map_err(|source| match source.status() {
+                Some(StatusCode::PRECONDITION_FAILED) => 
crate::Error::AlreadyExists {
+                    source: Box::new(source),
+                    path: to.to_string(),
+                },
+                _ => Error::CopyRequest {
+                    source,
+                    path: from.to_string(),
+                }
+                .into(),
             })?;
 
         Ok(())
diff --git a/object_store/src/aws/copy.rs b/object_store/src/aws/copy.rs
new file mode 100644
index 0000000000..6b96f992ce
--- /dev/null
+++ b/object_store/src/aws/copy.rs
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::config::Parse;
+
+/// Configure how to provide [`ObjectStore::copy_if_not_exists`] for 
[`AmazonS3`]
+#[derive(Debug, Clone)]
+#[non_exhaustive]
+pub enum S3CopyIfNotExists {
+    /// Some S3-compatible stores, such as Cloudflare R2, support copy if not 
exists
+    /// semantics through custom headers.
+    ///
+    /// If set, [`ObjectStore::copy_if_not_exists`] will perform a normal copy 
operation
+    /// with the provided header pair, and expect the store to fail with `412 
Precondition Failed`
+    /// if the destination file already exists
+    ///
+    /// Encoded as `header:<HEADER_NAME>:<HEADER_VALUE>` ignoring whitespace
+    ///
+    /// For example `header: cf-copy-destination-if-none-match: *`, would set
+    /// the header `cf-copy-destination-if-none-match` to `*`
+    Header(String, String),
+}
+
+impl std::fmt::Display for S3CopyIfNotExists {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::Header(k, v) => write!(f, "header: {}: {}", k, v),
+        }
+    }
+}
+
+impl S3CopyIfNotExists {
+    fn from_str(s: &str) -> Option<Self> {
+        let (variant, value) = s.split_once(':')?;
+        match variant.trim() {
+            "header" => {
+                let (k, v) = value.split_once(':')?;
+                Some(Self::Header(k.trim().to_string(), v.trim().to_string()))
+            }
+            _ => None,
+        }
+    }
+}
+
+impl Parse for S3CopyIfNotExists {
+    fn parse(v: &str) -> crate::Result<Self> {
+        Self::from_str(v).ok_or_else(|| crate::Error::Generic {
+            store: "Config",
+            source: format!("Failed to parse \"{v}\" as 
S3CopyIfNotExists").into(),
+        })
+    }
+}
diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs
index f6066d45a7..7e16b5a1ba 100644
--- a/object_store/src/aws/mod.rs
+++ b/object_store/src/aws/mod.rs
@@ -44,7 +44,6 @@ use tokio::io::AsyncWrite;
 use tracing::info;
 use url::Url;
 
-pub use crate::aws::checksum::Checksum;
 use crate::aws::client::{S3Client, S3Config};
 use crate::aws::credential::{
     InstanceCredentialProvider, TaskCredentialProvider, WebIdentityProvider,
@@ -64,8 +63,12 @@ use crate::{
 
 mod checksum;
 mod client;
+mod copy;
 mod credential;
 
+pub use checksum::Checksum;
+pub use copy::S3CopyIfNotExists;
+
 // 
http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
 //
 // Do not URI-encode any of the unreserved characters that RFC 3986 defines:
@@ -292,12 +295,11 @@ impl ObjectStore for AmazonS3 {
     }
 
     async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
-        self.client.copy_request(from, to).await
+        self.client.copy_request(from, to, true).await
     }
 
-    async fn copy_if_not_exists(&self, _source: &Path, _dest: &Path) -> 
Result<()> {
-        // Will need dynamodb_lock
-        Err(crate::Error::NotImplemented)
+    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
+        self.client.copy_request(from, to, false).await
     }
 }
 
@@ -390,6 +392,8 @@ pub struct AmazonS3Builder {
     client_options: ClientOptions,
     /// Credentials
     credentials: Option<AwsCredentialProvider>,
+    /// Copy if not exists
+    copy_if_not_exists: Option<ConfigValue<S3CopyIfNotExists>>,
 }
 
 /// Configuration keys for [`AmazonS3Builder`]
@@ -521,6 +525,11 @@ pub enum AmazonS3ConfigKey {
     /// 
<https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html>
     ContainerCredentialsRelativeUri,
 
+    /// Configure how to provide [`ObjectStore::copy_if_not_exists`]
+    ///
+    /// See [`S3CopyIfNotExists`]
+    CopyIfNotExists,
+
     /// Client options
     Client(ClientConfigKey),
 }
@@ -543,6 +552,7 @@ impl AsRef<str> for AmazonS3ConfigKey {
             Self::ContainerCredentialsRelativeUri => {
                 "aws_container_credentials_relative_uri"
             }
+            Self::CopyIfNotExists => "copy_if_not_exists",
             Self::Client(opt) => opt.as_ref(),
         }
     }
@@ -576,6 +586,7 @@ impl FromStr for AmazonS3ConfigKey {
             "aws_container_credentials_relative_uri" => {
                 Ok(Self::ContainerCredentialsRelativeUri)
             }
+            "copy_if_not_exists" => Ok(Self::CopyIfNotExists),
             // Backwards compatibility
             "aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)),
             _ => match s.parse() {
@@ -686,6 +697,9 @@ impl AmazonS3Builder {
             AmazonS3ConfigKey::Client(key) => {
                 self.client_options = self.client_options.with_config(key, 
value)
             }
+            AmazonS3ConfigKey::CopyIfNotExists => {
+                self.copy_if_not_exists = 
Some(ConfigValue::Deferred(value.into()))
+            }
         };
         self
     }
@@ -753,6 +767,9 @@ impl AmazonS3Builder {
             AmazonS3ConfigKey::ContainerCredentialsRelativeUri => {
                 self.container_credentials_relative_uri.clone()
             }
+            AmazonS3ConfigKey::CopyIfNotExists => {
+                self.copy_if_not_exists.as_ref().map(ToString::to_string)
+            }
         }
     }
 
@@ -935,6 +952,12 @@ impl AmazonS3Builder {
         self
     }
 
+    /// Configure how to provide [`ObjectStore::copy_if_not_exists`]
+    pub fn with_copy_if_not_exists(mut self, config: S3CopyIfNotExists) -> 
Self {
+        self.copy_if_not_exists = Some(config.into());
+        self
+    }
+
     /// Create a [`AmazonS3`] instance from the provided values,
     /// consuming `self`.
     pub fn build(mut self) -> Result<AmazonS3> {
@@ -945,6 +968,7 @@ impl AmazonS3Builder {
         let bucket = self.bucket_name.context(MissingBucketNameSnafu)?;
         let region = self.region.context(MissingRegionSnafu)?;
         let checksum = self.checksum_algorithm.map(|x| x.get()).transpose()?;
+        let copy_if_not_exists = self.copy_if_not_exists.map(|x| 
x.get()).transpose()?;
 
         let credentials = if let Some(credentials) = self.credentials {
             credentials
@@ -1050,6 +1074,7 @@ impl AmazonS3Builder {
             client_options: self.client_options,
             sign_payload: !self.unsigned_payload.get()?,
             checksum,
+            copy_if_not_exists,
         };
 
         let client = Arc::new(S3Client::new(config)?);
@@ -1062,8 +1087,9 @@ impl AmazonS3Builder {
 mod tests {
     use super::*;
     use crate::tests::{
-        get_nonexistent_object, get_opts, list_uses_directories_correctly,
-        list_with_delimiter, put_get_delete_list_opts, rename_and_copy, 
stream_get,
+        copy_if_not_exists, get_nonexistent_object, get_opts,
+        list_uses_directories_correctly, list_with_delimiter, 
put_get_delete_list_opts,
+        rename_and_copy, stream_get,
     };
     use bytes::Bytes;
     use std::collections::HashMap;
@@ -1164,6 +1190,7 @@ mod tests {
         let config = AmazonS3Builder::from_env();
 
         let is_local = matches!(&config.endpoint, Some(e) if 
e.starts_with("http://";));
+        let test_not_exists = config.copy_if_not_exists.is_some();
         let integration = config.build().unwrap();
 
         // Localstack doesn't support listing with spaces 
https://github.com/localstack/localstack/issues/6328
@@ -1173,6 +1200,9 @@ mod tests {
         list_with_delimiter(&integration).await;
         rename_and_copy(&integration).await;
         stream_get(&integration).await;
+        if test_not_exists {
+            copy_if_not_exists(&integration).await;
+        }
 
         // run integration test with unsigned payload enabled
         let config = AmazonS3Builder::from_env().with_unsigned_payload(true);

Reply via email to