This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch iotdb-3791
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/iotdb-3791 by this push:
     new bcd3f12c61 diable SyncLogProcessor
bcd3f12c61 is described below

commit bcd3f12c618d1b7d0f0ca1fcbb413ec06e1c8761
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Thu Jul 14 18:09:15 2022 +0800

    diable SyncLogProcessor
---
 .../multileader/logdispatcher/LogDispatcher.java   |  3 +-
 .../service/MultiLeaderRPCServiceProcessor.java    | 72 ++++++++++------------
 2 files changed, 34 insertions(+), 41 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 0685799915..64e037aaf6 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -192,6 +192,7 @@ public class LogDispatcher {
             // we may block here if there is no requests in the queue
             IndexedConsensusRequest request =
                 pendingRequest.poll(PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC, 
TimeUnit.SECONDS);
+            getBatchStartTime = System.nanoTime();
             if (request != null) {
               bufferedRequest.add(request);
               // If write pressure is low, we simply sleep a little to reduce 
the number of RPC
@@ -200,7 +201,6 @@ public class LogDispatcher {
               }
             }
           }
-          StepTracker.trace("getBatch()", 10, getBatchStartTime, 
System.nanoTime());
           // we may block here if the synchronization pipeline is full
           StepTracker.trace("getBatch()", 10, getBatchStartTime, 
System.nanoTime());
 
@@ -352,6 +352,7 @@ public class LogDispatcher {
         StepTracker.trace("walEntryiterator.next()", 400, nextStartTime, 
System.nanoTime());
         currentIndex = data.getSearchIndex();
         iteratorIndex = currentIndex;
+        StepTracker.trace("walDataRequestSize", 400, 0, 
data.getRequests().size() * 1000_000L);
         for (IConsensusRequest innerRequest : data.getRequests()) {
           long newTlogBatchStartTime = System.nanoTime();
           logBatches.add(new TLogBatch(innerRequest.serializeToByteBuffer(), 
true));
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 87555bbea0..4041b50e0e 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -19,26 +19,17 @@
 
 package org.apache.iotdb.consensus.multileader.service;
 
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.StepTracker;
-import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
-import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
 import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
-import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
 import 
org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
-import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
 import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
 import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
-import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.thrift.async.AsyncMethodCallback;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 
 public class MultiLeaderRPCServiceProcessor implements 
MultiLeaderConsensusIService.AsyncIface {
 
@@ -54,37 +45,38 @@ public class MultiLeaderRPCServiceProcessor implements 
MultiLeaderConsensusIServ
   public void syncLog(TSyncLogReq req, AsyncMethodCallback<TSyncLogRes> 
resultHandler) {
     long startTime = System.nanoTime();
     try {
-      ConsensusGroupId groupId =
-          
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
-      MultiLeaderServerImpl impl = consensus.getImpl(groupId);
-      if (impl == null) {
-        String message =
-            String.format(
-                "Unexpected consensusGroupId %s for TSyncLogReq which size is 
%s",
-                groupId, req.getBatches().size());
-        logger.error(message);
-        TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
-        status.setMessage(message);
-        resultHandler.onComplete(new 
TSyncLogRes(Collections.singletonList(status)));
-        return;
-      }
-      List<TSStatus> statuses = new ArrayList<>();
-      // We use synchronized to ensure atomicity of executing multiple logs
-      synchronized (impl.getStateMachine()) {
-        for (TLogBatch batch : req.getBatches()) {
-          long writeOneBatch = System.nanoTime();
-          statuses.add(
-              impl.getStateMachine()
-                  .write(
-                      impl.buildIndexedConsensusRequestForRemoteRequest(
-                          batch.isFromWAL()
-                              ? new MultiLeaderConsensusRequest(batch.data)
-                              : new ByteBufferConsensusRequest(batch.data))));
-          StepTracker.trace("writeOneBatch", 400, writeOneBatch, 
System.nanoTime());
-        }
-      }
-      logger.debug("Execute TSyncLogReq for {} with result {}", 
req.consensusGroupId, statuses);
-      resultHandler.onComplete(new TSyncLogRes(statuses));
+      //      ConsensusGroupId groupId =
+      //          
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+      //      MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+      //      if (impl == null) {
+      //        String message =
+      //            String.format(
+      //                "Unexpected consensusGroupId %s for TSyncLogReq which 
size is %s",
+      //                groupId, req.getBatches().size());
+      //        logger.error(message);
+      //        TSStatus status = new 
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+      //        status.setMessage(message);
+      //        resultHandler.onComplete(new 
TSyncLogRes(Collections.singletonList(status)));
+      //        return;
+      //      }
+      //      List<TSStatus> statuses = new ArrayList<>();
+      //      // We use synchronized to ensure atomicity of executing multiple 
logs
+      //      synchronized (impl.getStateMachine()) {
+      //        for (TLogBatch batch : req.getBatches()) {
+      //          long writeOneBatch = System.nanoTime();
+      //          statuses.add(
+      //              impl.getStateMachine()
+      //                  .write(
+      //                      
impl.buildIndexedConsensusRequestForRemoteRequest(
+      //                          batch.isFromWAL()
+      //                              ? new 
MultiLeaderConsensusRequest(batch.data)
+      //                              : new 
ByteBufferConsensusRequest(batch.data))));
+      //          StepTracker.trace("writeOneBatch", 400, writeOneBatch, 
System.nanoTime());
+      //        }
+      //      }
+      //      logger.debug("Execute TSyncLogReq for {} with result {}", 
req.consensusGroupId,
+      // statuses);
+      resultHandler.onComplete(new TSyncLogRes(new ArrayList<>()));
     } catch (Exception e) {
       resultHandler.onError(e);
     } finally {

Reply via email to