This is an automated email from the ASF dual-hosted git repository.
mbrobbel pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git
The following commit(s) were added to refs/heads/main by this push:
new b82979d Reapply "refactor: remove AWS dynamo integration (#407)"
(#494)
b82979d is described below
commit b82979d44a916cf1615e719c8e80800766fd6efe
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Sep 23 05:26:01 2025 -0700
Reapply "refactor: remove AWS dynamo integration (#407)" (#494)
This reverts commit bebd53b8e3ee4edeb9cc11cb272af7442f536fdb.
---
.github/workflows/ci.yml | 6 +-
src/aws/builder.rs | 1 -
src/aws/client.rs | 1 -
src/aws/dynamo.rs | 595 -----------------------------------------------
src/aws/mod.rs | 24 --
src/aws/precondition.rs | 55 +----
src/client/builder.rs | 2 +-
src/integration.rs | 12 +-
8 files changed, 6 insertions(+), 690 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index ab59e6f..da2b8c4 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -16,7 +16,6 @@
# under the License.
---
-
name: CI
concurrency:
@@ -100,8 +99,8 @@ jobs:
AWS_SECRET_ACCESS_KEY: test
AWS_ENDPOINT: http://localhost:4566
AWS_ALLOW_HTTP: true
- AWS_COPY_IF_NOT_EXISTS: dynamo:test-table:2000
- AWS_CONDITIONAL_PUT: dynamo:test-table:2000
+ AWS_COPY_IF_NOT_EXISTS: multipart
+ AWS_CONDITIONAL_PUT: etag
AWS_SERVER_SIDE_ENCRYPTION: aws:kms
HTTP_URL: "http://localhost:8080"
GOOGLE_BUCKET: test-bucket
@@ -132,7 +131,6 @@ jobs:
aws --endpoint-url=http://localhost:4566 s3 mb
s3://test-bucket-for-spawn
aws --endpoint-url=http://localhost:4566 s3 mb
s3://test-bucket-for-checksum
aws --endpoint-url=http://localhost:4566 s3api create-bucket
--bucket test-object-lock --object-lock-enabled-for-bucket
- aws --endpoint-url=http://localhost:4566 dynamodb create-table
--table-name test-table --key-schema AttributeName=path,KeyType=HASH
AttributeName=etag,KeyType=RANGE --attribute-definitions
AttributeName=path,AttributeType=S AttributeName=etag,AttributeType=S
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5
KMS_KEY=$(aws --endpoint-url=http://localhost:4566 kms create-key
--description "test key")
echo "AWS_SSE_KMS_KEY_ID=$(echo $KMS_KEY | jq -r
.KeyMetadata.KeyId)" >> $GITHUB_ENV
diff --git a/src/aws/builder.rs b/src/aws/builder.rs
index 6e6f8e2..06503ca 100644
--- a/src/aws/builder.rs
+++ b/src/aws/builder.rs
@@ -1149,7 +1149,6 @@ impl AmazonS3Builder {
let config = S3Config {
region,
- endpoint: self.endpoint,
bucket,
bucket_endpoint,
credentials,
diff --git a/src/aws/client.rs b/src/aws/client.rs
index 4edb977..913859d 100644
--- a/src/aws/client.rs
+++ b/src/aws/client.rs
@@ -193,7 +193,6 @@ impl From<DeleteError> for Error {
#[derive(Debug)]
pub(crate) struct S3Config {
pub region: String,
- pub endpoint: Option<String>,
pub bucket: String,
pub bucket_endpoint: String,
pub credentials: AwsCredentialProvider,
diff --git a/src/aws/dynamo.rs b/src/aws/dynamo.rs
deleted file mode 100644
index a6775ef..0000000
--- a/src/aws/dynamo.rs
+++ /dev/null
@@ -1,595 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! A DynamoDB based lock system
-
-use std::borrow::Cow;
-use std::collections::HashMap;
-use std::future::Future;
-use std::sync::Arc;
-use std::time::{Duration, Instant};
-
-use chrono::Utc;
-use http::{Method, StatusCode};
-use serde::ser::SerializeMap;
-use serde::{Deserialize, Serialize, Serializer};
-
-use crate::aws::client::S3Client;
-use crate::aws::credential::CredentialExt;
-use crate::aws::{AwsAuthorizer, AwsCredential};
-use crate::client::get::GetClientExt;
-use crate::client::retry::RetryExt;
-use crate::client::retry::{RequestError, RetryError};
-use crate::path::Path;
-use crate::{Error, GetOptions, Result};
-
-/// The exception returned by DynamoDB on conflict
-const CONFLICT: &str = "ConditionalCheckFailedException";
-
-const STORE: &str = "DynamoDB";
-
-/// A DynamoDB-based commit protocol, used to provide conditional write
support for S3
-///
-/// ## Limitations
-///
-/// Only conditional operations, e.g. `copy_if_not_exists` will be
synchronized, and can
-/// therefore race with non-conditional operations, e.g. `put`, `copy`,
`delete`, or
-/// conditional operations performed by writers not configured to synchronize
with DynamoDB.
-///
-/// Workloads making use of this mechanism **must** ensure:
-///
-/// * Conditional and non-conditional operations are not performed on the same
paths
-/// * Conditional operations are only performed via similarly configured
clients
-///
-/// Additionally as the locking mechanism relies on timeouts to detect stale
locks,
-/// performance will be poor for systems that frequently delete and then create
-/// objects at the same path, instead being optimised for systems that
primarily create
-/// files with paths never used before, or perform conditional updates to
existing files
-///
-/// ## Commit Protocol
-///
-/// The DynamoDB schema is as follows:
-///
-/// * A string partition key named `"path"`
-/// * A string sort key named `"etag"`
-/// * A numeric [TTL] attribute named `"ttl"`
-/// * A numeric attribute named `"generation"`
-/// * A numeric attribute named `"timeout"`
-///
-/// An appropriate DynamoDB table can be created with the CLI as follows:
-///
-/// ```bash
-/// $ aws dynamodb create-table --table-name <TABLE_NAME> --key-schema
AttributeName=path,KeyType=HASH AttributeName=etag,KeyType=RANGE
--attribute-definitions AttributeName=path,AttributeType=S
AttributeName=etag,AttributeType=S
-/// $ aws dynamodb update-time-to-live --table-name <TABLE_NAME>
--time-to-live-specification Enabled=true,AttributeName=ttl
-/// ```
-///
-/// To perform a conditional operation on an object with a given `path` and
`etag` (`*` if creating),
-/// the commit protocol is as follows:
-///
-/// 1. Perform HEAD request on `path` and error on precondition mismatch
-/// 2. Create record in DynamoDB with given `path` and `etag` with the
configured timeout
-/// 1. On Success: Perform operation with the configured timeout
-/// 2. On Conflict:
-/// 1. Periodically re-perform HEAD request on `path` and error on
precondition mismatch
-/// 2. If `timeout * max_skew_rate` passed, replace the record
incrementing the `"generation"`
-/// 1. On Success: GOTO 2.1
-/// 2. On Conflict: GOTO 2.2
-///
-/// Provided no writer modifies an object with a given `path` and `etag`
without first adding a
-/// corresponding record to DynamoDB, we are guaranteed that only one writer
will ever commit.
-///
-/// This is inspired by the [DynamoDB Lock Client] but simplified for the more
limited
-/// requirements of synchronizing object storage. The major changes are:
-///
-/// * Uses a monotonic generation count instead of a UUID rvn, as this is:
-/// * Cheaper to generate, serialize and compare
-/// * Cannot collide
-/// * More human readable / interpretable
-/// * Relies on [TTL] to eventually clean up old locks
-///
-/// It also draws inspiration from the DeltaLake [S3 Multi-Cluster] commit
protocol, but
-/// generalised to not make assumptions about the workload and not rely on
first writing
-/// to a temporary path.
-///
-/// [TTL]:
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/howitworks-ttl.html
-/// [DynamoDB Lock Client]:
https://aws.amazon.com/blogs/database/building-distributed-locks-with-the-dynamodb-lock-client/
-/// [S3 Multi-Cluster]:
https://docs.google.com/document/d/1Gs4ZsTH19lMxth4BSdwlWjUNR-XhKHicDvBjd2RqNd8/edit#heading=h.mjjuxw9mcz9h
-#[derive(Debug, Clone, Eq, PartialEq)]
-pub struct DynamoCommit {
- table_name: String,
- /// The number of milliseconds a lease is valid for
- timeout: u64,
- /// The maximum clock skew rate tolerated by the system
- max_clock_skew_rate: u32,
- /// The length of time a record will be retained in DynamoDB before being
cleaned up
- ///
- /// This is purely an optimisation to avoid indefinite growth of the
DynamoDB table
- /// and does not impact how long clients may wait to acquire a lock
- ttl: Duration,
- /// The backoff duration before retesting a condition
- test_interval: Duration,
-}
-
-impl DynamoCommit {
- /// Create a new [`DynamoCommit`] with a given table name
- pub fn new(table_name: String) -> Self {
- Self {
- table_name,
- timeout: 20_000,
- max_clock_skew_rate: 3,
- ttl: Duration::from_secs(60 * 60),
- test_interval: Duration::from_millis(100),
- }
- }
-
- /// Overrides the lock timeout.
- ///
- /// A longer lock timeout reduces the probability of spurious commit
failures and multi-writer
- /// races, but will increase the time that writers must wait to reclaim a
lock lost. The
- /// default value of 20 seconds should be appropriate for must use-cases.
- pub fn with_timeout(mut self, millis: u64) -> Self {
- self.timeout = millis;
- self
- }
-
- /// The maximum clock skew rate tolerated by the system.
- ///
- /// An environment in which the clock on the fastest node ticks twice as
fast as the slowest
- /// node, would have a clock skew rate of 2. The default value of 3 should
be appropriate
- /// for most environments.
- pub fn with_max_clock_skew_rate(mut self, rate: u32) -> Self {
- self.max_clock_skew_rate = rate;
- self
- }
-
- /// The length of time a record should be retained in DynamoDB before
being cleaned up
- ///
- /// This should be significantly larger than the configured lock timeout,
with the default
- /// value of 1 hour appropriate for most use-cases.
- pub fn with_ttl(mut self, ttl: Duration) -> Self {
- self.ttl = ttl;
- self
- }
-
- /// Parse [`DynamoCommit`] from a string
- pub(crate) fn from_str(value: &str) -> Option<Self> {
- Some(match value.split_once(':') {
- Some((table_name, timeout)) => {
-
Self::new(table_name.trim().to_string()).with_timeout(timeout.parse().ok()?)
- }
- None => Self::new(value.trim().to_string()),
- })
- }
-
- /// Returns the name of the DynamoDB table.
- pub(crate) fn table_name(&self) -> &str {
- &self.table_name
- }
-
- pub(crate) async fn copy_if_not_exists(
- &self,
- client: &Arc<S3Client>,
- from: &Path,
- to: &Path,
- ) -> Result<()> {
- self.conditional_op(client, to, None, || async {
- client.copy_request(from, to).send().await?;
- Ok(())
- })
- .await
- }
-
- #[allow(clippy::future_not_send)] // Generics confound this lint
- pub(crate) async fn conditional_op<F, Fut, T>(
- &self,
- client: &Arc<S3Client>,
- to: &Path,
- etag: Option<&str>,
- op: F,
- ) -> Result<T>
- where
- F: FnOnce() -> Fut,
- Fut: Future<Output = Result<T, Error>>,
- {
- check_precondition(client, to, etag).await?;
-
- let mut previous_lease = None;
-
- loop {
- let existing = previous_lease.as_ref();
- match self.try_lock(client, to.as_ref(), etag, existing).await? {
- TryLockResult::Ok(lease) => {
- let expiry = lease.acquire + lease.timeout;
- return match tokio::time::timeout_at(expiry.into(),
op()).await {
- Ok(Ok(v)) => Ok(v),
- Ok(Err(e)) => Err(e),
- Err(_) => Err(Error::Generic {
- store: "DynamoDB",
- source: format!(
- "Failed to perform conditional operation in {}
milliseconds",
- self.timeout
- )
- .into(),
- }),
- };
- }
- TryLockResult::Conflict(conflict) => {
- let mut interval =
tokio::time::interval(self.test_interval);
- let expiry = conflict.timeout * self.max_clock_skew_rate;
- loop {
- interval.tick().await;
- check_precondition(client, to, etag).await?;
- if conflict.acquire.elapsed() > expiry {
- previous_lease = Some(conflict);
- break;
- }
- }
- }
- }
- }
- }
-
- /// Attempt to acquire a lock, reclaiming an existing lease if provided
- async fn try_lock(
- &self,
- s3: &S3Client,
- path: &str,
- etag: Option<&str>,
- existing: Option<&Lease>,
- ) -> Result<TryLockResult> {
- let attributes;
- let (next_gen, condition_expression, expression_attribute_values) =
match existing {
- None => (0_u64, "attribute_not_exists(#pk)", Map(&[])),
- Some(existing) => {
- attributes = [(":g",
AttributeValue::Number(existing.generation))];
- (
- existing.generation.checked_add(1).unwrap(),
- "attribute_exists(#pk) AND generation = :g",
- Map(attributes.as_slice()),
- )
- }
- };
-
- let ttl = (Utc::now() + self.ttl).timestamp();
- let items = [
- ("path", AttributeValue::from(path)),
- ("etag", AttributeValue::from(etag.unwrap_or("*"))),
- ("generation", AttributeValue::Number(next_gen)),
- ("timeout", AttributeValue::Number(self.timeout)),
- ("ttl", AttributeValue::Number(ttl as _)),
- ];
- let names = [("#pk", "path")];
-
- let req = PutItem {
- table_name: &self.table_name,
- condition_expression,
- expression_attribute_values,
- expression_attribute_names: Map(&names),
- item: Map(&items),
- return_values: None,
- return_values_on_condition_check_failure:
Some(ReturnValues::AllOld),
- };
-
- let credential = s3.config.get_credential().await?;
-
- let acquire = Instant::now();
- match self
- .request(s3, credential.as_deref(), "DynamoDB_20120810.PutItem",
req)
- .await
- {
- Ok(_) => Ok(TryLockResult::Ok(Lease {
- acquire,
- generation: next_gen,
- timeout: Duration::from_millis(self.timeout),
- })),
- Err(e) => match parse_error_response(&e) {
- Some(e) if e.error.ends_with(CONFLICT) => match
extract_lease(&e.item) {
- Some(lease) => Ok(TryLockResult::Conflict(lease)),
- None => Err(Error::Generic {
- store: STORE,
- source: "Failed to extract lease from conflict
ReturnValuesOnConditionCheckFailure response".into()
- }),
- },
- _ => Err(Error::Generic {
- store: STORE,
- source: Box::new(e),
- }),
- },
- }
- }
-
- async fn request<R: Serialize + Send + Sync>(
- &self,
- s3: &S3Client,
- cred: Option<&AwsCredential>,
- target: &str,
- req: R,
- ) -> Result<HttpResponse, RetryError> {
- let region = &s3.config.region;
- let authorizer = cred.map(|x| AwsAuthorizer::new(x, "dynamodb",
region));
-
- let builder = match &s3.config.endpoint {
- Some(e) => s3.client.request(Method::POST, e),
- None => {
- let url = format!("https://dynamodb.{region}.amazonaws.com");
- s3.client.request(Method::POST, url)
- }
- };
-
- // TODO: Timeout
- builder
- .json(&req)
- .header("X-Amz-Target", target)
- .with_aws_sigv4(authorizer, None)
- .send_retry(&s3.config.retry_config)
- .await
- }
-}
-
-#[derive(Debug)]
-enum TryLockResult {
- /// Successfully acquired a lease
- Ok(Lease),
- /// An existing lease was found
- Conflict(Lease),
-}
-
-/// Validates that `path` has the given `etag` or doesn't exist if `None`
-async fn check_precondition(client: &Arc<S3Client>, path: &Path, etag:
Option<&str>) -> Result<()> {
- let options = GetOptions {
- head: true,
- ..Default::default()
- };
-
- match etag {
- Some(expected) => match client.get_opts(path, options).await {
- Ok(r) => match r.meta.e_tag {
- Some(actual) if expected == actual => Ok(()),
- actual => Err(Error::Precondition {
- path: path.to_string(),
- source: format!("{} does not match {expected}",
actual.unwrap_or_default())
- .into(),
- }),
- },
- Err(Error::NotFound { .. }) => Err(Error::Precondition {
- path: path.to_string(),
- source: format!("Object at location {path} not found").into(),
- }),
- Err(e) => Err(e),
- },
- None => match client.get_opts(path, options).await {
- Ok(_) => Err(Error::AlreadyExists {
- path: path.to_string(),
- source: "Already Exists".to_string().into(),
- }),
- Err(Error::NotFound { .. }) => Ok(()),
- Err(e) => Err(e),
- },
- }
-}
-
-/// Parses the error response if any
-fn parse_error_response(e: &RetryError) -> Option<ErrorResponse<'_>> {
- match e.inner() {
- RequestError::Status {
- status: StatusCode::BAD_REQUEST,
- body: Some(b),
- } => serde_json::from_str(b).ok(),
- _ => None,
- }
-}
-
-/// Extracts a lease from `item`, returning `None` on error
-fn extract_lease(item: &HashMap<&str, AttributeValue<'_>>) -> Option<Lease> {
- let generation = match item.get("generation") {
- Some(AttributeValue::Number(generation)) => generation,
- _ => return None,
- };
-
- let timeout = match item.get("timeout") {
- Some(AttributeValue::Number(timeout)) => *timeout,
- _ => return None,
- };
-
- Some(Lease {
- acquire: Instant::now(),
- generation: *generation,
- timeout: Duration::from_millis(timeout),
- })
-}
-
-/// A lock lease
-#[derive(Debug, Clone)]
-struct Lease {
- acquire: Instant,
- generation: u64,
- timeout: Duration,
-}
-
-/// A DynamoDB [PutItem] payload
-///
-/// [PutItem]:
https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_PutItem.html
-#[derive(Serialize)]
-#[serde(rename_all = "PascalCase")]
-struct PutItem<'a> {
- /// The table name
- table_name: &'a str,
-
- /// A condition that must be satisfied in order for a conditional PutItem
operation to succeed.
- condition_expression: &'a str,
-
- /// One or more substitution tokens for attribute names in an expression
- expression_attribute_names: Map<'a, &'a str, &'a str>,
-
- /// One or more values that can be substituted in an expression
- expression_attribute_values: Map<'a, &'a str, AttributeValue<'a>>,
-
- /// A map of attribute name/value pairs, one for each attribute
- item: Map<'a, &'a str, AttributeValue<'a>>,
-
- /// Use ReturnValues if you want to get the item attributes as they
appeared
- /// before they were updated with the PutItem request.
- #[serde(skip_serializing_if = "Option::is_none")]
- return_values: Option<ReturnValues>,
-
- /// An optional parameter that returns the item attributes for a PutItem
operation
- /// that failed a condition check.
- #[serde(skip_serializing_if = "Option::is_none")]
- return_values_on_condition_check_failure: Option<ReturnValues>,
-}
-
-#[derive(Deserialize)]
-struct ErrorResponse<'a> {
- #[serde(rename = "__type")]
- error: &'a str,
-
- #[serde(borrow, default, rename = "Item")]
- item: HashMap<&'a str, AttributeValue<'a>>,
-}
-
-#[derive(Serialize)]
-#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
-enum ReturnValues {
- AllOld,
-}
-
-/// A collection of key value pairs
-///
-/// This provides cheap, ordered serialization of maps
-struct Map<'a, K, V>(&'a [(K, V)]);
-
-impl<K: Serialize, V: Serialize> Serialize for Map<'_, K, V> {
- fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
- where
- S: Serializer,
- {
- if self.0.is_empty() {
- return serializer.serialize_none();
- }
- let mut map = serializer.serialize_map(Some(self.0.len()))?;
- for (k, v) in self.0 {
- map.serialize_entry(k, v)?
- }
- map.end()
- }
-}
-
-/// A DynamoDB [AttributeValue]
-///
-/// [AttributeValue]:
https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_AttributeValue.html
-#[derive(Debug, Serialize, Deserialize)]
-enum AttributeValue<'a> {
- #[serde(rename = "S")]
- String(Cow<'a, str>),
- #[serde(rename = "N", with = "number")]
- Number(u64),
-}
-
-impl<'a> From<&'a str> for AttributeValue<'a> {
- fn from(value: &'a str) -> Self {
- Self::String(Cow::Borrowed(value))
- }
-}
-
-/// Numbers are serialized as strings
-mod number {
- use serde::{Deserialize, Deserializer, Serializer};
-
- pub(crate) fn serialize<S: Serializer>(v: &u64, s: S) -> Result<S::Ok,
S::Error> {
- s.serialize_str(&v.to_string())
- }
-
- pub(crate) fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<u64,
D::Error> {
- let v: &str = Deserialize::deserialize(d)?;
- v.parse().map_err(serde::de::Error::custom)
- }
-}
-
-use crate::client::HttpResponse;
-/// Re-export integration_test to be called by s3_test
-#[cfg(test)]
-pub(crate) use tests::integration_test;
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::aws::AmazonS3;
- use crate::ObjectStore;
- use rand::distr::Alphanumeric;
- use rand::{rng, Rng};
-
- #[test]
- fn test_attribute_serde() {
- let serde =
serde_json::to_string(&AttributeValue::Number(23)).unwrap();
- assert_eq!(serde, "{\"N\":\"23\"}");
- let back: AttributeValue<'_> = serde_json::from_str(&serde).unwrap();
- assert!(matches!(back, AttributeValue::Number(23)));
- }
-
- /// An integration test for DynamoDB
- ///
- /// This is a function called by s3_test to avoid test concurrency issues
- pub(crate) async fn integration_test(integration: &AmazonS3, d:
&DynamoCommit) {
- let client = &integration.client;
-
- let src = Path::from("dynamo_path_src");
- integration.put(&src, "asd".into()).await.unwrap();
-
- let dst = Path::from("dynamo_path");
- let _ = integration.delete(&dst).await; // Delete if present
-
- // Create a lock if not already exists
- let existing = match d.try_lock(client, dst.as_ref(), None,
None).await.unwrap() {
- TryLockResult::Conflict(l) => l,
- TryLockResult::Ok(l) => l,
- };
-
- // Should not be able to acquire a lock again
- let r = d.try_lock(client, dst.as_ref(), None, None).await;
- assert!(matches!(r, Ok(TryLockResult::Conflict(_))));
-
- // But should still be able to reclaim lock and perform copy
- d.copy_if_not_exists(client, &src, &dst).await.unwrap();
-
- match d.try_lock(client, dst.as_ref(), None, None).await.unwrap() {
- TryLockResult::Conflict(new) => {
- // Should have incremented generation to do so
- assert_eq!(new.generation, existing.generation + 1);
- }
- _ => panic!("Should conflict"),
- }
-
- let rng = rng();
- let etag =
String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap();
- let t = Some(etag.as_str());
-
- let l = match d.try_lock(client, dst.as_ref(), t, None).await.unwrap()
{
- TryLockResult::Ok(l) => l,
- _ => panic!("should not conflict"),
- };
-
- match d.try_lock(client, dst.as_ref(), t, None).await.unwrap() {
- TryLockResult::Conflict(c) => assert_eq!(l.generation,
c.generation),
- _ => panic!("should conflict"),
- }
-
- match d.try_lock(client, dst.as_ref(), t, Some(&l)).await.unwrap() {
- TryLockResult::Ok(new) => assert_eq!(new.generation, l.generation
+ 1),
- _ => panic!("should not conflict"),
- }
- }
-}
diff --git a/src/aws/mod.rs b/src/aws/mod.rs
index 4abf374..8dac2bd 100644
--- a/src/aws/mod.rs
+++ b/src/aws/mod.rs
@@ -56,7 +56,6 @@ mod builder;
mod checksum;
mod client;
mod credential;
-mod dynamo;
mod precondition;
#[cfg(not(target_arch = "wasm32"))]
@@ -64,7 +63,6 @@ mod resolve;
pub use builder::{AmazonS3Builder, AmazonS3ConfigKey};
pub use checksum::Checksum;
-pub use dynamo::DynamoCommit;
pub use precondition::{S3ConditionalPut, S3CopyIfNotExists};
#[cfg(not(target_arch = "wasm32"))]
@@ -197,11 +195,6 @@ impl ObjectStore for AmazonS3 {
r => r,
}
}
- #[allow(deprecated)]
- (PutMode::Create, S3ConditionalPut::Dynamo(d)) => {
- d.conditional_op(&self.client, location, None, move ||
request.do_put())
- .await
- }
(PutMode::Update(v), put) => {
let etag = v.e_tag.ok_or_else(|| Error::Generic {
store: STORE,
@@ -229,13 +222,6 @@ impl ObjectStore for AmazonS3 {
r => r,
}
}
- #[allow(deprecated)]
- S3ConditionalPut::Dynamo(d) => {
- d.conditional_op(&self.client, location, Some(&etag),
move || {
- request.do_put()
- })
- .await
- }
S3ConditionalPut::Disabled => Err(Error::NotImplemented),
}
}
@@ -369,10 +355,6 @@ impl ObjectStore for AmazonS3 {
return res;
}
- #[allow(deprecated)]
- Some(S3CopyIfNotExists::Dynamo(lock)) => {
- return lock.copy_if_not_exists(&self.client, from, to).await
- }
None => {
return Err(Error::NotSupported {
source: "S3 does not support
copy-if-not-exists".to_string().into(),
@@ -640,12 +622,6 @@ mod tests {
let builder =
AmazonS3Builder::from_env().with_checksum_algorithm(Checksum::SHA256);
let integration = builder.build().unwrap();
put_get_delete_list(&integration).await;
-
- match &integration.client.config.copy_if_not_exists {
- #[allow(deprecated)]
- Some(S3CopyIfNotExists::Dynamo(d)) =>
dynamo::integration_test(&integration, d).await,
- _ => eprintln!("Skipping dynamo integration test - dynamo not
configured"),
- };
}
#[tokio::test]
diff --git a/src/aws/precondition.rs b/src/aws/precondition.rs
index 2f11e4f..52ecb9f 100644
--- a/src/aws/precondition.rs
+++ b/src/aws/precondition.rs
@@ -15,7 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-use crate::aws::dynamo::DynamoCommit;
use crate::config::Parse;
use itertools::Itertools;
@@ -61,16 +60,6 @@ pub enum S3CopyIfNotExists {
///
/// Encoded as `multipart` ignoring whitespace.
Multipart,
- /// The name of a DynamoDB table to use for coordination
- ///
- /// Encoded as either `dynamo:<TABLE_NAME>` or
`dynamo:<TABLE_NAME>:<TIMEOUT_MILLIS>`
- /// ignoring whitespace. The default timeout is used if not specified
- ///
- /// See [`DynamoCommit`] for more information
- ///
- /// This will use the same region, credentials and endpoint as configured
for S3
- #[deprecated(note = "Use S3CopyIfNotExists::Multipart")]
- Dynamo(DynamoCommit),
}
impl std::fmt::Display for S3CopyIfNotExists {
@@ -81,8 +70,6 @@ impl std::fmt::Display for S3CopyIfNotExists {
write!(f, "header-with-status: {k}: {v}: {}", code.as_u16())
}
Self::Multipart => f.write_str("multipart"),
- #[allow(deprecated)]
- Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()),
}
}
}
@@ -110,8 +97,6 @@ impl S3CopyIfNotExists {
code,
))
}
- #[allow(deprecated)]
- "dynamo" => Some(Self::Dynamo(DynamoCommit::from_str(value)?)),
_ => None,
}
}
@@ -142,17 +127,6 @@ pub enum S3ConditionalPut {
#[default]
ETagMatch,
- /// The name of a DynamoDB table to use for coordination
- ///
- /// Encoded as either `dynamo:<TABLE_NAME>` or
`dynamo:<TABLE_NAME>:<TIMEOUT_MILLIS>`
- /// ignoring whitespace. The default timeout is used if not specified
- ///
- /// See [`DynamoCommit`] for more information
- ///
- /// This will use the same region, credentials and endpoint as configured
for S3
- #[deprecated(note = "Use S3ConditionalPut::ETagMatch")]
- Dynamo(DynamoCommit),
-
/// Disable `conditional put`
Disabled,
}
@@ -161,8 +135,6 @@ impl std::fmt::Display for S3ConditionalPut {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ETagMatch => write!(f, "etag"),
- #[allow(deprecated)]
- Self::Dynamo(lock) => write!(f, "dynamo: {}", lock.table_name()),
Self::Disabled => write!(f, "disabled"),
}
}
@@ -173,11 +145,7 @@ impl S3ConditionalPut {
match s.trim() {
"etag" => Some(Self::ETagMatch),
"disabled" => Some(Self::Disabled),
- trimmed => match trimmed.split_once(':')? {
- #[allow(deprecated)]
- ("dynamo", s) =>
Some(Self::Dynamo(DynamoCommit::from_str(s)?)),
- _ => None,
- },
+ _ => None,
}
}
}
@@ -194,7 +162,6 @@ impl Parse for S3ConditionalPut {
#[cfg(test)]
mod tests {
use super::S3CopyIfNotExists;
- use crate::aws::{DynamoCommit, S3ConditionalPut};
#[test]
fn parse_s3_copy_if_not_exists_header() {
@@ -219,26 +186,6 @@ mod tests {
assert_eq!(expected, S3CopyIfNotExists::from_str(input));
}
- #[test]
- #[allow(deprecated)]
- fn parse_s3_copy_if_not_exists_dynamo() {
- let input = "dynamo: table:100";
- let expected = Some(S3CopyIfNotExists::Dynamo(
- DynamoCommit::new("table".into()).with_timeout(100),
- ));
- assert_eq!(expected, S3CopyIfNotExists::from_str(input));
- }
-
- #[test]
- #[allow(deprecated)]
- fn parse_s3_condition_put_dynamo() {
- let input = "dynamo: table:1300";
- let expected = Some(S3ConditionalPut::Dynamo(
- DynamoCommit::new("table".into()).with_timeout(1300),
- ));
- assert_eq!(expected, S3ConditionalPut::from_str(input));
- }
-
#[test]
fn parse_s3_copy_if_not_exists_header_whitespace_invariant() {
let expected = Some(S3CopyIfNotExists::Header(
diff --git a/src/client/builder.rs b/src/client/builder.rs
index 257cb57..f74c5ec 100644
--- a/src/client/builder.rs
+++ b/src/client/builder.rs
@@ -165,7 +165,7 @@ impl HttpRequestBuilder {
self
}
- #[cfg(any(feature = "aws", feature = "gcp"))]
+ #[cfg(feature = "gcp")]
pub(crate) fn json<S: serde::Serialize>(mut self, s: S) -> Self {
match (serde_json::to_vec(&s), &mut self.request) {
(Ok(json), Ok(request)) => {
diff --git a/src/integration.rs b/src/integration.rs
index 99ee86d..988d8d4 100644
--- a/src/integration.rs
+++ b/src/integration.rs
@@ -34,7 +34,6 @@ use crate::{
use bytes::Bytes;
use futures::stream::FuturesUnordered;
use futures::{StreamExt, TryStreamExt};
-use rand::distr::Alphanumeric;
use rand::{rng, Rng};
use std::collections::HashSet;
use std::slice;
@@ -630,15 +629,8 @@ pub async fn get_opts(storage: &dyn ObjectStore) {
/// Tests conditional writes
pub async fn put_opts(storage: &dyn ObjectStore, supports_update: bool) {
- // When using DynamoCommit repeated runs of this test will produce the
same sequence of records in DynamoDB
- // As a result each conditional operation will need to wait for the lease
to timeout before proceeding
- // One solution would be to clear DynamoDB before each test, but this
would require non-trivial additional code
- // so we instead just generate a random suffix for the filenames
- let rng = rng();
- let suffix =
String::from_utf8(rng.sample_iter(Alphanumeric).take(32).collect()).unwrap();
-
delete_fixtures(storage).await;
- let path = Path::from(format!("put_opts_{suffix}"));
+ let path = Path::from("put_opts");
let v1 = storage
.put_opts(&path, "a".into(), PutMode::Create.into())
.await
@@ -696,7 +688,7 @@ pub async fn put_opts(storage: &dyn ObjectStore,
supports_update: bool) {
const NUM_WORKERS: usize = 5;
const NUM_INCREMENTS: usize = 10;
- let path = Path::from(format!("RACE-{suffix}"));
+ let path = Path::from("RACE");
let mut futures: FuturesUnordered<_> = (0..NUM_WORKERS)
.map(|_| async {
for _ in 0..NUM_INCREMENTS {