This is an automated email from the ASF dual-hosted git repository.

liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new e47b268a feat(transaction): Add retry logic to transaction (#1484)
e47b268a is described below

commit e47b268a6d58a58b8d79263d13cded55ff93fcff
Author: Shawn Chang <[email protected]>
AuthorDate: Mon Jul 14 06:57:46 2025 -0700

    feat(transaction): Add retry logic to transaction (#1484)
---
 Cargo.lock                                |  76 ++++++++-
 Cargo.toml                                |   2 +
 crates/iceberg/Cargo.toml                 |   2 +
 crates/iceberg/src/catalog/mod.rs         |   2 +
 crates/iceberg/src/error.rs               |   5 +
 crates/iceberg/src/spec/table_metadata.rs |  20 +++
 crates/iceberg/src/transaction/mod.rs     | 255 +++++++++++++++++++++++++++++-
 7 files changed, 356 insertions(+), 6 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 552fcb08..6a56ce37 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1032,9 +1032,9 @@ dependencies = [
 
 [[package]]
 name = "backon"
-version = "1.3.0"
+version = "1.5.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "ba5289ec98f68f28dd809fd601059e6aa908bb8f6108620930828283d4ee23d7"
+checksum = "302eaff5357a264a2c42f127ecb8bac761cf99749fc3dc95677e2743991f99e7"
 dependencies = [
  "fastrand",
  "gloo-timers",
@@ -2638,6 +2638,12 @@ version = "0.15.7"
 source = "registry+https://github.com/rust-lang/crates.io-index";
 checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b"
 
+[[package]]
+name = "downcast"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1"
+
 [[package]]
 name = "dunce"
 version = "1.0.5"
@@ -2896,6 +2902,12 @@ dependencies = [
  "percent-encoding",
 ]
 
+[[package]]
+name = "fragile"
+version = "2.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619"
+
 [[package]]
 name = "fs-err"
 version = "3.1.0"
@@ -3500,6 +3512,7 @@ dependencies = [
  "as-any",
  "async-std",
  "async-trait",
+ "backon",
  "base64 0.22.1",
  "bimap",
  "bytes",
@@ -3511,6 +3524,7 @@ dependencies = [
  "futures",
  "iceberg_test_utils",
  "itertools 0.13.0",
+ "mockall",
  "moka",
  "murmur3",
  "num-bigint",
@@ -4434,6 +4448,32 @@ dependencies = [
  "windows-sys 0.52.0",
 ]
 
+[[package]]
+name = "mockall"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "39a6bfcc6c8c7eed5ee98b9c3e33adc726054389233e201c95dab2d41a3839d2"
+dependencies = [
+ "cfg-if",
+ "downcast",
+ "fragile",
+ "mockall_derive",
+ "predicates",
+ "predicates-tree",
+]
+
+[[package]]
+name = "mockall_derive"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "25ca3004c2efe9011bd4e461bd8256445052b9615405b4f7ea43fc8ca5c20898"
+dependencies = [
+ "cfg-if",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.101",
+]
+
 [[package]]
 name = "mockito"
 version = "1.6.1"
@@ -5249,6 +5289,32 @@ dependencies = [
  "zerocopy 0.7.35",
 ]
 
+[[package]]
+name = "predicates"
+version = "3.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "a5d19ee57562043d37e82899fade9a22ebab7be9cef5026b07fda9cdd4293573"
+dependencies = [
+ "anstyle",
+ "predicates-core",
+]
+
+[[package]]
+name = "predicates-core"
+version = "1.0.9"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "727e462b119fe9c93fd0eb1429a5f7647394014cf3c04ab2c0350eeb09095ffa"
+
+[[package]]
+name = "predicates-tree"
+version = "1.0.12"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "72dd2d6d381dfb73a193c7fca536518d7caee39fc8503f74e7dc0be0531b425c"
+dependencies = [
+ "predicates-core",
+ "termtree",
+]
+
 [[package]]
 name = "pretty_assertions"
 version = "1.4.1"
@@ -6883,6 +6949,12 @@ dependencies = [
  "unic-segment",
 ]
 
+[[package]]
+name = "termtree"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index";
+checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683"
+
 [[package]]
 name = "thiserror"
 version = "1.0.69"
diff --git a/Cargo.toml b/Cargo.toml
index 9c6d22de..3176cfdb 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -55,6 +55,7 @@ async-std = "1.12"
 async-trait = "0.1.88"
 aws-config = "1.6.1"
 aws-sdk-glue = "1.39"
+backon = "1.5.1"
 base64 = "0.22.1"
 bimap = "0.6"
 bytes = "1.10"
@@ -82,6 +83,7 @@ itertools = "0.13"
 linkedbytes = "0.1.8"
 metainfo = "0.7.14"
 mimalloc = "0.1.46"
+mockall = "0.13.1"
 mockito = "1"
 motore-macros = "0.4.3"
 murmur3 = "0.5.2"
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index ac56c0b3..6ae17d21 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -57,6 +57,7 @@ arrow-string = { workspace = true }
 as-any = { workspace = true }
 async-std = { workspace = true, optional = true, features = ["attributes"] }
 async-trait = { workspace = true }
+backon = { workspace = true }
 base64 = { workspace = true }
 bimap = { workspace = true }
 bytes = { workspace = true }
@@ -66,6 +67,7 @@ expect-test = { workspace = true }
 fnv = { workspace = true }
 futures = { workspace = true }
 itertools = { workspace = true }
+mockall = { workspace = true }
 moka = { version = "0.12.10", features = ["future"] }
 murmur3 = { workspace = true }
 num-bigint = { workspace = true }
diff --git a/crates/iceberg/src/catalog/mod.rs 
b/crates/iceberg/src/catalog/mod.rs
index 409a80ee..a9b92d47 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -29,6 +29,7 @@ use std::sync::Arc;
 use _serde::deserialize_snapshot;
 use async_trait::async_trait;
 pub use memory::MemoryCatalog;
+use mockall::automock;
 use serde_derive::{Deserialize, Serialize};
 use typed_builder::TypedBuilder;
 use uuid::Uuid;
@@ -43,6 +44,7 @@ use crate::{Error, ErrorKind, Result};
 
 /// The catalog API for Iceberg Rust.
 #[async_trait]
+#[automock]
 pub trait Catalog: Debug + Sync + Send {
     /// List namespaces inside the catalog.
     async fn list_namespaces(&self, parent: Option<&NamespaceIdent>)
diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs
index 9f299fb6..067329d5 100644
--- a/crates/iceberg/src/error.rs
+++ b/crates/iceberg/src/error.rs
@@ -320,6 +320,11 @@ impl Error {
         self.kind
     }
 
+    /// Return error's retryable status
+    pub fn retryable(&self) -> bool {
+        self.retryable
+    }
+
     /// Return error's message.
     #[inline]
     pub fn message(&self) -> &str {
diff --git a/crates/iceberg/src/spec/table_metadata.rs 
b/crates/iceberg/src/spec/table_metadata.rs
index c3156e56..2604eac0 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -98,6 +98,26 @@ pub const RESERVED_PROPERTIES: [&str; 9] = [
     PROPERTY_DEFAULT_SORT_ORDER,
 ];
 
+/// Property key for number of commit retries.
+pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries";
+/// Default value for number of commit retries.
+pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4;
+
+/// Property key for minimum wait time (ms) between retries.
+pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms";
+/// Default value for minimum wait time (ms) between retries.
+pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100;
+
+/// Property key for maximum wait time (ms) between retries.
+pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms";
+/// Default value for maximum wait time (ms) between retries.
+pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1 
minute
+
+/// Property key for total maximum retry time (ms).
+pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str = 
"commit.retry.total-timeout-ms";
+/// Default value for total maximum retry time (ms).
+pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000; 
// 30 minutes
+
 /// Reference to [`TableMetadata`].
 pub type TableMetadataRef = Arc<TableMetadata>;
 
diff --git a/crates/iceberg/src/transaction/mod.rs 
b/crates/iceberg/src/transaction/mod.rs
index f96a15b7..06549a95 100644
--- a/crates/iceberg/src/transaction/mod.rs
+++ b/crates/iceberg/src/transaction/mod.rs
@@ -51,6 +51,9 @@
 /// The `ApplyTransactionAction` trait provides an `apply` method
 /// that allows users to apply a transaction action to a `Transaction`.
 mod action;
+
+use std::collections::HashMap;
+
 pub use action::*;
 mod append;
 mod snapshot;
@@ -61,8 +64,17 @@ mod update_statistics;
 mod upgrade_format_version;
 
 use std::sync::Arc;
+use std::time::Duration;
+
+use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder, 
RetryableWithContext};
 
 use crate::error::Result;
+use crate::spec::{
+    PROPERTY_COMMIT_MAX_RETRY_WAIT_MS, 
PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
+    PROPERTY_COMMIT_MIN_RETRY_WAIT_MS, 
PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
+    PROPERTY_COMMIT_NUM_RETRIES, PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
+    PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS, 
PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
+};
 use crate::table::Table;
 use crate::transaction::action::BoxedTransactionAction;
 use crate::transaction::append::FastAppendAction;
@@ -71,7 +83,7 @@ use crate::transaction::update_location::UpdateLocationAction;
 use crate::transaction::update_properties::UpdatePropertiesAction;
 use crate::transaction::update_statistics::UpdateStatisticsAction;
 use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
-use crate::{Catalog, TableCommit, TableRequirement, TableUpdate};
+use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement, 
TableUpdate};
 
 /// Table transaction.
 #[derive(Clone)]
@@ -152,13 +164,76 @@ impl Transaction {
     }
 
     /// Commit transaction.
-    pub async fn commit(mut self, catalog: &dyn Catalog) -> Result<Table> {
+    pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
         if self.actions.is_empty() {
             // nothing to commit
             return Ok(self.table);
         }
 
-        self.do_commit(catalog).await
+        let backoff = Self::build_backoff(self.table.metadata().properties())?;
+        let tx = self;
+
+        (|mut tx: Transaction| async {
+            let result = tx.do_commit(catalog).await;
+            (tx, result)
+        })
+        .retry(backoff)
+        .sleep(tokio::time::sleep)
+        .context(tx)
+        .when(|e| e.retryable())
+        .await
+        .1
+    }
+
+    fn build_backoff(props: &HashMap<String, String>) -> 
Result<ExponentialBackoff> {
+        let min_delay = match props.get(PROPERTY_COMMIT_MIN_RETRY_WAIT_MS) {
+            Some(value_str) => value_str.parse::<u64>().map_err(|e| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    "Invalid value for commit.retry.min-wait-ms",
+                )
+                .with_source(e)
+            })?,
+            None => PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
+        };
+        let max_delay = match props.get(PROPERTY_COMMIT_MAX_RETRY_WAIT_MS) {
+            Some(value_str) => value_str.parse::<u64>().map_err(|e| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    "Invalid value for commit.retry.max-wait-ms",
+                )
+                .with_source(e)
+            })?,
+            None => PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
+        };
+        let total_delay = match props.get(PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS) 
{
+            Some(value_str) => value_str.parse::<u64>().map_err(|e| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    "Invalid value for commit.retry.total-timeout-ms",
+                )
+                .with_source(e)
+            })?,
+            None => PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
+        };
+        let max_times = match props.get(PROPERTY_COMMIT_NUM_RETRIES) {
+            Some(value_str) => value_str.parse::<usize>().map_err(|e| {
+                Error::new(
+                    ErrorKind::DataInvalid,
+                    "Invalid value for commit.retry.num-retries",
+                )
+                .with_source(e)
+            })?,
+            None => PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
+        };
+
+        Ok(ExponentialBuilder::new()
+            .with_min_delay(Duration::from_millis(min_delay))
+            .with_max_delay(Duration::from_millis(max_delay))
+            .with_total_delay(Some(Duration::from_millis(total_delay)))
+            .with_max_times(max_times)
+            .with_factor(2.0)
+            .build())
     }
 
     async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result<Table> {
@@ -198,13 +273,18 @@ impl Transaction {
 
 #[cfg(test)]
 mod tests {
+    use std::collections::HashMap;
     use std::fs::File;
     use std::io::BufReader;
+    use std::sync::Arc;
+    use std::sync::atomic::{AtomicU32, Ordering};
 
-    use crate::TableIdent;
+    use crate::catalog::MockCatalog;
     use crate::io::FileIOBuilder;
     use crate::spec::TableMetadata;
     use crate::table::Table;
+    use crate::transaction::{ApplyTransactionAction, Transaction};
+    use crate::{Error, ErrorKind, TableIdent};
 
     pub fn make_v1_table() -> Table {
         let file = File::open(format!(
@@ -262,4 +342,171 @@ mod tests {
             .build()
             .unwrap()
     }
+
+    /// Helper function to create a test table with retry properties
+    fn setup_test_table(num_retries: &str) -> Table {
+        let table = make_v2_table();
+
+        // Set retry properties
+        let mut props = HashMap::new();
+        props.insert("commit.retry.min-wait-ms".to_string(), "10".to_string());
+        props.insert("commit.retry.max-wait-ms".to_string(), 
"100".to_string());
+        props.insert(
+            "commit.retry.total-timeout-ms".to_string(),
+            "1000".to_string(),
+        );
+        props.insert(
+            "commit.retry.num-retries".to_string(),
+            num_retries.to_string(),
+        );
+
+        // Update table properties
+        let metadata = table
+            .metadata()
+            .clone()
+            .into_builder(None)
+            .set_properties(props)
+            .unwrap()
+            .build()
+            .unwrap()
+            .metadata;
+
+        table.with_metadata(Arc::new(metadata))
+    }
+
+    /// Helper function to create a transaction with a simple update action
+    fn create_test_transaction(table: &Table) -> Transaction {
+        let tx = Transaction::new(table);
+        tx.update_table_properties()
+            .set("test.key".to_string(), "test.value".to_string())
+            .apply(tx)
+            .unwrap()
+    }
+
+    /// Helper function to set up a mock catalog with retryable errors
+    fn setup_mock_catalog_with_retryable_errors(
+        success_after_attempts: Option<u32>,
+        expected_calls: usize,
+    ) -> MockCatalog {
+        let mut mock_catalog = MockCatalog::new();
+
+        mock_catalog
+            .expect_load_table()
+            .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
+
+        let attempts = AtomicU32::new(0);
+        mock_catalog
+            .expect_update_table()
+            .times(expected_calls)
+            .returning_st(move |_| {
+                if let Some(success_after_attempts) = success_after_attempts {
+                    attempts.fetch_add(1, Ordering::SeqCst);
+                    if attempts.load(Ordering::SeqCst) <= 
success_after_attempts {
+                        Box::pin(async move {
+                            Err(
+                                Error::new(ErrorKind::CatalogCommitConflicts, 
"Commit conflict")
+                                    .with_retryable(true),
+                            )
+                        })
+                    } else {
+                        Box::pin(async move { Ok(make_v2_table()) })
+                    }
+                } else {
+                    // Always fail with retryable error
+                    Box::pin(async move {
+                        Err(
+                            Error::new(ErrorKind::CatalogCommitConflicts, 
"Commit conflict")
+                                .with_retryable(true),
+                        )
+                    })
+                }
+            });
+
+        mock_catalog
+    }
+
+    /// Helper function to set up a mock catalog with non-retryable error
+    fn setup_mock_catalog_with_non_retryable_error() -> MockCatalog {
+        let mut mock_catalog = MockCatalog::new();
+
+        mock_catalog
+            .expect_load_table()
+            .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
+
+        mock_catalog
+            .expect_update_table()
+            .times(1) // Should only be called once since error is not 
retryable
+            .returning_st(move |_| {
+                Box::pin(async move {
+                    Err(Error::new(ErrorKind::Unexpected, "Non-retryable 
error")
+                        .with_retryable(false))
+                })
+            });
+
+        mock_catalog
+    }
+
+    #[tokio::test]
+    async fn test_commit_retryable_error() {
+        // Create a test table with retry properties
+        let table = setup_test_table("3");
+
+        // Create a transaction with a simple update action
+        let tx = create_test_transaction(&table);
+
+        // Create a mock catalog that fails twice then succeeds
+        let mock_catalog = setup_mock_catalog_with_retryable_errors(Some(2), 
3);
+
+        // Commit the transaction
+        let result = tx.commit(&mock_catalog).await;
+
+        // Verify the result
+        assert!(result.is_ok(), "Transaction should eventually succeed");
+    }
+
+    #[tokio::test]
+    async fn test_commit_non_retryable_error() {
+        // Create a test table with retry properties
+        let table = setup_test_table("3");
+
+        // Create a transaction with a simple update action
+        let tx = create_test_transaction(&table);
+
+        // Create a mock catalog that fails with non-retryable error
+        let mock_catalog = setup_mock_catalog_with_non_retryable_error();
+
+        // Commit the transaction
+        let result = tx.commit(&mock_catalog).await;
+
+        // Verify the result
+        assert!(result.is_err(), "Transaction should fail immediately");
+        if let Err(err) = result {
+            assert_eq!(err.kind(), ErrorKind::Unexpected);
+            assert_eq!(err.message(), "Non-retryable error");
+            assert!(!err.retryable(), "Error should not be retryable");
+        }
+    }
+
+    #[tokio::test]
+    async fn test_commit_max_retries_exceeded() {
+        // Create a test table with retry properties (only allow 2 retries)
+        let table = setup_test_table("2");
+
+        // Create a transaction with a simple update action
+        let tx = create_test_transaction(&table);
+
+        // Create a mock catalog that always fails with retryable error
+        let mock_catalog = setup_mock_catalog_with_retryable_errors(None, 3); 
// Initial attempt + 2 retries = 3 total attempts
+
+        // Commit the transaction
+        let result = tx.commit(&mock_catalog).await;
+
+        // Verify the result
+        assert!(result.is_err(), "Transaction should fail after max retries");
+        if let Err(err) = result {
+            assert_eq!(err.kind(), ErrorKind::CatalogCommitConflicts);
+            assert_eq!(err.message(), "Commit conflict");
+            assert!(err.retryable(), "Error should be retryable");
+        }
+    }
 }

Reply via email to