This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch test_wal_sync
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 1d5b65520762fa5787146705884515dd562f435a
Author: Jinrui.Zhang <[email protected]>
AuthorDate: Mon Jun 27 18:07:05 2022 +0800

    change wal sync to iterator
---
 .../multileader/logdispatcher/LogDispatcher.java       | 18 +++++++++++++++++-
 .../java/org/apache/iotdb/db/wal/node/WALNode.java     |  2 +-
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index 1d651a0ba5..9d592d004e 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -137,6 +137,9 @@ public class LogDispatcher {
         (ConsensusReqReader) impl.getStateMachine().read(new 
GetConsensusReqReaderPlan());
     private volatile boolean stopped = false;
 
+    private ConsensusReqReader.ReqIterator walEntryiterator;
+    private long iteratorIndex = 1;
+
     public LogDispatcherThread(Peer peer, MultiLeaderConfig config) {
       this.peer = peer;
       this.config = config;
@@ -146,6 +149,7 @@ public class LogDispatcher {
           new IndexController(
               impl.getStorageDir(), 
Utils.fromTEndPointToString(peer.getEndpoint()), false);
       this.syncStatus = new SyncStatus(controller, config);
+      this.walEntryiterator = reader.getReqIterator(iteratorIndex);
     }
 
     public IndexController getController() {
@@ -283,10 +287,22 @@ public class LogDispatcher {
 
     private long constructBatchFromWAL(
         long currentIndex, long maxIndex, List<TLogBatch> logBatches) {
+      if (iteratorIndex != currentIndex) {
+        walEntryiterator.skipTo(currentIndex);
+        iteratorIndex = currentIndex;
+      }
+
       while (currentIndex < maxIndex
           && logBatches.size() < 
config.getReplication().getMaxRequestPerBatch()) {
+        try {
+          walEntryiterator.waitForNextReady();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
         // TODO iterator
-        IConsensusRequest data = reader.getReq(currentIndex++);
+        IConsensusRequest data = walEntryiterator.next();
+        iteratorIndex++;
+        currentIndex++;
         if (data != null) {
           logBatches.add(new TLogBatch(data.serializeToByteBuffer()));
         }
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java 
b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 5d85114446..8f207cee32 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -237,7 +237,7 @@ public class WALNode implements IWALNode {
         }
       }
 
-      logger.debug(
+      logger.info(
           "Start deleting outdated wal files for wal node-{}, the first valid 
version id is {}, and the safely deleted search index is {}.",
           identifier,
           firstValidVersionId,

Reply via email to