This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 83e1071d19 [IOTDB-3734] Set safely deleted search index directly in 
multi-leader (#6617)
83e1071d19 is described below

commit 83e1071d19169d6ea4e335ac1691a15df8b15fd5
Author: Alan Choo <[email protected]>
AuthorDate: Thu Jul 7 23:53:11 2022 +0800

    [IOTDB-3734] Set safely deleted search index directly in multi-leader 
(#6617)
---
 .../common/request/IndexedConsensusRequest.java    |  6 ++++++
 .../multileader/MultiLeaderServerImpl.java         |  6 ++----
 .../multileader/logdispatcher/LogDispatcher.java   |  4 ++++
 .../multileader/wal/ConsensusReqReader.java        |  6 ++++++
 .../multileader/util/FakeConsensusReqReader.java   |  3 +++
 .../multileader/util/TestStateMachine.java         |  1 -
 .../statemachine/DataRegionStateMachine.java       |  3 ---
 .../plan/node/write/InsertMultiTabletsNode.java    |  6 ------
 .../plan/planner/plan/node/write/InsertNode.java   | 22 ++--------------------
 .../planner/plan/node/write/InsertRowsNode.java    |  6 ------
 .../plan/node/write/InsertRowsOfOneDeviceNode.java |  6 ------
 .../db/wal/allocation/FirstCreateStrategy.java     |  5 +++--
 .../org/apache/iotdb/db/wal/node/WALFakeNode.java  |  5 +++++
 .../java/org/apache/iotdb/db/wal/node/WALNode.java | 16 ++++++----------
 14 files changed, 37 insertions(+), 58 deletions(-)

diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index 3578fd5656..0eaade43f5 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.consensus.common.request;
 
+import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
+
 import java.nio.ByteBuffer;
 import java.util.Objects;
 
@@ -32,6 +34,10 @@ public class IndexedConsensusRequest implements 
IConsensusRequest {
 
   private final IConsensusRequest request;
 
+  public IndexedConsensusRequest(long searchIndex, IConsensusRequest request) {
+    this(searchIndex, ConsensusReqReader.DEFAULT_SAFELY_DELETED_SEARCH_INDEX, 
request);
+  }
+
   public IndexedConsensusRequest(
       long searchIndex, long safelyDeletedSearchIndex, IConsensusRequest 
request) {
     this.searchIndex = searchIndex;
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 93f9cf567b..76023e6814 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -177,14 +177,12 @@ public class MultiLeaderServerImpl {
 
   public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
       IConsensusRequest request) {
-    return new IndexedConsensusRequest(
-        index.incrementAndGet(), getCurrentSafelyDeletedSearchIndex(), 
request);
+    return new IndexedConsensusRequest(index.incrementAndGet(), request);
   }
 
   public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
       ByteBufferConsensusRequest request) {
-    return new IndexedConsensusRequest(
-        ConsensusReqReader.DEFAULT_SEARCH_INDEX, 
getCurrentSafelyDeletedSearchIndex(), request);
+    return new 
IndexedConsensusRequest(ConsensusReqReader.DEFAULT_SEARCH_INDEX, request);
   }
 
   /**
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index ae60598061..ce6adc5fff 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -201,6 +201,10 @@ public class LogDispatcher {
           syncStatus.addNextBatch(batch);
           // sends batch asynchronously and migrates the retry logic into the 
callback handler
           sendBatchAsync(batch, new DispatchLogHandler(this, batch));
+          // update safely deleted search index to delete outdated info,
+          // indicating that insert nodes whose search index are before this 
value can be deleted
+          // safely
+          
reader.setSafelyDeletedSearchIndex(impl.getCurrentSafelyDeletedSearchIndex());
         }
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
index a39e3186a4..ea462924e4 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
@@ -30,8 +30,14 @@ import java.util.concurrent.TimeoutException;
 /** This interface provides search interface for consensus requests via index. 
*/
 public interface ConsensusReqReader {
 
+  /** this insert node doesn't need to participate in multi-leader consensus */
   long DEFAULT_SEARCH_INDEX = -1;
 
+  /** multi-leader consensus cannot delete any insert nodes */
+  long DEFAULT_SAFELY_DELETED_SEARCH_INDEX = Long.MIN_VALUE;
+
+  void setSafelyDeletedSearchIndex(long safelyDeletedSearchIndex);
+
   /**
    * Gets the consensus request at the specified position.
    *
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java
index 01d90b62cc..a7f7134940 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java
@@ -36,6 +36,9 @@ public class FakeConsensusReqReader implements 
ConsensusReqReader, DataSet {
     this.requestSets = requestSets;
   }
 
+  @Override
+  public void setSafelyDeletedSearchIndex(long safelyDeletedSearchIndex) {}
+
   @Override
   public IConsensusRequest getReq(long index) {
     synchronized (requestSets) {
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
index a7803ba689..1a9507da07 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
@@ -65,7 +65,6 @@ public class TestStateMachine implements IStateMachine, 
IStateMachine.EventApi {
         requestSets.add(
             new IndexedConsensusRequest(
                 ((IndexedConsensusRequest) request).getSearchIndex(),
-                -1,
                 new TestEntry(buffer.getInt(), Peer.deserialize(buffer))),
             false);
       } else {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index ad33a65ae2..8ed5bbb7cd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -106,9 +106,6 @@ public class DataRegionStateMachine extends 
BaseStateMachine {
         if (planNode instanceof InsertNode) {
           ((InsertNode) planNode)
               .setSearchIndex(((IndexedConsensusRequest) 
request).getSearchIndex());
-          ((InsertNode) planNode)
-              .setSafelyDeletedSearchIndex(
-                  ((IndexedConsensusRequest) 
request).getSafelyDeletedSearchIndex());
         }
       } else {
         planNode = getPlanNode(request);
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 1d1a0f682b..6085a55d8f 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -135,12 +135,6 @@ public class InsertMultiTabletsNode extends InsertNode 
implements BatchInsertNod
     insertTabletNodeList.forEach(plan -> plan.setSearchIndex(index));
   }
 
-  @Override
-  public void setSafelyDeletedSearchIndex(long index) {
-    safelyDeletedSearchIndex = index;
-    insertTabletNodeList.forEach(plan -> 
plan.setSafelyDeletedSearchIndex(index));
-  }
-
   @Override
   public boolean validateAndSetSchema(SchemaTree schemaTree) {
     for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index aa3f6aaebb..0b773ba147 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
 
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
 import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
@@ -32,9 +33,6 @@ import 
org.apache.iotdb.tsfile.exception.NotImplementedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
@@ -48,11 +46,8 @@ import java.util.stream.Collectors;
 
 public abstract class InsertNode extends WritePlanNode {
 
-  private final Logger logger = LoggerFactory.getLogger(InsertNode.class);
   /** this insert node doesn't need to participate in multi-leader consensus */
-  public static final long NO_CONSENSUS_INDEX = -1;
-  /** no multi-leader consensus, all insert nodes can be safely deleted */
-  public static final long DEFAULT_SAFELY_DELETED_SEARCH_INDEX = 
Long.MAX_VALUE;
+  public static final long NO_CONSENSUS_INDEX = 
ConsensusReqReader.DEFAULT_SEARCH_INDEX;
 
   /**
    * if use id table, this filed is id form of device path <br>
@@ -81,11 +76,6 @@ public abstract class InsertNode extends WritePlanNode {
    * value should start from 1
    */
   protected long searchIndex = NO_CONSENSUS_INDEX;
-  /**
-   * this index pass info to wal, indicating that insert nodes whose search 
index are before this
-   * value can be deleted safely
-   */
-  protected long safelyDeletedSearchIndex = 
DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
 
   /** Physical address of data region after splitting */
   protected TRegionReplicaSet dataRegionReplicaSet;
@@ -168,14 +158,6 @@ public abstract class InsertNode extends WritePlanNode {
     this.searchIndex = searchIndex;
   }
 
-  public long getSafelyDeletedSearchIndex() {
-    return safelyDeletedSearchIndex;
-  }
-
-  public void setSafelyDeletedSearchIndex(long safelyDeletedSearchIndex) {
-    this.safelyDeletedSearchIndex = safelyDeletedSearchIndex;
-  }
-
   @Override
   protected void serializeAttributes(ByteBuffer byteBuffer) {
     throw new NotImplementedException("serializeAttributes of InsertNode is 
not implemented");
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
index 509886aa21..a81d7198e1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
@@ -103,12 +103,6 @@ public class InsertRowsNode extends InsertNode implements 
BatchInsertNode {
     insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
   }
 
-  @Override
-  public void setSafelyDeletedSearchIndex(long index) {
-    safelyDeletedSearchIndex = index;
-    insertRowNodeList.forEach(plan -> plan.setSafelyDeletedSearchIndex(index));
-  }
-
   public Map<Integer, TSStatus> getResults() {
     return results;
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index fc32377954..5f9772f699 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -91,12 +91,6 @@ public class InsertRowsOfOneDeviceNode extends InsertNode 
implements BatchInsert
     insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
   }
 
-  @Override
-  public void setSafelyDeletedSearchIndex(long index) {
-    safelyDeletedSearchIndex = index;
-    insertRowNodeList.forEach(plan -> plan.setSafelyDeletedSearchIndex(index));
-  }
-
   public TSStatus[] getFailingStatus() {
     return StatusUtils.getFailingStatus(results, insertRowNodeList.size());
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
 
b/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
index 92c102b464..2d7d3858b1 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.db.wal.allocation;
 
 import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
 import org.apache.iotdb.db.wal.node.IWALNode;
 import org.apache.iotdb.db.wal.node.WALNode;
 
@@ -52,7 +53,7 @@ public class FirstCreateStrategy extends 
AbstractNodeAllocationStrategy {
       IWALNode walNode = createWALNode(applicantUniqueId);
       if (walNode instanceof WALNode) {
         // avoid deletion
-        ((WALNode) walNode).setSafelyDeletedSearchIndex(Long.MIN_VALUE);
+        
walNode.setSafelyDeletedSearchIndex(ConsensusReqReader.DEFAULT_SAFELY_DELETED_SEARCH_INDEX);
         identifier2Nodes.put(applicantUniqueId, (WALNode) walNode);
       }
       return walNode;
@@ -73,7 +74,7 @@ public class FirstCreateStrategy extends 
AbstractNodeAllocationStrategy {
           createWALNode(applicantUniqueId, logDirectory, startFileVersion, 
startSearchIndex);
       if (walNode instanceof WALNode) {
         // avoid deletion
-        ((WALNode) walNode).setSafelyDeletedSearchIndex(Long.MIN_VALUE);
+        
walNode.setSafelyDeletedSearchIndex(ConsensusReqReader.DEFAULT_SAFELY_DELETED_SEARCH_INDEX);
         identifier2Nodes.put(applicantUniqueId, (WALNode) walNode);
       }
     } finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java 
b/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
index 54a1ba09a0..22030be9e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
@@ -99,6 +99,11 @@ public class WALFakeNode implements IWALNode {
     // do nothing
   }
 
+  @Override
+  public void setSafelyDeletedSearchIndex(long safelyDeletedSearchIndex) {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public IConsensusRequest getReq(long index) {
     throw new UnsupportedOperationException();
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java 
b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 1709ee22ec..3e50041e73 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -77,8 +77,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import static 
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode.DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
-
 /**
  * This class encapsulates {@link IWALBuffer} and {@link CheckpointManager}. 
If search is enabled,
  * the order of search index should be protected by the upper layer, and the 
value should start from
@@ -87,6 +85,8 @@ import static 
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode.DE
 public class WALNode implements IWALNode {
   private static final Logger logger = LoggerFactory.getLogger(WALNode.class);
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+  /** no multi-leader consensus, all insert nodes can be safely deleted */
+  public static final long DEFAULT_SAFELY_DELETED_SEARCH_INDEX = 
Long.MAX_VALUE;
 
   /** unique identifier of this WALNode */
   private final String identifier;
@@ -135,9 +135,6 @@ public class WALNode implements IWALNode {
 
   @Override
   public WALFlushListener log(long memTableId, InsertRowNode insertRowNode) {
-    if (insertRowNode.getSafelyDeletedSearchIndex() != 
DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
-      safelyDeletedSearchIndex = insertRowNode.getSafelyDeletedSearchIndex();
-    }
     WALEntry walEntry = new WALEntry(memTableId, insertRowNode);
     return log(walEntry);
   }
@@ -152,9 +149,6 @@ public class WALNode implements IWALNode {
   @Override
   public WALFlushListener log(
       long memTableId, InsertTabletNode insertTabletNode, int start, int end) {
-    if (insertTabletNode.getSafelyDeletedSearchIndex() != 
DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
-      safelyDeletedSearchIndex = 
insertTabletNode.getSafelyDeletedSearchIndex();
-    }
     WALEntry walEntry = new WALEntry(memTableId, insertTabletNode, start, end);
     return log(walEntry);
   }
@@ -235,7 +229,7 @@ public class WALNode implements IWALNode {
         }
       }
 
-      logger.info(
+      logger.debug(
           "Start deleting outdated wal files for wal node-{}, the first valid 
version id is {}, and the safely deleted search index is {}.",
           identifier,
           firstValidVersionId,
@@ -464,6 +458,7 @@ public class WALNode implements IWALNode {
   // endregion
 
   // region Search interfaces for consensus group
+  @Override
   public void setSafelyDeletedSearchIndex(long safelyDeletedSearchIndex) {
     this.safelyDeletedSearchIndex = safelyDeletedSearchIndex;
   }
@@ -692,7 +687,8 @@ public class WALNode implements IWALNode {
       if (needUpdatingFilesToSearch || filesToSearch == null) {
         updateFilesToSearch();
         if (needUpdatingFilesToSearch) {
-          logger.warn("update file to search failed");
+          logger.debug(
+              "update file to search failed, the next search index is {}", 
nextSearchIndex);
           return false;
         }
       }

Reply via email to