This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git
The following commit(s) were added to refs/heads/main by this push:
new e26702e chore: Idempotency bug: OOO loop when response is lost but
subsequent batches succeed (#448)
e26702e is described below
commit e26702e3bf4fe5dd6e273ee942564be98f7a7593
Author: Anton Borisov <[email protected]>
AuthorDate: Sat Mar 21 03:32:26 2026 +0000
chore: Idempotency bug: OOO loop when response is lost but subsequent
batches succeed (#448)
---
crates/fluss/src/client/write/idempotence.rs | 43 +++++++++++++++++++++++++++
crates/fluss/src/client/write/sender.rs | 44 ++++++++++++++++++++--------
2 files changed, 75 insertions(+), 12 deletions(-)
diff --git a/crates/fluss/src/client/write/idempotence.rs
b/crates/fluss/src/client/write/idempotence.rs
index 3c55f6a..eeec876 100644
--- a/crates/fluss/src/client/write/idempotence.rs
+++ b/crates/fluss/src/client/write/idempotence.rs
@@ -314,6 +314,19 @@ impl IdempotenceManager {
}
}
+ /// Returns true if the batch has already been committed on the server.
+ ///
+ /// If the batch's sequence is less than or equal to
`last_acked_sequence`, it means
+ /// a higher-sequence batch has already been acknowledged. This implies
the current batch
+ /// was also successfully written on the server (otherwise the
higher-sequence batch could
+ /// not have been committed).
+ pub fn is_already_committed(&self, bucket: &TableBucket, batch_sequence:
i32) -> bool {
+ let entries = self.bucket_entries.lock();
+ entries
+ .get(bucket)
+ .is_some_and(|e| e.last_acked_sequence >= 0 && batch_sequence <=
e.last_acked_sequence)
+ }
+
pub fn can_retry_for_error(
&self,
bucket: &TableBucket,
@@ -530,6 +543,36 @@ mod tests {
assert!(mgr.can_send_more_requests(&b0)); // under limit again
}
+ #[test]
+ fn test_is_already_committed() {
+ let mgr = IdempotenceManager::new(true, 5);
+ let b0 = test_bucket(0);
+ mgr.set_writer_id(42);
+
+ // No entry yet → not committed
+ assert!(!mgr.is_already_committed(&b0, 0));
+
+ // Initialize bucket and ack seq=0
+ let _ = mgr.next_sequence_and_increment(&b0); // 0
+ mgr.add_in_flight_batch(&b0, 0, 100);
+ mgr.handle_completed_batch(&b0, 100, 42); // last_acked=0
+
+ // seq=0 <= last_acked(0) → committed
+ assert!(mgr.is_already_committed(&b0, 0));
+ // seq=1 > last_acked(0) → not committed
+ assert!(!mgr.is_already_committed(&b0, 1));
+
+ // Ack up to seq=4, then check seq=0 still committed
+ for seq in 1..=4 {
+ let _ = mgr.next_sequence_and_increment(&b0);
+ mgr.add_in_flight_batch(&b0, seq, 100 + seq as i64);
+ mgr.handle_completed_batch(&b0, 100 + seq as i64, 42);
+ }
+ assert!(mgr.is_already_committed(&b0, 0)); // seq=0 <= last_acked(4)
+ assert!(mgr.is_already_committed(&b0, 4)); // seq=4 <= last_acked(4)
+ assert!(!mgr.is_already_committed(&b0, 5)); // seq=5 > last_acked(4)
+ }
+
#[test]
fn test_reset_batch_ids_cleaned_on_complete() {
let (mgr, b0) = setup_three_in_flight();
diff --git a/crates/fluss/src/client/write/sender.rs
b/crates/fluss/src/client/write/sender.rs
index b526e1a..dd5370d 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -637,6 +637,35 @@ impl Sender {
message: String,
) -> Result<Option<Arc<PhysicalTablePath>>> {
let physical_table_path =
Arc::clone(ready_write_batch.write_batch.physical_table_path());
+
+ if error == FlussError::DuplicateSequenceException {
+ warn!(
+ "Duplicate sequence for {} on bucket {}: {message}",
+ physical_table_path.as_ref(),
+ ready_write_batch.table_bucket.bucket_id()
+ );
+ self.complete_batch(ready_write_batch);
+ return Ok(None);
+ }
+
+ if error == FlussError::OutOfOrderSequenceException
+ && self.idempotence_manager.is_enabled()
+ && self.idempotence_manager.is_already_committed(
+ &ready_write_batch.table_bucket,
+ ready_write_batch.write_batch.batch_sequence(),
+ )
+ {
+ warn!(
+ "Batch for {} on bucket {} with sequence {} received
OutOfOrderSequenceException \
+ but has already been committed. Treating as success due to
lost response.",
+ physical_table_path.as_ref(),
+ ready_write_batch.table_bucket.bucket_id(),
+ ready_write_batch.write_batch.batch_sequence(),
+ );
+ self.complete_batch(ready_write_batch);
+ return Ok(None);
+ }
+
if self.can_retry(&ready_write_batch, error) {
warn!(
"Retrying write batch for {} on bucket {} after error
{error:?}: {message}",
@@ -680,18 +709,9 @@ impl Sender {
return
Ok(Self::is_invalid_metadata_error(error).then_some(physical_table_path));
}
- if error == FlussError::DuplicateSequenceException {
- warn!(
- "Duplicate sequence for {} on bucket {}: {message}",
- physical_table_path.as_ref(),
- ready_write_batch.table_bucket.bucket_id()
- );
- self.complete_batch(ready_write_batch);
- return Ok(None);
- }
-
- // Generic error path. handle_failed_batch will detect
OutOfOrderSequence /
- // UnknownWriterId and reset all writer state internally (matching
Java).
+ // Generic error path. handle_failed_batch will detect remaining
+ // OutOfOrderSequence (not already committed) / UnknownWriterId cases
and
+ // reset all writer state internally (matching Java).
// For other errors, only adjust sequences if the batch didn't exhaust
its retries.
let can_adjust = ready_write_batch.write_batch.attempts() <
self.retries;
self.fail_batch(