This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 83e1071d19 [IOTDB-3734] Set safely deleted search index directly in
multi-leader (#6617)
83e1071d19 is described below
commit 83e1071d19169d6ea4e335ac1691a15df8b15fd5
Author: Alan Choo <[email protected]>
AuthorDate: Thu Jul 7 23:53:11 2022 +0800
[IOTDB-3734] Set safely deleted search index directly in multi-leader
(#6617)
---
.../common/request/IndexedConsensusRequest.java | 6 ++++++
.../multileader/MultiLeaderServerImpl.java | 6 ++----
.../multileader/logdispatcher/LogDispatcher.java | 4 ++++
.../multileader/wal/ConsensusReqReader.java | 6 ++++++
.../multileader/util/FakeConsensusReqReader.java | 3 +++
.../multileader/util/TestStateMachine.java | 1 -
.../statemachine/DataRegionStateMachine.java | 3 ---
.../plan/node/write/InsertMultiTabletsNode.java | 6 ------
.../plan/planner/plan/node/write/InsertNode.java | 22 ++--------------------
.../planner/plan/node/write/InsertRowsNode.java | 6 ------
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 6 ------
.../db/wal/allocation/FirstCreateStrategy.java | 5 +++--
.../org/apache/iotdb/db/wal/node/WALFakeNode.java | 5 +++++
.../java/org/apache/iotdb/db/wal/node/WALNode.java | 16 ++++++----------
14 files changed, 37 insertions(+), 58 deletions(-)
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index 3578fd5656..0eaade43f5 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.consensus.common.request;
+import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
+
import java.nio.ByteBuffer;
import java.util.Objects;
@@ -32,6 +34,10 @@ public class IndexedConsensusRequest implements
IConsensusRequest {
private final IConsensusRequest request;
+ public IndexedConsensusRequest(long searchIndex, IConsensusRequest request) {
+ this(searchIndex, ConsensusReqReader.DEFAULT_SAFELY_DELETED_SEARCH_INDEX,
request);
+ }
+
public IndexedConsensusRequest(
long searchIndex, long safelyDeletedSearchIndex, IConsensusRequest
request) {
this.searchIndex = searchIndex;
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
index 93f9cf567b..76023e6814 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/MultiLeaderServerImpl.java
@@ -177,14 +177,12 @@ public class MultiLeaderServerImpl {
public IndexedConsensusRequest buildIndexedConsensusRequestForLocalRequest(
IConsensusRequest request) {
- return new IndexedConsensusRequest(
- index.incrementAndGet(), getCurrentSafelyDeletedSearchIndex(),
request);
+ return new IndexedConsensusRequest(index.incrementAndGet(), request);
}
public IndexedConsensusRequest buildIndexedConsensusRequestForRemoteRequest(
ByteBufferConsensusRequest request) {
- return new IndexedConsensusRequest(
- ConsensusReqReader.DEFAULT_SEARCH_INDEX,
getCurrentSafelyDeletedSearchIndex(), request);
+ return new
IndexedConsensusRequest(ConsensusReqReader.DEFAULT_SEARCH_INDEX, request);
}
/**
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
index ae60598061..ce6adc5fff 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/logdispatcher/LogDispatcher.java
@@ -201,6 +201,10 @@ public class LogDispatcher {
syncStatus.addNextBatch(batch);
// sends batch asynchronously and migrates the retry logic into the
callback handler
sendBatchAsync(batch, new DispatchLogHandler(this, batch));
+ // update safely deleted search index to delete outdated info,
+ // indicating that insert nodes whose search index are before this
value can be deleted
+ // safely
+
reader.setSafelyDeletedSearchIndex(impl.getCurrentSafelyDeletedSearchIndex());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
index a39e3186a4..ea462924e4 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/multileader/wal/ConsensusReqReader.java
@@ -30,8 +30,14 @@ import java.util.concurrent.TimeoutException;
/** This interface provides search interface for consensus requests via index.
*/
public interface ConsensusReqReader {
+ /** this insert node doesn't need to participate in multi-leader consensus */
long DEFAULT_SEARCH_INDEX = -1;
+ /** multi-leader consensus cannot delete any insert nodes */
+ long DEFAULT_SAFELY_DELETED_SEARCH_INDEX = Long.MIN_VALUE;
+
+ void setSafelyDeletedSearchIndex(long safelyDeletedSearchIndex);
+
/**
* Gets the consensus request at the specified position.
*
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java
index 01d90b62cc..a7f7134940 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/FakeConsensusReqReader.java
@@ -36,6 +36,9 @@ public class FakeConsensusReqReader implements
ConsensusReqReader, DataSet {
this.requestSets = requestSets;
}
+ @Override
+ public void setSafelyDeletedSearchIndex(long safelyDeletedSearchIndex) {}
+
@Override
public IConsensusRequest getReq(long index) {
synchronized (requestSets) {
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
index a7803ba689..1a9507da07 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/multileader/util/TestStateMachine.java
@@ -65,7 +65,6 @@ public class TestStateMachine implements IStateMachine,
IStateMachine.EventApi {
requestSets.add(
new IndexedConsensusRequest(
((IndexedConsensusRequest) request).getSearchIndex(),
- -1,
new TestEntry(buffer.getInt(), Peer.deserialize(buffer))),
false);
} else {
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index ad33a65ae2..8ed5bbb7cd 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -106,9 +106,6 @@ public class DataRegionStateMachine extends
BaseStateMachine {
if (planNode instanceof InsertNode) {
((InsertNode) planNode)
.setSearchIndex(((IndexedConsensusRequest)
request).getSearchIndex());
- ((InsertNode) planNode)
- .setSafelyDeletedSearchIndex(
- ((IndexedConsensusRequest)
request).getSafelyDeletedSearchIndex());
}
} else {
planNode = getPlanNode(request);
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 1d1a0f682b..6085a55d8f 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -135,12 +135,6 @@ public class InsertMultiTabletsNode extends InsertNode
implements BatchInsertNod
insertTabletNodeList.forEach(plan -> plan.setSearchIndex(index));
}
- @Override
- public void setSafelyDeletedSearchIndex(long index) {
- safelyDeletedSearchIndex = index;
- insertTabletNodeList.forEach(plan ->
plan.setSafelyDeletedSearchIndex(index));
- }
-
@Override
public boolean validateAndSetSchema(SchemaTree schemaTree) {
for (InsertTabletNode insertTabletNode : insertTabletNodeList) {
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
index aa3f6aaebb..0b773ba147 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertNode.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.mpp.plan.planner.plan.node.write;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
import org.apache.iotdb.db.metadata.idtable.entry.IDeviceID;
@@ -32,9 +33,6 @@ import
org.apache.iotdb.tsfile.exception.NotImplementedException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -48,11 +46,8 @@ import java.util.stream.Collectors;
public abstract class InsertNode extends WritePlanNode {
- private final Logger logger = LoggerFactory.getLogger(InsertNode.class);
/** this insert node doesn't need to participate in multi-leader consensus */
- public static final long NO_CONSENSUS_INDEX = -1;
- /** no multi-leader consensus, all insert nodes can be safely deleted */
- public static final long DEFAULT_SAFELY_DELETED_SEARCH_INDEX =
Long.MAX_VALUE;
+ public static final long NO_CONSENSUS_INDEX =
ConsensusReqReader.DEFAULT_SEARCH_INDEX;
/**
* if use id table, this filed is id form of device path <br>
@@ -81,11 +76,6 @@ public abstract class InsertNode extends WritePlanNode {
* value should start from 1
*/
protected long searchIndex = NO_CONSENSUS_INDEX;
- /**
- * this index pass info to wal, indicating that insert nodes whose search
index are before this
- * value can be deleted safely
- */
- protected long safelyDeletedSearchIndex =
DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
/** Physical address of data region after splitting */
protected TRegionReplicaSet dataRegionReplicaSet;
@@ -168,14 +158,6 @@ public abstract class InsertNode extends WritePlanNode {
this.searchIndex = searchIndex;
}
- public long getSafelyDeletedSearchIndex() {
- return safelyDeletedSearchIndex;
- }
-
- public void setSafelyDeletedSearchIndex(long safelyDeletedSearchIndex) {
- this.safelyDeletedSearchIndex = safelyDeletedSearchIndex;
- }
-
@Override
protected void serializeAttributes(ByteBuffer byteBuffer) {
throw new NotImplementedException("serializeAttributes of InsertNode is
not implemented");
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
index 509886aa21..a81d7198e1 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsNode.java
@@ -103,12 +103,6 @@ public class InsertRowsNode extends InsertNode implements
BatchInsertNode {
insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
}
- @Override
- public void setSafelyDeletedSearchIndex(long index) {
- safelyDeletedSearchIndex = index;
- insertRowNodeList.forEach(plan -> plan.setSafelyDeletedSearchIndex(index));
- }
-
public Map<Integer, TSStatus> getResults() {
return results;
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index fc32377954..5f9772f699 100644
---
a/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++
b/server/src/main/java/org/apache/iotdb/db/mpp/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -91,12 +91,6 @@ public class InsertRowsOfOneDeviceNode extends InsertNode
implements BatchInsert
insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
}
- @Override
- public void setSafelyDeletedSearchIndex(long index) {
- safelyDeletedSearchIndex = index;
- insertRowNodeList.forEach(plan -> plan.setSafelyDeletedSearchIndex(index));
- }
-
public TSStatus[] getFailingStatus() {
return StatusUtils.getFailingStatus(results, insertRowNodeList.size());
}
diff --git
a/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
b/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
index 92c102b464..2d7d3858b1 100644
---
a/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
+++
b/server/src/main/java/org/apache/iotdb/db/wal/allocation/FirstCreateStrategy.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.wal.allocation;
import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.consensus.multileader.wal.ConsensusReqReader;
import org.apache.iotdb.db.wal.node.IWALNode;
import org.apache.iotdb.db.wal.node.WALNode;
@@ -52,7 +53,7 @@ public class FirstCreateStrategy extends
AbstractNodeAllocationStrategy {
IWALNode walNode = createWALNode(applicantUniqueId);
if (walNode instanceof WALNode) {
// avoid deletion
- ((WALNode) walNode).setSafelyDeletedSearchIndex(Long.MIN_VALUE);
+
walNode.setSafelyDeletedSearchIndex(ConsensusReqReader.DEFAULT_SAFELY_DELETED_SEARCH_INDEX);
identifier2Nodes.put(applicantUniqueId, (WALNode) walNode);
}
return walNode;
@@ -73,7 +74,7 @@ public class FirstCreateStrategy extends
AbstractNodeAllocationStrategy {
createWALNode(applicantUniqueId, logDirectory, startFileVersion,
startSearchIndex);
if (walNode instanceof WALNode) {
// avoid deletion
- ((WALNode) walNode).setSafelyDeletedSearchIndex(Long.MIN_VALUE);
+
walNode.setSafelyDeletedSearchIndex(ConsensusReqReader.DEFAULT_SAFELY_DELETED_SEARCH_INDEX);
identifier2Nodes.put(applicantUniqueId, (WALNode) walNode);
}
} finally {
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
b/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
index 54a1ba09a0..22030be9e8 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALFakeNode.java
@@ -99,6 +99,11 @@ public class WALFakeNode implements IWALNode {
// do nothing
}
+ @Override
+ public void setSafelyDeletedSearchIndex(long safelyDeletedSearchIndex) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public IConsensusRequest getReq(long index) {
throw new UnsupportedOperationException();
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
index 1709ee22ec..3e50041e73 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/node/WALNode.java
@@ -77,8 +77,6 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import static
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode.DEFAULT_SAFELY_DELETED_SEARCH_INDEX;
-
/**
* This class encapsulates {@link IWALBuffer} and {@link CheckpointManager}.
If search is enabled,
* the order of search index should be protected by the upper layer, and the
value should start from
@@ -87,6 +85,8 @@ import static
org.apache.iotdb.db.mpp.plan.planner.plan.node.write.InsertNode.DE
public class WALNode implements IWALNode {
private static final Logger logger = LoggerFactory.getLogger(WALNode.class);
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
+ /** no multi-leader consensus, all insert nodes can be safely deleted */
+ public static final long DEFAULT_SAFELY_DELETED_SEARCH_INDEX =
Long.MAX_VALUE;
/** unique identifier of this WALNode */
private final String identifier;
@@ -135,9 +135,6 @@ public class WALNode implements IWALNode {
@Override
public WALFlushListener log(long memTableId, InsertRowNode insertRowNode) {
- if (insertRowNode.getSafelyDeletedSearchIndex() !=
DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
- safelyDeletedSearchIndex = insertRowNode.getSafelyDeletedSearchIndex();
- }
WALEntry walEntry = new WALEntry(memTableId, insertRowNode);
return log(walEntry);
}
@@ -152,9 +149,6 @@ public class WALNode implements IWALNode {
@Override
public WALFlushListener log(
long memTableId, InsertTabletNode insertTabletNode, int start, int end) {
- if (insertTabletNode.getSafelyDeletedSearchIndex() !=
DEFAULT_SAFELY_DELETED_SEARCH_INDEX) {
- safelyDeletedSearchIndex =
insertTabletNode.getSafelyDeletedSearchIndex();
- }
WALEntry walEntry = new WALEntry(memTableId, insertTabletNode, start, end);
return log(walEntry);
}
@@ -235,7 +229,7 @@ public class WALNode implements IWALNode {
}
}
- logger.info(
+ logger.debug(
"Start deleting outdated wal files for wal node-{}, the first valid
version id is {}, and the safely deleted search index is {}.",
identifier,
firstValidVersionId,
@@ -464,6 +458,7 @@ public class WALNode implements IWALNode {
// endregion
// region Search interfaces for consensus group
+ @Override
public void setSafelyDeletedSearchIndex(long safelyDeletedSearchIndex) {
this.safelyDeletedSearchIndex = safelyDeletedSearchIndex;
}
@@ -692,7 +687,8 @@ public class WALNode implements IWALNode {
if (needUpdatingFilesToSearch || filesToSearch == null) {
updateFilesToSearch();
if (needUpdatingFilesToSearch) {
- logger.warn("update file to search failed");
+ logger.debug(
+ "update file to search failed, the next search index is {}",
nextSearchIndex);
return false;
}
}