popduke commented on code in PR #230:
URL: https://github.com/apache/bifromq/pull/230#discussion_r2887063488


##########
base-kv/base-kv-store-client/src/main/java/org/apache/bifromq/basekv/client/scheduler/BatchMutationCall.java:
##########
@@ -81,25 +68,56 @@ protected abstract void handleOutput(Queue<ICallTask<ReqT, 
RespT, MutationCallBa
     @Override
     public void reset(boolean abort) {
         if (abort) {
-            batchCallTasks = new ArrayDeque<>();
+            pendingCallTasks = new ArrayDeque<>();
         }
     }
 
     @Override
     public CompletableFuture<Void> execute() {
-        return execute(batchCallTasks);
+        return executeBatches();
     }
 
-    private CompletableFuture<Void> execute(Deque<MutationCallTaskBatch<ReqT, 
RespT>> batchCallTasks) {
+    private CompletableFuture<Void> executeBatches() {
         CompletableFuture<Void> chained = 
CompletableFuture.completedFuture(null);
         MutationCallTaskBatch<ReqT, RespT> batchCallTask;
-        while ((batchCallTask = batchCallTasks.poll()) != null) {
+        while ((batchCallTask = buildNextBatch()) != null) {
             MutationCallTaskBatch<ReqT, RespT> current = batchCallTask;
             chained = chained.thenCompose(v -> fireSingleBatch(current));
         }
         return chained;
     }
 
+    private MutationCallTaskBatch<ReqT, RespT> buildNextBatch() {
+        if (pendingCallTasks.isEmpty()) {
+            return null;
+        }
+        MutationCallTaskBatch<ReqT, RespT> batchCallTask = null;
+        long batchVer = -1;
+        int size = pendingCallTasks.size();
+        for (int i = 0; i < size; i++) {
+            ICallTask<ReqT, RespT, MutationCallBatcherKey> task = 
pendingCallTasks.pollFirst();
+            if (task == null) {
+                break;
+            }
+            if (batchCallTask == null) {
+                batchVer = task.batcherKey().ver;
+                batchCallTask = newBatch(batchVer);
+                batchCallTask.add(task);
+                continue;
+            }
+            if (task.batcherKey().ver != batchVer) {
+                pendingCallTasks.addLast(task);

Review Comment:
   > If there is a new another one add a new req via add(ICallTask<ReqT, RespT, 
MutationCallBatcherKey> callTask) method and you put the previous one also in 
the tail. Will it introduce ordering issues?
   
   Then the two requests are concurrent in nature from different publishers



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to