This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch ml_0808_test_exp1_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/ml_0808_test_exp1_parallel by
this push:
new 9c7aa65752 change deserialize and write to batch level to speed up the
concurrency
9c7aa65752 is described below
commit 9c7aa65752909831344b21b713d0470aa30b8ef7
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Mon Aug 8 21:19:15 2022 +0800
change deserialize and write to batch level to speed up the concurrency
---
.../request/BatchIndexedConsensusRequest.java | 63 ++++++++++++
.../service/MultiLeaderRPCServiceProcessor.java | 25 +++--
.../statemachine/DataRegionStateMachine.java | 111 +++++++++++++--------
3 files changed, 144 insertions(+), 55 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
new file mode 100644
index 0000000000..a4a38fc84f
--- /dev/null
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.common.request;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+public class BatchIndexedConsensusRequest implements IConsensusRequest {
+
+ private boolean isStartSyncIndexInitialized;
+ private long startSyncIndex;
+ private long endSyncIndex;
+ private final List<IndexedConsensusRequest> requests;
+
+ public BatchIndexedConsensusRequest() {
+ this.requests = new LinkedList<>();
+ this.isStartSyncIndexInitialized = false;
+ }
+
+ public void add(IndexedConsensusRequest request) {
+ if (!isStartSyncIndexInitialized) {
+ startSyncIndex = request.getSyncIndex();
+ isStartSyncIndexInitialized = true;
+ }
+ endSyncIndex = request.getSyncIndex();
+ this.requests.add(request);
+ }
+
+ public long getStartSyncIndex() {
+ return startSyncIndex;
+ }
+
+ public long getEndSyncIndex() {
+ return endSyncIndex;
+ }
+
+ public List<IndexedConsensusRequest> getRequests() {
+ return requests;
+ }
+
+ @Override
+ public ByteBuffer serializeToByteBuffer() {
+ return null;
+ }
+}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 86b926ab4f..1b0328975b 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.consensus.multileader.service;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.StepTracker;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
@@ -69,7 +70,7 @@ public class MultiLeaderRPCServiceProcessor implements
MultiLeaderConsensusIServ
resultHandler.onComplete(new
TSyncLogRes(Collections.singletonList(status)));
return;
}
- List<TSStatus> statuses = new ArrayList<>();
+ BatchIndexedConsensusRequest requestsInThisBatch = new
BatchIndexedConsensusRequest();
// We use synchronized to ensure atomicity of executing multiple logs
if (!req.getBatches().isEmpty()) {
List<IConsensusRequest> consensusRequests = new ArrayList<>();
@@ -81,11 +82,9 @@ public class MultiLeaderRPCServiceProcessor implements
MultiLeaderConsensusIServ
: new ByteBufferConsensusRequest(batch.data);
// merge TLogBatch with same search index into one request
if (batch.getSearchIndex() != currentSearchIndex) {
- statuses.add(
- impl.getStateMachine()
- .write(
- impl.buildIndexedConsensusRequestForRemoteRequest(
- currentSearchIndex, consensusRequests)));
+ requestsInThisBatch.add(
+ impl.buildIndexedConsensusRequestForRemoteRequest(
+ currentSearchIndex, consensusRequests));
consensusRequests = new ArrayList<>();
currentSearchIndex = batch.getSearchIndex();
}
@@ -93,15 +92,15 @@ public class MultiLeaderRPCServiceProcessor implements
MultiLeaderConsensusIServ
}
// write last request
if (!consensusRequests.isEmpty()) {
- statuses.add(
- impl.getStateMachine()
- .write(
- impl.buildIndexedConsensusRequestForRemoteRequest(
- currentSearchIndex, consensusRequests)));
+ requestsInThisBatch.add(
+ impl.buildIndexedConsensusRequestForRemoteRequest(
+ currentSearchIndex, consensusRequests));
}
}
- logger.debug("Execute TSyncLogReq for {} with result {}",
req.consensusGroupId, statuses);
- resultHandler.onComplete(new TSyncLogRes(statuses));
+ TSStatus writeStatus = impl.getStateMachine().write(requestsInThisBatch);
+ logger.debug(
+ "Execute TSyncLogReq for {} with result {}", req.consensusGroupId,
writeStatus.subStatus);
+ resultHandler.onComplete(new TSyncLogRes(writeStatus.subStatus));
} catch (Exception e) {
resultHandler.onError(e);
} finally {
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index c26c711cb0..12746e9234 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -24,9 +24,9 @@ import org.apache.iotdb.commons.StepTracker;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor;
import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.util.ArrayList;
-import java.util.Comparator;
+import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
@@ -66,12 +66,12 @@ public class DataRegionStateMachine extends
BaseStateMachine {
private static final int MAX_REQUEST_CACHE_SIZE = 5;
private static final long CACHE_WINDOW_TIME_IN_MS = 10_000;
- private final PriorityQueue<InsertNode> requestCache;
+ private final PriorityQueue<InsertNodeWrapper> requestCache;
private long nextSyncIndex = -1;
public DataRegionStateMachine(DataRegion region) {
this.region = region;
- this.requestCache = new
PriorityQueue<>(Comparator.comparingLong(InsertNode::getSyncIndex));
+ this.requestCache = new PriorityQueue<>();
}
@Override
@@ -116,28 +116,28 @@ public class DataRegionStateMachine extends
BaseStateMachine {
}
}
- private TSStatus cacheAndInsertLatestNode(long syncIndex, InsertNode
insertNode) {
+ private TSStatus cacheAndInsertLatestNode(InsertNodeWrapper
insertNodeWrapper) {
long cacheRequestStartTime = System.nanoTime();
- logger.info("syncIndex = {}, nextSyncIndex = {}", syncIndex,
nextSyncIndex);
- insertNode.setSyncIndex(syncIndex);
+ logger.info(
+ "syncIndex = {}, nextSyncIndex = {}",
insertNodeWrapper.startSyncIndex, nextSyncIndex);
synchronized (requestCache) {
- requestCache.add(insertNode);
+ requestCache.add(insertNodeWrapper);
// If the peek is not hold by current thread, it should notify the
corresponding thread to
// process the peek when the queue is full
if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
- && requestCache.peek().getSyncIndex() != syncIndex) {
+ && requestCache.peek().getStartSyncIndex() !=
insertNodeWrapper.getStartSyncIndex()) {
requestCache.notifyAll();
}
while (true) {
- if (insertNode.getSyncIndex() == nextSyncIndex) {
- requestCache.remove(insertNode);
- nextSyncIndex++;
+ if (insertNodeWrapper.getStartSyncIndex() == nextSyncIndex) {
+ requestCache.remove(insertNodeWrapper);
+ nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
break;
}
if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
- && requestCache.peek().getSyncIndex() ==
insertNode.getSyncIndex()) {
+ && requestCache.peek().getStartSyncIndex() ==
insertNodeWrapper.getStartSyncIndex()) {
requestCache.remove();
- nextSyncIndex = insertNode.getSyncIndex() + 1;
+ nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
break;
}
try {
@@ -147,58 +147,85 @@ public class DataRegionStateMachine extends
BaseStateMachine {
}
}
StepTracker.trace("cacheAndQueueRequest", cacheRequestStartTime,
System.nanoTime());
- logger.info("queue size {}, syncIndex = {}", requestCache.size(),
insertNode.getSyncIndex());
- TSStatus tsStatus = write(insertNode);
+ logger.info(
+ "queue size {}, startSyncIndex = {}, endSyncIndex = {}",
+ requestCache.size(),
+ insertNodeWrapper.getStartSyncIndex(),
+ insertNodeWrapper.getEndSyncIndex());
+ List<TSStatus> subStatus = new LinkedList<>();
+ for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
+ subStatus.add(write(insertNode));
+ }
// TODO: think about notifying until processing the last request in this
batch
requestCache.notifyAll();
- return tsStatus;
+ return new TSStatus().setSubStatus(subStatus);
}
}
private static class InsertNodeWrapper implements
Comparable<InsertNodeWrapper> {
- private final long syncIndex;
- private final InsertNode insertNode;
+ private final long startSyncIndex;
+ private final long endSyncIndex;
+ private final List<InsertNode> insertNodes;
- public InsertNodeWrapper(long syncIndex, InsertNode insertNode) {
- this.syncIndex = syncIndex;
- this.insertNode = insertNode;
+ public InsertNodeWrapper(long startSyncIndex, long endSyncIndex) {
+ this.startSyncIndex = startSyncIndex;
+ this.endSyncIndex = endSyncIndex;
+ this.insertNodes = new LinkedList<>();
}
@Override
public int compareTo(@NotNull InsertNodeWrapper o) {
- return Long.compare(syncIndex, o.syncIndex);
+ return Long.compare(startSyncIndex, o.startSyncIndex);
+ }
+
+ public void add(InsertNode insertNode) {
+ this.insertNodes.add(insertNode);
}
- public long getSyncIndex() {
- return syncIndex;
+ public long getStartSyncIndex() {
+ return startSyncIndex;
}
- public InsertNode getInsertNode() {
- return insertNode;
+ public long getEndSyncIndex() {
+ return endSyncIndex;
+ }
+
+ public List<InsertNode> getInsertNodes() {
+ return insertNodes;
}
}
+ private InsertNodeWrapper deserializeAndWrap(BatchIndexedConsensusRequest
batchRequest) {
+ InsertNodeWrapper insertNodeWrapper =
+ new InsertNodeWrapper(batchRequest.getStartSyncIndex(),
batchRequest.getEndSyncIndex());
+ for (IndexedConsensusRequest indexedRequest : batchRequest.getRequests()) {
+ insertNodeWrapper.add(grabInsertNode(indexedRequest));
+ }
+ return insertNodeWrapper;
+ }
+
+ private InsertNode grabInsertNode(IndexedConsensusRequest indexedRequest) {
+ List<InsertNode> insertNodes = new
ArrayList<>(indexedRequest.getRequests().size());
+ for (IConsensusRequest req : indexedRequest.getRequests()) {
+ // PlanNode in IndexedConsensusRequest should always be InsertNode
+ InsertNode innerNode = (InsertNode) getPlanNode(req);
+ innerNode.setSearchIndex(indexedRequest.getSearchIndex());
+ insertNodes.add(innerNode);
+ }
+ return mergeInsertNodes(insertNodes);
+ }
+
@Override
public TSStatus write(IConsensusRequest request) {
PlanNode planNode;
try {
if (request instanceof IndexedConsensusRequest) {
IndexedConsensusRequest indexedRequest = (IndexedConsensusRequest)
request;
- List<InsertNode> insertNodes = new
ArrayList<>(indexedRequest.getRequests().size());
- for (IConsensusRequest req : indexedRequest.getRequests()) {
- // PlanNode in IndexedConsensusRequest should always be InsertNode
- InsertNode innerNode = (InsertNode) getPlanNode(req);
- innerNode.setSearchIndex(indexedRequest.getSearchIndex());
- insertNodes.add(innerNode);
- }
- if (indexedRequest.getSearchIndex() ==
ConsensusReqReader.DEFAULT_SEARCH_INDEX) {
- TSStatus status =
- cacheAndInsertLatestNode(
- indexedRequest.getSyncIndex(),
mergeInsertNodes(insertNodes));
- return status;
- } else {
- planNode = mergeInsertNodes(insertNodes);
- }
+ planNode = grabInsertNode(indexedRequest);
+ } else if (request instanceof BatchIndexedConsensusRequest) {
+ InsertNodeWrapper insertNodeWrapper =
+ deserializeAndWrap((BatchIndexedConsensusRequest) request);
+ return cacheAndInsertLatestNode(insertNodeWrapper);
} else {
planNode = getPlanNode(request);
}