This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f4d86532 [client] Fix infinite retry loop in idempotent writer on 
lost response with OutOfOrderSequenceException (#2827)
0f4d86532 is described below

commit 0f4d86532b48d229706f894dff652c40f825b003
Author: Liebing <[email protected]>
AuthorDate: Mon Mar 16 19:33:24 2026 +0800

    [client] Fix infinite retry loop in idempotent writer on lost response with 
OutOfOrderSequenceException (#2827)
---
 .../fluss/client/write/IdempotenceManager.java     | 17 ++++
 .../java/org/apache/fluss/client/write/Sender.java | 27 +++++--
 .../org/apache/fluss/client/write/SenderTest.java  | 94 ++++++++++++++++++++++
 3 files changed, 132 insertions(+), 6 deletions(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
index 7015b4e9a..1bc8f2bc1 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
@@ -294,6 +294,23 @@ public class IdempotenceManager {
         return false;
     }
 
+    /**
+     * Returns true if the batch has already been committed on the server.
+     *
+     * <p>If the batch's sequence is less than or equal to {@code 
lastAckedBatchSequence}, it means
+     * a higher-sequence batch has already been acknowledged. This implies the 
previous batch was
+     * also successfully written on the server (otherwise the higher-sequence 
batch could not have
+     * been committed).
+     *
+     * @param batch the batch to check
+     * @param tableBucket the target table-bucket
+     * @return true if the batch is already committed on the server
+     */
+    synchronized boolean isAlreadyCommitted(WriteBatch batch, TableBucket 
tableBucket) {
+        Optional<Integer> lastAcked = lastAckedBatchSequence(tableBucket);
+        return lastAcked.isPresent() && batch.batchSequence() <= 
lastAcked.get();
+    }
+
     void maybeWaitForWriterId(Set<PhysicalTablePath> tablePaths) throws 
Throwable {
         if (isWriterIdValid()) {
             return;
diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java 
b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
index 66b7719b1..5fc5ba274 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
@@ -532,7 +532,27 @@ public class Sender implements Runnable {
             ReadyWriteBatch readyWriteBatch, ApiError error) {
         Set<PhysicalTablePath> invalidMetadataTables = new HashSet<>();
         WriteBatch writeBatch = readyWriteBatch.writeBatch();
-        if (canRetry(readyWriteBatch, error.error())) {
+        if (error.error() == Errors.DUPLICATE_SEQUENCE_EXCEPTION) {
+            // If we have received a duplicate batch sequence error, it means 
that the batch
+            // sequence has advanced beyond the sequence of the current batch.
+            // The only thing we can do is to return success to the user.
+            completeBatch(readyWriteBatch);
+        } else if (error.error() == Errors.OUT_OF_ORDER_SEQUENCE_EXCEPTION
+                && idempotenceManager.idempotenceEnabled()
+                && idempotenceManager.isAlreadyCommitted(
+                        writeBatch, readyWriteBatch.tableBucket())) {
+            // The batch received OUT_OF_ORDER_SEQUENCE_EXCEPTION but its 
sequence is already
+            // <= lastAckedBatchSequence, which means it was successfully 
written on the server
+            // but the response was lost. Complete it as success to avoid 
infinite retry loop.
+            LOG.warn(
+                    "Batch for table-bucket {} with sequence {} received "
+                            + "OUT_OF_ORDER_SEQUENCE_EXCEPTION but has already 
been committed "
+                            + "(lastAckedBatchSequence={}). Treating as 
success due to lost response.",
+                    readyWriteBatch.tableBucket(),
+                    writeBatch.batchSequence(),
+                    
idempotenceManager.lastAckedBatchSequence(readyWriteBatch.tableBucket()));
+            completeBatch(readyWriteBatch);
+        } else if (canRetry(readyWriteBatch, error.error())) {
             // if batch failed because of retrievable exception, we need to 
retry send all those
             // batches.
             LOG.warn(
@@ -575,11 +595,6 @@ public class Sender implements Runnable {
                 }
                 invalidMetadataTables.add(writeBatch.physicalTablePath());
             }
-        } else if (error.error() == Errors.DUPLICATE_SEQUENCE_EXCEPTION) {
-            // If we have received a duplicate batch sequence error, it means 
that the batch
-            // sequence has advanced beyond the sequence of the current batch.
-            // The only thing we can do is to return success to the user.
-            completeBatch(readyWriteBatch);
         } else {
             LOG.warn(
                     "Get error write response on table bucket {}, fail. Error: 
{}",
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java 
b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
index 16ba2f2ec..cd59e8bdf 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
@@ -610,6 +610,100 @@ final class SenderTest {
         assertThat(future1.get()).isNull();
     }
 
+    /**
+     * Tests that when a batch's response is lost (e.g., due to request 
timeout) but the batch was
+     * successfully written on the server, and subsequent batches with higher 
sequence numbers are
+     * acknowledged, the client should treat the retried batch as already 
committed instead of
+     * entering an infinite retry loop with {@link
+     * org.apache.fluss.exception.OutOfOrderSequenceException}.
+     *
+     * <p>Detailed scenario:
+     *
+     * <ol>
+     *   <li>Send batch1(seq=0) ~ batch5(seq=4). All 5 batches are 
successfully written on the
+     *       server (server {@code lastBatchSeq=4}).
+     *   <li>batch2~5 (seq=1~4) responses return normally. Client {@code 
lastAckedBatchSequence=4}.
+     *   <li>Send batch6(seq=5) and ack successfully. Server {@code 
lastBatchSeq=5}.
+     *   <li>batch1(seq=0) response is lost due to {@code REQUEST_TIME_OUT}. 
batch1 is re-enqueued
+     *       for retry.
+     *   <li>Client retries batch1(seq=0). Since server {@code lastBatchSeq=5} 
and {@code 0 != 5+1},
+     *       server returns {@code OUT_OF_ORDER_SEQUENCE_EXCEPTION}.
+     *   <li>Client detects {@code batch1.seq(0) <= 
lastAckedBatchSequence(5)}: batch1 is already
+     *       committed. Client completes batch1 successfully without further 
retries.
+     * </ol>
+     */
+    @Test
+    void 
testCorrectHandlingOfOutOfOrderResponsesWhenResponseLostButSubsequentBatchesSucceeded()
+            throws Exception {
+        IdempotenceManager idempotenceManager = createIdempotenceManager(true);
+        Sender sender1 = setupWithIdempotenceState(idempotenceManager);
+        sender1.runOnce();
+        assertThat(idempotenceManager.isWriterIdValid()).isTrue();
+        assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(0);
+
+        // Send batch1 (seq=0): its response will be lost later.
+        CompletableFuture<Exception> future1 = new CompletableFuture<>();
+        appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) -> 
future1.complete(e));
+        sender1.runOnce();
+        assertThat(future1.isDone()).isFalse();
+        assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(1);
+        
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
+
+        // Send batch2~5 (seq=1~4) and collect their futures.
+        int numFollowingBatches = 4;
+        List<CompletableFuture<Exception>> followingFutures = new 
ArrayList<>();
+        for (int i = 0; i < numFollowingBatches; i++) {
+            CompletableFuture<Exception> future = new CompletableFuture<>();
+            followingFutures.add(future);
+            appendToAccumulator(tb1, row(i + 2, "b"), (tb, leo, e) -> 
future.complete(e));
+            sender1.runOnce();
+            assertThat(future.isDone()).isFalse();
+        }
+        assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(5);
+
+        // batch2~5 (seq=1~4) responses return normally.
+        for (int seq = 1; seq <= numFollowingBatches; seq++) {
+            finishIdempotentProduceLogRequest(
+                    seq, tb1, 1, createProduceLogResponse(tb1, seq, seq + 1L));
+            
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isEqualTo(Optional.of(seq));
+            assertThat(followingFutures.get(seq - 1).isDone()).isTrue();
+            assertThat(followingFutures.get(seq - 1).get()).isNull();
+        }
+
+        // Send batch6 (seq=5) and ack successfully.
+        // Now server lastBatchSeq=5. batch1 (seq=0) is still waiting response.
+        CompletableFuture<Exception> future6 = new CompletableFuture<>();
+        appendToAccumulator(tb1, row(6, "f"), (tb, leo, e) -> 
future6.complete(e));
+        sender1.runOnce(); // drain and send batch6 (seq=5)
+        finishIdempotentProduceLogRequest(5, tb1, 1, 
createProduceLogResponse(tb1, 5L, 6L));
+        
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isEqualTo(Optional.of(5));
+        assertThat(future6.isDone()).isTrue();
+        assertThat(future6.get()).isNull();
+
+        // All 6 batches are written successfully on the server (server 
lastBatchSeq=5).
+        // batch1 (seq=0) response is lost, simulated by REQUEST_TIME_OUT.
+        finishIdempotentProduceLogRequest(
+                0, tb1, 0, createProduceLogResponse(tb1, 
Errors.REQUEST_TIME_OUT));
+        assertThat(future1.isDone()).isFalse();
+
+        // Now retry batch1 (seq=0). Server lastBatchSeq=5, so 0 != 5+1,
+        // server returns OUT_OF_ORDER_SEQUENCE_EXCEPTION.
+        sender1.runOnce(); // send retried batch1
+        finishIdempotentProduceLogRequest(
+                0, tb1, 0, createProduceLogResponse(tb1, 
Errors.OUT_OF_ORDER_SEQUENCE_EXCEPTION));
+
+        // The client should detect that batch1.seq(0) <= 
lastAckedBatchSequence(5),
+        // meaning batch1 was already committed on the server (its response 
was just lost).
+        // It should complete batch1 successfully instead of entering an 
infinite retry loop.
+        assertThat(future1.isDone()).isTrue();
+        assertThat(future1.get()).isNull();
+        // lastAckedBatchSequence should remain at 5 (not changed by 
completing already-committed
+        // batch1)
+        
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isEqualTo(Optional.of(5));
+        // No more inflight batches
+        assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(0);
+    }
+
     @Test
     void testCorrectHandlingOfDuplicateSequenceError() throws Exception {
         IdempotenceManager idempotenceManager = createIdempotenceManager(true);

Reply via email to