This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch multi_leader_memory_pendingBatch_control in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e6aa8dea1f123a222cc6218b53d9a66c9adccb77 Author: OneSizeFitQuorum <[email protected]> AuthorDate: Wed Nov 9 21:19:35 2022 +0800 Control the size of each pending batch to be no larger than maxSizePerBatch Signed-off-by: OneSizeFitQuorum <[email protected]> --- .../common/request/IndexedConsensusRequest.java | 26 ++++----- .../iotdb/consensus/config/MultiLeaderConfig.java | 38 +++++++++--- .../multileader/MultiLeaderServerImpl.java | 2 +- .../multileader/client/DispatchLogHandler.java | 2 +- .../logdispatcher/AccumulatingBatch.java | 67 ++++++++++++++++++++++ .../multileader/logdispatcher/LogDispatcher.java | 59 ++++++++++--------- .../multileader/logdispatcher/PendingBatch.java | 23 ++++---- .../multileader/logdispatcher/SyncStatus.java | 4 +- .../multileader/logdispatcher/SyncStatusTest.java | 15 +++-- .../plan/node/write/InsertMultiTabletsNode.java | 2 +- 10 files changed, 167 insertions(+), 71 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java index 6abca549b6..ca236da1c3 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java @@ -20,7 +20,7 @@ package org.apache.iotdb.consensus.common.request; import java.nio.ByteBuffer; -import java.util.LinkedList; +import java.util.ArrayList; import java.util.List; import java.util.Objects; @@ -31,8 +31,8 @@ public class IndexedConsensusRequest implements IConsensusRequest { private final long searchIndex; private final long syncIndex; - private List<IConsensusRequest> requests; - private List<ByteBuffer> serializedRequests; + private final List<IConsensusRequest> requests; + private final List<ByteBuffer> serializedRequests = new ArrayList<>(); private long serializedSize = 0; public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest> requests) { @@ -41,15 +41,6 @@ public class IndexedConsensusRequest implements IConsensusRequest { this.syncIndex = -1L; } - public IndexedConsensusRequest(List<ByteBuffer> serializedRequests, long searchIndex) { - this.searchIndex = searchIndex; - this.serializedRequests = serializedRequests; - for (ByteBuffer byteBuffer : serializedRequests) { - serializedSize += byteBuffer.capacity(); - } - this.syncIndex = -1L; - } - public IndexedConsensusRequest( long searchIndex, long syncIndex, List<IConsensusRequest> requests) { this.searchIndex = searchIndex; @@ -70,10 +61,13 @@ public class IndexedConsensusRequest implements IConsensusRequest { return serializedRequests; } - public List<ByteBuffer> buildSerializedRequests() { - List<ByteBuffer> result = new LinkedList<>(); - this.requests.forEach(r -> result.add(r.serializeToByteBuffer())); - return result; + public void buildSerializedRequests() { + this.requests.forEach( + r -> { + ByteBuffer buffer = r.serializeToByteBuffer(); + this.serializedRequests.add(buffer); + this.serializedSize += buffer.capacity(); + }); } public long getSerializedSize() { diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java index cd0d58c4ae..8c7abec0ce 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java @@ -196,7 +196,9 @@ public class MultiLeaderConfig { } public static class Replication { - private final int maxRequestPerBatch; + private final int maxRequestNumPerBatch; + + private final int maxSizePerBatch; private final int maxPendingBatch; private final int maxWaitingTimeForAccumulatingBatchInMs; private final long basicRetryWaitTimeMs; @@ -207,7 +209,8 @@ public class MultiLeaderConfig { private final Long allocateMemoryForConsensus; private Replication( - int maxRequestPerBatch, + int maxRequestNumPerBatch, + int maxSizePerBatch, int maxPendingBatch, int maxWaitingTimeForAccumulatingBatchInMs, long basicRetryWaitTimeMs, @@ -216,7 +219,8 @@ public class MultiLeaderConfig { long throttleTimeOutMs, long checkpointGap, long allocateMemoryForConsensus) { - this.maxRequestPerBatch = maxRequestPerBatch; + this.maxRequestNumPerBatch = maxRequestNumPerBatch; + this.maxSizePerBatch = maxSizePerBatch; this.maxPendingBatch = maxPendingBatch; this.maxWaitingTimeForAccumulatingBatchInMs = maxWaitingTimeForAccumulatingBatchInMs; this.basicRetryWaitTimeMs = basicRetryWaitTimeMs; @@ -227,8 +231,12 @@ public class MultiLeaderConfig { this.allocateMemoryForConsensus = allocateMemoryForConsensus; } - public int getMaxRequestPerBatch() { - return maxRequestPerBatch; + public int getMaxRequestNumPerBatch() { + return maxRequestNumPerBatch; + } + + public int getMaxSizePerBatch() { + return maxSizePerBatch; } public int getMaxPendingBatch() { @@ -268,7 +276,8 @@ public class MultiLeaderConfig { } public static class Builder { - private int maxRequestPerBatch = 30; + private int maxRequestNumPerBatch = 30; + private int maxSizePerBatch = 4 * 1024 * 1024; // (IMPORTANT) Value of this variable should be the same with MAX_REQUEST_CACHE_SIZE // in DataRegionStateMachine private int maxPendingBatch = 5; @@ -280,8 +289,13 @@ public class MultiLeaderConfig { private long checkpointGap = 500; private long allocateMemoryForConsensus; - public Replication.Builder setMaxRequestPerBatch(int maxRequestPerBatch) { - this.maxRequestPerBatch = maxRequestPerBatch; + public Replication.Builder setMaxRequestNumPerBatch(int maxRequestNumPerBatch) { + this.maxRequestNumPerBatch = maxRequestNumPerBatch; + return this; + } + + public Builder setMaxSizePerBatch(int maxSizePerBatch) { + this.maxSizePerBatch = maxSizePerBatch; return this; } @@ -316,6 +330,11 @@ public class MultiLeaderConfig { return this; } + public Builder setCheckpointGap(long checkpointGap) { + this.checkpointGap = checkpointGap; + return this; + } + public Replication.Builder setAllocateMemoryForConsensus(long allocateMemoryForConsensus) { this.allocateMemoryForConsensus = allocateMemoryForConsensus; return this; @@ -323,7 +342,8 @@ public class MultiLeaderConfig { public Replication build() { return new Replication( - maxRequestPerBatch, + maxRequestNumPerBatch, + maxSizePerBatch, maxPendingBatch, maxWaitingTimeForAccumulatingBatchInMs, basicRetryWaitTimeMs, diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java index e965973459..46f22472aa 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java @@ -99,7 +99,7 @@ public class MultiLeaderServerImpl { private final LogDispatcher logDispatcher; private final MultiLeaderConfig config; private final ConsensusReqReader reader; - private boolean active; + private volatile boolean active; private String latestSnapshotId; private final IClientManager<TEndPoint, SyncMultiLeaderServiceClient> syncClientManager; private final MultiLeaderServerMetrics metrics; diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java index 705aa980e9..5d733f9cc3 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/client/DispatchLogHandler.java @@ -74,7 +74,7 @@ public class DispatchLogHandler implements AsyncMethodCallback<TSyncLogRes> { "syncLogTimePerRequest", Tag.REGION.toString(), this.thread.getPeer().getGroupId().toString()) - .update((System.currentTimeMillis() - createTime) / batch.getBatches().size()); + .update((System.currentTimeMillis() - createTime) / batch.getBatch().getBatchList().size()); } private boolean needRetry(int statusCode) { diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/AccumulatingBatch.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/AccumulatingBatch.java new file mode 100644 index 0000000000..afc0537512 --- /dev/null +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/AccumulatingBatch.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.consensus.multileader.logdispatcher; + +import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.consensus.config.MultiLeaderConfig; +import org.apache.iotdb.consensus.multileader.thrift.TLogBatch; + +import java.util.ArrayList; +import java.util.List; + +public class AccumulatingBatch { + + private final MultiLeaderConfig config; + private final List<TLogBatch> batchList; + private long serializedSize; + + public AccumulatingBatch(MultiLeaderConfig config) { + this.config = config; + this.batchList = new ArrayList<>(); + this.serializedSize = 0; + } + + @TestOnly + public AccumulatingBatch(List<TLogBatch> batches) { + this.config = MultiLeaderConfig.newBuilder().build(); + this.batchList = batches; + this.serializedSize = + batches.stream().mapToLong(x -> x.getData() == null ? 0 : x.getData().length).sum(); + } + + public void addTLogBatch(TLogBatch batch) { + batchList.add(batch); + // TODO Maybe we need to add in additional fields for more accurate calculations + serializedSize += batch.getData().length; + } + + public boolean canAccumulate() { + return batchList.size() < config.getReplication().getMaxRequestNumPerBatch() + && serializedSize < config.getReplication().getMaxSizePerBatch(); + } + + public List<TLogBatch> getBatchList() { + return batchList; + } + + public long getSerializedSize() { + return serializedSize; + } +} diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java index 606731e6a7..198284dc78 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java @@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -59,6 +58,7 @@ import java.util.stream.Collectors; /** Manage all asynchronous replication threads and corresponding async clients */ public class LogDispatcher { + private static final Logger logger = LoggerFactory.getLogger(LogDispatcher.class); private static final long DEFAULT_INITIAL_SYNC_INDEX = 0L; private final MultiLeaderServerImpl impl; @@ -156,7 +156,7 @@ public class LogDispatcher { } public void offer(IndexedConsensusRequest request) { - List<ByteBuffer> serializedRequests = request.buildSerializedRequests(); + request.buildSerializedRequests(); // we put the serialization step outside the synchronized block because it is stateless and // time-consuming synchronized (this) { @@ -167,8 +167,7 @@ public class LogDispatcher { impl.getThisNode().getGroupId(), thread.getPeer().getEndpoint().getIp(), thread.getPendingRequestSize()); - if (!thread.offer( - new IndexedConsensusRequest(serializedRequests, request.getSearchIndex()))) { + if (!thread.offer(request)) { logger.debug( "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}", impl.getThisNode().getGroupId(), @@ -180,6 +179,7 @@ public class LogDispatcher { } public class LogDispatcherThread implements Runnable { + private static final long PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC = 10; private static final long START_INDEX = 1; private final MultiLeaderConfig config; @@ -198,7 +198,7 @@ public class LogDispatcher { MultiLeaderMemoryManager.getInstance(); private volatile boolean stopped = false; - private ConsensusReqReader.ReqIterator walEntryIterator; + private final ConsensusReqReader.ReqIterator walEntryIterator; private final LogDispatcherThreadMetrics metrics; @@ -300,7 +300,7 @@ public class LogDispatcher { if (request != null) { bufferedRequest.add(request); // If write pressure is low, we simply sleep a little to reduce the number of RPC - if (pendingRequest.size() <= config.getReplication().getMaxRequestPerBatch()) { + if (pendingRequest.size() <= config.getReplication().getMaxRequestNumPerBatch()) { Thread.sleep(config.getReplication().getMaxWaitingTimeForAccumulatingBatchInMs()); } } @@ -315,7 +315,9 @@ public class LogDispatcher { "constructBatch", Tag.REGION.toString(), peer.getGroupId().toString()) - .update((System.currentTimeMillis() - startTime) / batch.getBatches().size()); + .update( + (System.currentTimeMillis() - startTime) + / batch.getBatch().getBatchList().size()); // we may block here if the synchronization pipeline is full syncStatus.addNextBatch(batch); // sends batch asynchronously and migrates the retry logic into the callback handler @@ -342,12 +344,10 @@ public class LogDispatcher { } public PendingBatch getBatch() { - PendingBatch batch; - List<TLogBatch> logBatches = new ArrayList<>(); long startIndex = syncStatus.getNextSendingIndex(); long maxIndexWhenBufferedRequestEmpty = startIndex; logger.debug("[GetBatch] startIndex: {}", startIndex); - if (bufferedRequest.size() <= config.getReplication().getMaxRequestPerBatch()) { + if (bufferedRequest.size() <= config.getReplication().getMaxRequestNumPerBatch()) { // Use drainTo instead of poll to reduce lock overhead logger.debug( "{} : pendingRequest Size: {}, bufferedRequest size: {}", @@ -357,7 +357,7 @@ public class LogDispatcher { synchronized (impl.getIndexObject()) { pendingRequest.drainTo( bufferedRequest, - config.getReplication().getMaxRequestPerBatch() - bufferedRequest.size()); + config.getReplication().getMaxRequestNumPerBatch() - bufferedRequest.size()); maxIndexWhenBufferedRequestEmpty = impl.getIndex() + 1; } // remove all request that searchIndex < startIndex @@ -372,6 +372,8 @@ public class LogDispatcher { } } } + PendingBatch batch; + AccumulatingBatch logBatches = new AccumulatingBatch(config); // This condition will be executed in several scenarios: // 1. restart // 2. The getBatch() is invoked immediately at the moment the PendingRequests are consumed @@ -386,25 +388,32 @@ public class LogDispatcher { // Notice that prev searchIndex >= startIndex Iterator<IndexedConsensusRequest> iterator = bufferedRequest.iterator(); IndexedConsensusRequest prev = iterator.next(); + // Prevents gap between logs. For example, some requests are not written into the queue when // the queue is full. In this case, requests need to be loaded from the WAL constructBatchFromWAL(startIndex, prev.getSearchIndex(), logBatches); - if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) { + if (!logBatches.canAccumulate()) { batch = new PendingBatch(logBatches); logger.debug("{} : accumulated a {} from wal", impl.getThisNode().getGroupId(), batch); return batch; } + constructBatchIndexedFromConsensusRequest(prev, logBatches); iterator.remove(); releaseReservedMemory(prev); - while (iterator.hasNext() - && logBatches.size() <= config.getReplication().getMaxRequestPerBatch()) { + if (!logBatches.canAccumulate()) { + batch = new PendingBatch(logBatches); + logger.debug("{} : accumulated a {} from queue", impl.getThisNode().getGroupId(), batch); + return batch; + } + + while (iterator.hasNext() && logBatches.canAccumulate()) { IndexedConsensusRequest current = iterator.next(); // Prevents gap between logs. For example, some logs are not written into the queue when // the queue is full. In this case, requests need to be loaded from the WAL if (current.getSearchIndex() != prev.getSearchIndex() + 1) { constructBatchFromWAL(prev.getSearchIndex(), current.getSearchIndex(), logBatches); - if (logBatches.size() == config.getReplication().getMaxRequestPerBatch()) { + if (!logBatches.canAccumulate()) { batch = new PendingBatch(logBatches); logger.debug( "gap {} : accumulated a {} from queue and wal when gap", @@ -433,7 +442,9 @@ public class LogDispatcher { AsyncMultiLeaderServiceClient client = clientManager.borrowClient(peer.getEndpoint()); TSyncLogReq req = new TSyncLogReq( - selfPeerId, peer.getGroupId().convertToTConsensusGroupId(), batch.getBatches()); + selfPeerId, + peer.getGroupId().convertToTConsensusGroupId(), + batch.getBatch().getBatchList()); logger.debug( "Send Batch[startIndex:{}, endIndex:{}] to ConsensusGroup:{}", batch.getStartIndex(), @@ -450,8 +461,8 @@ public class LogDispatcher { return syncStatus; } - private long constructBatchFromWAL( - long currentIndex, long maxIndex, List<TLogBatch> logBatches) { + private void constructBatchFromWAL( + long currentIndex, long maxIndex, AccumulatingBatch logBatches) { logger.debug( String.format( "DataRegion[%s]->%s: currentIndex: %d, maxIndex: %d", @@ -460,8 +471,7 @@ public class LogDispatcher { long targetIndex = currentIndex; // Even if there is no WAL files, these code won't produce error. walEntryIterator.skipTo(targetIndex); - while (targetIndex < maxIndex - && logBatches.size() < config.getReplication().getMaxRequestPerBatch()) { + while (targetIndex < maxIndex && logBatches.canAccumulate()) { logger.debug("construct from WAL for one Entry, index : {}", targetIndex); try { walEntryIterator.waitForNextReady(); @@ -490,19 +500,16 @@ public class LogDispatcher { targetIndex = data.getSearchIndex() + 1; // construct request from wal for (IConsensusRequest innerRequest : data.getRequests()) { - logBatches.add( + logBatches.addTLogBatch( new TLogBatch(innerRequest.serializeToByteBuffer(), data.getSearchIndex(), true)); } } - return logBatches.size() > 0 - ? logBatches.get(logBatches.size() - 1).searchIndex - : currentIndex; } private void constructBatchIndexedFromConsensusRequest( - IndexedConsensusRequest request, List<TLogBatch> logBatches) { + IndexedConsensusRequest request, AccumulatingBatch logBatches) { for (ByteBuffer innerRequest : request.getSerializedRequests()) { - logBatches.add(new TLogBatch(innerRequest, request.getSearchIndex(), false)); + logBatches.addTLogBatch(new TLogBatch(innerRequest, request.getSearchIndex(), false)); } } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java index a6ee43392f..71a7f72059 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/PendingBatch.java @@ -27,19 +27,20 @@ public class PendingBatch { private final long startIndex; private final long endIndex; - private final List<TLogBatch> batches; + private final AccumulatingBatch batch; // indicates whether this batch has been successfully synchronized to another node private boolean synced; - public PendingBatch(List<TLogBatch> batches) { - if (!batches.isEmpty()) { - this.startIndex = batches.get(0).getSearchIndex(); - this.endIndex = batches.get(batches.size() - 1).getSearchIndex(); + public PendingBatch(AccumulatingBatch batch) { + List<TLogBatch> batchList = batch.getBatchList(); + if (!batchList.isEmpty()) { + this.startIndex = batchList.get(0).getSearchIndex(); + this.endIndex = batchList.get(batchList.size() - 1).getSearchIndex(); } else { this.startIndex = 0; this.endIndex = 0; } - this.batches = batches; + this.batch = batch; this.synced = false; } @@ -51,8 +52,8 @@ public class PendingBatch { return endIndex; } - public List<TLogBatch> getBatches() { - return batches; + public AccumulatingBatch getBatch() { + return batch; } public boolean isSynced() { @@ -64,7 +65,7 @@ public class PendingBatch { } public boolean isEmpty() { - return batches.isEmpty(); + return batch.getBatchList().isEmpty(); } @Override @@ -75,7 +76,9 @@ public class PendingBatch { + ", endIndex=" + endIndex + ", size=" - + batches.size() + + batch.getBatchList().size() + + ", serializedSize=" + + batch.getSerializedSize() + '}'; } } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java index e9901d931a..3549c1158b 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatus.java @@ -30,7 +30,7 @@ public class SyncStatus { private final MultiLeaderConfig config; private final IndexController controller; - private final List<PendingBatch> pendingBatches = new LinkedList<>(); + private final LinkedList<PendingBatch> pendingBatches = new LinkedList<>(); public SyncStatus(IndexController controller, MultiLeaderConfig config) { this.controller = controller; @@ -80,7 +80,7 @@ public class SyncStatus { return 1 + (pendingBatches.isEmpty() ? controller.getCurrentIndex() - : pendingBatches.get(pendingBatches.size() - 1).getEndIndex()); + : pendingBatches.getLast().getEndIndex()); } } diff --git a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java index c4363428fb..d9b591d1b6 100644 --- a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java +++ b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/logdispatcher/SyncStatusTest.java @@ -66,7 +66,8 @@ public class SyncStatusTest { for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) { TLogBatch logBatch = new TLogBatch(); logBatch.setSearchIndex(i); - PendingBatch batch = new PendingBatch(Collections.singletonList(logBatch)); + PendingBatch batch = + new PendingBatch(new AccumulatingBatch(Collections.singletonList(logBatch))); batchList.add(batch); status.addNextBatch(batch); } @@ -95,7 +96,8 @@ public class SyncStatusTest { for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) { TLogBatch logBatch = new TLogBatch(); logBatch.setSearchIndex(i); - PendingBatch batch = new PendingBatch(Collections.singletonList(logBatch)); + PendingBatch batch = + new PendingBatch(new AccumulatingBatch(Collections.singletonList(logBatch))); batchList.add(batch); status.addNextBatch(batch); } @@ -130,7 +132,8 @@ public class SyncStatusTest { for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) { TLogBatch logBatch = new TLogBatch(); logBatch.setSearchIndex(i); - PendingBatch batch = new PendingBatch(Collections.singletonList(logBatch)); + PendingBatch batch = + new PendingBatch(new AccumulatingBatch(Collections.singletonList(logBatch))); batchList.add(batch); status.addNextBatch(batch); } @@ -176,7 +179,8 @@ public class SyncStatusTest { for (long i = 0; i < config.getReplication().getMaxPendingBatch(); i++) { TLogBatch logBatch = new TLogBatch(); logBatch.setSearchIndex(i); - PendingBatch batch = new PendingBatch(Collections.singletonList(logBatch)); + PendingBatch batch = + new PendingBatch(new AccumulatingBatch(Collections.singletonList(logBatch))); batchList.add(batch); status.addNextBatch(batch); } @@ -195,7 +199,8 @@ public class SyncStatusTest { () -> { TLogBatch logBatch = new TLogBatch(); logBatch.setSearchIndex(config.getReplication().getMaxPendingBatch()); - PendingBatch batch = new PendingBatch(Collections.singletonList(logBatch)); + PendingBatch batch = + new PendingBatch(new AccumulatingBatch(Collections.singletonList(logBatch))); batchList.add(batch); try { status.addNextBatch(batch); diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java index be8e90b914..6ce2eeff80 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java @@ -92,7 +92,7 @@ public class InsertMultiTabletsNode extends InsertNode implements BatchInsertNod List<InsertTabletNode> insertTabletNodeList; /** record the result of insert tablets */ - private Map<Integer, TSStatus> results = new HashMap<>(); + private final Map<Integer, TSStatus> results = new HashMap<>(); public InsertMultiTabletsNode(PlanNodeId id) { super(id);
