This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch test_wal_sync
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/test_wal_sync by this push:
new abbf55208b add time consumed metrics for wal construction
abbf55208b is described below
commit abbf55208b439ba7b8273240f3c80cf80f5d1b04
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Mon Jun 27 21:14:21 2022 +0800
add time consumed metrics for wal construction
---
.../multileader/logdispatcher/LogDispatcher.java | 50 ++++++++++++++--------
1 file changed, 31 insertions(+), 19 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 9d592d004e..087076dfa2 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -52,7 +52,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-/** Manage all asynchronous replication threads and corresponding async
clients */
+/**
+ * Manage all asynchronous replication threads and corresponding async clients
+ */
public class LogDispatcher {
private final Logger logger = LoggerFactory.getLogger(LogDispatcher.class);
@@ -287,27 +289,37 @@ public class LogDispatcher {
private long constructBatchFromWAL(
long currentIndex, long maxIndex, List<TLogBatch> logBatches) {
- if (iteratorIndex != currentIndex) {
- walEntryiterator.skipTo(currentIndex);
- iteratorIndex = currentIndex;
- }
-
- while (currentIndex < maxIndex
- && logBatches.size() <
config.getReplication().getMaxRequestPerBatch()) {
- try {
- walEntryiterator.waitForNextReady();
- } catch (InterruptedException e) {
- e.printStackTrace();
+ long startTime = System.nanoTime();
+ int count = 0;
+ try {
+ if (iteratorIndex != currentIndex) {
+ walEntryiterator.skipTo(currentIndex);
+ iteratorIndex = currentIndex;
}
- // TODO iterator
- IConsensusRequest data = walEntryiterator.next();
- iteratorIndex++;
- currentIndex++;
- if (data != null) {
- logBatches.add(new TLogBatch(data.serializeToByteBuffer()));
+ while (currentIndex < maxIndex
+ && logBatches.size() <
config.getReplication().getMaxRequestPerBatch()) {
+ try {
+ walEntryiterator.waitForNextReady();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ // TODO iterator
+ IConsensusRequest data = walEntryiterator.next();
+ iteratorIndex++;
+ currentIndex++;
+ if (data != null) {
+ logBatches.add(new TLogBatch(data.serializeToByteBuffer()));
+ count++;
+ }
}
+ return currentIndex - 1;
+ } finally {
+ double timeConsumed = (System.nanoTime() * 1.0 - startTime) / 1000_000;
+ logger.info(
+ String.format(
+ "[DataRegion[%s]->%s]construct batch[%d] time consumed: %.3f",
+ peer.getGroupId().getId(), peer.getEndpoint().ip, count,
timeConsumed));
}
- return currentIndex - 1;
}
private void constructBatchIndexedFromConsensusRequest(