This is an automated email from the ASF dual-hosted git repository. tustvold pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push: new 1f466dc62c Support copy_if_not_exists for Cloudflare R2 (#4190) (#4239) 1f466dc62c is described below commit 1f466dc62c9ad2fbea206b2bfdec40ca783a9c33 Author: Raphael Taylor-Davies <1781103+tustv...@users.noreply.github.com> AuthorDate: Mon Aug 7 15:44:40 2023 +0100 Support copy_if_not_exists for Cloudflare R2 (#4190) (#4239) * Support copy_if_not_exists for Cloudflare R2 (#4190) * Add tests --- object_store/src/aws/client.rs | 48 +++++++++++++++++++++++++----- object_store/src/aws/copy.rs | 66 ++++++++++++++++++++++++++++++++++++++++++ object_store/src/aws/mod.rs | 44 +++++++++++++++++++++++----- 3 files changed, 144 insertions(+), 14 deletions(-) diff --git a/object_store/src/aws/client.rs b/object_store/src/aws/client.rs index 188897620b..1c35586f8b 100644 --- a/object_store/src/aws/client.rs +++ b/object_store/src/aws/client.rs @@ -17,7 +17,9 @@ use crate::aws::checksum::Checksum; use crate::aws::credential::{AwsCredential, CredentialExt}; -use crate::aws::{AwsCredentialProvider, STORE, STRICT_PATH_ENCODE_SET}; +use crate::aws::{ + AwsCredentialProvider, S3CopyIfNotExists, STORE, STRICT_PATH_ENCODE_SET, +}; use crate::client::get::GetClient; use crate::client::list::ListClient; use crate::client::list_response::ListResponse; @@ -37,7 +39,7 @@ use percent_encoding::{utf8_percent_encode, PercentEncode}; use quick_xml::events::{self as xml_events}; use reqwest::{ header::{CONTENT_LENGTH, CONTENT_TYPE}, - Client as ReqwestClient, Method, Response, + Client as ReqwestClient, Method, Response, StatusCode, }; use serde::{Deserialize, Serialize}; use snafu::{ResultExt, Snafu}; @@ -206,6 +208,7 @@ pub struct S3Config { pub client_options: ClientOptions, pub sign_payload: bool, pub checksum: Option<Checksum>, + pub copy_if_not_exists: Option<S3CopyIfNotExists>, } impl S3Config { @@ -424,14 +427,37 @@ impl S3Client { } /// Make an S3 Copy request <https://docs.aws.amazon.com/AmazonS3/latest/API/API_CopyObject.html> - pub async fn copy_request(&self, from: &Path, to: &Path) -> Result<()> { + pub async fn copy_request( + &self, + from: &Path, + to: &Path, + overwrite: bool, + ) -> Result<()> { let credential = self.get_credential().await?; let url = self.config.path_url(to); let source = format!("{}/{}", self.config.bucket, encode_path(from)); - self.client + let mut builder = self + .client .request(Method::PUT, url) - .header("x-amz-copy-source", source) + .header("x-amz-copy-source", source); + + if !overwrite { + match &self.config.copy_if_not_exists { + Some(S3CopyIfNotExists::Header(k, v)) => { + builder = builder.header(k, v); + } + None => { + return Err(crate::Error::NotSupported { + source: "S3 does not support copy-if-not-exists" + .to_string() + .into(), + }) + } + } + } + + builder .with_aws_sigv4( credential.as_ref(), &self.config.region, @@ -441,8 +467,16 @@ impl S3Client { ) .send_retry(&self.config.retry_config) .await - .context(CopyRequestSnafu { - path: from.as_ref(), + .map_err(|source| match source.status() { + Some(StatusCode::PRECONDITION_FAILED) => crate::Error::AlreadyExists { + source: Box::new(source), + path: to.to_string(), + }, + _ => Error::CopyRequest { + source, + path: from.to_string(), + } + .into(), })?; Ok(()) diff --git a/object_store/src/aws/copy.rs b/object_store/src/aws/copy.rs new file mode 100644 index 0000000000..6b96f992ce --- /dev/null +++ b/object_store/src/aws/copy.rs @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::config::Parse; + +/// Configure how to provide [`ObjectStore::copy_if_not_exists`] for [`AmazonS3`] +#[derive(Debug, Clone)] +#[non_exhaustive] +pub enum S3CopyIfNotExists { + /// Some S3-compatible stores, such as Cloudflare R2, support copy if not exists + /// semantics through custom headers. + /// + /// If set, [`ObjectStore::copy_if_not_exists`] will perform a normal copy operation + /// with the provided header pair, and expect the store to fail with `412 Precondition Failed` + /// if the destination file already exists + /// + /// Encoded as `header:<HEADER_NAME>:<HEADER_VALUE>` ignoring whitespace + /// + /// For example `header: cf-copy-destination-if-none-match: *`, would set + /// the header `cf-copy-destination-if-none-match` to `*` + Header(String, String), +} + +impl std::fmt::Display for S3CopyIfNotExists { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Header(k, v) => write!(f, "header: {}: {}", k, v), + } + } +} + +impl S3CopyIfNotExists { + fn from_str(s: &str) -> Option<Self> { + let (variant, value) = s.split_once(':')?; + match variant.trim() { + "header" => { + let (k, v) = value.split_once(':')?; + Some(Self::Header(k.trim().to_string(), v.trim().to_string())) + } + _ => None, + } + } +} + +impl Parse for S3CopyIfNotExists { + fn parse(v: &str) -> crate::Result<Self> { + Self::from_str(v).ok_or_else(|| crate::Error::Generic { + store: "Config", + source: format!("Failed to parse \"{v}\" as S3CopyIfNotExists").into(), + }) + } +} diff --git a/object_store/src/aws/mod.rs b/object_store/src/aws/mod.rs index f6066d45a7..7e16b5a1ba 100644 --- a/object_store/src/aws/mod.rs +++ b/object_store/src/aws/mod.rs @@ -44,7 +44,6 @@ use tokio::io::AsyncWrite; use tracing::info; use url::Url; -pub use crate::aws::checksum::Checksum; use crate::aws::client::{S3Client, S3Config}; use crate::aws::credential::{ InstanceCredentialProvider, TaskCredentialProvider, WebIdentityProvider, @@ -64,8 +63,12 @@ use crate::{ mod checksum; mod client; +mod copy; mod credential; +pub use checksum::Checksum; +pub use copy::S3CopyIfNotExists; + // http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html // // Do not URI-encode any of the unreserved characters that RFC 3986 defines: @@ -292,12 +295,11 @@ impl ObjectStore for AmazonS3 { } async fn copy(&self, from: &Path, to: &Path) -> Result<()> { - self.client.copy_request(from, to).await + self.client.copy_request(from, to, true).await } - async fn copy_if_not_exists(&self, _source: &Path, _dest: &Path) -> Result<()> { - // Will need dynamodb_lock - Err(crate::Error::NotImplemented) + async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> { + self.client.copy_request(from, to, false).await } } @@ -390,6 +392,8 @@ pub struct AmazonS3Builder { client_options: ClientOptions, /// Credentials credentials: Option<AwsCredentialProvider>, + /// Copy if not exists + copy_if_not_exists: Option<ConfigValue<S3CopyIfNotExists>>, } /// Configuration keys for [`AmazonS3Builder`] @@ -521,6 +525,11 @@ pub enum AmazonS3ConfigKey { /// <https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-iam-roles.html> ContainerCredentialsRelativeUri, + /// Configure how to provide [`ObjectStore::copy_if_not_exists`] + /// + /// See [`S3CopyIfNotExists`] + CopyIfNotExists, + /// Client options Client(ClientConfigKey), } @@ -543,6 +552,7 @@ impl AsRef<str> for AmazonS3ConfigKey { Self::ContainerCredentialsRelativeUri => { "aws_container_credentials_relative_uri" } + Self::CopyIfNotExists => "copy_if_not_exists", Self::Client(opt) => opt.as_ref(), } } @@ -576,6 +586,7 @@ impl FromStr for AmazonS3ConfigKey { "aws_container_credentials_relative_uri" => { Ok(Self::ContainerCredentialsRelativeUri) } + "copy_if_not_exists" => Ok(Self::CopyIfNotExists), // Backwards compatibility "aws_allow_http" => Ok(Self::Client(ClientConfigKey::AllowHttp)), _ => match s.parse() { @@ -686,6 +697,9 @@ impl AmazonS3Builder { AmazonS3ConfigKey::Client(key) => { self.client_options = self.client_options.with_config(key, value) } + AmazonS3ConfigKey::CopyIfNotExists => { + self.copy_if_not_exists = Some(ConfigValue::Deferred(value.into())) + } }; self } @@ -753,6 +767,9 @@ impl AmazonS3Builder { AmazonS3ConfigKey::ContainerCredentialsRelativeUri => { self.container_credentials_relative_uri.clone() } + AmazonS3ConfigKey::CopyIfNotExists => { + self.copy_if_not_exists.as_ref().map(ToString::to_string) + } } } @@ -935,6 +952,12 @@ impl AmazonS3Builder { self } + /// Configure how to provide [`ObjectStore::copy_if_not_exists`] + pub fn with_copy_if_not_exists(mut self, config: S3CopyIfNotExists) -> Self { + self.copy_if_not_exists = Some(config.into()); + self + } + /// Create a [`AmazonS3`] instance from the provided values, /// consuming `self`. pub fn build(mut self) -> Result<AmazonS3> { @@ -945,6 +968,7 @@ impl AmazonS3Builder { let bucket = self.bucket_name.context(MissingBucketNameSnafu)?; let region = self.region.context(MissingRegionSnafu)?; let checksum = self.checksum_algorithm.map(|x| x.get()).transpose()?; + let copy_if_not_exists = self.copy_if_not_exists.map(|x| x.get()).transpose()?; let credentials = if let Some(credentials) = self.credentials { credentials @@ -1050,6 +1074,7 @@ impl AmazonS3Builder { client_options: self.client_options, sign_payload: !self.unsigned_payload.get()?, checksum, + copy_if_not_exists, }; let client = Arc::new(S3Client::new(config)?); @@ -1062,8 +1087,9 @@ impl AmazonS3Builder { mod tests { use super::*; use crate::tests::{ - get_nonexistent_object, get_opts, list_uses_directories_correctly, - list_with_delimiter, put_get_delete_list_opts, rename_and_copy, stream_get, + copy_if_not_exists, get_nonexistent_object, get_opts, + list_uses_directories_correctly, list_with_delimiter, put_get_delete_list_opts, + rename_and_copy, stream_get, }; use bytes::Bytes; use std::collections::HashMap; @@ -1164,6 +1190,7 @@ mod tests { let config = AmazonS3Builder::from_env(); let is_local = matches!(&config.endpoint, Some(e) if e.starts_with("http://")); + let test_not_exists = config.copy_if_not_exists.is_some(); let integration = config.build().unwrap(); // Localstack doesn't support listing with spaces https://github.com/localstack/localstack/issues/6328 @@ -1173,6 +1200,9 @@ mod tests { list_with_delimiter(&integration).await; rename_and_copy(&integration).await; stream_get(&integration).await; + if test_not_exists { + copy_if_not_exists(&integration).await; + } // run integration test with unsigned payload enabled let config = AmazonS3Builder::from_env().with_unsigned_payload(true);