benesch commented on code in PR #6682:
URL: https://github.com/apache/arrow-rs/pull/6682#discussion_r1830322603
##########
object_store/src/aws/precondition.rs:
##########
@@ -118,6 +133,17 @@ pub enum S3ConditionalPut {
/// [HTTP precondition]:
https://datatracker.ietf.org/doc/html/rfc9110#name-preconditions
ETagMatch,
+ /// Like `ETagMatch`, but with support for `PutMode::Create` and not
+ /// `PutMode::Option`.
+ ///
+ /// This is the limited form of conditional put supported by Amazon S3
+ /// as of August 2024 ([announcement]).
+ ///
+ /// Encoded as `etag-create-only` ignoring whitespace.
+ ///
+ /// [announcement]:
https://aws.amazon.com/about-aws/whats-new/2024/08/amazon-s3-conditional-writes/
+ ETagCreateOnly,
Review Comment:
Sure, works for me.
##########
object_store/src/aws/precondition.rs:
##########
@@ -46,6 +46,15 @@ pub enum S3CopyIfNotExists {
///
/// Encoded as `header-with-status:<HEADER_NAME>:<HEADER_VALUE>:<STATUS>`
ignoring whitespace
HeaderWithStatus(String, String, reqwest::StatusCode),
+ /// Native Amazon S3 supports copy if not exists through a multipart upload
+ /// where the upload copies an existing object and is completed only if
+ /// the new object does not already exist.
+ ///
+ /// WARNING: When using this mode, `copy_if_not_exists` does not copy
+ /// tags or attributes from the source object.
+ ///
Review Comment:
Good call, done!
##########
object_store/src/aws/mod.rs:
##########
@@ -293,6 +297,34 @@ impl ObjectStore for AmazonS3 {
let (k, v, status) = match &self.client.config.copy_if_not_exists {
Some(S3CopyIfNotExists::Header(k, v)) => (k, v,
StatusCode::PRECONDITION_FAILED),
Some(S3CopyIfNotExists::HeaderWithStatus(k, v, status)) => (k, v,
*status),
+ Some(S3CopyIfNotExists::Multipart) => {
+ let upload_id = self
+ .client
+ .create_multipart(to, PutMultipartOpts::default())
+ .await?;
+ let part_id = self
+ .client
+ .put_part(to, &upload_id, 0, PutPartPayload::Copy(from))
+ .await?;
+ let res = match self
+ .client
+ .complete_multipart(
+ to,
+ &upload_id,
+ vec![part_id],
+ CompleteMultipartMode::Create,
+ )
+ .await
+ {
+ Err(e @ Error::Precondition { .. }) =>
Err(Error::AlreadyExists {
Review Comment:
Oof, good call. It does not happen automatically. I added code to do a best
effort multipart upload, and a docs note that this isn't guaranteed and that
the user should configure automatic multipart expiration via a lifecycle rule.
##########
object_store/src/aws/client.rs:
##########
@@ -605,15 +627,24 @@ impl S3Client {
path: &Path,
upload_id: &MultipartId,
part_idx: usize,
- data: PutPayload,
+ data: PutPartPayload<'_>,
) -> Result<PartId> {
+ let is_copy = matches!(data, PutPartPayload::Copy(_));
let part = (part_idx + 1).to_string();
let mut request = self
.request(Method::PUT, path)
- .with_payload(data)
.query(&[("partNumber", &part), ("uploadId", upload_id)])
.idempotent(true);
+
+ request = match data {
+ PutPartPayload::Inline(payload) => request.with_payload(payload),
+ PutPartPayload::Copy(path) => request.header(
+ "x-amz-copy-source",
+ &format!("{}/{}", self.config.bucket, path),
Review Comment:
Ah yeah, seems like it, thanks!
##########
object_store/src/aws/client.rs:
##########
@@ -99,7 +99,10 @@ pub(crate) enum Error {
CreateMultipartResponseBody { source: reqwest::Error },
#[snafu(display("Error performing complete multipart request: {}",
source))]
- CompleteMultipartRequest { source: crate::client::retry::Error },
+ CompleteMultipartRequest {
+ source: crate::client::retry::Error,
+ path: String,
Review Comment:
Good call, done.
##########
object_store/src/aws/client.rs:
##########
@@ -118,13 +121,32 @@ pub(crate) enum Error {
impl From<Error> for crate::Error {
fn from(err: Error) -> Self {
- Self::Generic {
- store: STORE,
- source: Box::new(err),
+ match err {
+ Error::CompleteMultipartRequest { source, path } =>
source.error(STORE, path),
+ _ => Self::Generic {
+ store: STORE,
+ source: Box::new(err),
+ },
}
}
}
+pub(crate) enum PutPartPayload<'a> {
+ Inline(PutPayload),
Review Comment:
Sure, no strong feels from me on the name here!
##########
object_store/src/aws/client.rs:
##########
@@ -625,7 +656,18 @@ impl S3Client {
}
let response = request.send().await?;
- let content_id = get_etag(response.headers()).context(MetadataSnafu)?;
+ let content_id = match is_copy {
+ false => get_etag(response.headers()).context(MetadataSnafu)?,
+ true => {
+ let response = response
Review Comment:
I think you must be seeing something I'm not! AFAICT the docs indicate that
the ETag is returned as a response body XML element rather than a header (which
is at variance with UploadPart). And also AFAICT Localstack's implementation
matches that.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]