This is an automated email from the ASF dual-hosted git repository.
liurenjie1024 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-rust.git
The following commit(s) were added to refs/heads/main by this push:
new e47b268a feat(transaction): Add retry logic to transaction (#1484)
e47b268a is described below
commit e47b268a6d58a58b8d79263d13cded55ff93fcff
Author: Shawn Chang <[email protected]>
AuthorDate: Mon Jul 14 06:57:46 2025 -0700
feat(transaction): Add retry logic to transaction (#1484)
---
Cargo.lock | 76 ++++++++-
Cargo.toml | 2 +
crates/iceberg/Cargo.toml | 2 +
crates/iceberg/src/catalog/mod.rs | 2 +
crates/iceberg/src/error.rs | 5 +
crates/iceberg/src/spec/table_metadata.rs | 20 +++
crates/iceberg/src/transaction/mod.rs | 255 +++++++++++++++++++++++++++++-
7 files changed, 356 insertions(+), 6 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 552fcb08..6a56ce37 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1032,9 +1032,9 @@ dependencies = [
[[package]]
name = "backon"
-version = "1.3.0"
+version = "1.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ba5289ec98f68f28dd809fd601059e6aa908bb8f6108620930828283d4ee23d7"
+checksum = "302eaff5357a264a2c42f127ecb8bac761cf99749fc3dc95677e2743991f99e7"
dependencies = [
"fastrand",
"gloo-timers",
@@ -2638,6 +2638,12 @@ version = "0.15.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b"
+[[package]]
+name = "downcast"
+version = "0.11.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1"
+
[[package]]
name = "dunce"
version = "1.0.5"
@@ -2896,6 +2902,12 @@ dependencies = [
"percent-encoding",
]
+[[package]]
+name = "fragile"
+version = "2.0.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619"
+
[[package]]
name = "fs-err"
version = "3.1.0"
@@ -3500,6 +3512,7 @@ dependencies = [
"as-any",
"async-std",
"async-trait",
+ "backon",
"base64 0.22.1",
"bimap",
"bytes",
@@ -3511,6 +3524,7 @@ dependencies = [
"futures",
"iceberg_test_utils",
"itertools 0.13.0",
+ "mockall",
"moka",
"murmur3",
"num-bigint",
@@ -4434,6 +4448,32 @@ dependencies = [
"windows-sys 0.52.0",
]
+[[package]]
+name = "mockall"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "39a6bfcc6c8c7eed5ee98b9c3e33adc726054389233e201c95dab2d41a3839d2"
+dependencies = [
+ "cfg-if",
+ "downcast",
+ "fragile",
+ "mockall_derive",
+ "predicates",
+ "predicates-tree",
+]
+
+[[package]]
+name = "mockall_derive"
+version = "0.13.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "25ca3004c2efe9011bd4e461bd8256445052b9615405b4f7ea43fc8ca5c20898"
+dependencies = [
+ "cfg-if",
+ "proc-macro2",
+ "quote",
+ "syn 2.0.101",
+]
+
[[package]]
name = "mockito"
version = "1.6.1"
@@ -5249,6 +5289,32 @@ dependencies = [
"zerocopy 0.7.35",
]
+[[package]]
+name = "predicates"
+version = "3.1.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a5d19ee57562043d37e82899fade9a22ebab7be9cef5026b07fda9cdd4293573"
+dependencies = [
+ "anstyle",
+ "predicates-core",
+]
+
+[[package]]
+name = "predicates-core"
+version = "1.0.9"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "727e462b119fe9c93fd0eb1429a5f7647394014cf3c04ab2c0350eeb09095ffa"
+
+[[package]]
+name = "predicates-tree"
+version = "1.0.12"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "72dd2d6d381dfb73a193c7fca536518d7caee39fc8503f74e7dc0be0531b425c"
+dependencies = [
+ "predicates-core",
+ "termtree",
+]
+
[[package]]
name = "pretty_assertions"
version = "1.4.1"
@@ -6883,6 +6949,12 @@ dependencies = [
"unic-segment",
]
+[[package]]
+name = "termtree"
+version = "0.5.1"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683"
+
[[package]]
name = "thiserror"
version = "1.0.69"
diff --git a/Cargo.toml b/Cargo.toml
index 9c6d22de..3176cfdb 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -55,6 +55,7 @@ async-std = "1.12"
async-trait = "0.1.88"
aws-config = "1.6.1"
aws-sdk-glue = "1.39"
+backon = "1.5.1"
base64 = "0.22.1"
bimap = "0.6"
bytes = "1.10"
@@ -82,6 +83,7 @@ itertools = "0.13"
linkedbytes = "0.1.8"
metainfo = "0.7.14"
mimalloc = "0.1.46"
+mockall = "0.13.1"
mockito = "1"
motore-macros = "0.4.3"
murmur3 = "0.5.2"
diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml
index ac56c0b3..6ae17d21 100644
--- a/crates/iceberg/Cargo.toml
+++ b/crates/iceberg/Cargo.toml
@@ -57,6 +57,7 @@ arrow-string = { workspace = true }
as-any = { workspace = true }
async-std = { workspace = true, optional = true, features = ["attributes"] }
async-trait = { workspace = true }
+backon = { workspace = true }
base64 = { workspace = true }
bimap = { workspace = true }
bytes = { workspace = true }
@@ -66,6 +67,7 @@ expect-test = { workspace = true }
fnv = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
+mockall = { workspace = true }
moka = { version = "0.12.10", features = ["future"] }
murmur3 = { workspace = true }
num-bigint = { workspace = true }
diff --git a/crates/iceberg/src/catalog/mod.rs
b/crates/iceberg/src/catalog/mod.rs
index 409a80ee..a9b92d47 100644
--- a/crates/iceberg/src/catalog/mod.rs
+++ b/crates/iceberg/src/catalog/mod.rs
@@ -29,6 +29,7 @@ use std::sync::Arc;
use _serde::deserialize_snapshot;
use async_trait::async_trait;
pub use memory::MemoryCatalog;
+use mockall::automock;
use serde_derive::{Deserialize, Serialize};
use typed_builder::TypedBuilder;
use uuid::Uuid;
@@ -43,6 +44,7 @@ use crate::{Error, ErrorKind, Result};
/// The catalog API for Iceberg Rust.
#[async_trait]
+#[automock]
pub trait Catalog: Debug + Sync + Send {
/// List namespaces inside the catalog.
async fn list_namespaces(&self, parent: Option<&NamespaceIdent>)
diff --git a/crates/iceberg/src/error.rs b/crates/iceberg/src/error.rs
index 9f299fb6..067329d5 100644
--- a/crates/iceberg/src/error.rs
+++ b/crates/iceberg/src/error.rs
@@ -320,6 +320,11 @@ impl Error {
self.kind
}
+ /// Return error's retryable status
+ pub fn retryable(&self) -> bool {
+ self.retryable
+ }
+
/// Return error's message.
#[inline]
pub fn message(&self) -> &str {
diff --git a/crates/iceberg/src/spec/table_metadata.rs
b/crates/iceberg/src/spec/table_metadata.rs
index c3156e56..2604eac0 100644
--- a/crates/iceberg/src/spec/table_metadata.rs
+++ b/crates/iceberg/src/spec/table_metadata.rs
@@ -98,6 +98,26 @@ pub const RESERVED_PROPERTIES: [&str; 9] = [
PROPERTY_DEFAULT_SORT_ORDER,
];
+/// Property key for number of commit retries.
+pub const PROPERTY_COMMIT_NUM_RETRIES: &str = "commit.retry.num-retries";
+/// Default value for number of commit retries.
+pub const PROPERTY_COMMIT_NUM_RETRIES_DEFAULT: usize = 4;
+
+/// Property key for minimum wait time (ms) between retries.
+pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS: &str = "commit.retry.min-wait-ms";
+/// Default value for minimum wait time (ms) between retries.
+pub const PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT: u64 = 100;
+
+/// Property key for maximum wait time (ms) between retries.
+pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS: &str = "commit.retry.max-wait-ms";
+/// Default value for maximum wait time (ms) between retries.
+pub const PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT: u64 = 60 * 1000; // 1
minute
+
+/// Property key for total maximum retry time (ms).
+pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS: &str =
"commit.retry.total-timeout-ms";
+/// Default value for total maximum retry time (ms).
+pub const PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT: u64 = 30 * 60 * 1000;
// 30 minutes
+
/// Reference to [`TableMetadata`].
pub type TableMetadataRef = Arc<TableMetadata>;
diff --git a/crates/iceberg/src/transaction/mod.rs
b/crates/iceberg/src/transaction/mod.rs
index f96a15b7..06549a95 100644
--- a/crates/iceberg/src/transaction/mod.rs
+++ b/crates/iceberg/src/transaction/mod.rs
@@ -51,6 +51,9 @@
/// The `ApplyTransactionAction` trait provides an `apply` method
/// that allows users to apply a transaction action to a `Transaction`.
mod action;
+
+use std::collections::HashMap;
+
pub use action::*;
mod append;
mod snapshot;
@@ -61,8 +64,17 @@ mod update_statistics;
mod upgrade_format_version;
use std::sync::Arc;
+use std::time::Duration;
+
+use backon::{BackoffBuilder, ExponentialBackoff, ExponentialBuilder,
RetryableWithContext};
use crate::error::Result;
+use crate::spec::{
+ PROPERTY_COMMIT_MAX_RETRY_WAIT_MS,
PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
+ PROPERTY_COMMIT_MIN_RETRY_WAIT_MS,
PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
+ PROPERTY_COMMIT_NUM_RETRIES, PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
+ PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS,
PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
+};
use crate::table::Table;
use crate::transaction::action::BoxedTransactionAction;
use crate::transaction::append::FastAppendAction;
@@ -71,7 +83,7 @@ use crate::transaction::update_location::UpdateLocationAction;
use crate::transaction::update_properties::UpdatePropertiesAction;
use crate::transaction::update_statistics::UpdateStatisticsAction;
use crate::transaction::upgrade_format_version::UpgradeFormatVersionAction;
-use crate::{Catalog, TableCommit, TableRequirement, TableUpdate};
+use crate::{Catalog, Error, ErrorKind, TableCommit, TableRequirement,
TableUpdate};
/// Table transaction.
#[derive(Clone)]
@@ -152,13 +164,76 @@ impl Transaction {
}
/// Commit transaction.
- pub async fn commit(mut self, catalog: &dyn Catalog) -> Result<Table> {
+ pub async fn commit(self, catalog: &dyn Catalog) -> Result<Table> {
if self.actions.is_empty() {
// nothing to commit
return Ok(self.table);
}
- self.do_commit(catalog).await
+ let backoff = Self::build_backoff(self.table.metadata().properties())?;
+ let tx = self;
+
+ (|mut tx: Transaction| async {
+ let result = tx.do_commit(catalog).await;
+ (tx, result)
+ })
+ .retry(backoff)
+ .sleep(tokio::time::sleep)
+ .context(tx)
+ .when(|e| e.retryable())
+ .await
+ .1
+ }
+
+ fn build_backoff(props: &HashMap<String, String>) ->
Result<ExponentialBackoff> {
+ let min_delay = match props.get(PROPERTY_COMMIT_MIN_RETRY_WAIT_MS) {
+ Some(value_str) => value_str.parse::<u64>().map_err(|e| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Invalid value for commit.retry.min-wait-ms",
+ )
+ .with_source(e)
+ })?,
+ None => PROPERTY_COMMIT_MIN_RETRY_WAIT_MS_DEFAULT,
+ };
+ let max_delay = match props.get(PROPERTY_COMMIT_MAX_RETRY_WAIT_MS) {
+ Some(value_str) => value_str.parse::<u64>().map_err(|e| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Invalid value for commit.retry.max-wait-ms",
+ )
+ .with_source(e)
+ })?,
+ None => PROPERTY_COMMIT_MAX_RETRY_WAIT_MS_DEFAULT,
+ };
+ let total_delay = match props.get(PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS)
{
+ Some(value_str) => value_str.parse::<u64>().map_err(|e| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Invalid value for commit.retry.total-timeout-ms",
+ )
+ .with_source(e)
+ })?,
+ None => PROPERTY_COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT,
+ };
+ let max_times = match props.get(PROPERTY_COMMIT_NUM_RETRIES) {
+ Some(value_str) => value_str.parse::<usize>().map_err(|e| {
+ Error::new(
+ ErrorKind::DataInvalid,
+ "Invalid value for commit.retry.num-retries",
+ )
+ .with_source(e)
+ })?,
+ None => PROPERTY_COMMIT_NUM_RETRIES_DEFAULT,
+ };
+
+ Ok(ExponentialBuilder::new()
+ .with_min_delay(Duration::from_millis(min_delay))
+ .with_max_delay(Duration::from_millis(max_delay))
+ .with_total_delay(Some(Duration::from_millis(total_delay)))
+ .with_max_times(max_times)
+ .with_factor(2.0)
+ .build())
}
async fn do_commit(&mut self, catalog: &dyn Catalog) -> Result<Table> {
@@ -198,13 +273,18 @@ impl Transaction {
#[cfg(test)]
mod tests {
+ use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;
+ use std::sync::Arc;
+ use std::sync::atomic::{AtomicU32, Ordering};
- use crate::TableIdent;
+ use crate::catalog::MockCatalog;
use crate::io::FileIOBuilder;
use crate::spec::TableMetadata;
use crate::table::Table;
+ use crate::transaction::{ApplyTransactionAction, Transaction};
+ use crate::{Error, ErrorKind, TableIdent};
pub fn make_v1_table() -> Table {
let file = File::open(format!(
@@ -262,4 +342,171 @@ mod tests {
.build()
.unwrap()
}
+
+ /// Helper function to create a test table with retry properties
+ fn setup_test_table(num_retries: &str) -> Table {
+ let table = make_v2_table();
+
+ // Set retry properties
+ let mut props = HashMap::new();
+ props.insert("commit.retry.min-wait-ms".to_string(), "10".to_string());
+ props.insert("commit.retry.max-wait-ms".to_string(),
"100".to_string());
+ props.insert(
+ "commit.retry.total-timeout-ms".to_string(),
+ "1000".to_string(),
+ );
+ props.insert(
+ "commit.retry.num-retries".to_string(),
+ num_retries.to_string(),
+ );
+
+ // Update table properties
+ let metadata = table
+ .metadata()
+ .clone()
+ .into_builder(None)
+ .set_properties(props)
+ .unwrap()
+ .build()
+ .unwrap()
+ .metadata;
+
+ table.with_metadata(Arc::new(metadata))
+ }
+
+ /// Helper function to create a transaction with a simple update action
+ fn create_test_transaction(table: &Table) -> Transaction {
+ let tx = Transaction::new(table);
+ tx.update_table_properties()
+ .set("test.key".to_string(), "test.value".to_string())
+ .apply(tx)
+ .unwrap()
+ }
+
+ /// Helper function to set up a mock catalog with retryable errors
+ fn setup_mock_catalog_with_retryable_errors(
+ success_after_attempts: Option<u32>,
+ expected_calls: usize,
+ ) -> MockCatalog {
+ let mut mock_catalog = MockCatalog::new();
+
+ mock_catalog
+ .expect_load_table()
+ .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
+
+ let attempts = AtomicU32::new(0);
+ mock_catalog
+ .expect_update_table()
+ .times(expected_calls)
+ .returning_st(move |_| {
+ if let Some(success_after_attempts) = success_after_attempts {
+ attempts.fetch_add(1, Ordering::SeqCst);
+ if attempts.load(Ordering::SeqCst) <=
success_after_attempts {
+ Box::pin(async move {
+ Err(
+ Error::new(ErrorKind::CatalogCommitConflicts,
"Commit conflict")
+ .with_retryable(true),
+ )
+ })
+ } else {
+ Box::pin(async move { Ok(make_v2_table()) })
+ }
+ } else {
+ // Always fail with retryable error
+ Box::pin(async move {
+ Err(
+ Error::new(ErrorKind::CatalogCommitConflicts,
"Commit conflict")
+ .with_retryable(true),
+ )
+ })
+ }
+ });
+
+ mock_catalog
+ }
+
+ /// Helper function to set up a mock catalog with non-retryable error
+ fn setup_mock_catalog_with_non_retryable_error() -> MockCatalog {
+ let mut mock_catalog = MockCatalog::new();
+
+ mock_catalog
+ .expect_load_table()
+ .returning_st(|_| Box::pin(async move { Ok(make_v2_table()) }));
+
+ mock_catalog
+ .expect_update_table()
+ .times(1) // Should only be called once since error is not
retryable
+ .returning_st(move |_| {
+ Box::pin(async move {
+ Err(Error::new(ErrorKind::Unexpected, "Non-retryable
error")
+ .with_retryable(false))
+ })
+ });
+
+ mock_catalog
+ }
+
+ #[tokio::test]
+ async fn test_commit_retryable_error() {
+ // Create a test table with retry properties
+ let table = setup_test_table("3");
+
+ // Create a transaction with a simple update action
+ let tx = create_test_transaction(&table);
+
+ // Create a mock catalog that fails twice then succeeds
+ let mock_catalog = setup_mock_catalog_with_retryable_errors(Some(2),
3);
+
+ // Commit the transaction
+ let result = tx.commit(&mock_catalog).await;
+
+ // Verify the result
+ assert!(result.is_ok(), "Transaction should eventually succeed");
+ }
+
+ #[tokio::test]
+ async fn test_commit_non_retryable_error() {
+ // Create a test table with retry properties
+ let table = setup_test_table("3");
+
+ // Create a transaction with a simple update action
+ let tx = create_test_transaction(&table);
+
+ // Create a mock catalog that fails with non-retryable error
+ let mock_catalog = setup_mock_catalog_with_non_retryable_error();
+
+ // Commit the transaction
+ let result = tx.commit(&mock_catalog).await;
+
+ // Verify the result
+ assert!(result.is_err(), "Transaction should fail immediately");
+ if let Err(err) = result {
+ assert_eq!(err.kind(), ErrorKind::Unexpected);
+ assert_eq!(err.message(), "Non-retryable error");
+ assert!(!err.retryable(), "Error should not be retryable");
+ }
+ }
+
+ #[tokio::test]
+ async fn test_commit_max_retries_exceeded() {
+ // Create a test table with retry properties (only allow 2 retries)
+ let table = setup_test_table("2");
+
+ // Create a transaction with a simple update action
+ let tx = create_test_transaction(&table);
+
+ // Create a mock catalog that always fails with retryable error
+ let mock_catalog = setup_mock_catalog_with_retryable_errors(None, 3);
// Initial attempt + 2 retries = 3 total attempts
+
+ // Commit the transaction
+ let result = tx.commit(&mock_catalog).await;
+
+ // Verify the result
+ assert!(result.is_err(), "Transaction should fail after max retries");
+ if let Err(err) = result {
+ assert_eq!(err.kind(), ErrorKind::CatalogCommitConflicts);
+ assert_eq!(err.message(), "Commit conflict");
+ assert!(err.retryable(), "Error should be retryable");
+ }
+ }
}