This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch ml_optimize_reader
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 05539bcdfab2cd082372d18e5744498a1d8fcc7e
Author: ZhangHongYin <[email protected]>
AuthorDate: Tue Sep 13 10:47:26 2022 +0800

    [IOTDB-4372] adapt PlanNodeIterator. (#7275)
---
 .../multileader/logdispatcher/LogDispatcher.java   | 26 ++++++++++++++++++++--
 1 file changed, 24 insertions(+), 2 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index e3539752be..abd491dc5d 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -361,8 +361,30 @@ public class LogDispatcher {
           logger.warn("wait for next WAL entry is interrupted");
         }
         IndexedConsensusRequest data = walEntryiterator.next();
-        currentIndex = data.getSearchIndex();
-        iteratorIndex = currentIndex;
+        if (currentIndex > data.getSearchIndex()) {
+          // if the index of request is smaller than currentIndex, then 
continue
+          logger.warn(
+              "search for one Entry which index is {}, but find a smaller one, 
index : {}",
+              currentIndex,
+              data.getSearchIndex());
+          continue;
+        } else if (currentIndex < data.getSearchIndex()) {
+          logger.warn(
+              "search for one Entry which index is {}, but find a larger one, 
index : {}",
+              currentIndex,
+              data.getSearchIndex());
+          if (data.getSearchIndex() >= maxIndex) {
+            // if the index of request is larger than maxIndex, then finish
+            walEntryiterator.skipTo(currentIndex);
+            break;
+          }
+          // if the index of request is larger than currentIndex, and smaller 
than maxIndex, then
+          // skip to index
+          currentIndex = data.getSearchIndex();
+          walEntryiterator.skipTo(currentIndex);
+          iteratorIndex = currentIndex;
+        }
+        // construct request from wal
         for (IConsensusRequest innerRequest : data.getRequests()) {
           logBatches.add(new TLogBatch(innerRequest.serializeToByteBuffer(), 
currentIndex, true));
         }

Reply via email to