This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch multi_leader_memory_pendingBatch_control
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit e6aa8dea1f123a222cc6218b53d9a66c9adccb77
Author: OneSizeFitQuorum <[email protected]>
AuthorDate: Wed Nov 9 21:19:35 2022 +0800

    Control the size of each pending batch to be no larger than maxSizePerBatch
    
    Signed-off-by: OneSizeFitQuorum <[email protected]>
---
 .../common/request/IndexedConsensusRequest.java    | 26 ++++-----
 .../iotdb/consensus/config/MultiLeaderConfig.java  | 38 +++++++++---
 .../multileader/MultiLeaderServerImpl.java         |  2 +-
 .../multileader/client/DispatchLogHandler.java     |  2 +-
 .../logdispatcher/AccumulatingBatch.java           | 67 ++++++++++++++++++++++
 .../multileader/logdispatcher/LogDispatcher.java   | 59 ++++++++++---------
 .../multileader/logdispatcher/PendingBatch.java    | 23 ++++----
 .../multileader/logdispatcher/SyncStatus.java      |  4 +-
 .../multileader/logdispatcher/SyncStatusTest.java  | 15 +++--
 .../plan/node/write/InsertMultiTabletsNode.java    |  2 +-
 10 files changed, 167 insertions(+), 71 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index 6abca549b6..ca236da1c3 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -20,7 +20,7 @@
 package org.apache.iotdb.consensus.common.request;
 
 import java.nio.ByteBuffer;
-import java.util.LinkedList;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
@@ -31,8 +31,8 @@ public class IndexedConsensusRequest implements 
IConsensusRequest {
   private final long searchIndex;
 
   private final long syncIndex;
-  private List<IConsensusRequest> requests;
-  private List<ByteBuffer> serializedRequests;
+  private final List<IConsensusRequest> requests;
+  private final List<ByteBuffer> serializedRequests = new ArrayList<>();
   private long serializedSize = 0;
 
   public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> 
requests) {
@@ -41,15 +41,6 @@ public class IndexedConsensusRequest implements 
IConsensusRequest {
     this.syncIndex = -1L;
   }
 
-  public IndexedConsensusRequest(List<ByteBuffer> serializedRequests, long 
searchIndex) {
-    this.searchIndex = searchIndex;
-    this.serializedRequests = serializedRequests;
-    for (ByteBuffer byteBuffer : serializedRequests) {
-      serializedSize += byteBuffer.capacity();
-    }
-    this.syncIndex = -1L;
-  }
-
   public IndexedConsensusRequest(
       long searchIndex, long syncIndex, List<IConsensusRequest> requests) {
     this.searchIndex = searchIndex;
@@ -70,10 +61,13 @@ public class IndexedConsensusRequest implements 
IConsensusRequest {
     return serializedRequests;
   }
 
-  public List<ByteBuffer> buildSerializedRequests() {
-    List<ByteBuffer> result = new LinkedList<>();
-    this.requests.forEach(r -> result.add(r.serializeToByteBuffer()));
-    return result;
+  public void buildSerializedRequests() {
+    this.requests.forEach(
+        r -> {
+          ByteBuffer buffer = r.serializeToByteBuffer();
+          this.serializedRequests.add(buffer);
+          this.serializedSize += buffer.capacity();
+        });
   }
 
   public long getSerializedSize() {
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index cd0d58c4ae..8c7abec0ce 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -196,7 +196,9 @@ public class MultiLeaderConfig {
   }
 
   public static class Replication {
-    private final int maxRequestPerBatch;
+    private final int maxRequestNumPerBatch;
+
+    private final int maxSizePerBatch;
     private final int maxPendingBatch;
     private final int maxWaitingTimeForAccumulatingBatchInMs;
     private final long basicRetryWaitTimeMs;
@@ -207,7 +209,8 @@ public class MultiLeaderConfig {
     private final Long allocateMemoryForConsensus;
 
     private Replication(
-        int maxRequestPerBatch,
+        int maxRequestNumPerBatch,
+        int maxSizePerBatch,
         int maxPendingBatch,
         int maxWaitingTimeForAccumulatingBatchInMs,
         long basicRetryWaitTimeMs,
@@ -216,7 +219,8 @@ public class MultiLeaderConfig {
         long throttleTimeOutMs,
         long checkpointGap,
         long allocateMemoryForConsensus) {
-      this.maxRequestPerBatch = maxRequestPerBatch;
+      this.maxRequestNumPerBatch = maxRequestNumPerBatch;
+      this.maxSizePerBatch = maxSizePerBatch;
       this.maxPendingBatch = maxPendingBatch;
       this.maxWaitingTimeForAccumulatingBatchInMs = 
maxWaitingTimeForAccumulatingBatchInMs;
       this.basicRetryWaitTimeMs = basicRetryWaitTimeMs;
@@ -227,8 +231,12 @@ public class MultiLeaderConfig {
       this.allocateMemoryForConsensus = allocateMemoryForConsensus;
     }
 
-    public int getMaxRequestPerBatch() {
-      return maxRequestPerBatch;
+    public int getMaxRequestNumPerBatch() {
+      return maxRequestNumPerBatch;
+    }
+
+    public int getMaxSizePerBatch() {
+      return maxSizePerBatch;
     }
 
     public int getMaxPendingBatch() {
@@ -268,7 +276,8 @@ public class MultiLeaderConfig {
     }
 
     public static class Builder {
-      private int maxRequestPerBatch = 30;
+      private int maxRequestNumPerBatch = 30;
+      private int maxSizePerBatch = 4 * 1024 * 1024;
       // (IMPORTANT) Value of this variable should be the same with 
MAX_REQUEST_CACHE_SIZE
       // in DataRegionStateMachine
       private int maxPendingBatch = 5;
@@ -280,8 +289,13 @@ public class MultiLeaderConfig {
       private long checkpointGap = 500;
       private long allocateMemoryForConsensus;
 
-      public Replication.Builder setMaxRequestPerBatch(int maxRequestPerBatch) 
{
-        this.maxRequestPerBatch = maxRequestPerBatch;
+      public Replication.Builder setMaxRequestNumPerBatch(int 
maxRequestNumPerBatch) {
+        this.maxRequestNumPerBatch = maxRequestNumPerBatch;
+        return this;
+      }
+
+      public Builder setMaxSizePerBatch(int maxSizePerBatch) {
+        this.maxSizePerBatch = maxSizePerBatch;
         return this;
       }
 
@@ -316,6 +330,11 @@ public class MultiLeaderConfig {
         return this;
       }
 
+      public Builder setCheckpointGap(long checkpointGap) {
+        this.checkpointGap = checkpointGap;
+        return this;
+      }
+
       public Replication.Builder setAllocateMemoryForConsensus(long 
allocateMemoryForConsensus) {
         this.allocateMemoryForConsensus = allocateMemoryForConsensus;
         return this;
@@ -323,7 +342,8 @@ public class MultiLeaderConfig {
 
       public Replication build() {
         return new Replication(
-            maxRequestPerBatch,
+            maxRequestNumPerBatch,
+            maxSizePerBatch,
             maxPendingBatch,
             maxWaitingTimeForAccumulatingBatchInMs,
             basicRetryWaitTimeMs,
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index e965973459..46f22472aa 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -99,7 +99,7 @@ public class MultiLeaderServerImpl {
   private final LogDispatcher logDispatcher;
   private final MultiLeaderConfig config;
   private final ConsensusReqReader reader;
-  private boolean active;
+  private volatile boolean active;
   private String latestSnapshotId;
   private final IClientManager<TEndPoint, SyncMultiLeaderServiceClient> 
syncClientManager;
   private final MultiLeaderServerMetrics metrics;
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
index 705aa980e9..5d733f9cc3 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java
@@ -74,7 +74,7 @@ public class DispatchLogHandler implements 
AsyncMethodCallback<TSyncLogRes> {
             "syncLogTimePerRequest",
             Tag.REGION.toString(),
             this.thread.getPeer().getGroupId().toString())
-        .update((System.currentTimeMillis() - createTime) / 
batch.getBatches().size());
+        .update((System.currentTimeMillis() - createTime) / 
batch.getBatch().getBatchList().size());
   }
 
   private boolean needRetry(int statusCode) {
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/AccumulatingBatch.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/AccumulatingBatch.java
new file mode 100644
index 0000000000..afc0537512
--- /dev/null
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/AccumulatingBatch.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.multileader.logdispatcher;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.config.MultiLeaderConfig;
+import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class AccumulatingBatch {
+
+  private final MultiLeaderConfig config;
+  private final List<TLogBatch> batchList;
+  private long serializedSize;
+
+  public AccumulatingBatch(MultiLeaderConfig config) {
+    this.config = config;
+    this.batchList = new ArrayList<>();
+    this.serializedSize = 0;
+  }
+
+  @TestOnly
+  public AccumulatingBatch(List<TLogBatch> batches) {
+    this.config = MultiLeaderConfig.newBuilder().build();
+    this.batchList = batches;
+    this.serializedSize =
+        batches.stream().mapToLong(x -> x.getData() == null ? 0 : 
x.getData().length).sum();
+  }
+
+  public void addTLogBatch(TLogBatch batch) {
+    batchList.add(batch);
+    // TODO Maybe we need to add in additional fields for more accurate 
calculations
+    serializedSize += batch.getData().length;
+  }
+
+  public boolean canAccumulate() {
+    return batchList.size() < 
config.getReplication().getMaxRequestNumPerBatch()
+        && serializedSize < config.getReplication().getMaxSizePerBatch();
+  }
+
+  public List<TLogBatch> getBatchList() {
+    return batchList;
+  }
+
+  public long getSerializedSize() {
+    return serializedSize;
+  }
+}
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 606731e6a7..198284dc78 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -59,6 +58,7 @@ import java.util.stream.Collectors;
 
 /** Manage all asynchronous replication threads and corresponding async 
clients */
 public class LogDispatcher {
+
   private static final Logger logger = 
LoggerFactory.getLogger(LogDispatcher.class);
   private static final long DEFAULT_INITIAL_SYNC_INDEX = 0L;
   private final MultiLeaderServerImpl impl;
@@ -156,7 +156,7 @@ public class LogDispatcher {
   }
 
   public void offer(IndexedConsensusRequest request) {
-    List<ByteBuffer> serializedRequests = request.buildSerializedRequests();
+    request.buildSerializedRequests();
     // we put the serialization step outside the synchronized block because it 
is stateless and
     // time-consuming
     synchronized (this) {
@@ -167,8 +167,7 @@ public class LogDispatcher {
                 impl.getThisNode().getGroupId(),
                 thread.getPeer().getEndpoint().getIp(),
                 thread.getPendingRequestSize());
-            if (!thread.offer(
-                new IndexedConsensusRequest(serializedRequests, 
request.getSearchIndex()))) {
+            if (!thread.offer(request)) {
               logger.debug(
                   "{}: Log queue of {} is full, ignore the log to this node, 
searchIndex: {}",
                   impl.getThisNode().getGroupId(),
@@ -180,6 +179,7 @@ public class LogDispatcher {
   }
 
   public class LogDispatcherThread implements Runnable {
+
     private static final long PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC = 10;
     private static final long START_INDEX = 1;
     private final MultiLeaderConfig config;
@@ -198,7 +198,7 @@ public class LogDispatcher {
         MultiLeaderMemoryManager.getInstance();
     private volatile boolean stopped = false;
 
-    private ConsensusReqReader.ReqIterator walEntryIterator;
+    private final ConsensusReqReader.ReqIterator walEntryIterator;
 
     private final LogDispatcherThreadMetrics metrics;
 
@@ -300,7 +300,7 @@ public class LogDispatcher {
             if (request != null) {
               bufferedRequest.add(request);
               // If write pressure is low, we simply sleep a little to reduce 
the number of RPC
-              if (pendingRequest.size() <= 
config.getReplication().getMaxRequestPerBatch()) {
+              if (pendingRequest.size() <= 
config.getReplication().getMaxRequestNumPerBatch()) {
                 
Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs());
               }
             }
@@ -315,7 +315,9 @@ public class LogDispatcher {
                   "constructBatch",
                   Tag.REGION.toString(),
                   peer.getGroupId().toString())
-              .update((System.currentTimeMillis() - startTime) / 
batch.getBatches().size());
+              .update(
+                  (System.currentTimeMillis() - startTime)
+                      / batch.getBatch().getBatchList().size());
           // we may block here if the synchronization pipeline is full
           syncStatus.addNextBatch(batch);
           // sends batch asynchronously and migrates the retry logic into the 
callback handler
@@ -342,12 +344,10 @@ public class LogDispatcher {
     }
 
     public PendingBatch getBatch() {
-      PendingBatch batch;
-      List<TLogBatch> logBatches = new ArrayList<>();
       long startIndex = syncStatus.getNextSendingIndex();
       long maxIndexWhenBufferedRequestEmpty = startIndex;
       logger.debug("[GetBatch] startIndex: {}", startIndex);
-      if (bufferedRequest.size() <= 
config.getReplication().getMaxRequestPerBatch()) {
+      if (bufferedRequest.size() <= 
config.getReplication().getMaxRequestNumPerBatch()) {
         // Use drainTo instead of poll to reduce lock overhead
         logger.debug(
             "{} : pendingRequest Size: {}, bufferedRequest size: {}",
@@ -357,7 +357,7 @@ public class LogDispatcher {
         synchronized (impl.getIndexObject()) {
           pendingRequest.drainTo(
               bufferedRequest,
-              config.getReplication().getMaxRequestPerBatch() - 
bufferedRequest.size());
+              config.getReplication().getMaxRequestNumPerBatch() - 
bufferedRequest.size());
           maxIndexWhenBufferedRequestEmpty = impl.getIndex() + 1;
         }
         // remove all request that searchIndex < startIndex
@@ -372,6 +372,8 @@ public class LogDispatcher {
           }
         }
       }
+      PendingBatch batch;
+      AccumulatingBatch logBatches = new AccumulatingBatch(config);
       // This condition will be executed in several scenarios:
       // 1. restart
       // 2. The getBatch() is invoked immediately at the moment the 
PendingRequests are consumed
@@ -386,25 +388,32 @@ public class LogDispatcher {
         // Notice that prev searchIndex >= startIndex
         Iterator<IndexedConsensusRequest> iterator = 
bufferedRequest.iterator();
         IndexedConsensusRequest prev = iterator.next();
+
         // Prevents gap between logs. For example, some requests are not 
written into the queue when
         // the queue is full. In this case, requests need to be loaded from 
the WAL
         constructBatchFromWAL(startIndex, prev.getSearchIndex(), logBatches);
-        if (logBatches.size() == 
config.getReplication().getMaxRequestPerBatch()) {
+        if (!logBatches.canAccumulate()) {
           batch = new PendingBatch(logBatches);
           logger.debug("{} : accumulated a {} from wal", 
impl.getThisNode().getGroupId(), batch);
           return batch;
         }
+
         constructBatchIndexedFromConsensusRequest(prev, logBatches);
         iterator.remove();
         releaseReservedMemory(prev);
-        while (iterator.hasNext()
-            && logBatches.size() <= 
config.getReplication().getMaxRequestPerBatch()) {
+        if (!logBatches.canAccumulate()) {
+          batch = new PendingBatch(logBatches);
+          logger.debug("{} : accumulated a {} from queue", 
impl.getThisNode().getGroupId(), batch);
+          return batch;
+        }
+
+        while (iterator.hasNext() && logBatches.canAccumulate()) {
           IndexedConsensusRequest current = iterator.next();
           // Prevents gap between logs. For example, some logs are not written 
into the queue when
           // the queue is full. In this case, requests need to be loaded from 
the WAL
           if (current.getSearchIndex() != prev.getSearchIndex() + 1) {
             constructBatchFromWAL(prev.getSearchIndex(), 
current.getSearchIndex(), logBatches);
-            if (logBatches.size() == 
config.getReplication().getMaxRequestPerBatch()) {
+            if (!logBatches.canAccumulate()) {
               batch = new PendingBatch(logBatches);
               logger.debug(
                   "gap {} : accumulated a {} from queue and wal when gap",
@@ -433,7 +442,9 @@ public class LogDispatcher {
         AsyncMultiLeaderServiceClient client = 
clientManager.borrowClient(peer.getEndpoint());
         TSyncLogReq req =
             new TSyncLogReq(
-                selfPeerId, peer.getGroupId().convertToTConsensusGroupId(), 
batch.getBatches());
+                selfPeerId,
+                peer.getGroupId().convertToTConsensusGroupId(),
+                batch.getBatch().getBatchList());
         logger.debug(
             "Send Batch[startIndex:{}, endIndex:{}] to ConsensusGroup:{}",
             batch.getStartIndex(),
@@ -450,8 +461,8 @@ public class LogDispatcher {
       return syncStatus;
     }
 
-    private long constructBatchFromWAL(
-        long currentIndex, long maxIndex, List<TLogBatch> logBatches) {
+    private void constructBatchFromWAL(
+        long currentIndex, long maxIndex, AccumulatingBatch logBatches) {
       logger.debug(
           String.format(
               "DataRegion[%s]->%s: currentIndex: %d, maxIndex: %d",
@@ -460,8 +471,7 @@ public class LogDispatcher {
       long targetIndex = currentIndex;
       // Even if there is no WAL files, these code won't produce error.
       walEntryIterator.skipTo(targetIndex);
-      while (targetIndex < maxIndex
-          && logBatches.size() < 
config.getReplication().getMaxRequestPerBatch()) {
+      while (targetIndex < maxIndex && logBatches.canAccumulate()) {
         logger.debug("construct from WAL for one Entry, index : {}", 
targetIndex);
         try {
           walEntryIterator.waitForNextReady();
@@ -490,19 +500,16 @@ public class LogDispatcher {
         targetIndex = data.getSearchIndex() + 1;
         // construct request from wal
         for (IConsensusRequest innerRequest : data.getRequests()) {
-          logBatches.add(
+          logBatches.addTLogBatch(
               new TLogBatch(innerRequest.serializeToByteBuffer(), 
data.getSearchIndex(), true));
         }
       }
-      return logBatches.size() > 0
-          ? logBatches.get(logBatches.size() - 1).searchIndex
-          : currentIndex;
     }
 
     private void constructBatchIndexedFromConsensusRequest(
-        IndexedConsensusRequest request, List<TLogBatch> logBatches) {
+        IndexedConsensusRequest request, AccumulatingBatch logBatches) {
       for (ByteBuffer innerRequest : request.getSerializedRequests()) {
-        logBatches.add(new TLogBatch(innerRequest, request.getSearchIndex(), 
false));
+        logBatches.addTLogBatch(new TLogBatch(innerRequest, 
request.getSearchIndex(), false));
       }
     }
   }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
index a6ee43392f..71a7f72059 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java
@@ -27,19 +27,20 @@ public class PendingBatch {
 
   private final long startIndex;
   private final long endIndex;
-  private final List<TLogBatch> batches;
+  private final AccumulatingBatch batch;
   // indicates whether this batch has been successfully synchronized to 
another node
   private boolean synced;
 
-  public PendingBatch(List<TLogBatch> batches) {
-    if (!batches.isEmpty()) {
-      this.startIndex = batches.get(0).getSearchIndex();
-      this.endIndex = batches.get(batches.size() - 1).getSearchIndex();
+  public PendingBatch(AccumulatingBatch batch) {
+    List<TLogBatch> batchList = batch.getBatchList();
+    if (!batchList.isEmpty()) {
+      this.startIndex = batchList.get(0).getSearchIndex();
+      this.endIndex = batchList.get(batchList.size() - 1).getSearchIndex();
     } else {
       this.startIndex = 0;
       this.endIndex = 0;
     }
-    this.batches = batches;
+    this.batch = batch;
     this.synced = false;
   }
 
@@ -51,8 +52,8 @@ public class PendingBatch {
     return endIndex;
   }
 
-  public List<TLogBatch> getBatches() {
-    return batches;
+  public AccumulatingBatch getBatch() {
+    return batch;
   }
 
   public boolean isSynced() {
@@ -64,7 +65,7 @@ public class PendingBatch {
   }
 
   public boolean isEmpty() {
-    return batches.isEmpty();
+    return batch.getBatchList().isEmpty();
   }
 
   @Override
@@ -75,7 +76,9 @@ public class PendingBatch {
         + ", endIndex="
         + endIndex
         + ", size="
-        + batches.size()
+        + batch.getBatchList().size()
+        + ", serializedSize="
+        + batch.getSerializedSize()
         + '}';
   }
 }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
index e9901d931a..3549c1158b 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java
@@ -30,7 +30,7 @@ public class SyncStatus {
 
   private final MultiLeaderConfig config;
   private final IndexController controller;
-  private final List<PendingBatch> pendingBatches = new LinkedList<>();
+  private final LinkedList<PendingBatch> pendingBatches = new LinkedList<>();
 
   public SyncStatus(IndexController controller, MultiLeaderConfig config) {
     this.controller = controller;
@@ -80,7 +80,7 @@ public class SyncStatus {
       return 1
           + (pendingBatches.isEmpty()
               ? controller.getCurrentIndex()
-              : pendingBatches.get(pendingBatches.size() - 1).getEndIndex());
+              : pendingBatches.getLast().getEndIndex());
     }
   }
 
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
index c4363428fb..d9b591d1b6 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java
@@ -66,7 +66,8 @@ public class SyncStatusTest {
     for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       TLogBatch logBatch = new TLogBatch();
       logBatch.setSearchIndex(i);
-      PendingBatch batch = new 
PendingBatch(Collections.singletonList(logBatch));
+      PendingBatch batch =
+          new PendingBatch(new 
AccumulatingBatch(Collections.singletonList(logBatch)));
       batchList.add(batch);
       status.addNextBatch(batch);
     }
@@ -95,7 +96,8 @@ public class SyncStatusTest {
     for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       TLogBatch logBatch = new TLogBatch();
       logBatch.setSearchIndex(i);
-      PendingBatch batch = new 
PendingBatch(Collections.singletonList(logBatch));
+      PendingBatch batch =
+          new PendingBatch(new 
AccumulatingBatch(Collections.singletonList(logBatch)));
       batchList.add(batch);
       status.addNextBatch(batch);
     }
@@ -130,7 +132,8 @@ public class SyncStatusTest {
     for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       TLogBatch logBatch = new TLogBatch();
       logBatch.setSearchIndex(i);
-      PendingBatch batch = new 
PendingBatch(Collections.singletonList(logBatch));
+      PendingBatch batch =
+          new PendingBatch(new 
AccumulatingBatch(Collections.singletonList(logBatch)));
       batchList.add(batch);
       status.addNextBatch(batch);
     }
@@ -176,7 +179,8 @@ public class SyncStatusTest {
     for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) {
       TLogBatch logBatch = new TLogBatch();
       logBatch.setSearchIndex(i);
-      PendingBatch batch = new 
PendingBatch(Collections.singletonList(logBatch));
+      PendingBatch batch =
+          new PendingBatch(new 
AccumulatingBatch(Collections.singletonList(logBatch)));
       batchList.add(batch);
       status.addNextBatch(batch);
     }
@@ -195,7 +199,8 @@ public class SyncStatusTest {
             () -> {
               TLogBatch logBatch = new TLogBatch();
               
logBatch.setSearchIndex(config.getReplication().getMaxPendingBatch());
-              PendingBatch batch = new 
PendingBatch(Collections.singletonList(logBatch));
+              PendingBatch batch =
+                  new PendingBatch(new 
AccumulatingBatch(Collections.singletonList(logBatch)));
               batchList.add(batch);
               try {
                 status.addNextBatch(batch);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index be8e90b914..6ce2eeff80 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -92,7 +92,7 @@ public class InsertMultiTabletsNode extends InsertNode 
implements BatchInsertNod
   List<InsertTabletNode> insertTabletNodeList;
 
   /** record the result of insert tablets */
-  private Map<Integer, TSStatus> results = new HashMap<>();
+  private final Map<Integer, TSStatus> results = new HashMap<>();
 
   public InsertMultiTabletsNode(PlanNodeId id) {
     super(id);

Reply via email to