This is an automated email from the ASF dual-hosted git repository.

mbrobbel pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new b82979d  Reapply "refactor: remove AWS dynamo integration (#407)" 
(#494)
b82979d is described below

commit b82979d44a916cf1615e719c8e80800766fd6efe
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Sep 23 05:26:01 2025 -0700

    Reapply "refactor: remove AWS dynamo integration (#407)" (#494)
    
    This reverts commit bebd53b8e3ee4edeb9cc11cb272af7442f536fdb.
---
 .github/workflows/ci.yml |   6 +-
 src/aws/builder.rs       |   1 -
 src/aws/client.rs        |   1 -
 src/aws/dynamo.rs        | 595 -----------------------------------------------
 src/aws/mod.rs           |  24 --
 src/aws/precondition.rs  |  55 +----
 src/client/builder.rs    |   2 +-
 src/integration.rs       |  12 +-
 8 files changed, 6 insertions(+), 690 deletions(-)

diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index ab59e6f..da2b8c4 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -16,7 +16,6 @@
 # under the License.
 
 ---
-
 name: CI
 
 concurrency:
@@ -100,8 +99,8 @@ jobs:
       AWS_SECRET_ACCESS_KEY: test
       AWS_ENDPOINT: http://localhost:4566
       AWS_ALLOW_HTTP: true
-      AWS_COPY_IF_NOT_EXISTS: dynamo:test-table:2000
-      AWS_CONDITIONAL_PUT: dynamo:test-table:2000
+      AWS_COPY_IF_NOT_EXISTS: multipart
+      AWS_CONDITIONAL_PUT: etag
       AWS_SERVER_SIDE_ENCRYPTION: aws:kms
       HTTP_URL: "http://localhost:8080";
       GOOGLE_BUCKET: test-bucket
@@ -132,7 +131,6 @@ jobs:
           aws --endpoint-url=http://localhost:4566 s3 mb 
s3://test-bucket-for-spawn
           aws --endpoint-url=http://localhost:4566 s3 mb 
s3://test-bucket-for-checksum
           aws --endpoint-url=http://localhost:4566 s3api create-bucket 
--bucket test-object-lock --object-lock-enabled-for-bucket
-          aws --endpoint-url=http://localhost:4566 dynamodb create-table 
--table-name test-table --key-schema AttributeName=path,KeyType=HASH 
AttributeName=etag,KeyType=RANGE --attribute-definitions 
AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S 
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
 
           KMS_KEY=$(aws --endpoint-url=http://localhost:4566 kms create-key 
--description "test key")
           echo "AWS_SSE_KMS_KEY_ID=$(echo $KMS_KEY | jq -r 
.KeyMetadata.KeyId)" >> $GITHUB_ENV
diff --git a/src/aws/builder.rs b/src/aws/builder.rs
index 6e6f8e2..06503ca 100644
--- a/src/aws/builder.rs
+++ b/src/aws/builder.rs
@@ -1149,7 +1149,6 @@ impl AmazonS3Builder {
 
         let config = S3Config {
             region,
-            endpoint: self.endpoint,
             bucket,
             bucket_endpoint,
             credentials,
diff --git a/src/aws/client.rs b/src/aws/client.rs
index 4edb977..913859d 100644
--- a/src/aws/client.rs
+++ b/src/aws/client.rs
@@ -193,7 +193,6 @@ impl From<DeleteError> for Error {
 #[derive(Debug)]
 pub(crate) struct S3Config {
     pub region: String,
-    pub endpoint: Option<String>,
     pub bucket: String,
     pub bucket_endpoint: String,
     pub credentials: AwsCredentialProvider,
diff --git a/src/aws/dynamo.rs b/src/aws/dynamo.rs
deleted file mode 100644
index a6775ef..0000000
--- a/src/aws/dynamo.rs
+++ /dev/null
@@ -1,595 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! A DynamoDB based lock system
-
-use std::borrow::Cow;
-use std::collections::HashMap;
-use std::future::Future;
-use std::sync::Arc;
-use std::time::{Duration, Instant};
-
-use chrono::Utc;
-use http::{Method, StatusCode};
-use serde::ser::SerializeMap;
-use serde::{Deserialize, Serialize, Serializer};
-
-use crate::aws::client::S3Client;
-use crate::aws::credential::CredentialExt;
-use crate::aws::{AwsAuthorizer, AwsCredential};
-use crate::client::get::GetClientExt;
-use crate::client::retry::RetryExt;
-use crate::client::retry::{RequestError, RetryError};
-use crate::path::Path;
-use crate::{Error, GetOptions, Result};
-
-/// The exception returned by DynamoDB on conflict
-const CONFLICT: &str = "ConditionalCheckFailedException";
-
-const STORE: &str = "DynamoDB";
-
-/// A DynamoDB-based commit protocol, used to provide conditional write 
support for S3
-///
-/// ## Limitations
-///
-/// Only conditional operations, e.g. `copy_if_not_exists` will be 
synchronized, and can
-/// therefore race with non-conditional operations, e.g. `put`, `copy`, 
`delete`, or
-/// conditional operations performed by writers not configured to synchronize 
with DynamoDB.
-///
-/// Workloads making use of this mechanism **must** ensure:
-///
-/// * Conditional and non-conditional operations are not performed on the same 
paths
-/// * Conditional operations are only performed via similarly configured 
clients
-///
-/// Additionally as the locking mechanism relies on timeouts to detect stale 
locks,
-/// performance will be poor for systems that frequently delete and then create
-/// objects at the same path, instead being optimised for systems that 
primarily create
-/// files with paths never used before, or perform conditional updates to 
existing files
-///
-/// ## Commit Protocol
-///
-/// The DynamoDB schema is as follows:
-///
-/// * A string partition key named `"path"`
-/// * A string sort key named `"etag"`
-/// * A numeric [TTL] attribute named `"ttl"`
-/// * A numeric attribute named `"generation"`
-/// * A numeric attribute named `"timeout"`
-///
-/// An appropriate DynamoDB table can be created with the CLI as follows:
-///
-/// ```bash
-/// $ aws dynamodb create-table --table-name <TABLE_NAME> --key-schema 
AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE 
--attribute-definitions AttributeName=path,AttributeType=S 
AttributeName=etag,AttributeType=S
-/// $ aws dynamodb update-time-to-live --table-name <TABLE_NAME> 
--time-to-live-specification Enabled=true,AttributeName=ttl
-/// ```
-///
-/// To perform a conditional operation on an object with a given `path` and 
`etag` (`*` if creating),
-/// the commit protocol is as follows:
-///
-/// 1. Perform HEAD request on `path` and error on precondition mismatch
-/// 2. Create record in DynamoDB with given `path` and `etag` with the 
configured timeout
-///     1. On Success: Perform operation with the configured timeout
-///     2. On Conflict:
-///         1. Periodically re-perform HEAD request on `path` and error on 
precondition mismatch
-///         2. If `timeout * max_skew_rate` passed, replace the record 
incrementing the `"generation"`
-///             1. On Success: GOTO 2.1
-///             2. On Conflict: GOTO 2.2
-///
-/// Provided no writer modifies an object with a given `path` and `etag` 
without first adding a
-/// corresponding record to DynamoDB, we are guaranteed that only one writer 
will ever commit.
-///
-/// This is inspired by the [DynamoDB Lock Client] but simplified for the more 
limited
-/// requirements of synchronizing object storage. The major changes are:
-///
-/// * Uses a monotonic generation count instead of a UUID rvn, as this is:
-///     * Cheaper to generate, serialize and compare
-///     * Cannot collide
-///     * More human readable / interpretable
-/// * Relies on [TTL] to eventually clean up old locks
-///
-/// It also draws inspiration from the DeltaLake [S3 Multi-Cluster] commit 
protocol, but
-/// generalised to not make assumptions about the workload and not rely on 
first writing
-/// to a temporary path.
-///
-/// [TTL]: 
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/howitworks-ttl.html
-/// [DynamoDB Lock Client]: 
https://aws.amazon.com/blogs/database/building-distributed-locks-with-the-dynamodb-lock-client/
-/// [S3 Multi-Cluster]: 
https://docs.google.com/document/d/1Gs4ZsTH19lMxth4BSdwlWjUNR-XhKHicDvBjd2RqNd8/edit#heading=h.mjjuxw9mcz9h
-#[derive(Debug, Clone, Eq, PartialEq)]
-pub struct DynamoCommit {
-    table_name: String,
-    /// The number of milliseconds a lease is valid for
-    timeout: u64,
-    /// The maximum clock skew rate tolerated by the system
-    max_clock_skew_rate: u32,
-    /// The length of time a record will be retained in DynamoDB before being 
cleaned up
-    ///
-    /// This is purely an optimisation to avoid indefinite growth of the 
DynamoDB table
-    /// and does not impact how long clients may wait to acquire a lock
-    ttl: Duration,
-    /// The backoff duration before retesting a condition
-    test_interval: Duration,
-}
-
-impl DynamoCommit {
-    /// Create a new [`DynamoCommit`] with a given table name
-    pub fn new(table_name: String) -> Self {
-        Self {
-            table_name,
-            timeout: 20_000,
-            max_clock_skew_rate: 3,
-            ttl: Duration::from_secs(60 * 60),
-            test_interval: Duration::from_millis(100),
-        }
-    }
-
-    /// Overrides the lock timeout.
-    ///
-    /// A longer lock timeout reduces the probability of spurious commit 
failures and multi-writer
-    /// races, but will increase the time that writers must wait to reclaim a 
lock lost. The
-    /// default value of 20 seconds should be appropriate for must use-cases.
-    pub fn with_timeout(mut self, millis: u64) -> Self {
-        self.timeout = millis;
-        self
-    }
-
-    /// The maximum clock skew rate tolerated by the system.
-    ///
-    /// An environment in which the clock on the fastest node ticks twice as 
fast as the slowest
-    /// node, would have a clock skew rate of 2. The default value of 3 should 
be appropriate
-    /// for most environments.
-    pub fn with_max_clock_skew_rate(mut self, rate: u32) -> Self {
-        self.max_clock_skew_rate = rate;
-        self
-    }
-
-    /// The length of time a record should be retained in DynamoDB before 
being cleaned up
-    ///
-    /// This should be significantly larger than the configured lock timeout, 
with the default
-    /// value of 1 hour appropriate for most use-cases.
-    pub fn with_ttl(mut self, ttl: Duration) -> Self {
-        self.ttl = ttl;
-        self
-    }
-
-    /// Parse [`DynamoCommit`] from a string
-    pub(crate) fn from_str(value: &str) -> Option<Self> {
-        Some(match value.split_once(':') {
-            Some((table_name, timeout)) => {
-                
Self::new(table_name.trim().to_string()).with_timeout(timeout.parse().ok()?)
-            }
-            None => Self::new(value.trim().to_string()),
-        })
-    }
-
-    /// Returns the name of the DynamoDB table.
-    pub(crate) fn table_name(&self) -> &str {
-        &self.table_name
-    }
-
-    pub(crate) async fn copy_if_not_exists(
-        &self,
-        client: &Arc<S3Client>,
-        from: &Path,
-        to: &Path,
-    ) -> Result<()> {
-        self.conditional_op(client, to, None, || async {
-            client.copy_request(from, to).send().await?;
-            Ok(())
-        })
-        .await
-    }
-
-    #[allow(clippy::future_not_send)] // Generics confound this lint
-    pub(crate) async fn conditional_op<F, Fut, T>(
-        &self,
-        client: &Arc<S3Client>,
-        to: &Path,
-        etag: Option<&str>,
-        op: F,
-    ) -> Result<T>
-    where
-        F: FnOnce() -> Fut,
-        Fut: Future<Output = Result<T, Error>>,
-    {
-        check_precondition(client, to, etag).await?;
-
-        let mut previous_lease = None;
-
-        loop {
-            let existing = previous_lease.as_ref();
-            match self.try_lock(client, to.as_ref(), etag, existing).await? {
-                TryLockResult::Ok(lease) => {
-                    let expiry = lease.acquire + lease.timeout;
-                    return match tokio::time::timeout_at(expiry.into(), 
op()).await {
-                        Ok(Ok(v)) => Ok(v),
-                        Ok(Err(e)) => Err(e),
-                        Err(_) => Err(Error::Generic {
-                            store: "DynamoDB",
-                            source: format!(
-                                "Failed to perform conditional operation in {} 
milliseconds",
-                                self.timeout
-                            )
-                            .into(),
-                        }),
-                    };
-                }
-                TryLockResult::Conflict(conflict) => {
-                    let mut interval = 
tokio::time::interval(self.test_interval);
-                    let expiry = conflict.timeout * self.max_clock_skew_rate;
-                    loop {
-                        interval.tick().await;
-                        check_precondition(client, to, etag).await?;
-                        if conflict.acquire.elapsed() > expiry {
-                            previous_lease = Some(conflict);
-                            break;
-                        }
-                    }
-                }
-            }
-        }
-    }
-
-    /// Attempt to acquire a lock, reclaiming an existing lease if provided
-    async fn try_lock(
-        &self,
-        s3: &S3Client,
-        path: &str,
-        etag: Option<&str>,
-        existing: Option<&Lease>,
-    ) -> Result<TryLockResult> {
-        let attributes;
-        let (next_gen, condition_expression, expression_attribute_values) = 
match existing {
-            None => (0_u64, "attribute_not_exists(#pk)", Map(&[])),
-            Some(existing) => {
-                attributes = [(":g", 
AttributeValue::Number(existing.generation))];
-                (
-                    existing.generation.checked_add(1).unwrap(),
-                    "attribute_exists(#pk) AND generation = :g",
-                    Map(attributes.as_slice()),
-                )
-            }
-        };
-
-        let ttl = (Utc::now() + self.ttl).timestamp();
-        let items = [
-            ("path", AttributeValue::from(path)),
-            ("etag", AttributeValue::from(etag.unwrap_or("*"))),
-            ("generation", AttributeValue::Number(next_gen)),
-            ("timeout", AttributeValue::Number(self.timeout)),
-            ("ttl", AttributeValue::Number(ttl as _)),
-        ];
-        let names = [("#pk", "path")];
-
-        let req = PutItem {
-            table_name: &self.table_name,
-            condition_expression,
-            expression_attribute_values,
-            expression_attribute_names: Map(&names),
-            item: Map(&items),
-            return_values: None,
-            return_values_on_condition_check_failure: 
Some(ReturnValues::AllOld),
-        };
-
-        let credential = s3.config.get_credential().await?;
-
-        let acquire = Instant::now();
-        match self
-            .request(s3, credential.as_deref(), "DynamoDB_20120810.PutItem", 
req)
-            .await
-        {
-            Ok(_) => Ok(TryLockResult::Ok(Lease {
-                acquire,
-                generation: next_gen,
-                timeout: Duration::from_millis(self.timeout),
-            })),
-            Err(e) => match parse_error_response(&e) {
-                Some(e) if e.error.ends_with(CONFLICT) => match 
extract_lease(&e.item) {
-                    Some(lease) => Ok(TryLockResult::Conflict(lease)),
-                    None => Err(Error::Generic {
-                        store: STORE,
-                        source: "Failed to extract lease from conflict 
ReturnValuesOnConditionCheckFailure response".into()
-                    }),
-                },
-                _ => Err(Error::Generic {
-                    store: STORE,
-                    source: Box::new(e),
-                }),
-            },
-        }
-    }
-
-    async fn request<R: Serialize + Send + Sync>(
-        &self,
-        s3: &S3Client,
-        cred: Option<&AwsCredential>,
-        target: &str,
-        req: R,
-    ) -> Result<HttpResponse, RetryError> {
-        let region = &s3.config.region;
-        let authorizer = cred.map(|x| AwsAuthorizer::new(x, "dynamodb", 
region));
-
-        let builder = match &s3.config.endpoint {
-            Some(e) => s3.client.request(Method::POST, e),
-            None => {
-                let url = format!("https://dynamodb.{region}.amazonaws.com";);
-                s3.client.request(Method::POST, url)
-            }
-        };
-
-        // TODO: Timeout
-        builder
-            .json(&req)
-            .header("X-Amz-Target", target)
-            .with_aws_sigv4(authorizer, None)
-            .send_retry(&s3.config.retry_config)
-            .await
-    }
-}
-
-#[derive(Debug)]
-enum TryLockResult {
-    /// Successfully acquired a lease
-    Ok(Lease),
-    /// An existing lease was found
-    Conflict(Lease),
-}
-
-/// Validates that `path` has the given `etag` or doesn't exist if `None`
-async fn check_precondition(client: &Arc<S3Client>, path: &Path, etag: 
Option<&str>) -> Result<()> {
-    let options = GetOptions {
-        head: true,
-        ..Default::default()
-    };
-
-    match etag {
-        Some(expected) => match client.get_opts(path, options).await {
-            Ok(r) => match r.meta.e_tag {
-                Some(actual) if expected == actual => Ok(()),
-                actual => Err(Error::Precondition {
-                    path: path.to_string(),
-                    source: format!("{} does not match {expected}", 
actual.unwrap_or_default())
-                        .into(),
-                }),
-            },
-            Err(Error::NotFound { .. }) => Err(Error::Precondition {
-                path: path.to_string(),
-                source: format!("Object at location {path} not found").into(),
-            }),
-            Err(e) => Err(e),
-        },
-        None => match client.get_opts(path, options).await {
-            Ok(_) => Err(Error::AlreadyExists {
-                path: path.to_string(),
-                source: "Already Exists".to_string().into(),
-            }),
-            Err(Error::NotFound { .. }) => Ok(()),
-            Err(e) => Err(e),
-        },
-    }
-}
-
-/// Parses the error response if any
-fn parse_error_response(e: &RetryError) -> Option<ErrorResponse<'_>> {
-    match e.inner() {
-        RequestError::Status {
-            status: StatusCode::BAD_REQUEST,
-            body: Some(b),
-        } => serde_json::from_str(b).ok(),
-        _ => None,
-    }
-}
-
-/// Extracts a lease from `item`, returning `None` on error
-fn extract_lease(item: &HashMap<&str, AttributeValue<'_>>) -> Option<Lease> {
-    let generation = match item.get("generation") {
-        Some(AttributeValue::Number(generation)) => generation,
-        _ => return None,
-    };
-
-    let timeout = match item.get("timeout") {
-        Some(AttributeValue::Number(timeout)) => *timeout,
-        _ => return None,
-    };
-
-    Some(Lease {
-        acquire: Instant::now(),
-        generation: *generation,
-        timeout: Duration::from_millis(timeout),
-    })
-}
-
-/// A lock lease
-#[derive(Debug, Clone)]
-struct Lease {
-    acquire: Instant,
-    generation: u64,
-    timeout: Duration,
-}
-
-/// A DynamoDB [PutItem] payload
-///
-/// [PutItem]: 
https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html
-#[derive(Serialize)]
-#[serde(rename_all = "PascalCase")]
-struct PutItem<'a> {
-    /// The table name
-    table_name: &'a str,
-
-    /// A condition that must be satisfied in order for a conditional PutItem 
operation to succeed.
-    condition_expression: &'a str,
-
-    /// One or more substitution tokens for attribute names in an expression
-    expression_attribute_names: Map<'a, &'a str, &'a str>,
-
-    /// One or more values that can be substituted in an expression
-    expression_attribute_values: Map<'a, &'a str, AttributeValue<'a>>,
-
-    /// A map of attribute name/value pairs, one for each attribute
-    item: Map<'a, &'a str, AttributeValue<'a>>,
-
-    /// Use ReturnValues if you want to get the item attributes as they 
appeared
-    /// before they were updated with the PutItem request.
-    #[serde(skip_serializing_if = "Option::is_none")]
-    return_values: Option<ReturnValues>,
-
-    /// An optional parameter that returns the item attributes for a PutItem 
operation
-    /// that failed a condition check.
-    #[serde(skip_serializing_if = "Option::is_none")]
-    return_values_on_condition_check_failure: Option<ReturnValues>,
-}
-
-#[derive(Deserialize)]
-struct ErrorResponse<'a> {
-    #[serde(rename = "__type")]
-    error: &'a str,
-
-    #[serde(borrow, default, rename = "Item")]
-    item: HashMap<&'a str, AttributeValue<'a>>,
-}
-
-#[derive(Serialize)]
-#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
-enum ReturnValues {
-    AllOld,
-}
-
-/// A collection of key value pairs
-///
-/// This provides cheap, ordered serialization of maps
-struct Map<'a, K, V>(&'a [(K, V)]);
-
-impl<K: Serialize, V: Serialize> Serialize for Map<'_, K, V> {
-    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
-    where
-        S: Serializer,
-    {
-        if self.0.is_empty() {
-            return serializer.serialize_none();
-        }
-        let mut map = serializer.serialize_map(Some(self.0.len()))?;
-        for (k, v) in self.0 {
-            map.serialize_entry(k, v)?
-        }
-        map.end()
-    }
-}
-
-/// A DynamoDB [AttributeValue]
-///
-/// [AttributeValue]: 
https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_AttributeValue.html
-#[derive(Debug, Serialize, Deserialize)]
-enum AttributeValue<'a> {
-    #[serde(rename = "S")]
-    String(Cow<'a, str>),
-    #[serde(rename = "N", with = "number")]
-    Number(u64),
-}
-
-impl<'a> From<&'a str> for AttributeValue<'a> {
-    fn from(value: &'a str) -> Self {
-        Self::String(Cow::Borrowed(value))
-    }
-}
-
-/// Numbers are serialized as strings
-mod number {
-    use serde::{Deserialize, Deserializer, Serializer};
-
-    pub(crate) fn serialize<S: Serializer>(v: &u64, s: S) -> Result<S::Ok, 
S::Error> {
-        s.serialize_str(&v.to_string())
-    }
-
-    pub(crate) fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<u64, 
D::Error> {
-        let v: &str = Deserialize::deserialize(d)?;
-        v.parse().map_err(serde::de::Error::custom)
-    }
-}
-
-use crate::client::HttpResponse;
-/// Re-export integration_test to be called by s3_test
-#[cfg(test)]
-pub(crate) use tests::integration_test;
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-    use crate::aws::AmazonS3;
-    use crate::ObjectStore;
-    use rand::distr::Alphanumeric;
-    use rand::{rng, Rng};
-
-    #[test]
-    fn test_attribute_serde() {
-        let serde = 
serde_json::to_string(&AttributeValue::Number(23)).unwrap();
-        assert_eq!(serde, "{\"N\":\"23\"}");
-        let back: AttributeValue<'_> = serde_json::from_str(&serde).unwrap();
-        assert!(matches!(back, AttributeValue::Number(23)));
-    }
-
-    /// An integration test for DynamoDB
-    ///
-    /// This is a function called by s3_test to avoid test concurrency issues
-    pub(crate) async fn integration_test(integration: &AmazonS3, d: 
&DynamoCommit) {
-        let client = &integration.client;
-
-        let src = Path::from("dynamo_path_src");
-        integration.put(&src, "asd".into()).await.unwrap();
-
-        let dst = Path::from("dynamo_path");
-        let _ = integration.delete(&dst).await; // Delete if present
-
-        // Create a lock if not already exists
-        let existing = match d.try_lock(client, dst.as_ref(), None, 
None).await.unwrap() {
-            TryLockResult::Conflict(l) => l,
-            TryLockResult::Ok(l) => l,
-        };
-
-        // Should not be able to acquire a lock again
-        let r = d.try_lock(client, dst.as_ref(), None, None).await;
-        assert!(matches!(r, Ok(TryLockResult::Conflict(_))));
-
-        // But should still be able to reclaim lock and perform copy
-        d.copy_if_not_exists(client, &src, &dst).await.unwrap();
-
-        match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() {
-            TryLockResult::Conflict(new) => {
-                // Should have incremented generation to do so
-                assert_eq!(new.generation, existing.generation + 1);
-            }
-            _ => panic!("Should conflict"),
-        }
-
-        let rng = rng();
-        let etag = 
String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap();
-        let t = Some(etag.as_str());
-
-        let l = match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() 
{
-            TryLockResult::Ok(l) => l,
-            _ => panic!("should not conflict"),
-        };
-
-        match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() {
-            TryLockResult::Conflict(c) => assert_eq!(l.generation, 
c.generation),
-            _ => panic!("should conflict"),
-        }
-
-        match d.try_lock(client, dst.as_ref(), t, Some(&l)).await.unwrap() {
-            TryLockResult::Ok(new) => assert_eq!(new.generation, l.generation 
+ 1),
-            _ => panic!("should not conflict"),
-        }
-    }
-}
diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index 4abf374..8dac2bd 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -56,7 +56,6 @@ mod builder;
 mod checksum;
 mod client;
 mod credential;
-mod dynamo;
 mod precondition;
 
 #[cfg(not(target_arch = "wasm32"))]
@@ -64,7 +63,6 @@ mod resolve;
 
 pub use builder::{AmazonS3Builder, AmazonS3ConfigKey};
 pub use checksum::Checksum;
-pub use dynamo::DynamoCommit;
 pub use precondition::{S3ConditionalPut, S3CopyIfNotExists};
 
 #[cfg(not(target_arch = "wasm32"))]
@@ -197,11 +195,6 @@ impl ObjectStore for AmazonS3 {
                     r => r,
                 }
             }
-            #[allow(deprecated)]
-            (PutMode::Create, S3ConditionalPut::Dynamo(d)) => {
-                d.conditional_op(&self.client, location, None, move || 
request.do_put())
-                    .await
-            }
             (PutMode::Update(v), put) => {
                 let etag = v.e_tag.ok_or_else(|| Error::Generic {
                     store: STORE,
@@ -229,13 +222,6 @@ impl ObjectStore for AmazonS3 {
                             r => r,
                         }
                     }
-                    #[allow(deprecated)]
-                    S3ConditionalPut::Dynamo(d) => {
-                        d.conditional_op(&self.client, location, Some(&etag), 
move || {
-                            request.do_put()
-                        })
-                        .await
-                    }
                     S3ConditionalPut::Disabled => Err(Error::NotImplemented),
                 }
             }
@@ -369,10 +355,6 @@ impl ObjectStore for AmazonS3 {
 
                 return res;
             }
-            #[allow(deprecated)]
-            Some(S3CopyIfNotExists::Dynamo(lock)) => {
-                return lock.copy_if_not_exists(&self.client, from, to).await
-            }
             None => {
                 return Err(Error::NotSupported {
                     source: "S3 does not support 
copy-if-not-exists".to_string().into(),
@@ -640,12 +622,6 @@ mod tests {
         let builder = 
AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256);
         let integration = builder.build().unwrap();
         put_get_delete_list(&integration).await;
-
-        match &integration.client.config.copy_if_not_exists {
-            #[allow(deprecated)]
-            Some(S3CopyIfNotExists::Dynamo(d)) => 
dynamo::integration_test(&integration, d).await,
-            _ => eprintln!("Skipping dynamo integration test - dynamo not 
configured"),
-        };
     }
 
     #[tokio::test]
diff --git a/src/aws/precondition.rs b/src/aws/precondition.rs
index 2f11e4f..52ecb9f 100644
--- a/src/aws/precondition.rs
+++ b/src/aws/precondition.rs
@@ -15,7 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::aws::dynamo::DynamoCommit;
 use crate::config::Parse;
 
 use itertools::Itertools;
@@ -61,16 +60,6 @@ pub enum S3CopyIfNotExists {
     ///
     /// Encoded as `multipart` ignoring whitespace.
     Multipart,
-    /// The name of a DynamoDB table to use for coordination
-    ///
-    /// Encoded as either `dynamo:<TABLE_NAME>` or 
`dynamo:<TABLE_NAME>:<TIMEOUT_MILLIS>`
-    /// ignoring whitespace. The default timeout is used if not specified
-    ///
-    /// See [`DynamoCommit`] for more information
-    ///
-    /// This will use the same region, credentials and endpoint as configured 
for S3
-    #[deprecated(note = "Use S3CopyIfNotExists::Multipart")]
-    Dynamo(DynamoCommit),
 }
 
 impl std::fmt::Display for S3CopyIfNotExists {
@@ -81,8 +70,6 @@ impl std::fmt::Display for S3CopyIfNotExists {
                 write!(f, "header-with-status: {k}: {v}: {}", code.as_u16())
             }
             Self::Multipart => f.write_str("multipart"),
-            #[allow(deprecated)]
-            Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()),
         }
     }
 }
@@ -110,8 +97,6 @@ impl S3CopyIfNotExists {
                     code,
                 ))
             }
-            #[allow(deprecated)]
-            "dynamo" => Some(Self::Dynamo(DynamoCommit::from_str(value)?)),
             _ => None,
         }
     }
@@ -142,17 +127,6 @@ pub enum S3ConditionalPut {
     #[default]
     ETagMatch,
 
-    /// The name of a DynamoDB table to use for coordination
-    ///
-    /// Encoded as either `dynamo:<TABLE_NAME>` or 
`dynamo:<TABLE_NAME>:<TIMEOUT_MILLIS>`
-    /// ignoring whitespace. The default timeout is used if not specified
-    ///
-    /// See [`DynamoCommit`] for more information
-    ///
-    /// This will use the same region, credentials and endpoint as configured 
for S3
-    #[deprecated(note = "Use S3ConditionalPut::ETagMatch")]
-    Dynamo(DynamoCommit),
-
     /// Disable `conditional put`
     Disabled,
 }
@@ -161,8 +135,6 @@ impl std::fmt::Display for S3ConditionalPut {
     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
         match self {
             Self::ETagMatch => write!(f, "etag"),
-            #[allow(deprecated)]
-            Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()),
             Self::Disabled => write!(f, "disabled"),
         }
     }
@@ -173,11 +145,7 @@ impl S3ConditionalPut {
         match s.trim() {
             "etag" => Some(Self::ETagMatch),
             "disabled" => Some(Self::Disabled),
-            trimmed => match trimmed.split_once(':')? {
-                #[allow(deprecated)]
-                ("dynamo", s) => 
Some(Self::Dynamo(DynamoCommit::from_str(s)?)),
-                _ => None,
-            },
+            _ => None,
         }
     }
 }
@@ -194,7 +162,6 @@ impl Parse for S3ConditionalPut {
 #[cfg(test)]
 mod tests {
     use super::S3CopyIfNotExists;
-    use crate::aws::{DynamoCommit, S3ConditionalPut};
 
     #[test]
     fn parse_s3_copy_if_not_exists_header() {
@@ -219,26 +186,6 @@ mod tests {
         assert_eq!(expected, S3CopyIfNotExists::from_str(input));
     }
 
-    #[test]
-    #[allow(deprecated)]
-    fn parse_s3_copy_if_not_exists_dynamo() {
-        let input = "dynamo: table:100";
-        let expected = Some(S3CopyIfNotExists::Dynamo(
-            DynamoCommit::new("table".into()).with_timeout(100),
-        ));
-        assert_eq!(expected, S3CopyIfNotExists::from_str(input));
-    }
-
-    #[test]
-    #[allow(deprecated)]
-    fn parse_s3_condition_put_dynamo() {
-        let input = "dynamo: table:1300";
-        let expected = Some(S3ConditionalPut::Dynamo(
-            DynamoCommit::new("table".into()).with_timeout(1300),
-        ));
-        assert_eq!(expected, S3ConditionalPut::from_str(input));
-    }
-
     #[test]
     fn parse_s3_copy_if_not_exists_header_whitespace_invariant() {
         let expected = Some(S3CopyIfNotExists::Header(
diff --git a/src/client/builder.rs b/src/client/builder.rs
index 257cb57..f74c5ec 100644
--- a/src/client/builder.rs
+++ b/src/client/builder.rs
@@ -165,7 +165,7 @@ impl HttpRequestBuilder {
         self
     }
 
-    #[cfg(any(feature = "aws", feature = "gcp"))]
+    #[cfg(feature = "gcp")]
     pub(crate) fn json<S: serde::Serialize>(mut self, s: S) -> Self {
         match (serde_json::to_vec(&s), &mut self.request) {
             (Ok(json), Ok(request)) => {
diff --git a/src/integration.rs b/src/integration.rs
index 99ee86d..988d8d4 100644
--- a/src/integration.rs
+++ b/src/integration.rs
@@ -34,7 +34,6 @@ use crate::{
 use bytes::Bytes;
 use futures::stream::FuturesUnordered;
 use futures::{StreamExt, TryStreamExt};
-use rand::distr::Alphanumeric;
 use rand::{rng, Rng};
 use std::collections::HashSet;
 use std::slice;
@@ -630,15 +629,8 @@ pub async fn get_opts(storage: &dyn ObjectStore) {
 
 /// Tests conditional writes
 pub async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) {
-    // When using DynamoCommit repeated runs of this test will produce the 
same sequence of records in DynamoDB
-    // As a result each conditional operation will need to wait for the lease 
to timeout before proceeding
-    // One solution would be to clear DynamoDB before each test, but this 
would require non-trivial additional code
-    // so we instead just generate a random suffix for the filenames
-    let rng = rng();
-    let suffix = 
String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap();
-
     delete_fixtures(storage).await;
-    let path = Path::from(format!("put_opts_{suffix}"));
+    let path = Path::from("put_opts");
     let v1 = storage
         .put_opts(&path, "a".into(), PutMode::Create.into())
         .await
@@ -696,7 +688,7 @@ pub async fn put_opts(storage: &dyn ObjectStore, 
supports_update: bool) {
     const NUM_WORKERS: usize = 5;
     const NUM_INCREMENTS: usize = 10;
 
-    let path = Path::from(format!("RACE-{suffix}"));
+    let path = Path::from("RACE");
     let mut futures: FuturesUnordered<_> = (0..NUM_WORKERS)
         .map(|_| async {
             for _ in 0..NUM_INCREMENTS {

Reply via email to