This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch iotdb-3791
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/iotdb-3791 by this push:
new bcd3f12c61 diable SyncLogProcessor
bcd3f12c61 is described below
commit bcd3f12c618d1b7d0f0ca1fcbb413ec06e1c8761
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Thu Jul 14 18:09:15 2022 +0800
diable SyncLogProcessor
---
.../multileader/logdispatcher/LogDispatcher.java | 3 +-
.../service/MultiLeaderRPCServiceProcessor.java | 72 ++++++++++------------
2 files changed, 34 insertions(+), 41 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 0685799915..64e037aaf6 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -192,6 +192,7 @@ public class LogDispatcher {
// we may block here if there is no requests in the queue
IndexedConsensusRequest request =
pendingRequest.poll(PENDING_REQUEST_TAKING_TIME_OUT_IN_SEC,
TimeUnit.SECONDS);
+ getBatchStartTime = System.nanoTime();
if (request != null) {
bufferedRequest.add(request);
// If write pressure is low, we simply sleep a little to reduce
the number of RPC
@@ -200,7 +201,6 @@ public class LogDispatcher {
}
}
}
- StepTracker.trace("getBatch()", 10, getBatchStartTime,
System.nanoTime());
// we may block here if the synchronization pipeline is full
StepTracker.trace("getBatch()", 10, getBatchStartTime,
System.nanoTime());
@@ -352,6 +352,7 @@ public class LogDispatcher {
StepTracker.trace("walEntryiterator.next()", 400, nextStartTime,
System.nanoTime());
currentIndex = data.getSearchIndex();
iteratorIndex = currentIndex;
+ StepTracker.trace("walDataRequestSize", 400, 0,
data.getRequests().size() * 1000_000L);
for (IConsensusRequest innerRequest : data.getRequests()) {
long newTlogBatchStartTime = System.nanoTime();
logBatches.add(new TLogBatch(innerRequest.serializeToByteBuffer(),
true));
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
index 87555bbea0..4041b50e0e 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/service/MultiLeaderRPCServiceProcessor.java
@@ -19,26 +19,17 @@
package org.apache.iotdb.consensus.multileader.service;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.StepTracker;
-import org.apache.iotdb.commons.consensus.ConsensusGroupId;
-import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
-import org.apache.iotdb.consensus.common.request.MultiLeaderConsensusRequest;
import org.apache.iotdb.consensus.multileader.MultiLeaderConsensus;
-import org.apache.iotdb.consensus.multileader.MultiLeaderServerImpl;
import
org.apache.iotdb.consensus.multileader.thrift.MultiLeaderConsensusIService;
-import org.apache.iotdb.consensus.multileader.thrift.TLogBatch;
import org.apache.iotdb.consensus.multileader.thrift.TSyncLogReq;
import org.apache.iotdb.consensus.multileader.thrift.TSyncLogRes;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.thrift.async.AsyncMethodCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
public class MultiLeaderRPCServiceProcessor implements
MultiLeaderConsensusIService.AsyncIface {
@@ -54,37 +45,38 @@ public class MultiLeaderRPCServiceProcessor implements
MultiLeaderConsensusIServ
public void syncLog(TSyncLogReq req, AsyncMethodCallback<TSyncLogRes>
resultHandler) {
long startTime = System.nanoTime();
try {
- ConsensusGroupId groupId =
-
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
- MultiLeaderServerImpl impl = consensus.getImpl(groupId);
- if (impl == null) {
- String message =
- String.format(
- "Unexpected consensusGroupId %s for TSyncLogReq which size is
%s",
- groupId, req.getBatches().size());
- logger.error(message);
- TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
- status.setMessage(message);
- resultHandler.onComplete(new
TSyncLogRes(Collections.singletonList(status)));
- return;
- }
- List<TSStatus> statuses = new ArrayList<>();
- // We use synchronized to ensure atomicity of executing multiple logs
- synchronized (impl.getStateMachine()) {
- for (TLogBatch batch : req.getBatches()) {
- long writeOneBatch = System.nanoTime();
- statuses.add(
- impl.getStateMachine()
- .write(
- impl.buildIndexedConsensusRequestForRemoteRequest(
- batch.isFromWAL()
- ? new MultiLeaderConsensusRequest(batch.data)
- : new ByteBufferConsensusRequest(batch.data))));
- StepTracker.trace("writeOneBatch", 400, writeOneBatch,
System.nanoTime());
- }
- }
- logger.debug("Execute TSyncLogReq for {} with result {}",
req.consensusGroupId, statuses);
- resultHandler.onComplete(new TSyncLogRes(statuses));
+ // ConsensusGroupId groupId =
+ //
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.getConsensusGroupId());
+ // MultiLeaderServerImpl impl = consensus.getImpl(groupId);
+ // if (impl == null) {
+ // String message =
+ // String.format(
+ // "Unexpected consensusGroupId %s for TSyncLogReq which
size is %s",
+ // groupId, req.getBatches().size());
+ // logger.error(message);
+ // TSStatus status = new
TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
+ // status.setMessage(message);
+ // resultHandler.onComplete(new
TSyncLogRes(Collections.singletonList(status)));
+ // return;
+ // }
+ // List<TSStatus> statuses = new ArrayList<>();
+ // // We use synchronized to ensure atomicity of executing multiple
logs
+ // synchronized (impl.getStateMachine()) {
+ // for (TLogBatch batch : req.getBatches()) {
+ // long writeOneBatch = System.nanoTime();
+ // statuses.add(
+ // impl.getStateMachine()
+ // .write(
+ //
impl.buildIndexedConsensusRequestForRemoteRequest(
+ // batch.isFromWAL()
+ // ? new
MultiLeaderConsensusRequest(batch.data)
+ // : new
ByteBufferConsensusRequest(batch.data))));
+ // StepTracker.trace("writeOneBatch", 400, writeOneBatch,
System.nanoTime());
+ // }
+ // }
+ // logger.debug("Execute TSyncLogReq for {} with result {}",
req.consensusGroupId,
+ // statuses);
+ resultHandler.onComplete(new TSyncLogRes(new ArrayList<>()));
} catch (Exception e) {
resultHandler.onError(e);
} finally {