This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_0808_test_exp1_parallel
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ml_0808_test_exp1_parallel by 
this push:
     new 9c7aa65752 change deserialize and write to batch level to speed up the 
concurrency
9c7aa65752 is described below

commit 9c7aa65752909831344b21b713d0470aa30b8ef7
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Mon Aug 8 21:19:15 2022 +0800

    change deserialize and write to batch level to speed up the concurrency
---
 .../request/BatchIndexedConsensusRequest.java      |  63 ++++++++++++
 .../service/MultiLeaderRPCServiceProcessor.java    |  25 +++--
 .../statemachine/DataRegionStateMachine.java       | 111 +++++++++++++--------
 3 files changed, 144 insertions(+), 55 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
new file mode 100644
index 0000000000..a4a38fc84f
--- /dev/null
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/BatchIndexedConsensusRequest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.consensus.common.request;
+
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.List;
+
+public class BatchIndexedConsensusRequest implements IConsensusRequest {
+
+  private boolean isStartSyncIndexInitialized;
+  private long startSyncIndex;
+  private long endSyncIndex;
+  private final List<IndexedConsensusRequest> requests;
+
+  public BatchIndexedConsensusRequest() {
+    this.requests = new LinkedList<>();
+    this.isStartSyncIndexInitialized = false;
+  }
+
+  public void add(IndexedConsensusRequest request) {
+    if (!isStartSyncIndexInitialized) {
+      startSyncIndex = request.getSyncIndex();
+      isStartSyncIndexInitialized = true;
+    }
+    endSyncIndex = request.getSyncIndex();
+    this.requests.add(request);
+  }
+
+  public long getStartSyncIndex() {
+    return startSyncIndex;
+  }
+
+  public long getEndSyncIndex() {
+    return endSyncIndex;
+  }
+
+  public List<IndexedConsensusRequest> getRequests() {
+    return requests;
+  }
+
+  @Override
+  public ByteBuffer serializeToByteBuffer() {
+    return null;
+  }
+}
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 86b926ab4f..1b0328975b 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.consensus.multileader.service;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.StepTracker;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
@@ -69,7 +70,7 @@ public class MultiLeaderRPCServiceProcessor implements 
MultiLeaderConsensusIServ
         resultHandler.onComplete(new 
TSyncLogRes(Collections.singletonList(status)));
         return;
       }
-      List<TSStatus> statuses = new ArrayList<>();
+      BatchIndexedConsensusRequest requestsInThisBatch = new 
BatchIndexedConsensusRequest();
       // We use synchronized to ensure atomicity of executing multiple logs
       if (!req.getBatches().isEmpty()) {
         List<IConsensusRequest> consensusRequests = new ArrayList<>();
@@ -81,11 +82,9 @@ public class MultiLeaderRPCServiceProcessor implements 
MultiLeaderConsensusIServ
                   : new ByteBufferConsensusRequest(batch.data);
           // merge TLogBatch with same search index into one request
           if (batch.getSearchIndex() != currentSearchIndex) {
-            statuses.add(
-                impl.getStateMachine()
-                    .write(
-                        impl.buildIndexedConsensusRequestForRemoteRequest(
-                            currentSearchIndex, consensusRequests)));
+            requestsInThisBatch.add(
+                impl.buildIndexedConsensusRequestForRemoteRequest(
+                    currentSearchIndex, consensusRequests));
             consensusRequests = new ArrayList<>();
             currentSearchIndex = batch.getSearchIndex();
           }
@@ -93,15 +92,15 @@ public class MultiLeaderRPCServiceProcessor implements 
MultiLeaderConsensusIServ
         }
         // write last request
         if (!consensusRequests.isEmpty()) {
-          statuses.add(
-              impl.getStateMachine()
-                  .write(
-                      impl.buildIndexedConsensusRequestForRemoteRequest(
-                          currentSearchIndex, consensusRequests)));
+          requestsInThisBatch.add(
+              impl.buildIndexedConsensusRequestForRemoteRequest(
+                  currentSearchIndex, consensusRequests));
         }
       }
-      logger.debug("Execute TSyncLogReq for {} with result {}", 
req.consensusGroupId, statuses);
-      resultHandler.onComplete(new TSyncLogRes(statuses));
+      TSStatus writeStatus = impl.getStateMachine().write(requestsInThisBatch);
+      logger.debug(
+          "Execute TSyncLogReq for {} with result {}", req.consensusGroupId, 
writeStatus.subStatus);
+      resultHandler.onComplete(new TSyncLogRes(writeStatus.subStatus));
     } catch (Exception e) {
       resultHandler.onError(e);
     } finally {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index c26c711cb0..12746e9234 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -24,9 +24,9 @@ import org.apache.iotdb.commons.StepTracker;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.request.BatchIndexedConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
-import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
 import org.apache.iotdb.consensus.multileader.wal.GetConsensusReqReaderPlan;
 import org.apache.iotdb.db.consensus.statemachine.visitor.DataExecutionVisitor;
 import org.apache.iotdb.db.engine.StorageEngineV2;
@@ -50,7 +50,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.util.ArrayList;
-import java.util.Comparator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
 
@@ -66,12 +66,12 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
   private static final int MAX_REQUEST_CACHE_SIZE = 5;
   private static final long CACHE_WINDOW_TIME_IN_MS = 10_000;
 
-  private final PriorityQueue<InsertNode> requestCache;
+  private final PriorityQueue<InsertNodeWrapper> requestCache;
   private long nextSyncIndex = -1;
 
   public DataRegionStateMachine(DataRegion region) {
     this.region = region;
-    this.requestCache = new 
PriorityQueue<>(Comparator.comparingLong(InsertNode::getSyncIndex));
+    this.requestCache = new PriorityQueue<>();
   }
 
   @Override
@@ -116,28 +116,28 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
     }
   }
 
-  private TSStatus cacheAndInsertLatestNode(long syncIndex, InsertNode 
insertNode) {
+  private TSStatus cacheAndInsertLatestNode(InsertNodeWrapper 
insertNodeWrapper) {
     long cacheRequestStartTime = System.nanoTime();
-    logger.info("syncIndex = {}, nextSyncIndex = {}", syncIndex, 
nextSyncIndex);
-    insertNode.setSyncIndex(syncIndex);
+    logger.info(
+        "syncIndex = {}, nextSyncIndex = {}", 
insertNodeWrapper.startSyncIndex, nextSyncIndex);
     synchronized (requestCache) {
-      requestCache.add(insertNode);
+      requestCache.add(insertNodeWrapper);
       // If the peek is not hold by current thread, it should notify the 
corresponding thread to
       // process the peek when the queue is full
       if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
-          && requestCache.peek().getSyncIndex() != syncIndex) {
+          && requestCache.peek().getStartSyncIndex() != 
insertNodeWrapper.getStartSyncIndex()) {
         requestCache.notifyAll();
       }
       while (true) {
-        if (insertNode.getSyncIndex() == nextSyncIndex) {
-          requestCache.remove(insertNode);
-          nextSyncIndex++;
+        if (insertNodeWrapper.getStartSyncIndex() == nextSyncIndex) {
+          requestCache.remove(insertNodeWrapper);
+          nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
           break;
         }
         if (requestCache.size() == MAX_REQUEST_CACHE_SIZE
-            && requestCache.peek().getSyncIndex() == 
insertNode.getSyncIndex()) {
+            && requestCache.peek().getStartSyncIndex() == 
insertNodeWrapper.getStartSyncIndex()) {
           requestCache.remove();
-          nextSyncIndex = insertNode.getSyncIndex() + 1;
+          nextSyncIndex = insertNodeWrapper.getEndSyncIndex() + 1;
           break;
         }
         try {
@@ -147,58 +147,85 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
         }
       }
       StepTracker.trace("cacheAndQueueRequest", cacheRequestStartTime, 
System.nanoTime());
-      logger.info("queue size {}, syncIndex = {}", requestCache.size(), 
insertNode.getSyncIndex());
-      TSStatus tsStatus = write(insertNode);
+      logger.info(
+          "queue size {}, startSyncIndex = {}, endSyncIndex = {}",
+          requestCache.size(),
+          insertNodeWrapper.getStartSyncIndex(),
+          insertNodeWrapper.getEndSyncIndex());
+      List<TSStatus> subStatus = new LinkedList<>();
+      for (InsertNode insertNode : insertNodeWrapper.getInsertNodes()) {
+        subStatus.add(write(insertNode));
+      }
       // TODO: think about notifying until processing the last request in this 
batch
       requestCache.notifyAll();
-      return tsStatus;
+      return new TSStatus().setSubStatus(subStatus);
     }
   }
 
   private static class InsertNodeWrapper implements 
Comparable<InsertNodeWrapper> {
-    private final long syncIndex;
-    private final InsertNode insertNode;
+    private final long startSyncIndex;
+    private final long endSyncIndex;
+    private final List<InsertNode> insertNodes;
 
-    public InsertNodeWrapper(long syncIndex, InsertNode insertNode) {
-      this.syncIndex = syncIndex;
-      this.insertNode = insertNode;
+    public InsertNodeWrapper(long startSyncIndex, long endSyncIndex) {
+      this.startSyncIndex = startSyncIndex;
+      this.endSyncIndex = endSyncIndex;
+      this.insertNodes = new LinkedList<>();
     }
 
     @Override
     public int compareTo(@NotNull InsertNodeWrapper o) {
-      return Long.compare(syncIndex, o.syncIndex);
+      return Long.compare(startSyncIndex, o.startSyncIndex);
+    }
+
+    public void add(InsertNode insertNode) {
+      this.insertNodes.add(insertNode);
     }
 
-    public long getSyncIndex() {
-      return syncIndex;
+    public long getStartSyncIndex() {
+      return startSyncIndex;
     }
 
-    public InsertNode getInsertNode() {
-      return insertNode;
+    public long getEndSyncIndex() {
+      return endSyncIndex;
+    }
+
+    public List<InsertNode> getInsertNodes() {
+      return insertNodes;
     }
   }
 
+  private InsertNodeWrapper deserializeAndWrap(BatchIndexedConsensusRequest 
batchRequest) {
+    InsertNodeWrapper insertNodeWrapper =
+        new InsertNodeWrapper(batchRequest.getStartSyncIndex(), 
batchRequest.getEndSyncIndex());
+    for (IndexedConsensusRequest indexedRequest : batchRequest.getRequests()) {
+      insertNodeWrapper.add(grabInsertNode(indexedRequest));
+    }
+    return insertNodeWrapper;
+  }
+
+  private InsertNode grabInsertNode(IndexedConsensusRequest indexedRequest) {
+    List<InsertNode> insertNodes = new 
ArrayList<>(indexedRequest.getRequests().size());
+    for (IConsensusRequest req : indexedRequest.getRequests()) {
+      // PlanNode in IndexedConsensusRequest should always be InsertNode
+      InsertNode innerNode = (InsertNode) getPlanNode(req);
+      innerNode.setSearchIndex(indexedRequest.getSearchIndex());
+      insertNodes.add(innerNode);
+    }
+    return mergeInsertNodes(insertNodes);
+  }
+
   @Override
   public TSStatus write(IConsensusRequest request) {
     PlanNode planNode;
     try {
       if (request instanceof IndexedConsensusRequest) {
         IndexedConsensusRequest indexedRequest = (IndexedConsensusRequest) 
request;
-        List<InsertNode> insertNodes = new 
ArrayList<>(indexedRequest.getRequests().size());
-        for (IConsensusRequest req : indexedRequest.getRequests()) {
-          // PlanNode in IndexedConsensusRequest should always be InsertNode
-          InsertNode innerNode = (InsertNode) getPlanNode(req);
-          innerNode.setSearchIndex(indexedRequest.getSearchIndex());
-          insertNodes.add(innerNode);
-        }
-        if (indexedRequest.getSearchIndex() == 
ConsensusReqReader.DEFAULT_SEARCH_INDEX) {
-          TSStatus status =
-              cacheAndInsertLatestNode(
-                  indexedRequest.getSyncIndex(), 
mergeInsertNodes(insertNodes));
-          return status;
-        } else {
-          planNode = mergeInsertNodes(insertNodes);
-        }
+        planNode = grabInsertNode(indexedRequest);
+      } else if (request instanceof BatchIndexedConsensusRequest) {
+        InsertNodeWrapper insertNodeWrapper =
+            deserializeAndWrap((BatchIndexedConsensusRequest) request);
+        return cacheAndInsertLatestNode(insertNodeWrapper);
       } else {
         planNode = getPlanNode(request);
       }

Reply via email to