This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d20e48b09 Optimize multiLeaderConsensus performance (#6413)
8d20e48b09 is described below

commit 8d20e48b09bbd90a0e1c5eb92a28dbc63680e7d0
Author: Potato <[email protected]>
AuthorDate: Sat Jun 25 00:13:58 2022 +0800

    Optimize multiLeaderConsensus performance (#6413)
    
    * optimize multiLeaderConsensus performance
    
    * fix UT
---
 .../resources/conf/iotdb-confignode.properties     |  4 +-
 .../iotdb/consensus/config/MultiLeaderConfig.java  |  8 ++--
 .../multileader/MultiLeaderConsensus.java          |  2 +-
 .../multileader/logdispatcher/LogDispatcher.java   | 34 +++++++++++------
 .../multileader/MultiLeaderConsensusTest.java      | 44 ++++++++++++----------
 5 files changed, 53 insertions(+), 39 deletions(-)

diff --git a/confignode/src/assembly/resources/conf/iotdb-confignode.properties 
b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
index f7fb90f633..59803ee9ab 100644
--- a/confignode/src/assembly/resources/conf/iotdb-confignode.properties
+++ b/confignode/src/assembly/resources/conf/iotdb-confignode.properties
@@ -56,7 +56,7 @@ target_confignode=0.0.0.0:22277
 
 # DataRegion consensus protocol type
 # These consensus protocols are currently supported:
-# 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(No protocol, 
only supports stand-alone machine)
+# 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(Consensus 
patterns optimized specifically for single replica)
 # 2. org.apache.iotdb.consensus.ratis.RatisConsensus(Raft protocol)
 # 3. org.apache.iotdb.consensus.multileader.MultiLeaderConsensus(weak 
consistency, high performance)
 # Datatype: String
@@ -64,7 +64,7 @@ target_confignode=0.0.0.0:22277
 
 # SchemaRegion consensus protocol type
 # These consensus protocols are currently supported:
-# 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(No protocol, 
only supports stand-alone machine)
+# 1. org.apache.iotdb.consensus.standalone.StandAloneConsensus(Consensus 
patterns optimized specifically for single replica)
 # 2. org.apache.iotdb.consensus.ratis.RatisConsensus(Raft protocol)
 # Datatype: String
 # 
schema_region_consensus_protocol_class=org.apache.iotdb.consensus.standalone.StandAloneConsensus
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
index 7f4c0b4de9..119bd17b0a 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/config/MultiLeaderConfig.java
@@ -205,10 +205,10 @@ public class MultiLeaderConfig {
     }
 
     public static class Builder {
-      private int maxPendingRequestNumPerNode = 1000;
-      private int maxRequestPerBatch = 100;
-      private int maxPendingBatch = 20;
-      private int maxWaitingTimeForAccumulatingBatchInMs = 10;
+      private int maxPendingRequestNumPerNode = 200;
+      private int maxRequestPerBatch = 40;
+      private int maxPendingBatch = 6;
+      private int maxWaitingTimeForAccumulatingBatchInMs = 500;
       private long basicRetryWaitTimeMs = TimeUnit.MILLISECONDS.toMillis(100);
       private long maxRetryWaitTimeMs = TimeUnit.SECONDS.toMillis(20);
 
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
index 8a95c1288e..eecfa473c6 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensus.java
@@ -124,9 +124,9 @@ public class MultiLeaderConsensus implements IConsensus {
 
   @Override
   public void stop() {
+    clientManager.close();
     
stateMachineMap.values().parallelStream().forEach(MultiLeaderServerImpl::stop);
     registerManager.deregisterAll();
-    clientManager.close();
   }
 
   @Override
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index a2add314ae..1d651a0ba5 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -74,7 +74,8 @@ public class LogDispatcher {
             .collect(Collectors.toList());
     if (!threads.isEmpty()) {
       this.executorService =
-          IoTDBThreadPoolFactory.newFixedThreadPool(threads.size(), 
"LogDispatcher");
+          IoTDBThreadPoolFactory.newFixedThreadPool(
+              threads.size(), "LogDispatcher-" + 
impl.getThisNode().getGroupId());
     }
   }
 
@@ -107,8 +108,15 @@ public class LogDispatcher {
   public void offer(IndexedConsensusRequest request) {
     threads.forEach(
         thread -> {
-          if (!thread.offer(request)) {
-            logger.debug("Log queue of {} is full, ignore the log to this 
node", thread.getPeer());
+          logger.debug(
+              "{}: Push a log to the queue, where the queue length is {}",
+              impl.getThisNode().getGroupId(),
+              thread.getPendingRequest().size());
+          if (!thread.getPendingRequest().offer(request)) {
+            logger.debug(
+                "{}: Log queue of {} is full, ignore the log to this node",
+                impl.getThisNode().getGroupId(),
+                thread.getPeer());
           }
         });
   }
@@ -156,8 +164,8 @@ public class LogDispatcher {
       return config;
     }
 
-    public boolean offer(IndexedConsensusRequest request) {
-      return pendingRequest.offer(request);
+    public BlockingQueue<IndexedConsensusRequest> getPendingRequest() {
+      return pendingRequest;
     }
 
     public void stop() {
@@ -199,7 +207,7 @@ public class LogDispatcher {
       PendingBatch batch;
       List<TLogBatch> logBatches = new ArrayList<>();
       long startIndex = syncStatus.getNextSendingIndex();
-      long maxIndex = impl.getController().getCurrentIndex();
+      long maxIndex = impl.getController().getCurrentIndex() + 1;
       long endIndex;
       if (bufferedRequest.size() <= 
config.getReplication().getMaxRequestPerBatch()) {
         // Use drainTo instead of poll to reduce lock overhead
@@ -211,7 +219,7 @@ public class LogDispatcher {
         // only execute this after a restart
         endIndex = constructBatchFromWAL(startIndex, maxIndex, logBatches);
         batch = new PendingBatch(startIndex, endIndex, logBatches);
-        logger.debug("accumulated a {} from wal", batch);
+        logger.debug("{} : accumulated a {} from wal", 
impl.getThisNode().getGroupId(), batch);
       } else {
         Iterator<IndexedConsensusRequest> iterator = 
bufferedRequest.iterator();
         IndexedConsensusRequest prev = iterator.next();
@@ -220,7 +228,7 @@ public class LogDispatcher {
         endIndex = constructBatchFromWAL(startIndex, prev.getSearchIndex(), 
logBatches);
         if (logBatches.size() == 
config.getReplication().getMaxRequestPerBatch()) {
           batch = new PendingBatch(startIndex, endIndex, logBatches);
-          logger.debug("accumulated a {} from wal", batch);
+          logger.debug("{} : accumulated a {} from wal", 
impl.getThisNode().getGroupId(), batch);
           return batch;
         }
         constructBatchIndexedFromConsensusRequest(prev, logBatches);
@@ -236,7 +244,10 @@ public class LogDispatcher {
                 constructBatchFromWAL(prev.getSearchIndex(), 
current.getSearchIndex(), logBatches);
             if (logBatches.size() == 
config.getReplication().getMaxRequestPerBatch()) {
               batch = new PendingBatch(startIndex, endIndex, logBatches);
-              logger.debug("accumulated a {} from queue and wal", batch);
+              logger.debug(
+                  "{} : accumulated a {} from queue and wal",
+                  impl.getThisNode().getGroupId(),
+                  batch);
               return batch;
             }
           }
@@ -249,7 +260,8 @@ public class LogDispatcher {
           iterator.remove();
         }
         batch = new PendingBatch(startIndex, endIndex, logBatches);
-        logger.debug("accumulated a {} from queue and wal", batch);
+        logger.debug(
+            "{} : accumulated a {} from queue and wal", 
impl.getThisNode().getGroupId(), batch);
       }
       return batch;
     }
@@ -276,8 +288,6 @@ public class LogDispatcher {
         // TODO iterator
         IConsensusRequest data = reader.getReq(currentIndex++);
         if (data != null) {
-          // since WAL can no longer recover FragmentInstance, but only 
PlanNode, we need to give
-          // special flags to use different deserialization methods in the 
dataRegion stateMachine
           logBatches.add(new TLogBatch(data.serializeToByteBuffer()));
         }
       }
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
index fa86215466..5b3b8253f8 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/MultiLeaderConsensusTest.java
@@ -365,25 +365,27 @@ public class MultiLeaderConsensusTest {
     public void stop() {}
 
     @Override
-    public synchronized TSStatus write(IConsensusRequest request) {
-      IConsensusRequest innerRequest = ((IndexedConsensusRequest) 
request).getRequest();
-      if (innerRequest instanceof ByteBufferConsensusRequest) {
-        ByteBuffer buffer = innerRequest.serializeToByteBuffer();
-        requestSet.add(
-            new IndexedConsensusRequest(
-                ((IndexedConsensusRequest) request).getSearchIndex(),
-                -1,
-                new TestEntry(buffer.getInt(), Peer.deserialize(buffer))));
-      } else {
-        requestSet.add(((IndexedConsensusRequest) request));
+    public TSStatus write(IConsensusRequest request) {
+      synchronized (requestSet) {
+        IConsensusRequest innerRequest = ((IndexedConsensusRequest) 
request).getRequest();
+        if (innerRequest instanceof ByteBufferConsensusRequest) {
+          ByteBuffer buffer = innerRequest.serializeToByteBuffer();
+          requestSet.add(
+              new IndexedConsensusRequest(
+                  ((IndexedConsensusRequest) request).getSearchIndex(),
+                  -1,
+                  new TestEntry(buffer.getInt(), Peer.deserialize(buffer))));
+        } else {
+          requestSet.add(((IndexedConsensusRequest) request));
+        }
+        return new TSStatus();
       }
-      return new TSStatus();
     }
 
     @Override
     public synchronized DataSet read(IConsensusRequest request) {
       if (request instanceof GetConsensusReqReaderPlan) {
-        return new FakeConsensusReqReader(new ArrayList<>(requestSet));
+        return new FakeConsensusReqReader(requestSet);
       }
       return null;
     }
@@ -399,20 +401,22 @@ public class MultiLeaderConsensusTest {
 
   public static class FakeConsensusReqReader implements ConsensusReqReader, 
DataSet {
 
-    private final List<IndexedConsensusRequest> requestList;
+    private final Set<IndexedConsensusRequest> requestSet;
 
-    public FakeConsensusReqReader(List<IndexedConsensusRequest> requestList) {
-      this.requestList = requestList;
+    public FakeConsensusReqReader(Set<IndexedConsensusRequest> requestSet) {
+      this.requestSet = requestSet;
     }
 
     @Override
     public IConsensusRequest getReq(long index) {
-      for (IndexedConsensusRequest indexedConsensusRequest : requestList) {
-        if (indexedConsensusRequest.getSearchIndex() == index) {
-          return indexedConsensusRequest;
+      synchronized (requestSet) {
+        for (IndexedConsensusRequest indexedConsensusRequest : requestSet) {
+          if (indexedConsensusRequest.getSearchIndex() == index) {
+            return indexedConsensusRequest;
+          }
         }
+        return null;
       }
-      return null;
     }
 
     @Override

Reply via email to