tustvold commented on code in PR #6682:
URL: https://github.com/apache/arrow-rs/pull/6682#discussion_r1829588248
##########
object_store/src/integration.rs:
##########
@@ -651,6 +651,12 @@ pub async fn put_opts(storage: &dyn ObjectStore,
supports_update: bool) {
assert_eq!(b.as_ref(), b"a");
if !supports_update {
+ let err = storage
Review Comment:
:+1:
##########
object_store/src/aws/precondition.rs:
##########
@@ -46,6 +46,15 @@ pub enum S3CopyIfNotExists {
///
/// Encoded as `header-with-status:<HEADER_NAME>:<HEADER_VALUE>:<STATUS>`
ignoring whitespace
HeaderWithStatus(String, String, reqwest::StatusCode),
+ /// Native Amazon S3 supports copy if not exists through a multipart upload
+ /// where the upload copies an existing object and is completed only if
+ /// the new object does not already exist.
+ ///
+ /// WARNING: When using this mode, `copy_if_not_exists` does not copy
+ /// tags or attributes from the source object.
Review Comment:
:+1:
I had a brief scan and couldn't see a way around this
##########
object_store/src/aws/client.rs:
##########
@@ -625,7 +656,18 @@ impl S3Client {
}
let response = request.send().await?;
- let content_id = get_etag(response.headers()).context(MetadataSnafu)?;
+ let content_id = match is_copy {
+ false => get_etag(response.headers()).context(MetadataSnafu)?,
+ true => {
+ let response = response
Review Comment:
The docs suggest the headers are populated -
https://docs.aws.amazon.com/AmazonS3/latest/API/API_UploadPartCopy.html
##########
object_store/src/aws/precondition.rs:
##########
@@ -118,6 +133,17 @@ pub enum S3ConditionalPut {
/// [HTTP precondition]:
https://datatracker.ietf.org/doc/html/rfc9110#name-preconditions
ETagMatch,
+ /// Like `ETagMatch`, but with support for `PutMode::Create` and not
+ /// `PutMode::Option`.
+ ///
+ /// This is the limited form of conditional put supported by Amazon S3
+ /// as of August 2024 ([announcement]).
+ ///
+ /// Encoded as `etag-create-only` ignoring whitespace.
+ ///
+ /// [announcement]:
https://aws.amazon.com/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/
+ ETagCreateOnly,
Review Comment:
```suggestion
ETagPutIfNotExists,
```
Maybe? I don't feel strongly though
##########
object_store/src/aws/client.rs:
##########
@@ -118,13 +121,32 @@ pub(crate) enum Error {
impl From<Error> for crate::Error {
fn from(err: Error) -> Self {
- Self::Generic {
- store: STORE,
- source: Box::new(err),
+ match err {
+ Error::CompleteMultipartRequest { source, path } =>
source.error(STORE, path),
+ _ => Self::Generic {
+ store: STORE,
+ source: Box::new(err),
+ },
}
}
}
+pub(crate) enum PutPartPayload<'a> {
+ Inline(PutPayload),
Review Comment:
```suggestion
Part(PutPayload),
```
Maybe?
##########
object_store/src/aws/precondition.rs:
##########
@@ -46,6 +46,15 @@ pub enum S3CopyIfNotExists {
///
/// Encoded as `header-with-status:<HEADER_NAME>:<HEADER_VALUE>:<STATUS>`
ignoring whitespace
HeaderWithStatus(String, String, reqwest::StatusCode),
+ /// Native Amazon S3 supports copy if not exists through a multipart upload
+ /// where the upload copies an existing object and is completed only if
+ /// the new object does not already exist.
+ ///
+ /// WARNING: When using this mode, `copy_if_not_exists` does not copy
+ /// tags or attributes from the source object.
+ ///
Review Comment:
Could possibly also link to the module docs around lifecycle rules for
cleaning up incomplete multipart uploads -
https://docs.rs/object_store/latest/object_store/aws/index.html#multipart-uploads
##########
object_store/src/aws/mod.rs:
##########
@@ -293,6 +297,34 @@ impl ObjectStore for AmazonS3 {
let (k, v, status) = match &self.client.config.copy_if_not_exists {
Some(S3CopyIfNotExists::Header(k, v)) => (k, v,
StatusCode::PRECONDITION_FAILED),
Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v,
*status),
+ Some(S3CopyIfNotExists::Multipart) => {
+ let upload_id = self
+ .client
+ .create_multipart(to, PutMultipartOpts::default())
+ .await?;
+ let part_id = self
+ .client
+ .put_part(to, &upload_id, 0, PutPartPayload::Copy(from))
+ .await?;
+ let res = match self
+ .client
+ .complete_multipart(
+ to,
+ &upload_id,
+ vec![part_id],
+ CompleteMultipartMode::Create,
+ )
+ .await
+ {
+ Err(e @ Error::Precondition { .. }) =>
Err(Error::AlreadyExists {
Review Comment:
Do we need to abort the multipart upload in this case? Does this happen
automatically?
##########
object_store/src/aws/client.rs:
##########
@@ -605,15 +627,24 @@ impl S3Client {
path: &Path,
upload_id: &MultipartId,
part_idx: usize,
- data: PutPayload,
+ data: PutPartPayload<'_>,
) -> Result<PartId> {
+ let is_copy = matches!(data, PutPartPayload::Copy(_));
let part = (part_idx + 1).to_string();
let mut request = self
.request(Method::PUT, path)
- .with_payload(data)
.query(&[("partNumber", &part), ("uploadId", upload_id)])
.idempotent(true);
+
+ request = match data {
+ PutPartPayload::Inline(payload) => request.with_payload(payload),
+ PutPartPayload::Copy(path) => request.header(
+ "x-amz-copy-source",
+ &format!("{}/{}", self.config.bucket, path),
Review Comment:
Do we need to use encode_path here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]