This is an automated email from the ASF dual-hosted git repository. tanxinyu pushed a commit to branch ca in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit b76b380b6331d50f2c2fdc0a9ed1d16eb4afd011 Author: BUAAserein <[email protected]> AuthorDate: Tue Apr 11 21:04:37 2023 +0800 improve iot --- .../consensus/iot/IoTConsensusServerImpl.java | 17 +++++++++ .../consensus/iot/logdispatcher/LogDispatcher.java | 42 ++++++++++++---------- 2 files changed, 41 insertions(+), 18 deletions(-) diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java index 201322fa48..032d9ad187 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensusServerImpl.java @@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.exception.ClientManagerException; +import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.enums.Metric; import org.apache.iotdb.commons.service.metric.enums.PerformanceOverviewMetrics; @@ -81,6 +82,7 @@ import java.util.LinkedList; import java.util.List; import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; @@ -115,6 +117,8 @@ public class IoTConsensusServerImpl { private final String consensusGroupId; + private ExecutorService executorService; + public IoTConsensusServerImpl( String storageDir, Peer thisNode, @@ -143,6 +147,9 @@ public class IoTConsensusServerImpl { this.searchIndex = new AtomicLong(currentSearchIndex); this.consensusGroupId = thisNode.getGroupId().toString(); this.metrics = new IoTConsensusServerMetrics(this); + this.executorService = + IoTDBThreadPoolFactory.newFixedThreadPool( + Runtime.getRuntime().availableProcessors(), "Serialization"); } public IStateMachine getStateMachine() { @@ -158,6 +165,16 @@ public class IoTConsensusServerImpl { public void stop() { logDispatcher.stop(); stateMachine.stop(); + executorService.shutdownNow(); + int timeout = 10; + try { + if (!executorService.awaitTermination(timeout, TimeUnit.SECONDS)) { + logger.error("Unable to shutdown serialization service after {} seconds", timeout); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + logger.error("Unexpected Interruption when closing serialization service "); + } MetricService.getInstance().removeMetricSet(this.metrics); } diff --git a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java index f6da0efd52..dac55282f7 100644 --- a/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java +++ b/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java @@ -158,24 +158,27 @@ public class LogDispatcher { public void offer(IndexedConsensusRequest request) { // we don't need to serialize and offer request when replicaNum is 1. if (!threads.isEmpty()) { - request.buildSerializedRequests(); - synchronized (this) { - threads.forEach( - thread -> { - logger.debug( - "{}->{}: Push a log to the queue, where the queue length is {}", - impl.getThisNode().getGroupId(), - thread.getPeer().getEndpoint().getIp(), - thread.getPendingEntriesSize()); - if (!thread.offer(request)) { - logger.debug( - "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}", - impl.getThisNode().getGroupId(), - thread.getPeer(), - request.getSearchIndex()); - } - }); - } + executorService.submit( + () -> { + request.buildSerializedRequests(); + synchronized (this) { + threads.forEach( + thread -> { + logger.debug( + "{}->{}: Push a log to the queue, where the queue length is {}", + impl.getThisNode().getGroupId(), + thread.getPeer().getEndpoint().getIp(), + thread.getPendingEntriesSize()); + if (!thread.offer(request)) { + logger.debug( + "{}: Log queue of {} is full, ignore the log to this node, searchIndex: {}", + impl.getThisNode().getGroupId(), + thread.getPeer(), + request.getSearchIndex()); + } + }); + } + }); } } @@ -252,6 +255,9 @@ public class LogDispatcher { /** try to offer a request into queue with memory control. */ public boolean offer(IndexedConsensusRequest indexedConsensusRequest) { + if (indexedConsensusRequest.getSearchIndex() - getCurrentSyncIndex() > 10000) { + return false; + } if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest.getSerializedSize(), true)) { return false; }
