This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss-rust.git


The following commit(s) were added to refs/heads/main by this push:
     new e26702e  chore: Idempotency bug: OOO loop when response is lost but 
subsequent batches succeed (#448)
e26702e is described below

commit e26702e3bf4fe5dd6e273ee942564be98f7a7593
Author: Anton Borisov <[email protected]>
AuthorDate: Sat Mar 21 03:32:26 2026 +0000

    chore: Idempotency bug: OOO loop when response is lost but subsequent 
batches succeed (#448)
---
 crates/fluss/src/client/write/idempotence.rs | 43 +++++++++++++++++++++++++++
 crates/fluss/src/client/write/sender.rs      | 44 ++++++++++++++++++++--------
 2 files changed, 75 insertions(+), 12 deletions(-)

diff --git a/crates/fluss/src/client/write/idempotence.rs 
b/crates/fluss/src/client/write/idempotence.rs
index 3c55f6a..eeec876 100644
--- a/crates/fluss/src/client/write/idempotence.rs
+++ b/crates/fluss/src/client/write/idempotence.rs
@@ -314,6 +314,19 @@ impl IdempotenceManager {
         }
     }
 
+    /// Returns true if the batch has already been committed on the server.
+    ///
+    /// If the batch's sequence is less than or equal to 
`last_acked_sequence`, it means
+    /// a higher-sequence batch has already been acknowledged. This implies 
the current batch
+    /// was also successfully written on the server (otherwise the 
higher-sequence batch could
+    /// not have been committed).
+    pub fn is_already_committed(&self, bucket: &TableBucket, batch_sequence: 
i32) -> bool {
+        let entries = self.bucket_entries.lock();
+        entries
+            .get(bucket)
+            .is_some_and(|e| e.last_acked_sequence >= 0 && batch_sequence <= 
e.last_acked_sequence)
+    }
+
     pub fn can_retry_for_error(
         &self,
         bucket: &TableBucket,
@@ -530,6 +543,36 @@ mod tests {
         assert!(mgr.can_send_more_requests(&b0)); // under limit again
     }
 
+    #[test]
+    fn test_is_already_committed() {
+        let mgr = IdempotenceManager::new(true, 5);
+        let b0 = test_bucket(0);
+        mgr.set_writer_id(42);
+
+        // No entry yet → not committed
+        assert!(!mgr.is_already_committed(&b0, 0));
+
+        // Initialize bucket and ack seq=0
+        let _ = mgr.next_sequence_and_increment(&b0); // 0
+        mgr.add_in_flight_batch(&b0, 0, 100);
+        mgr.handle_completed_batch(&b0, 100, 42); // last_acked=0
+
+        // seq=0 <= last_acked(0) → committed
+        assert!(mgr.is_already_committed(&b0, 0));
+        // seq=1 > last_acked(0) → not committed
+        assert!(!mgr.is_already_committed(&b0, 1));
+
+        // Ack up to seq=4, then check seq=0 still committed
+        for seq in 1..=4 {
+            let _ = mgr.next_sequence_and_increment(&b0);
+            mgr.add_in_flight_batch(&b0, seq, 100 + seq as i64);
+            mgr.handle_completed_batch(&b0, 100 + seq as i64, 42);
+        }
+        assert!(mgr.is_already_committed(&b0, 0)); // seq=0 <= last_acked(4)
+        assert!(mgr.is_already_committed(&b0, 4)); // seq=4 <= last_acked(4)
+        assert!(!mgr.is_already_committed(&b0, 5)); // seq=5 > last_acked(4)
+    }
+
     #[test]
     fn test_reset_batch_ids_cleaned_on_complete() {
         let (mgr, b0) = setup_three_in_flight();
diff --git a/crates/fluss/src/client/write/sender.rs 
b/crates/fluss/src/client/write/sender.rs
index b526e1a..dd5370d 100644
--- a/crates/fluss/src/client/write/sender.rs
+++ b/crates/fluss/src/client/write/sender.rs
@@ -637,6 +637,35 @@ impl Sender {
         message: String,
     ) -> Result<Option<Arc<PhysicalTablePath>>> {
         let physical_table_path = 
Arc::clone(ready_write_batch.write_batch.physical_table_path());
+
+        if error == FlussError::DuplicateSequenceException {
+            warn!(
+                "Duplicate sequence for {} on bucket {}: {message}",
+                physical_table_path.as_ref(),
+                ready_write_batch.table_bucket.bucket_id()
+            );
+            self.complete_batch(ready_write_batch);
+            return Ok(None);
+        }
+
+        if error == FlussError::OutOfOrderSequenceException
+            && self.idempotence_manager.is_enabled()
+            && self.idempotence_manager.is_already_committed(
+                &ready_write_batch.table_bucket,
+                ready_write_batch.write_batch.batch_sequence(),
+            )
+        {
+            warn!(
+                "Batch for {} on bucket {} with sequence {} received 
OutOfOrderSequenceException \
+                 but has already been committed. Treating as success due to 
lost response.",
+                physical_table_path.as_ref(),
+                ready_write_batch.table_bucket.bucket_id(),
+                ready_write_batch.write_batch.batch_sequence(),
+            );
+            self.complete_batch(ready_write_batch);
+            return Ok(None);
+        }
+
         if self.can_retry(&ready_write_batch, error) {
             warn!(
                 "Retrying write batch for {} on bucket {} after error 
{error:?}: {message}",
@@ -680,18 +709,9 @@ impl Sender {
             return 
Ok(Self::is_invalid_metadata_error(error).then_some(physical_table_path));
         }
 
-        if error == FlussError::DuplicateSequenceException {
-            warn!(
-                "Duplicate sequence for {} on bucket {}: {message}",
-                physical_table_path.as_ref(),
-                ready_write_batch.table_bucket.bucket_id()
-            );
-            self.complete_batch(ready_write_batch);
-            return Ok(None);
-        }
-
-        // Generic error path. handle_failed_batch will detect 
OutOfOrderSequence /
-        // UnknownWriterId and reset all writer state internally (matching 
Java).
+        // Generic error path. handle_failed_batch will detect remaining
+        // OutOfOrderSequence (not already committed) / UnknownWriterId cases 
and
+        // reset all writer state internally (matching Java).
         // For other errors, only adjust sequences if the batch didn't exhaust 
its retries.
         let can_adjust = ready_write_batch.write_batch.attempts() < 
self.retries;
         self.fail_batch(

Reply via email to