This is an automated email from the ASF dual-hosted git repository.
popduke pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/bifromq.git
The following commit(s) were added to refs/heads/main by this push:
new a7f85e520 Refactor BatchMutationCall to optimize task batching and
execution flow (#230)
a7f85e520 is described below
commit a7f85e5201488a71ce70d703b169a5d46194e8ea
Author: Yonny(Yu) Hao <[email protected]>
AuthorDate: Thu Mar 5 09:44:13 2026 +0800
Refactor BatchMutationCall to optimize task batching and execution flow
(#230)
---
.../basekv/client/scheduler/BatchMutationCall.java | 56 +++++++----
.../client/scheduler/BatchMutationCallTest.java | 109 ++++++++++++++++++---
.../client/scheduler/TestBatchMutationCall.java | 32 +++++-
3 files changed, 163 insertions(+), 34 deletions(-)
diff --git
a/base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCall.java
b/base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCall.java
index e5c6dee94..6c9a43ee2 100644
---
a/base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCall.java
+++
b/base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCall.java
@@ -42,7 +42,7 @@ import org.apache.bifromq.basescheduler.ICallTask;
public abstract class BatchMutationCall<ReqT, RespT> implements
IBatchCall<ReqT, RespT, MutationCallBatcherKey> {
protected final MutationCallBatcherKey batcherKey;
private final IMutationPipeline storePipeline;
- private Deque<MutationCallTaskBatch<ReqT, RespT>> batchCallTasks = new
ArrayDeque<>();
+ private Deque<ICallTask<ReqT, RespT, MutationCallBatcherKey>>
pendingCallTasks = new ArrayDeque<>();
protected BatchMutationCall(IMutationPipeline storePipeline,
MutationCallBatcherKey batcherKey) {
this.batcherKey = batcherKey;
@@ -51,20 +51,7 @@ public abstract class BatchMutationCall<ReqT, RespT>
implements IBatchCall<ReqT,
@Override
public final void add(ICallTask<ReqT, RespT, MutationCallBatcherKey>
callTask) {
- MutationCallTaskBatch<ReqT, RespT> lastBatchCallTask;
- MutationCallBatcherKey batcherKey = callTask.batcherKey();
- assert callTask.batcherKey().id.equals(batcherKey.id);
- if ((lastBatchCallTask = batchCallTasks.peekLast()) != null) {
- if (!lastBatchCallTask.isBatchable(callTask)) {
- lastBatchCallTask = newBatch(batcherKey.ver);
- batchCallTasks.add(lastBatchCallTask);
- }
- lastBatchCallTask.add(callTask);
- } else {
- lastBatchCallTask = newBatch(batcherKey.ver);
- lastBatchCallTask.add(callTask);
- batchCallTasks.add(lastBatchCallTask);
- }
+ pendingCallTasks.add(callTask);
}
protected MutationCallTaskBatch<ReqT, RespT> newBatch(long ver) {
@@ -81,25 +68,56 @@ public abstract class BatchMutationCall<ReqT, RespT>
implements IBatchCall<ReqT,
@Override
public void reset(boolean abort) {
if (abort) {
- batchCallTasks = new ArrayDeque<>();
+ pendingCallTasks = new ArrayDeque<>();
}
}
@Override
public CompletableFuture<Void> execute() {
- return execute(batchCallTasks);
+ return executeBatches();
}
- private CompletableFuture<Void> execute(Deque<MutationCallTaskBatch<ReqT,
RespT>> batchCallTasks) {
+ private CompletableFuture<Void> executeBatches() {
CompletableFuture<Void> chained =
CompletableFuture.completedFuture(null);
MutationCallTaskBatch<ReqT, RespT> batchCallTask;
- while ((batchCallTask = batchCallTasks.poll()) != null) {
+ while ((batchCallTask = buildNextBatch()) != null) {
MutationCallTaskBatch<ReqT, RespT> current = batchCallTask;
chained = chained.thenCompose(v -> fireSingleBatch(current));
}
return chained;
}
+ private MutationCallTaskBatch<ReqT, RespT> buildNextBatch() {
+ if (pendingCallTasks.isEmpty()) {
+ return null;
+ }
+ MutationCallTaskBatch<ReqT, RespT> batchCallTask = null;
+ long batchVer = -1;
+ int size = pendingCallTasks.size();
+ for (int i = 0; i < size; i++) {
+ ICallTask<ReqT, RespT, MutationCallBatcherKey> task =
pendingCallTasks.pollFirst();
+ if (task == null) {
+ break;
+ }
+ if (batchCallTask == null) {
+ batchVer = task.batcherKey().ver;
+ batchCallTask = newBatch(batchVer);
+ batchCallTask.add(task);
+ continue;
+ }
+ if (task.batcherKey().ver != batchVer) {
+ pendingCallTasks.addLast(task);
+ continue;
+ }
+ if (batchCallTask.isBatchable(task)) {
+ batchCallTask.add(task);
+ } else {
+ pendingCallTasks.addLast(task);
+ }
+ }
+ return batchCallTask;
+ }
+
private CompletableFuture<Void>
fireSingleBatch(MutationCallTaskBatch<ReqT, RespT> batchCallTask) {
RWCoProcInput input = makeBatch(batchCallTask.batchedTasks);
long reqId = System.nanoTime();
diff --git
a/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCallTest.java
b/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCallTest.java
index 72b1c3457..f13fb9114 100644
---
a/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCallTest.java
+++
b/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCallTest.java
@@ -30,6 +30,7 @@ import static org.testng.Assert.assertEquals;
import com.google.protobuf.ByteString;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
@@ -77,9 +78,11 @@ public class BatchMutationCallTest {
@Test
public void addToSameBatch() {
- when(storeClient.latestEffectiveRouter()).thenReturn(new
TreeMap<>(BoundaryUtil::compare) {{
- put(FULL_BOUNDARY, setting(id, "V1", 0));
- }});
+ when(storeClient.latestEffectiveRouter()).thenReturn(new
TreeMap<>(BoundaryUtil::compare) {
+ {
+ put(FULL_BOUNDARY, setting(id, "V1", 0));
+ }
+ });
when(storeClient.createMutationPipeline("V1")).thenReturn(mutationPipeline1);
when(mutationPipeline1.execute(any()))
@@ -103,7 +106,8 @@ public class BatchMutationCallTest {
String[] keys =
request.getRwCoProc().getRaw().toStringUtf8().split("_");
assertEquals(keys.length, Sets.newSet(keys).size());
}
- // the resp order preserved
+ Collections.sort(reqList);
+ Collections.sort(respList);
assertEquals(reqList, respList);
}
@@ -124,19 +128,24 @@ public class BatchMutationCallTest {
int req = ThreadLocalRandom.current().nextInt(1, 1001);
reqList.add(req);
if (req < 500) {
- when(storeClient.latestEffectiveRouter()).thenReturn(new
TreeMap<>(BoundaryUtil::compare) {{
- put(FULL_BOUNDARY, setting(id, "V1", 0));
- }});
+ when(storeClient.latestEffectiveRouter()).thenReturn(new
TreeMap<>(BoundaryUtil::compare) {
+ {
+ put(FULL_BOUNDARY, setting(id, "V1", 0));
+ }
+ });
} else {
- when(storeClient.latestEffectiveRouter()).thenReturn(new
TreeMap<>(BoundaryUtil::compare) {{
- put(FULL_BOUNDARY, setting(id, "V2", 0));
- }});
+ when(storeClient.latestEffectiveRouter()).thenReturn(new
TreeMap<>(BoundaryUtil::compare) {
+ {
+ put(FULL_BOUNDARY, setting(id, "V2", 0));
+ }
+ });
}
futures.add(scheduler.schedule(ByteString.copyFromUtf8(Integer.toString(req)))
.thenAccept((v) ->
respList.add(Integer.parseInt(v.toStringUtf8()))));
}
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
- // the resp order preserved
+ Collections.sort(reqList);
+ Collections.sort(respList);
assertEquals(reqList, respList);
}
@@ -166,4 +175,82 @@ public class BatchMutationCallTest {
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
assertEquals(execCount.get(), n);
}
+
+ @Test
+ public void reScanWhenHitNonBatchable() {
+ when(storeClient.latestEffectiveRouter()).thenReturn(new
TreeMap<>(BoundaryUtil::compare) {
+ {
+ put(FULL_BOUNDARY, setting(id, "V1", 0));
+ }
+ });
+
when(storeClient.createMutationPipeline("V1")).thenReturn(mutationPipeline1);
+ when(mutationPipeline1.execute(any()))
+ .thenReturn(CompletableFuture.supplyAsync(() ->
KVRangeRWReply.newBuilder().build()));
+
+ MutationCallScheduler<ByteString, ByteString, TestBatchMutationCall>
scheduler =
+ new MutationCallScheduler<>(NonBatchableBatchCall::new,
Duration.ofMillis(1000).toNanos(), storeClient) {
+ @Override
+ protected ByteString rangeKey(ByteString call) {
+ return call;
+ }
+ };
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ List<ByteString> reqs = List.of(
+ ByteString.copyFromUtf8("k1"),
+ ByteString.copyFromUtf8("k_dup"), // will mark non-batchable in
first batch
+ ByteString.copyFromUtf8("k2"));
+ List<ByteString> resps = new CopyOnWriteArrayList<>();
+ reqs.forEach(req ->
futures.add(scheduler.schedule(req).thenAccept(resps::add)));
+
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
+ List<String> reqSorted =
reqs.stream().map(ByteString::toStringUtf8).sorted().toList();
+ List<String> respSorted =
resps.stream().map(ByteString::toStringUtf8).sorted().toList();
+ assertEquals(reqSorted, respSorted);
+ }
+
+ @Test
+ public void mixDifferentVersions() {
+
when(storeClient.createMutationPipeline("V1")).thenReturn(mutationPipeline1);
+
when(storeClient.createMutationPipeline("V2")).thenReturn(mutationPipeline2);
+ when(mutationPipeline1.execute(any()))
+ .thenReturn(CompletableFuture.supplyAsync(() ->
KVRangeRWReply.newBuilder().build()));
+ when(mutationPipeline2.execute(any()))
+ .thenReturn(CompletableFuture.supplyAsync(() ->
KVRangeRWReply.newBuilder().build()));
+ TestMutationCallScheduler scheduler = new
TestMutationCallScheduler(storeClient, Duration.ofMillis(1000));
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ List<ByteString> reqs = new ArrayList<>();
+ List<ByteString> resps = new CopyOnWriteArrayList<>();
+ for (int i = 0; i < 20; i++) {
+ ByteString req = ByteString.copyFromUtf8("k" + i);
+ reqs.add(req);
+ if (i % 2 == 0) {
+ when(storeClient.latestEffectiveRouter()).thenReturn(new
TreeMap<>(BoundaryUtil::compare) {
+ {
+ put(FULL_BOUNDARY, setting(id, "V1", 0));
+ }
+ });
+ } else {
+ when(storeClient.latestEffectiveRouter()).thenReturn(new
TreeMap<>(BoundaryUtil::compare) {
+ {
+ put(FULL_BOUNDARY, setting(id, "V2", 1));
+ }
+ });
+ }
+ futures.add(scheduler.schedule(req).thenAccept(resps::add));
+ }
+
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
+ List<String> reqSorted =
reqs.stream().map(ByteString::toStringUtf8).sorted().toList();
+ List<String> respSorted =
resps.stream().map(ByteString::toStringUtf8).sorted().toList();
+ assertEquals(reqSorted, respSorted);
+ }
+
+ private static class NonBatchableBatchCall extends TestBatchMutationCall {
+ protected NonBatchableBatchCall(IMutationPipeline pipeline,
MutationCallBatcherKey batcherKey) {
+ super(pipeline, batcherKey);
+ }
+
+ @Override
+ protected NonBatchableFirstBatch newBatch(long ver) {
+ return new NonBatchableFirstBatch(ver);
+ }
+ }
}
diff --git
a/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/TestBatchMutationCall.java
b/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/TestBatchMutationCall.java
index 276634b09..cbb06fff6 100644
---
a/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/TestBatchMutationCall.java
+++
b/base-kv/base-kv-store-client/src/test/java/org/apache/bifromq/basekv/client/scheduler/TestBatchMutationCall.java
@@ -19,16 +19,16 @@
package org.apache.bifromq.basekv.client.scheduler;
-import org.apache.bifromq.basekv.client.IMutationPipeline;
-import org.apache.bifromq.basekv.store.proto.RWCoProcInput;
-import org.apache.bifromq.basekv.store.proto.RWCoProcOutput;
-import org.apache.bifromq.basescheduler.ICallTask;
import com.google.common.collect.Iterables;
import com.google.protobuf.ByteString;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Queue;
import java.util.Set;
+import org.apache.bifromq.basekv.client.IMutationPipeline;
+import org.apache.bifromq.basekv.store.proto.RWCoProcInput;
+import org.apache.bifromq.basekv.store.proto.RWCoProcOutput;
+import org.apache.bifromq.basescheduler.ICallTask;
public class TestBatchMutationCall extends BatchMutationCall<ByteString,
ByteString> {
protected TestBatchMutationCall(IMutationPipeline pipeline,
MutationCallBatcherKey batcherKey) {
@@ -86,4 +86,28 @@ public class TestBatchMutationCall extends
BatchMutationCall<ByteString, ByteStr
return !keys.contains(callTask.call());
}
}
+
+ static class NonBatchableFirstBatch extends
MutationCallTaskBatch<ByteString, ByteString> {
+ private final Set<ByteString> keys = new HashSet<>();
+ private boolean sawNonBatchable;
+
+ protected NonBatchableFirstBatch(long ver) {
+ super(ver);
+ }
+
+ @Override
+ protected void add(ICallTask<ByteString, ByteString,
MutationCallBatcherKey> callTask) {
+ super.add(callTask);
+ keys.add(callTask.call());
+ }
+
+ @Override
+ protected boolean isBatchable(ICallTask<ByteString, ByteString,
MutationCallBatcherKey> callTask) {
+ if (!sawNonBatchable &&
callTask.call().toStringUtf8().contains("dup")) {
+ sawNonBatchable = true;
+ return false;
+ }
+ return !keys.contains(callTask.call());
+ }
+ }
}