This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new cac4bac  Revert "refactor: remove AWS dynamo integration (#407)" (#493)
cac4bac is described below

commit cac4bacf89133e12a8ff0f30055a1bc53cdca96c
Author: Andrew Lamb <[email protected]>
AuthorDate: Fri Sep 19 07:38:05 2025 -0700

    Revert "refactor: remove AWS dynamo integration (#407)" (#493)
    
    This reverts commit 034733f5aa760529a647041468681328d2f222c7.
---
 .github/workflows/ci.yml |   6 +-
 src/aws/builder.rs       |   1 +
 src/aws/client.rs        |   1 +
 src/aws/dynamo.rs        | 595 +++++++++++++++++++++++++++++++++++++++++++++++
 src/aws/mod.rs           |  24 ++
 src/aws/precondition.rs  |  55 ++++-
 src/client/builder.rs    |   2 +-
 src/integration.rs       |  12 +-
 8 files changed, 690 insertions(+), 6 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index da2b8c4..ab59e6f 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -16,6 +16,7 @@
 # under the License.
 
 ---
+
 name: CI
 
 concurrency:
@@ -99,8 +100,8 @@ jobs:
       AWS_SECRET_ACCESS_KEY: test
       AWS_ENDPOINT: http://localhost:4566
       AWS_ALLOW_HTTP: true
-      AWS_COPY_IF_NOT_EXISTS: multipart
-      AWS_CONDITIONAL_PUT: etag
+      AWS_COPY_IF_NOT_EXISTS: dynamo:test-table:2000
+      AWS_CONDITIONAL_PUT: dynamo:test-table:2000
       AWS_SERVER_SIDE_ENCRYPTION: aws:kms
       HTTP_URL: "http://localhost:8080";
       GOOGLE_BUCKET: test-bucket
@@ -131,6 +132,7 @@ jobs:
           aws --endpoint-url=http://localhost:4566 s3 mb 
s3://test-bucket-for-spawn
           aws --endpoint-url=http://localhost:4566 s3 mb 
s3://test-bucket-for-checksum
           aws --endpoint-url=http://localhost:4566 s3api create-bucket 
--bucket test-object-lock --object-lock-enabled-for-bucket
+          aws --endpoint-url=http://localhost:4566 dynamodb create-table 
--table-name test-table --key-schema AttributeName=path,KeyType=HASH 
AttributeName=etag,KeyType=RANGE --attribute-definitions 
AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S 
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
 
           KMS_KEY=$(aws --endpoint-url=http://localhost:4566 kms create-key 
--description "test key")
           echo "AWS_SSE_KMS_KEY_ID=$(echo $KMS_KEY | jq -r 
.KeyMetadata.KeyId)" >> $GITHUB_ENV
diff --git a/src/aws/builder.rs b/src/aws/builder.rs
index 06503ca..6e6f8e2 100644
--- a/src/aws/builder.rs
+++ b/src/aws/builder.rs
@@ -1149,6 +1149,7 @@ impl AmazonS3Builder {
 
         let config = S3Config {
             region,
+            endpoint: self.endpoint,
             bucket,
             bucket_endpoint,
             credentials,
diff --git a/src/aws/client.rs b/src/aws/client.rs
index 913859d..4edb977 100644
--- a/src/aws/client.rs
+++ b/src/aws/client.rs
@@ -193,6 +193,7 @@ impl From<DeleteError> for Error {
 #[derive(Debug)]
 pub(crate) struct S3Config {
     pub region: String,
+    pub endpoint: Option<String>,
     pub bucket: String,
     pub bucket_endpoint: String,
     pub credentials: AwsCredentialProvider,
diff --git a/src/aws/dynamo.rs b/src/aws/dynamo.rs
new file mode 100644
index 0000000..a6775ef
--- /dev/null
+++ b/src/aws/dynamo.rs
@@ -0,0 +1,595 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! A DynamoDB based lock system
+
+use std::borrow::Cow;
+use std::collections::HashMap;
+use std::future::Future;
+use std::sync::Arc;
+use std::time::{Duration, Instant};
+
+use chrono::Utc;
+use http::{Method, StatusCode};
+use serde::ser::SerializeMap;
+use serde::{Deserialize, Serialize, Serializer};
+
+use crate::aws::client::S3Client;
+use crate::aws::credential::CredentialExt;
+use crate::aws::{AwsAuthorizer, AwsCredential};
+use crate::client::get::GetClientExt;
+use crate::client::retry::RetryExt;
+use crate::client::retry::{RequestError, RetryError};
+use crate::path::Path;
+use crate::{Error, GetOptions, Result};
+
+/// The exception returned by DynamoDB on conflict
+const CONFLICT: &str = "ConditionalCheckFailedException";
+
+const STORE: &str = "DynamoDB";
+
+/// A DynamoDB-based commit protocol, used to provide conditional write 
support for S3
+///
+/// ## Limitations
+///
+/// Only conditional operations, e.g. `copy_if_not_exists` will be 
synchronized, and can
+/// therefore race with non-conditional operations, e.g. `put`, `copy`, 
`delete`, or
+/// conditional operations performed by writers not configured to synchronize 
with DynamoDB.
+///
+/// Workloads making use of this mechanism **must** ensure:
+///
+/// * Conditional and non-conditional operations are not performed on the same 
paths
+/// * Conditional operations are only performed via similarly configured 
clients
+///
+/// Additionally as the locking mechanism relies on timeouts to detect stale 
locks,
+/// performance will be poor for systems that frequently delete and then create
+/// objects at the same path, instead being optimised for systems that 
primarily create
+/// files with paths never used before, or perform conditional updates to 
existing files
+///
+/// ## Commit Protocol
+///
+/// The DynamoDB schema is as follows:
+///
+/// * A string partition key named `"path"`
+/// * A string sort key named `"etag"`
+/// * A numeric [TTL] attribute named `"ttl"`
+/// * A numeric attribute named `"generation"`
+/// * A numeric attribute named `"timeout"`
+///
+/// An appropriate DynamoDB table can be created with the CLI as follows:
+///
+/// ```bash
+/// $ aws dynamodb create-table --table-name <TABLE_NAME> --key-schema 
AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE 
--attribute-definitions AttributeName=path,AttributeType=S 
AttributeName=etag,AttributeType=S
+/// $ aws dynamodb update-time-to-live --table-name <TABLE_NAME> 
--time-to-live-specification Enabled=true,AttributeName=ttl
+/// ```
+///
+/// To perform a conditional operation on an object with a given `path` and 
`etag` (`*` if creating),
+/// the commit protocol is as follows:
+///
+/// 1. Perform HEAD request on `path` and error on precondition mismatch
+/// 2. Create record in DynamoDB with given `path` and `etag` with the 
configured timeout
+///     1. On Success: Perform operation with the configured timeout
+///     2. On Conflict:
+///         1. Periodically re-perform HEAD request on `path` and error on 
precondition mismatch
+///         2. If `timeout * max_skew_rate` passed, replace the record 
incrementing the `"generation"`
+///             1. On Success: GOTO 2.1
+///             2. On Conflict: GOTO 2.2
+///
+/// Provided no writer modifies an object with a given `path` and `etag` 
without first adding a
+/// corresponding record to DynamoDB, we are guaranteed that only one writer 
will ever commit.
+///
+/// This is inspired by the [DynamoDB Lock Client] but simplified for the more 
limited
+/// requirements of synchronizing object storage. The major changes are:
+///
+/// * Uses a monotonic generation count instead of a UUID rvn, as this is:
+///     * Cheaper to generate, serialize and compare
+///     * Cannot collide
+///     * More human readable / interpretable
+/// * Relies on [TTL] to eventually clean up old locks
+///
+/// It also draws inspiration from the DeltaLake [S3 Multi-Cluster] commit 
protocol, but
+/// generalised to not make assumptions about the workload and not rely on 
first writing
+/// to a temporary path.
+///
+/// [TTL]: 
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/howitworks-ttl.html
+/// [DynamoDB Lock Client]: 
https://aws.amazon.com/blogs/database/building-distributed-locks-with-the-dynamodb-lock-client/
+/// [S3 Multi-Cluster]: 
https://docs.google.com/document/d/1Gs4ZsTH19lMxth4BSdwlWjUNR-XhKHicDvBjd2RqNd8/edit#heading=h.mjjuxw9mcz9h
+#[derive(Debug, Clone, Eq, PartialEq)]
+pub struct DynamoCommit {
+    table_name: String,
+    /// The number of milliseconds a lease is valid for
+    timeout: u64,
+    /// The maximum clock skew rate tolerated by the system
+    max_clock_skew_rate: u32,
+    /// The length of time a record will be retained in DynamoDB before being 
cleaned up
+    ///
+    /// This is purely an optimisation to avoid indefinite growth of the 
DynamoDB table
+    /// and does not impact how long clients may wait to acquire a lock
+    ttl: Duration,
+    /// The backoff duration before retesting a condition
+    test_interval: Duration,
+}
+
+impl DynamoCommit {
+    /// Create a new [`DynamoCommit`] with a given table name
+    pub fn new(table_name: String) -> Self {
+        Self {
+            table_name,
+            timeout: 20_000,
+            max_clock_skew_rate: 3,
+            ttl: Duration::from_secs(60 * 60),
+            test_interval: Duration::from_millis(100),
+        }
+    }
+
+    /// Overrides the lock timeout.
+    ///
+    /// A longer lock timeout reduces the probability of spurious commit 
failures and multi-writer
+    /// races, but will increase the time that writers must wait to reclaim a 
lock lost. The
+    /// default value of 20 seconds should be appropriate for must use-cases.
+    pub fn with_timeout(mut self, millis: u64) -> Self {
+        self.timeout = millis;
+        self
+    }
+
+    /// The maximum clock skew rate tolerated by the system.
+    ///
+    /// An environment in which the clock on the fastest node ticks twice as 
fast as the slowest
+    /// node, would have a clock skew rate of 2. The default value of 3 should 
be appropriate
+    /// for most environments.
+    pub fn with_max_clock_skew_rate(mut self, rate: u32) -> Self {
+        self.max_clock_skew_rate = rate;
+        self
+    }
+
+    /// The length of time a record should be retained in DynamoDB before 
being cleaned up
+    ///
+    /// This should be significantly larger than the configured lock timeout, 
with the default
+    /// value of 1 hour appropriate for most use-cases.
+    pub fn with_ttl(mut self, ttl: Duration) -> Self {
+        self.ttl = ttl;
+        self
+    }
+
+    /// Parse [`DynamoCommit`] from a string
+    pub(crate) fn from_str(value: &str) -> Option<Self> {
+        Some(match value.split_once(':') {
+            Some((table_name, timeout)) => {
+                
Self::new(table_name.trim().to_string()).with_timeout(timeout.parse().ok()?)
+            }
+            None => Self::new(value.trim().to_string()),
+        })
+    }
+
+    /// Returns the name of the DynamoDB table.
+    pub(crate) fn table_name(&self) -> &str {
+        &self.table_name
+    }
+
+    pub(crate) async fn copy_if_not_exists(
+        &self,
+        client: &Arc<S3Client>,
+        from: &Path,
+        to: &Path,
+    ) -> Result<()> {
+        self.conditional_op(client, to, None, || async {
+            client.copy_request(from, to).send().await?;
+            Ok(())
+        })
+        .await
+    }
+
+    #[allow(clippy::future_not_send)] // Generics confound this lint
+    pub(crate) async fn conditional_op<F, Fut, T>(
+        &self,
+        client: &Arc<S3Client>,
+        to: &Path,
+        etag: Option<&str>,
+        op: F,
+    ) -> Result<T>
+    where
+        F: FnOnce() -> Fut,
+        Fut: Future<Output = Result<T, Error>>,
+    {
+        check_precondition(client, to, etag).await?;
+
+        let mut previous_lease = None;
+
+        loop {
+            let existing = previous_lease.as_ref();
+            match self.try_lock(client, to.as_ref(), etag, existing).await? {
+                TryLockResult::Ok(lease) => {
+                    let expiry = lease.acquire + lease.timeout;
+                    return match tokio::time::timeout_at(expiry.into(), 
op()).await {
+                        Ok(Ok(v)) => Ok(v),
+                        Ok(Err(e)) => Err(e),
+                        Err(_) => Err(Error::Generic {
+                            store: "DynamoDB",
+                            source: format!(
+                                "Failed to perform conditional operation in {} 
milliseconds",
+                                self.timeout
+                            )
+                            .into(),
+                        }),
+                    };
+                }
+                TryLockResult::Conflict(conflict) => {
+                    let mut interval = 
tokio::time::interval(self.test_interval);
+                    let expiry = conflict.timeout * self.max_clock_skew_rate;
+                    loop {
+                        interval.tick().await;
+                        check_precondition(client, to, etag).await?;
+                        if conflict.acquire.elapsed() > expiry {
+                            previous_lease = Some(conflict);
+                            break;
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    /// Attempt to acquire a lock, reclaiming an existing lease if provided
+    async fn try_lock(
+        &self,
+        s3: &S3Client,
+        path: &str,
+        etag: Option<&str>,
+        existing: Option<&Lease>,
+    ) -> Result<TryLockResult> {
+        let attributes;
+        let (next_gen, condition_expression, expression_attribute_values) = 
match existing {
+            None => (0_u64, "attribute_not_exists(#pk)", Map(&[])),
+            Some(existing) => {
+                attributes = [(":g", 
AttributeValue::Number(existing.generation))];
+                (
+                    existing.generation.checked_add(1).unwrap(),
+                    "attribute_exists(#pk) AND generation = :g",
+                    Map(attributes.as_slice()),
+                )
+            }
+        };
+
+        let ttl = (Utc::now() + self.ttl).timestamp();
+        let items = [
+            ("path", AttributeValue::from(path)),
+            ("etag", AttributeValue::from(etag.unwrap_or("*"))),
+            ("generation", AttributeValue::Number(next_gen)),
+            ("timeout", AttributeValue::Number(self.timeout)),
+            ("ttl", AttributeValue::Number(ttl as _)),
+        ];
+        let names = [("#pk", "path")];
+
+        let req = PutItem {
+            table_name: &self.table_name,
+            condition_expression,
+            expression_attribute_values,
+            expression_attribute_names: Map(&names),
+            item: Map(&items),
+            return_values: None,
+            return_values_on_condition_check_failure: 
Some(ReturnValues::AllOld),
+        };
+
+        let credential = s3.config.get_credential().await?;
+
+        let acquire = Instant::now();
+        match self
+            .request(s3, credential.as_deref(), "DynamoDB_20120810.PutItem", 
req)
+            .await
+        {
+            Ok(_) => Ok(TryLockResult::Ok(Lease {
+                acquire,
+                generation: next_gen,
+                timeout: Duration::from_millis(self.timeout),
+            })),
+            Err(e) => match parse_error_response(&e) {
+                Some(e) if e.error.ends_with(CONFLICT) => match 
extract_lease(&e.item) {
+                    Some(lease) => Ok(TryLockResult::Conflict(lease)),
+                    None => Err(Error::Generic {
+                        store: STORE,
+                        source: "Failed to extract lease from conflict 
ReturnValuesOnConditionCheckFailure response".into()
+                    }),
+                },
+                _ => Err(Error::Generic {
+                    store: STORE,
+                    source: Box::new(e),
+                }),
+            },
+        }
+    }
+
+    async fn request<R: Serialize + Send + Sync>(
+        &self,
+        s3: &S3Client,
+        cred: Option<&AwsCredential>,
+        target: &str,
+        req: R,
+    ) -> Result<HttpResponse, RetryError> {
+        let region = &s3.config.region;
+        let authorizer = cred.map(|x| AwsAuthorizer::new(x, "dynamodb", 
region));
+
+        let builder = match &s3.config.endpoint {
+            Some(e) => s3.client.request(Method::POST, e),
+            None => {
+                let url = format!("https://dynamodb.{region}.amazonaws.com";);
+                s3.client.request(Method::POST, url)
+            }
+        };
+
+        // TODO: Timeout
+        builder
+            .json(&req)
+            .header("X-Amz-Target", target)
+            .with_aws_sigv4(authorizer, None)
+            .send_retry(&s3.config.retry_config)
+            .await
+    }
+}
+
+#[derive(Debug)]
+enum TryLockResult {
+    /// Successfully acquired a lease
+    Ok(Lease),
+    /// An existing lease was found
+    Conflict(Lease),
+}
+
+/// Validates that `path` has the given `etag` or doesn't exist if `None`
+async fn check_precondition(client: &Arc<S3Client>, path: &Path, etag: 
Option<&str>) -> Result<()> {
+    let options = GetOptions {
+        head: true,
+        ..Default::default()
+    };
+
+    match etag {
+        Some(expected) => match client.get_opts(path, options).await {
+            Ok(r) => match r.meta.e_tag {
+                Some(actual) if expected == actual => Ok(()),
+                actual => Err(Error::Precondition {
+                    path: path.to_string(),
+                    source: format!("{} does not match {expected}", 
actual.unwrap_or_default())
+                        .into(),
+                }),
+            },
+            Err(Error::NotFound { .. }) => Err(Error::Precondition {
+                path: path.to_string(),
+                source: format!("Object at location {path} not found").into(),
+            }),
+            Err(e) => Err(e),
+        },
+        None => match client.get_opts(path, options).await {
+            Ok(_) => Err(Error::AlreadyExists {
+                path: path.to_string(),
+                source: "Already Exists".to_string().into(),
+            }),
+            Err(Error::NotFound { .. }) => Ok(()),
+            Err(e) => Err(e),
+        },
+    }
+}
+
+/// Parses the error response if any
+fn parse_error_response(e: &RetryError) -> Option<ErrorResponse<'_>> {
+    match e.inner() {
+        RequestError::Status {
+            status: StatusCode::BAD_REQUEST,
+            body: Some(b),
+        } => serde_json::from_str(b).ok(),
+        _ => None,
+    }
+}
+
+/// Extracts a lease from `item`, returning `None` on error
+fn extract_lease(item: &HashMap<&str, AttributeValue<'_>>) -> Option<Lease> {
+    let generation = match item.get("generation") {
+        Some(AttributeValue::Number(generation)) => generation,
+        _ => return None,
+    };
+
+    let timeout = match item.get("timeout") {
+        Some(AttributeValue::Number(timeout)) => *timeout,
+        _ => return None,
+    };
+
+    Some(Lease {
+        acquire: Instant::now(),
+        generation: *generation,
+        timeout: Duration::from_millis(timeout),
+    })
+}
+
+/// A lock lease
+#[derive(Debug, Clone)]
+struct Lease {
+    acquire: Instant,
+    generation: u64,
+    timeout: Duration,
+}
+
+/// A DynamoDB [PutItem] payload
+///
+/// [PutItem]: 
https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html
+#[derive(Serialize)]
+#[serde(rename_all = "PascalCase")]
+struct PutItem<'a> {
+    /// The table name
+    table_name: &'a str,
+
+    /// A condition that must be satisfied in order for a conditional PutItem 
operation to succeed.
+    condition_expression: &'a str,
+
+    /// One or more substitution tokens for attribute names in an expression
+    expression_attribute_names: Map<'a, &'a str, &'a str>,
+
+    /// One or more values that can be substituted in an expression
+    expression_attribute_values: Map<'a, &'a str, AttributeValue<'a>>,
+
+    /// A map of attribute name/value pairs, one for each attribute
+    item: Map<'a, &'a str, AttributeValue<'a>>,
+
+    /// Use ReturnValues if you want to get the item attributes as they 
appeared
+    /// before they were updated with the PutItem request.
+    #[serde(skip_serializing_if = "Option::is_none")]
+    return_values: Option<ReturnValues>,
+
+    /// An optional parameter that returns the item attributes for a PutItem 
operation
+    /// that failed a condition check.
+    #[serde(skip_serializing_if = "Option::is_none")]
+    return_values_on_condition_check_failure: Option<ReturnValues>,
+}
+
+#[derive(Deserialize)]
+struct ErrorResponse<'a> {
+    #[serde(rename = "__type")]
+    error: &'a str,
+
+    #[serde(borrow, default, rename = "Item")]
+    item: HashMap<&'a str, AttributeValue<'a>>,
+}
+
+#[derive(Serialize)]
+#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
+enum ReturnValues {
+    AllOld,
+}
+
+/// A collection of key value pairs
+///
+/// This provides cheap, ordered serialization of maps
+struct Map<'a, K, V>(&'a [(K, V)]);
+
+impl<K: Serialize, V: Serialize> Serialize for Map<'_, K, V> {
+    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
+    where
+        S: Serializer,
+    {
+        if self.0.is_empty() {
+            return serializer.serialize_none();
+        }
+        let mut map = serializer.serialize_map(Some(self.0.len()))?;
+        for (k, v) in self.0 {
+            map.serialize_entry(k, v)?
+        }
+        map.end()
+    }
+}
+
+/// A DynamoDB [AttributeValue]
+///
+/// [AttributeValue]: 
https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_AttributeValue.html
+#[derive(Debug, Serialize, Deserialize)]
+enum AttributeValue<'a> {
+    #[serde(rename = "S")]
+    String(Cow<'a, str>),
+    #[serde(rename = "N", with = "number")]
+    Number(u64),
+}
+
+impl<'a> From<&'a str> for AttributeValue<'a> {
+    fn from(value: &'a str) -> Self {
+        Self::String(Cow::Borrowed(value))
+    }
+}
+
+/// Numbers are serialized as strings
+mod number {
+    use serde::{Deserialize, Deserializer, Serializer};
+
+    pub(crate) fn serialize<S: Serializer>(v: &u64, s: S) -> Result<S::Ok, 
S::Error> {
+        s.serialize_str(&v.to_string())
+    }
+
+    pub(crate) fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<u64, 
D::Error> {
+        let v: &str = Deserialize::deserialize(d)?;
+        v.parse().map_err(serde::de::Error::custom)
+    }
+}
+
+use crate::client::HttpResponse;
+/// Re-export integration_test to be called by s3_test
+#[cfg(test)]
+pub(crate) use tests::integration_test;
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::aws::AmazonS3;
+    use crate::ObjectStore;
+    use rand::distr::Alphanumeric;
+    use rand::{rng, Rng};
+
+    #[test]
+    fn test_attribute_serde() {
+        let serde = 
serde_json::to_string(&AttributeValue::Number(23)).unwrap();
+        assert_eq!(serde, "{\"N\":\"23\"}");
+        let back: AttributeValue<'_> = serde_json::from_str(&serde).unwrap();
+        assert!(matches!(back, AttributeValue::Number(23)));
+    }
+
+    /// An integration test for DynamoDB
+    ///
+    /// This is a function called by s3_test to avoid test concurrency issues
+    pub(crate) async fn integration_test(integration: &AmazonS3, d: 
&DynamoCommit) {
+        let client = &integration.client;
+
+        let src = Path::from("dynamo_path_src");
+        integration.put(&src, "asd".into()).await.unwrap();
+
+        let dst = Path::from("dynamo_path");
+        let _ = integration.delete(&dst).await; // Delete if present
+
+        // Create a lock if not already exists
+        let existing = match d.try_lock(client, dst.as_ref(), None, 
None).await.unwrap() {
+            TryLockResult::Conflict(l) => l,
+            TryLockResult::Ok(l) => l,
+        };
+
+        // Should not be able to acquire a lock again
+        let r = d.try_lock(client, dst.as_ref(), None, None).await;
+        assert!(matches!(r, Ok(TryLockResult::Conflict(_))));
+
+        // But should still be able to reclaim lock and perform copy
+        d.copy_if_not_exists(client, &src, &dst).await.unwrap();
+
+        match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() {
+            TryLockResult::Conflict(new) => {
+                // Should have incremented generation to do so
+                assert_eq!(new.generation, existing.generation + 1);
+            }
+            _ => panic!("Should conflict"),
+        }
+
+        let rng = rng();
+        let etag = 
String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap();
+        let t = Some(etag.as_str());
+
+        let l = match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() 
{
+            TryLockResult::Ok(l) => l,
+            _ => panic!("should not conflict"),
+        };
+
+        match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() {
+            TryLockResult::Conflict(c) => assert_eq!(l.generation, 
c.generation),
+            _ => panic!("should conflict"),
+        }
+
+        match d.try_lock(client, dst.as_ref(), t, Some(&l)).await.unwrap() {
+            TryLockResult::Ok(new) => assert_eq!(new.generation, l.generation 
+ 1),
+            _ => panic!("should not conflict"),
+        }
+    }
+}
diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index 8dac2bd..4abf374 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -56,6 +56,7 @@ mod builder;
 mod checksum;
 mod client;
 mod credential;
+mod dynamo;
 mod precondition;
 
 #[cfg(not(target_arch = "wasm32"))]
@@ -63,6 +64,7 @@ mod resolve;
 
 pub use builder::{AmazonS3Builder, AmazonS3ConfigKey};
 pub use checksum::Checksum;
+pub use dynamo::DynamoCommit;
 pub use precondition::{S3ConditionalPut, S3CopyIfNotExists};
 
 #[cfg(not(target_arch = "wasm32"))]
@@ -195,6 +197,11 @@ impl ObjectStore for AmazonS3 {
                     r => r,
                 }
             }
+            #[allow(deprecated)]
+            (PutMode::Create, S3ConditionalPut::Dynamo(d)) => {
+                d.conditional_op(&self.client, location, None, move || 
request.do_put())
+                    .await
+            }
             (PutMode::Update(v), put) => {
                 let etag = v.e_tag.ok_or_else(|| Error::Generic {
                     store: STORE,
@@ -222,6 +229,13 @@ impl ObjectStore for AmazonS3 {
                             r => r,
                         }
                     }
+                    #[allow(deprecated)]
+                    S3ConditionalPut::Dynamo(d) => {
+                        d.conditional_op(&self.client, location, Some(&etag), 
move || {
+                            request.do_put()
+                        })
+                        .await
+                    }
                     S3ConditionalPut::Disabled => Err(Error::NotImplemented),
                 }
             }
@@ -355,6 +369,10 @@ impl ObjectStore for AmazonS3 {
 
                 return res;
             }
+            #[allow(deprecated)]
+            Some(S3CopyIfNotExists::Dynamo(lock)) => {
+                return lock.copy_if_not_exists(&self.client, from, to).await
+            }
             None => {
                 return Err(Error::NotSupported {
                     source: "S3 does not support 
copy-if-not-exists".to_string().into(),
@@ -622,6 +640,12 @@ mod tests {
         let builder = 
AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256);
         let integration = builder.build().unwrap();
         put_get_delete_list(&integration).await;
+
+        match &integration.client.config.copy_if_not_exists {
+            #[allow(deprecated)]
+            Some(S3CopyIfNotExists::Dynamo(d)) => 
dynamo::integration_test(&integration, d).await,
+            _ => eprintln!("Skipping dynamo integration test - dynamo not 
configured"),
+        };
     }
 
     #[tokio::test]
diff --git a/src/aws/precondition.rs b/src/aws/precondition.rs
index 52ecb9f..2f11e4f 100644
--- a/src/aws/precondition.rs
+++ b/src/aws/precondition.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use crate::aws::dynamo::DynamoCommit;
 use crate::config::Parse;
 
 use itertools::Itertools;
@@ -60,6 +61,16 @@ pub enum S3CopyIfNotExists {
     ///
     /// Encoded as `multipart` ignoring whitespace.
     Multipart,
+    /// The name of a DynamoDB table to use for coordination
+    ///
+    /// Encoded as either `dynamo:<TABLE_NAME>` or 
`dynamo:<TABLE_NAME>:<TIMEOUT_MILLIS>`
+    /// ignoring whitespace. The default timeout is used if not specified
+    ///
+    /// See [`DynamoCommit`] for more information
+    ///
+    /// This will use the same region, credentials and endpoint as configured 
for S3
+    #[deprecated(note = "Use S3CopyIfNotExists::Multipart")]
+    Dynamo(DynamoCommit),
 }
 
 impl std::fmt::Display for S3CopyIfNotExists {
@@ -70,6 +81,8 @@ impl std::fmt::Display for S3CopyIfNotExists {
                 write!(f, "header-with-status: {k}: {v}: {}", code.as_u16())
             }
             Self::Multipart => f.write_str("multipart"),
+            #[allow(deprecated)]
+            Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()),
         }
     }
 }
@@ -97,6 +110,8 @@ impl S3CopyIfNotExists {
                     code,
                 ))
             }
+            #[allow(deprecated)]
+            "dynamo" => Some(Self::Dynamo(DynamoCommit::from_str(value)?)),
             _ => None,
         }
     }
@@ -127,6 +142,17 @@ pub enum S3ConditionalPut {
     #[default]
     ETagMatch,
 
+    /// The name of a DynamoDB table to use for coordination
+    ///
+    /// Encoded as either `dynamo:<TABLE_NAME>` or 
`dynamo:<TABLE_NAME>:<TIMEOUT_MILLIS>`
+    /// ignoring whitespace. The default timeout is used if not specified
+    ///
+    /// See [`DynamoCommit`] for more information
+    ///
+    /// This will use the same region, credentials and endpoint as configured 
for S3
+    #[deprecated(note = "Use S3ConditionalPut::ETagMatch")]
+    Dynamo(DynamoCommit),
+
     /// Disable `conditional put`
     Disabled,
 }
@@ -135,6 +161,8 @@ impl std::fmt::Display for S3ConditionalPut {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         match self {
             Self::ETagMatch => write!(f, "etag"),
+            #[allow(deprecated)]
+            Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()),
             Self::Disabled => write!(f, "disabled"),
         }
     }
@@ -145,7 +173,11 @@ impl S3ConditionalPut {
         match s.trim() {
             "etag" => Some(Self::ETagMatch),
             "disabled" => Some(Self::Disabled),
-            _ => None,
+            trimmed => match trimmed.split_once(':')? {
+                #[allow(deprecated)]
+                ("dynamo", s) => 
Some(Self::Dynamo(DynamoCommit::from_str(s)?)),
+                _ => None,
+            },
         }
     }
 }
@@ -162,6 +194,7 @@ impl Parse for S3ConditionalPut {
 #[cfg(test)]
 mod tests {
     use super::S3CopyIfNotExists;
+    use crate::aws::{DynamoCommit, S3ConditionalPut};
 
     #[test]
     fn parse_s3_copy_if_not_exists_header() {
@@ -186,6 +219,26 @@ mod tests {
         assert_eq!(expected, S3CopyIfNotExists::from_str(input));
     }
 
+    #[test]
+    #[allow(deprecated)]
+    fn parse_s3_copy_if_not_exists_dynamo() {
+        let input = "dynamo: table:100";
+        let expected = Some(S3CopyIfNotExists::Dynamo(
+            DynamoCommit::new("table".into()).with_timeout(100),
+        ));
+        assert_eq!(expected, S3CopyIfNotExists::from_str(input));
+    }
+
+    #[test]
+    #[allow(deprecated)]
+    fn parse_s3_condition_put_dynamo() {
+        let input = "dynamo: table:1300";
+        let expected = Some(S3ConditionalPut::Dynamo(
+            DynamoCommit::new("table".into()).with_timeout(1300),
+        ));
+        assert_eq!(expected, S3ConditionalPut::from_str(input));
+    }
+
     #[test]
     fn parse_s3_copy_if_not_exists_header_whitespace_invariant() {
         let expected = Some(S3CopyIfNotExists::Header(
diff --git a/src/client/builder.rs b/src/client/builder.rs
index f74c5ec..257cb57 100644
--- a/src/client/builder.rs
+++ b/src/client/builder.rs
@@ -165,7 +165,7 @@ impl HttpRequestBuilder {
         self
     }
 
-    #[cfg(feature = "gcp")]
+    #[cfg(any(feature = "aws", feature = "gcp"))]
     pub(crate) fn json<S: serde::Serialize>(mut self, s: S) -> Self {
         match (serde_json::to_vec(&s), &mut self.request) {
             (Ok(json), Ok(request)) => {
diff --git a/src/integration.rs b/src/integration.rs
index 988d8d4..99ee86d 100644
--- a/src/integration.rs
+++ b/src/integration.rs
@@ -34,6 +34,7 @@ use crate::{
 use bytes::Bytes;
 use futures::stream::FuturesUnordered;
 use futures::{StreamExt, TryStreamExt};
+use rand::distr::Alphanumeric;
 use rand::{rng, Rng};
 use std::collections::HashSet;
 use std::slice;
@@ -629,8 +630,15 @@ pub async fn get_opts(storage: &dyn ObjectStore) {
 
 /// Tests conditional writes
 pub async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) {
+    // When using DynamoCommit repeated runs of this test will produce the 
same sequence of records in DynamoDB
+    // As a result each conditional operation will need to wait for the lease 
to timeout before proceeding
+    // One solution would be to clear DynamoDB before each test, but this 
would require non-trivial additional code
+    // so we instead just generate a random suffix for the filenames
+    let rng = rng();
+    let suffix = 
String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap();
+
     delete_fixtures(storage).await;
-    let path = Path::from("put_opts");
+    let path = Path::from(format!("put_opts_{suffix}"));
     let v1 = storage
         .put_opts(&path, "a".into(), PutMode::Create.into())
         .await
@@ -688,7 +696,7 @@ pub async fn put_opts(storage: &dyn ObjectStore, 
supports_update: bool) {
     const NUM_WORKERS: usize = 5;
     const NUM_INCREMENTS: usize = 10;
 
-    let path = Path::from("RACE");
+    let path = Path::from(format!("RACE-{suffix}"));
     let mut futures: FuturesUnordered<_> = (0..NUM_WORKERS)
         .map(|_| async {
             for _ in 0..NUM_INCREMENTS {

Reply via email to