This is an automated email from the ASF dual-hosted git repository.
ipolyzos pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 2df0914bd [client][backport] Skip blocked bucket instead of stopping
drain loop in idempotent writer (#2953)
2df0914bd is described below
commit 2df0914bd1b149be51bbf8ac1b644539940b2af8
Author: Anton Borisov <[email protected]>
AuthorDate: Tue Mar 31 12:47:36 2026 +0100
[client][backport] Skip blocked bucket instead of stopping drain loop in
idempotent writer (#2953)
---
.../fluss/client/write/RecordAccumulator.java | 3 +-
.../fluss/client/write/RecordAccumulatorTest.java | 44 ++++++++++++++++++++++
2 files changed, 46 insertions(+), 1 deletion(-)
diff --git
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
index 991ea7c8e..70988d0e2 100644
---
a/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
+++
b/fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java
@@ -729,7 +729,8 @@ public final class RecordAccumulator {
break;
} else {
if (shouldStopDrainBatchesForBucket(first, tableBucket)) {
- break;
+ // Buckets are independent — skip this one, keep
draining others.
+ continue;
}
}
diff --git
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
index 773c12a44..d9e2291ce 100644
---
a/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
+++
b/fluss-client/src/test/java/org/apache/fluss/client/write/RecordAccumulatorTest.java
@@ -602,6 +602,50 @@ class RecordAccumulatorTest {
assertThat(tableBucketsInBatch).containsExactlyInAnyOrder(tb);
}
+ @Test
+ void testDrainContinuesWhenBucketAtMaxInflight() throws Exception {
+ int batchSize = 1024;
+ conf.set(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT,
Duration.ofMillis(0));
+ conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, new
MemorySize(10L * batchSize));
+ conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_PAGE_SIZE, new
MemorySize(256));
+ conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, new
MemorySize(batchSize));
+
+ IdempotenceManager idempotenceManager =
+ new IdempotenceManager(true, /* maxInflightPerBucket */ 1,
null, null);
+ idempotenceManager.setWriterId(1L);
+
+ RecordAccumulator accum =
+ new RecordAccumulator(
+ conf, idempotenceManager,
TestingWriterMetricGroup.newInstance(), clock);
+
+ cluster = updateCluster(Arrays.asList(bucket1, bucket2));
+ IndexedRow row = indexedRow(DATA1_ROW_TYPE, new Object[] {1, "a"});
+
+ // Drain both buckets so each has 1 in-flight batch.
+ accum.append(createRecord(row), writeCallback, cluster,
tb1.getBucket(), false);
+ accum.append(createRecord(row), writeCallback, cluster,
tb2.getBucket(), false);
+
+ Map<Integer, List<ReadyWriteBatch>> firstDrain =
+ accum.drain(cluster, Collections.singleton(node1.id()),
Integer.MAX_VALUE);
+ List<ReadyWriteBatch> firstBatches = firstDrain.get(node1.id());
+ assertThat(firstBatches).hasSize(2);
+
+ // Complete only tb2, leaving tb1 at max in-flight.
+ ReadyWriteBatch tb2Batch =
+ firstBatches.stream().filter(b ->
b.tableBucket().equals(tb2)).findFirst().get();
+ idempotenceManager.handleCompletedBatch(tb2Batch);
+
+ // Append again to both. On drain, tb1 should be skipped but tb2
should still be drained.
+ accum.append(createRecord(row), writeCallback, cluster,
tb1.getBucket(), false);
+ accum.append(createRecord(row), writeCallback, cluster,
tb2.getBucket(), false);
+
+ Map<Integer, List<ReadyWriteBatch>> secondDrain =
+ accum.drain(cluster, Collections.singleton(node1.id()),
Integer.MAX_VALUE);
+ List<ReadyWriteBatch> secondBatches = secondDrain.get(node1.id());
+ assertThat(secondBatches).hasSize(1);
+ assertThat(secondBatches.get(0).tableBucket()).isEqualTo(tb2);
+ }
+
/** Return the offset delta. */
private int expectedNumAppends(IndexedRow row, int batchSize) {
int size = recordBatchHeaderSize(CURRENT_LOG_MAGIC_VALUE);