This is an automated email from the ASF dual-hosted git repository.

ipolyzos pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 2df0914bd [client][backport] Skip blocked bucket instead of stopping 
drain loop in idempotent writer (#2953)
2df0914bd is described below

commit 2df0914bd1b149be51bbf8ac1b644539940b2af8
Author: Anton Borisov <[email protected]>
AuthorDate: Tue Mar 31 12:47:36 2026 +0100

    [client][backport] Skip blocked bucket instead of stopping drain loop in 
idempotent writer (#2953)
---
 .../fluss/client/write/RecordAccumulator.java      |  3 +-
 .../fluss/client/write/RecordAccumulatorTest.java  | 44 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 1 deletion(-)

diff --git 
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
 
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
index 991ea7c8e..70988d0e2 100644
--- 
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
+++ 
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
@@ -729,7 +729,8 @@ public final class RecordAccumulator {
                     break;
                 } else {
                     if (shouldStopDrainBatchesForBucket(first, tableBucket)) {
-                        break;
+                        // Buckets are independent — skip this one, keep 
draining others.
+                        continue;
                     }
                 }
 
diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
index 773c12a44..d9e2291ce 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
@@ -602,6 +602,50 @@ class RecordAccumulatorTest {
         assertThat(tableBucketsInBatch).containsExactlyInAnyOrder(tb);
     }
 
+    @Test
+    void testDrainContinuesWhenBucketAtMaxInflight() throws Exception {
+        int batchSize = 1024;
+        conf.set(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT, 
Duration.ofMillis(0));
+        conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, new 
MemorySize(10L * batchSize));
+        conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE, new 
MemorySize(256));
+        conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, new 
MemorySize(batchSize));
+
+        IdempotenceManager idempotenceManager =
+                new IdempotenceManager(true, /* maxInflightPerBucket */ 1, 
null, null);
+        idempotenceManager.setWriterId(1L);
+
+        RecordAccumulator accum =
+                new RecordAccumulator(
+                        conf, idempotenceManager, 
TestingWriterMetricGroup.newInstance(), clock);
+
+        cluster = updateCluster(Arrays.asList(bucket1, bucket2));
+        IndexedRow row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"});
+
+        // Drain both buckets so each has 1 in-flight batch.
+        accum.append(createRecord(row), writeCallback, cluster, 
tb1.getBucket(), false);
+        accum.append(createRecord(row), writeCallback, cluster, 
tb2.getBucket(), false);
+
+        Map<Integer, List<ReadyWriteBatch>> firstDrain =
+                accum.drain(cluster, Collections.singleton(node1.id()), 
Integer.MAX_VALUE);
+        List<ReadyWriteBatch> firstBatches = firstDrain.get(node1.id());
+        assertThat(firstBatches).hasSize(2);
+
+        // Complete only tb2, leaving tb1 at max in-flight.
+        ReadyWriteBatch tb2Batch =
+                firstBatches.stream().filter(b -> 
b.tableBucket().equals(tb2)).findFirst().get();
+        idempotenceManager.handleCompletedBatch(tb2Batch);
+
+        // Append again to both. On drain, tb1 should be skipped but tb2 
should still be drained.
+        accum.append(createRecord(row), writeCallback, cluster, 
tb1.getBucket(), false);
+        accum.append(createRecord(row), writeCallback, cluster, 
tb2.getBucket(), false);
+
+        Map<Integer, List<ReadyWriteBatch>> secondDrain =
+                accum.drain(cluster, Collections.singleton(node1.id()), 
Integer.MAX_VALUE);
+        List<ReadyWriteBatch> secondBatches = secondDrain.get(node1.id());
+        assertThat(secondBatches).hasSize(1);
+        assertThat(secondBatches.get(0).tableBucket()).isEqualTo(tb2);
+    }
+
     /** Return the offset delta. */
     private int expectedNumAppends(IndexedRow row, int batchSize) {
         int size = recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE);

Reply via email to