This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 0f4d86532 [client] Fix infinite retry loop in idempotent writer on
lost response with OutOfOrderSequenceException (#2827)
0f4d86532 is described below
commit 0f4d86532b48d229706f894dff652c40f825b003
Author: Liebing <[email protected]>
AuthorDate: Mon Mar 16 19:33:24 2026 +0800
[client] Fix infinite retry loop in idempotent writer on lost response with
OutOfOrderSequenceException (#2827)
---
.../fluss/client/write/IdempotenceManager.java | 17 ++++
.../java/org/apache/fluss/client/write/Sender.java | 27 +++++--
.../org/apache/fluss/client/write/SenderTest.java | 94 ++++++++++++++++++++++
3 files changed, 132 insertions(+), 6 deletions(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
index 7015b4e9a..1bc8f2bc1 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/write/IdempotenceManager.java
@@ -294,6 +294,23 @@ public class IdempotenceManager {
return false;
}
+ /**
+ * Returns true if the batch has already been committed on the server.
+ *
+ * <p>If the batch's sequence is less than or equal to {@code
lastAckedBatchSequence}, it means
+ * a higher-sequence batch has already been acknowledged. This implies the
previous batch was
+ * also successfully written on the server (otherwise the higher-sequence
batch could not have
+ * been committed).
+ *
+ * @param batch the batch to check
+ * @param tableBucket the target table-bucket
+ * @return true if the batch is already committed on the server
+ */
+ synchronized boolean isAlreadyCommitted(WriteBatch batch, TableBucket
tableBucket) {
+ Optional<Integer> lastAcked = lastAckedBatchSequence(tableBucket);
+ return lastAcked.isPresent() && batch.batchSequence() <=
lastAcked.get();
+ }
+
void maybeWaitForWriterId(Set<PhysicalTablePath> tablePaths) throws
Throwable {
if (isWriterIdValid()) {
return;
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
index 66b7719b1..5fc5ba274 100644
--- a/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
+++ b/fluss-client/src/main/java/org/apache/fluss/client/write/Sender.java
@@ -532,7 +532,27 @@ public class Sender implements Runnable {
ReadyWriteBatch readyWriteBatch, ApiError error) {
Set<PhysicalTablePath> invalidMetadataTables = new HashSet<>();
WriteBatch writeBatch = readyWriteBatch.writeBatch();
- if (canRetry(readyWriteBatch, error.error())) {
+ if (error.error() == Errors.DUPLICATE_SEQUENCE_EXCEPTION) {
+ // If we have received a duplicate batch sequence error, it means
that the batch
+ // sequence has advanced beyond the sequence of the current batch.
+ // The only thing we can do is to return success to the user.
+ completeBatch(readyWriteBatch);
+ } else if (error.error() == Errors.OUT_OF_ORDER_SEQUENCE_EXCEPTION
+ && idempotenceManager.idempotenceEnabled()
+ && idempotenceManager.isAlreadyCommitted(
+ writeBatch, readyWriteBatch.tableBucket())) {
+ // The batch received OUT_OF_ORDER_SEQUENCE_EXCEPTION but its
sequence is already
+ // <= lastAckedBatchSequence, which means it was successfully
written on the server
+ // but the response was lost. Complete it as success to avoid
infinite retry loop.
+ LOG.warn(
+ "Batch for table-bucket {} with sequence {} received "
+ + "OUT_OF_ORDER_SEQUENCE_EXCEPTION but has already
been committed "
+ + "(lastAckedBatchSequence={}). Treating as
success due to lost response.",
+ readyWriteBatch.tableBucket(),
+ writeBatch.batchSequence(),
+
idempotenceManager.lastAckedBatchSequence(readyWriteBatch.tableBucket()));
+ completeBatch(readyWriteBatch);
+ } else if (canRetry(readyWriteBatch, error.error())) {
// if batch failed because of retrievable exception, we need to
retry send all those
// batches.
LOG.warn(
@@ -575,11 +595,6 @@ public class Sender implements Runnable {
}
invalidMetadataTables.add(writeBatch.physicalTablePath());
}
- } else if (error.error() == Errors.DUPLICATE_SEQUENCE_EXCEPTION) {
- // If we have received a duplicate batch sequence error, it means
that the batch
- // sequence has advanced beyond the sequence of the current batch.
- // The only thing we can do is to return success to the user.
- completeBatch(readyWriteBatch);
} else {
LOG.warn(
"Get error write response on table bucket {}, fail. Error:
{}",
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
index 16ba2f2ec..cd59e8bdf 100644
--- a/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
+++ b/fluss-client/src/test/java/org/apache/fluss/client/write/SenderTest.java
@@ -610,6 +610,100 @@ final class SenderTest {
assertThat(future1.get()).isNull();
}
+ /**
+ * Tests that when a batch's response is lost (e.g., due to request
timeout) but the batch was
+ * successfully written on the server, and subsequent batches with higher
sequence numbers are
+ * acknowledged, the client should treat the retried batch as already
committed instead of
+ * entering an infinite retry loop with {@link
+ * org.apache.fluss.exception.OutOfOrderSequenceException}.
+ *
+ * <p>Detailed scenario:
+ *
+ * <ol>
+ * <li>Send batch1(seq=0) ~ batch5(seq=4). All 5 batches are
successfully written on the
+ * server (server {@code lastBatchSeq=4}).
+ * <li>batch2~5 (seq=1~4) responses return normally. Client {@code
lastAckedBatchSequence=4}.
+ * <li>Send batch6(seq=5) and ack successfully. Server {@code
lastBatchSeq=5}.
+ * <li>batch1(seq=0) response is lost due to {@code REQUEST_TIME_OUT}.
batch1 is re-enqueued
+ * for retry.
+ * <li>Client retries batch1(seq=0). Since server {@code lastBatchSeq=5}
and {@code 0 != 5+1},
+ * server returns {@code OUT_OF_ORDER_SEQUENCE_EXCEPTION}.
+ * <li>Client detects {@code batch1.seq(0) <=
lastAckedBatchSequence(5)}: batch1 is already
+ * committed. Client completes batch1 successfully without further
retries.
+ * </ol>
+ */
+ @Test
+ void
testCorrectHandlingOfOutOfOrderResponsesWhenResponseLostButSubsequentBatchesSucceeded()
+ throws Exception {
+ IdempotenceManager idempotenceManager = createIdempotenceManager(true);
+ Sender sender1 = setupWithIdempotenceState(idempotenceManager);
+ sender1.runOnce();
+ assertThat(idempotenceManager.isWriterIdValid()).isTrue();
+ assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(0);
+
+ // Send batch1 (seq=0): its response will be lost later.
+ CompletableFuture<Exception> future1 = new CompletableFuture<>();
+ appendToAccumulator(tb1, row(1, "a"), (tb, leo, e) ->
future1.complete(e));
+ sender1.runOnce();
+ assertThat(future1.isDone()).isFalse();
+ assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(1);
+
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isNotPresent();
+
+ // Send batch2~5 (seq=1~4) and collect their futures.
+ int numFollowingBatches = 4;
+ List<CompletableFuture<Exception>> followingFutures = new
ArrayList<>();
+ for (int i = 0; i < numFollowingBatches; i++) {
+ CompletableFuture<Exception> future = new CompletableFuture<>();
+ followingFutures.add(future);
+ appendToAccumulator(tb1, row(i + 2, "b"), (tb, leo, e) ->
future.complete(e));
+ sender1.runOnce();
+ assertThat(future.isDone()).isFalse();
+ }
+ assertThat(idempotenceManager.nextSequence(tb1)).isEqualTo(5);
+
+ // batch2~5 (seq=1~4) responses return normally.
+ for (int seq = 1; seq <= numFollowingBatches; seq++) {
+ finishIdempotentProduceLogRequest(
+ seq, tb1, 1, createProduceLogResponse(tb1, seq, seq + 1L));
+
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isEqualTo(Optional.of(seq));
+ assertThat(followingFutures.get(seq - 1).isDone()).isTrue();
+ assertThat(followingFutures.get(seq - 1).get()).isNull();
+ }
+
+ // Send batch6 (seq=5) and ack successfully.
+ // Now server lastBatchSeq=5. batch1 (seq=0) is still waiting response.
+ CompletableFuture<Exception> future6 = new CompletableFuture<>();
+ appendToAccumulator(tb1, row(6, "f"), (tb, leo, e) ->
future6.complete(e));
+ sender1.runOnce(); // drain and send batch6 (seq=5)
+ finishIdempotentProduceLogRequest(5, tb1, 1,
createProduceLogResponse(tb1, 5L, 6L));
+
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isEqualTo(Optional.of(5));
+ assertThat(future6.isDone()).isTrue();
+ assertThat(future6.get()).isNull();
+
+ // All 6 batches are written successfully on the server (server
lastBatchSeq=5).
+ // batch1 (seq=0) response is lost, simulated by REQUEST_TIME_OUT.
+ finishIdempotentProduceLogRequest(
+ 0, tb1, 0, createProduceLogResponse(tb1,
Errors.REQUEST_TIME_OUT));
+ assertThat(future1.isDone()).isFalse();
+
+ // Now retry batch1 (seq=0). Server lastBatchSeq=5, so 0 != 5+1,
+ // server returns OUT_OF_ORDER_SEQUENCE_EXCEPTION.
+ sender1.runOnce(); // send retried batch1
+ finishIdempotentProduceLogRequest(
+ 0, tb1, 0, createProduceLogResponse(tb1,
Errors.OUT_OF_ORDER_SEQUENCE_EXCEPTION));
+
+ // The client should detect that batch1.seq(0) <=
lastAckedBatchSequence(5),
+ // meaning batch1 was already committed on the server (its response
was just lost).
+ // It should complete batch1 successfully instead of entering an
infinite retry loop.
+ assertThat(future1.isDone()).isTrue();
+ assertThat(future1.get()).isNull();
+ // lastAckedBatchSequence should remain at 5 (not changed by
completing already-committed
+ // batch1)
+
assertThat(idempotenceManager.lastAckedBatchSequence(tb1)).isEqualTo(Optional.of(5));
+ // No more inflight batches
+ assertThat(sender1.numOfInFlightBatches(tb1)).isEqualTo(0);
+ }
+
@Test
void testCorrectHandlingOfDuplicateSequenceError() throws Exception {
IdempotenceManager idempotenceManager = createIdempotenceManager(true);