This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new f0ec9c4a071 [To dev/1.3] Cherry-pick some IoTConsensus related commits
to 1.3 (#16812)
f0ec9c4a071 is described below
commit f0ec9c4a071f309cbc2a2f00fc4d118011b28d9e
Author: Jiang Tian <[email protected]>
AuthorDate: Thu Nov 27 17:21:57 2025 +0800
[To dev/1.3] Cherry-pick some IoTConsensus related commits to 1.3 (#16812)
* more accurate mermory size (#15713)
* Fix stuck when stopping a DataNode with large unremovable WAL files
(#15727)
* Fix stuck when stopping a DataNode with large unremovable WAL files
* spotless
* add shutdown hook watcher
* Fix logDispatcher stuck
* add re-interrupt
* Add a multiplier to avoid receiver OOM in IoTConsensus (#16102)
* Fix negative iot queue size & missing search index for deletion & missed
request when performing empty table deleting (#16022)
* Fix double memory free of iotconsensus queue request during region
deletion
* Fix missing searchIndex and lost deletion when no TsFile is involved.
* Fix ref count of IoTConsensus request not decreased in allocation failure
(#16169)
* fix IoTConsensus memory management
* Fix ref count of IoTConsensus request not decreased in allocation failure
* fix log level
* remove irrelevant codes from 2.0
* Remove a table test
* Interrupt wal-delete thread when WALManager is closed (#15442)
---------
Co-authored-by: Xiangpeng Hu <[email protected]>
---
.../it/env/cluster/config/MppDataNodeConfig.java | 12 ++
.../it/env/cluster/node/AbstractNodeWrapper.java | 4 +
.../it/env/remote/config/RemoteDataNodeConfig.java | 10 ++
.../apache/iotdb/itbase/env/DataNodeConfig.java | 4 +
...IoTDBRegionOperationReliabilityITFramework.java | 23 +++-
.../iotdb/db/it/IoTDBCustomizedClusterIT.java | 148 +++++++++++++++++++++
.../DeserializedBatchIndexedConsensusRequest.java | 7 +
.../common/request/IndexedConsensusRequest.java | 10 ++
.../consensus/iot/client/DispatchLogHandler.java | 5 +
.../iotdb/consensus/iot/logdispatcher/Batch.java | 25 ++--
.../logdispatcher/IoTConsensusMemoryManager.java | 126 ++++++++++++++----
.../consensus/iot/logdispatcher/LogDispatcher.java | 38 ++++--
.../consensus/iot/logdispatcher/SyncStatus.java | 23 +++-
.../service/IoTConsensusRPCServiceProcessor.java | 3 +-
.../plan/node/pipe/PipeEnrichedInsertNode.java | 4 +-
.../planner/plan/node/write/DeleteDataNode.java | 9 +-
.../plan/node/write/InsertMultiTabletsNode.java | 3 +-
.../planner/plan/node/write/InsertRowsNode.java | 3 +-
.../plan/node/write/InsertRowsOfOneDeviceNode.java | 3 +-
.../plan/planner/plan/node/write/SearchNode.java | 3 +-
.../iotdb/db/service/DataNodeShutdownHook.java | 31 ++++-
.../storageengine/dataregion/wal/WALManager.java | 14 +-
.../src/main/thrift/iotconsensus.thrift | 2 +
23 files changed, 437 insertions(+), 73 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index 01636b7bf0b..9ac380503cd 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -100,4 +100,16 @@ public class MppDataNodeConfig extends MppBaseConfig
implements DataNodeConfig {
setProperty("cache_last_values_for_load",
String.valueOf(cacheLastValuesForLoad));
return this;
}
+
+ @Override
+ public DataNodeConfig setWalThrottleSize(long walThrottleSize) {
+ setProperty("wal_throttle_threshold_in_byte",
String.valueOf(walThrottleSize));
+ return this;
+ }
+
+ @Override
+ public DataNodeConfig setDeleteWalFilesPeriodInMs(long
deleteWalFilesPeriodInMs) {
+ setProperty("delete_wal_files_period_in_ms",
String.valueOf(deleteWalFilesPeriodInMs));
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
index f1e25f9e1d6..005db9050ff 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -758,4 +758,8 @@ public abstract class AbstractNodeWrapper implements
BaseNodeWrapper {
return -1;
}
}
+
+ public Process getInstance() {
+ return instance;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index 80d9d98d23d..b2550ae689f 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -63,4 +63,14 @@ public class RemoteDataNodeConfig implements DataNodeConfig {
public DataNodeConfig setCacheLastValuesForLoad(boolean
cacheLastValuesForLoad) {
return this;
}
+
+ @Override
+ public DataNodeConfig setWalThrottleSize(long walThrottleSize) {
+ return this;
+ }
+
+ @Override
+ public DataNodeConfig setDeleteWalFilesPeriodInMs(long
deleteWalFilesPeriodInMs) {
+ return this;
+ }
}
diff --git
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index c6112a0e639..c27eb369c0d 100644
---
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -39,4 +39,8 @@ public interface DataNodeConfig {
DataNodeConfig setLoadLastCacheStrategy(String strategyName);
DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad);
+
+ DataNodeConfig setWalThrottleSize(long walThrottleSize);
+
+ DataNodeConfig setDeleteWalFilesPeriodInMs(long deleteWalFilesPeriodInMs);
}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
index 0dccc669eeb..1d14574a8ee 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/regionmigration/IoTDBRegionOperationReliabilityITFramework.java
@@ -44,6 +44,7 @@ import org.apache.iotdb.session.Session;
import org.apache.thrift.TException;
import org.apache.tsfile.read.common.Field;
+import org.apache.tsfile.utils.Pair;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.After;
@@ -307,7 +308,7 @@ public class IoTDBRegionOperationReliabilityITFramework {
}
}
- protected Set<Integer> getAllDataNodes(Statement statement) throws Exception
{
+ public static Set<Integer> getAllDataNodes(Statement statement) throws
Exception {
ResultSet result = statement.executeQuery(SHOW_DATANODES);
Set<Integer> allDataNodeId = new HashSet<>();
while (result.next()) {
@@ -444,6 +445,26 @@ public class IoTDBRegionOperationReliabilityITFramework {
return regionMap;
}
+ public static Map<Integer, Pair<Integer, Set<Integer>>>
getDataRegionMapWithLeader(
+ Statement statement) throws Exception {
+ ResultSet showRegionsResult = statement.executeQuery(SHOW_REGIONS);
+ Map<Integer, Pair<Integer, Set<Integer>>> regionMap = new HashMap<>();
+ while (showRegionsResult.next()) {
+ if (String.valueOf(TConsensusGroupType.DataRegion)
+ .equals(showRegionsResult.getString(ColumnHeaderConstant.TYPE))) {
+ int regionId =
showRegionsResult.getInt(ColumnHeaderConstant.REGION_ID);
+ int dataNodeId =
showRegionsResult.getInt(ColumnHeaderConstant.DATA_NODE_ID);
+ Pair<Integer, Set<Integer>> leaderNodesPair =
+ regionMap.computeIfAbsent(regionId, id -> new Pair<>(-1, new
HashSet<>()));
+ leaderNodesPair.getRight().add(dataNodeId);
+ if
(showRegionsResult.getString(ColumnHeaderConstant.ROLE).equals("Leader")) {
+ leaderNodesPair.setLeft(dataNodeId);
+ }
+ }
+ }
+ return regionMap;
+ }
+
public static Map<Integer, Set<Integer>> getAllRegionMap(Statement
statement) throws Exception {
ResultSet showRegionsResult = statement.executeQuery(SHOW_REGIONS);
Map<Integer, Set<Integer>> regionMap = new HashMap<>();
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java
new file mode 100644
index 00000000000..5812e255d16
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it;
+
+import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
+import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+import org.apache.iotdb.session.Session;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Date;
+
+import static org.junit.Assert.fail;
+
+/** Tests that may not be satisfied with the default cluster settings. */
+@RunWith(IoTDBTestRunner.class)
+@Category({ClusterIT.class})
+public class IoTDBCustomizedClusterIT {
+
+ private final Logger logger =
LoggerFactory.getLogger(IoTDBCustomizedClusterIT.class);
+
+ /**
+ * When the wal size exceeds `walThrottleSize` * 0.8, the timed
wal-delete-thread will try
+ * deleting wal forever, which will block the DataNode from exiting, because
task of deleting wal
+ * submitted by the ShutdownHook cannot be executed. This test ensures that
this blocking is
+ * fixed.
+ */
+ @Test
+ public void testWalThrottleStuck()
+ throws SQLException,
+ IoTDBConnectionException,
+ StatementExecutionException,
+ InterruptedException {
+ SimpleEnv simpleEnv = new SimpleEnv();
+ simpleEnv
+ .getConfig()
+ .getDataNodeConfig()
+ .setWalThrottleSize(1)
+ .setDeleteWalFilesPeriodInMs(100);
+ simpleEnv
+ .getConfig()
+ .getCommonConfig()
+ .setDataReplicationFactor(3)
+ .setSchemaReplicationFactor(3)
+
.setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus")
+
.setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.iot.IoTConsensus");
+ try {
+ simpleEnv.initClusterEnvironment(1, 3);
+
+ int leaderIndex = -1;
+ try (Connection connection = simpleEnv.getConnection();
+ Statement statement = connection.createStatement()) {
+ // write the first data
+ statement.execute("INSERT INTO root.db1.d1 (time, s1) values (1,1)");
+ // find the leader of the data region
+ int port = -1;
+
+ ResultSet resultSet = statement.executeQuery("SHOW REGIONS");
+ while (resultSet.next()) {
+ String regionType = resultSet.getString("Type");
+ if (regionType.equals("DataRegion")) {
+ String role = resultSet.getString("Role");
+ if (role.equals("Leader")) {
+ port = resultSet.getInt("RpcPort");
+ break;
+ }
+ }
+ }
+
+ if (port == -1) {
+ fail("Leader not found");
+ }
+
+ for (int i = 0; i < simpleEnv.getDataNodeWrapperList().size(); i++) {
+ if (simpleEnv.getDataNodeWrapperList().get(i).getPort() == port) {
+ leaderIndex = i;
+ break;
+ }
+ }
+ }
+
+ // stop a follower
+ int followerIndex = (leaderIndex + 1) %
simpleEnv.getDataNodeWrapperList().size();
+ simpleEnv.getDataNodeWrapperList().get(followerIndex).stop();
+ System.out.println(
+ new Date()
+ + ":Stopping data node "
+ +
simpleEnv.getDataNodeWrapperList().get(followerIndex).getIpAndPortString());
+
+ DataNodeWrapper leader =
simpleEnv.getDataNodeWrapperList().get(leaderIndex);
+ // write to the leader to generate wal that cannot be synced
+ try (Session session = new Session(leader.getIp(), leader.getPort())) {
+ session.open();
+
+ session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1)
values (1,1)");
+ session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1)
values (1,1)");
+ session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1)
values (1,1)");
+ session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1)
values (1,1)");
+ session.executeNonQueryStatement("INSERT INTO root.db1.d1 (time, s1)
values (1,1)");
+ }
+
+ // wait for wal-delete thread to be scheduled
+ Thread.sleep(1000);
+
+ // stop the leader
+ leader.getInstance().destroy();
+ System.out.println(new Date() + ":Stopping data node " +
leader.getIpAndPortString());
+ // confirm the death of the leader
+ long startTime = System.currentTimeMillis();
+ while (leader.isAlive()) {
+ if (System.currentTimeMillis() - startTime > 30000) {
+ fail("Leader does not exit after 30s");
+ }
+ }
+ } finally {
+ simpleEnv.cleanClusterEnvironment();
+ }
+ }
+}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java
index 3cb81fa95c7..9cdeaf60c30 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/DeserializedBatchIndexedConsensusRequest.java
@@ -29,6 +29,7 @@ public class DeserializedBatchIndexedConsensusRequest
private final long startSyncIndex;
private final long endSyncIndex;
private final List<IConsensusRequest> insertNodes;
+ private long memorySize;
public DeserializedBatchIndexedConsensusRequest(
long startSyncIndex, long endSyncIndex, int size) {
@@ -52,6 +53,7 @@ public class DeserializedBatchIndexedConsensusRequest
public void add(IConsensusRequest insertNode) {
this.insertNodes.add(insertNode);
+ this.memorySize += insertNode.getMemorySize();
}
@Override
@@ -82,4 +84,9 @@ public class DeserializedBatchIndexedConsensusRequest
public ByteBuffer serializeToByteBuffer() {
return null;
}
+
+ @Override
+ public long getMemorySize() {
+ return memorySize;
+ }
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
index 1147abc049e..2bf01d4ef86 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/common/request/IndexedConsensusRequest.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.atomic.AtomicLong;
/** only used for iot consensus. */
public class IndexedConsensusRequest implements IConsensusRequest {
@@ -34,6 +35,7 @@ public class IndexedConsensusRequest implements
IConsensusRequest {
private final List<IConsensusRequest> requests;
private final List<ByteBuffer> serializedRequests;
private long memorySize = 0;
+ private AtomicLong referenceCnt = new AtomicLong();
public IndexedConsensusRequest(long searchIndex, List<IConsensusRequest>
requests) {
this.searchIndex = searchIndex;
@@ -100,4 +102,12 @@ public class IndexedConsensusRequest implements
IConsensusRequest {
public int hashCode() {
return Objects.hash(searchIndex, requests);
}
+
+ public long incRef() {
+ return referenceCnt.getAndIncrement();
+ }
+
+ public long decRef() {
+ return referenceCnt.getAndDecrement();
+ }
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
index 9b0979bf6be..3b812f6c297 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/client/DispatchLogHandler.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.consensus.iot.client;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.iot.logdispatcher.Batch;
+import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher;
import
org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcher.LogDispatcherThread;
import org.apache.iotdb.consensus.iot.logdispatcher.LogDispatcherThreadMetrics;
import org.apache.iotdb.consensus.iot.thrift.TSyncLogEntriesRes;
@@ -89,6 +90,10 @@ public class DispatchLogHandler implements
AsyncMethodCallback<TSyncLogEntriesRe
}
completeBatch(batch);
}
+ if (response.isSetReceiverMemSize()) {
+
LogDispatcher.getReceiverMemSizeSum().addAndGet(response.getReceiverMemSize());
+ LogDispatcher.getSenderMemSizeSum().addAndGet(batch.getMemorySize());
+ }
logDispatcherThreadMetrics.recordSyncLogTimePerRequest(System.nanoTime() -
createTime);
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
index 7a556c85a04..72b68ab96ac 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/Batch.java
@@ -22,7 +22,6 @@ package org.apache.iotdb.consensus.iot.logdispatcher;
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
import org.apache.iotdb.consensus.iot.thrift.TLogEntry;
-import java.nio.Buffer;
import java.util.ArrayList;
import java.util.List;
@@ -37,7 +36,7 @@ public class Batch {
private long logEntriesNumFromWAL = 0L;
- private long serializedSize;
+ private long memorySize;
// indicates whether this batch has been successfully synchronized to
another node
private boolean synced;
@@ -60,14 +59,20 @@ public class Batch {
if (entry.fromWAL) {
logEntriesNumFromWAL++;
}
- // TODO Maybe we need to add in additional fields for more accurate
calculations
- serializedSize +=
- entry.getData() == null ? 0 :
entry.getData().stream().mapToInt(Buffer::capacity).sum();
+ memorySize += entry.getMemorySize();
}
public boolean canAccumulate() {
+ // When reading entries from the WAL, the memory size is calculated based
on the serialized
+ // size, which can be significantly smaller than the actual size.
+ // Thus, we add a multiplier to sender's memory size to estimate the
receiver's memory cost.
+ // The multiplier is calculated based on the receiver's feedback.
+ long receiverMemSize = LogDispatcher.getReceiverMemSizeSum().get();
+ long senderMemSize = LogDispatcher.getSenderMemSizeSum().get();
+ double multiplier = senderMemSize > 0 ? (double) receiverMemSize /
senderMemSize : 1.0;
+ multiplier = Math.max(multiplier, 1.0);
return logEntries.size() <
config.getReplication().getMaxLogEntriesNumPerBatch()
- && serializedSize < config.getReplication().getMaxSizePerBatch();
+ && ((long) (memorySize * multiplier)) <
config.getReplication().getMaxSizePerBatch();
}
public long getStartIndex() {
@@ -94,8 +99,8 @@ public class Batch {
return logEntries.isEmpty();
}
- public long getSerializedSize() {
- return serializedSize;
+ public long getMemorySize() {
+ return memorySize;
}
public long getLogEntriesNumFromWAL() {
@@ -111,8 +116,8 @@ public class Batch {
+ endIndex
+ ", size="
+ logEntries.size()
- + ", serializedSize="
- + serializedSize
+ + ", memorySize="
+ + memorySize
+ '}';
}
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
index df54918c60e..bb46f20486f 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java
@@ -20,11 +20,12 @@
package org.apache.iotdb.consensus.iot.logdispatcher;
import org.apache.iotdb.commons.service.metric.MetricService;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
public class IoTConsensusMemoryManager {
@@ -39,41 +40,101 @@ public class IoTConsensusMemoryManager {
MetricService.getInstance().addMetricSet(new
IoTConsensusMemoryManagerMetrics(this));
}
- public boolean reserve(long size, boolean fromQueue) {
- AtomicBoolean result = new AtomicBoolean(false);
- memorySizeInByte.updateAndGet(
- memorySize -> {
- long remainSize =
- (fromQueue ? maxMemorySizeForQueueInByte : maxMemorySizeInByte)
- memorySize;
- if (size > remainSize) {
- logger.debug(
- "consensus memory limited. required: {}, used: {}, total: {}",
- size,
- memorySize,
- maxMemorySizeInByte);
- result.set(false);
- return memorySize;
- } else {
- logger.debug(
- "{} add {} bytes, total memory size: {} bytes.",
- Thread.currentThread().getName(),
- size,
- memorySize + size);
- result.set(true);
- return memorySize + size;
- }
- });
- if (result.get()) {
+ public boolean reserve(IndexedConsensusRequest request) {
+ long prevRef = request.incRef();
+ if (prevRef == 0) {
+ boolean reserved = reserve(request.getMemorySize(), true);
+ if (reserved) {
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Reserving {} bytes for request {} succeeds, current total usage
{}",
+ request.getMemorySize(),
+ request.getSearchIndex(),
+ memorySizeInByte.get());
+ }
+ } else {
+ request.decRef();
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Reserving {} bytes for request {} fails, current total usage
{}",
+ request.getMemorySize(),
+ request.getSearchIndex(),
+ memorySizeInByte.get());
+ }
+ }
+ return reserved;
+ } else if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Skip memory reservation for {} because its ref count is not 0",
+ request.getSearchIndex());
+ }
+ return true;
+ }
+
+ public boolean reserve(Batch batch) {
+ boolean reserved = reserve(batch.getMemorySize(), false);
+ if (reserved && logger.isDebugEnabled()) {
+ logger.debug(
+ "Reserving {} bytes for batch {}-{} succeeds, current total usage
{}",
+ batch.getMemorySize(),
+ batch.getStartIndex(),
+ batch.getEndIndex(),
+ memorySizeInByte.get());
+ } else if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Reserving {} bytes for batch {}-{} fails, current total usage {}",
+ batch.getMemorySize(),
+ batch.getStartIndex(),
+ batch.getEndIndex(),
+ memorySizeInByte.get());
+ }
+ return reserved;
+ }
+
+ private boolean reserve(long size, boolean fromQueue) {
+ boolean result = memorySizeInByte.addAndGet(size) < maxMemorySizeInByte;
+ if (result) {
if (fromQueue) {
- queueMemorySizeInByte.addAndGet(size);
+ result = queueMemorySizeInByte.addAndGet(size) <
maxMemorySizeForQueueInByte;
+ if (!result) {
+ queueMemorySizeInByte.addAndGet(-size);
+ }
} else {
syncMemorySizeInByte.addAndGet(size);
}
+ } else {
+ memorySizeInByte.addAndGet(-size);
+ }
+ return result;
+ }
+
+ public void free(IndexedConsensusRequest request) {
+ long prevRef = request.decRef();
+ if (prevRef == 1) {
+ free(request.getMemorySize(), true);
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Freed {} bytes for request {}, current total usage {}",
+ request.getMemorySize(),
+ request.getSearchIndex(),
+ memorySizeInByte.get());
+ }
}
- return result.get();
}
- public void free(long size, boolean fromQueue) {
+ public void free(Batch batch) {
+ free(batch.getMemorySize(), false);
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Freed {} bytes for batch {}-{}, current total usage {}",
+ batch.getMemorySize(),
+ batch.getStartIndex(),
+ batch.getEndIndex(),
+ getMemorySizeInByte());
+ }
+ }
+
+ private void free(long size, boolean fromQueue) {
long currentUsedMemory = memorySizeInByte.addAndGet(-size);
if (fromQueue) {
queueMemorySizeInByte.addAndGet(-size);
@@ -92,6 +153,13 @@ public class IoTConsensusMemoryManager {
this.maxMemorySizeForQueueInByte = maxMemorySizeForQueue;
}
+ @TestOnly
+ public void reset() {
+ this.memorySizeInByte.set(0);
+ this.queueMemorySizeInByte.set(0);
+ this.syncMemorySizeInByte.set(0);
+ }
+
long getMemorySizeInByte() {
return memorySizeInByte.get();
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
index 2d4e7cd5e01..374691bf38b 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/LogDispatcher.java
@@ -69,6 +69,9 @@ public class LogDispatcher {
private final AtomicLong logEntriesFromWAL = new AtomicLong(0);
private final AtomicLong logEntriesFromQueue = new AtomicLong(0);
+ private static final AtomicLong senderMemSizeSum = new AtomicLong(0);
+ private static final AtomicLong receiverMemSizeSum = new AtomicLong(0);
+
public LogDispatcher(
IoTConsensusServerImpl impl,
IClientManager<TEndPoint, AsyncIoTConsensusServiceClient> clientManager)
{
@@ -105,8 +108,8 @@ public class LogDispatcher {
public synchronized void stop() {
if (!threads.isEmpty()) {
threads.forEach(LogDispatcherThread::setStopped);
- threads.forEach(LogDispatcherThread::processStopped);
executorService.shutdownNow();
+ threads.forEach(LogDispatcherThread::processStopped);
int timeout = 10;
try {
if (!executorService.awaitTermination(timeout, TimeUnit.SECONDS)) {
@@ -280,7 +283,7 @@ public class LogDispatcher {
/** try to offer a request into queue with memory control. */
public boolean offer(IndexedConsensusRequest indexedConsensusRequest) {
- if
(!iotConsensusMemoryManager.reserve(indexedConsensusRequest.getMemorySize(),
true)) {
+ if (!iotConsensusMemoryManager.reserve(indexedConsensusRequest)) {
return false;
}
boolean success;
@@ -288,19 +291,19 @@ public class LogDispatcher {
success = pendingEntries.offer(indexedConsensusRequest);
} catch (Throwable t) {
// If exception occurs during request offer, the reserved memory
should be released
-
iotConsensusMemoryManager.free(indexedConsensusRequest.getMemorySize(), true);
+ iotConsensusMemoryManager.free(indexedConsensusRequest);
throw t;
}
if (!success) {
// If offer failed, the reserved memory should be released
-
iotConsensusMemoryManager.free(indexedConsensusRequest.getMemorySize(), true);
+ iotConsensusMemoryManager.free(indexedConsensusRequest);
}
return success;
}
/** try to remove a request from queue with memory control. */
private void releaseReservedMemory(IndexedConsensusRequest
indexedConsensusRequest) {
- iotConsensusMemoryManager.free(indexedConsensusRequest.getMemorySize(),
true);
+ iotConsensusMemoryManager.free(indexedConsensusRequest);
}
public void stop() {
@@ -320,17 +323,13 @@ public class LogDispatcher {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
- long requestSize = 0;
for (IndexedConsensusRequest indexedConsensusRequest : pendingEntries) {
- requestSize += indexedConsensusRequest.getMemorySize();
+ iotConsensusMemoryManager.free(indexedConsensusRequest);
}
pendingEntries.clear();
- iotConsensusMemoryManager.free(requestSize, true);
- requestSize = 0;
for (IndexedConsensusRequest indexedConsensusRequest : bufferedEntries) {
- requestSize += indexedConsensusRequest.getMemorySize();
+ iotConsensusMemoryManager.free(indexedConsensusRequest);
}
- iotConsensusMemoryManager.free(requestSize, true);
syncStatus.free();
MetricService.getInstance().removeMetricSet(logDispatcherThreadMetrics);
}
@@ -567,7 +566,8 @@ public class LogDispatcher {
data.buildSerializedRequests();
// construct request from wal
logBatches.addTLogEntry(
- new TLogEntry(data.getSerializedRequests(), data.getSearchIndex(),
true));
+ new TLogEntry(
+ data.getSerializedRequests(), data.getSearchIndex(), true,
data.getMemorySize()));
}
// In the case of corrupt Data, we return true so that we can send a
batch as soon as
// possible, avoiding potential duplication
@@ -577,7 +577,19 @@ public class LogDispatcher {
private void constructBatchIndexedFromConsensusRequest(
IndexedConsensusRequest request, Batch logBatches) {
logBatches.addTLogEntry(
- new TLogEntry(request.getSerializedRequests(),
request.getSearchIndex(), false));
+ new TLogEntry(
+ request.getSerializedRequests(),
+ request.getSearchIndex(),
+ false,
+ request.getMemorySize()));
}
}
+
+ public static AtomicLong getReceiverMemSizeSum() {
+ return receiverMemSizeSum;
+ }
+
+ public static AtomicLong getSenderMemSizeSum() {
+ return senderMemSizeSum;
+ }
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
index e11b6302114..accc9f7667d 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/SyncStatus.java
@@ -21,12 +21,16 @@ package org.apache.iotdb.consensus.iot.logdispatcher;
import org.apache.iotdb.consensus.config.IoTConsensusConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
public class SyncStatus {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SyncStatus.class);
private final IoTConsensusConfig config;
private final IndexController controller;
private final LinkedList<Batch> pendingBatches = new LinkedList<>();
@@ -44,10 +48,19 @@ public class SyncStatus {
* @throws InterruptedException
*/
public synchronized void addNextBatch(Batch batch) throws
InterruptedException {
- while (pendingBatches.size() >=
config.getReplication().getMaxPendingBatchesNum()
- || !iotConsensusMemoryManager.reserve(batch.getSerializedSize(),
false)) {
+ while ((pendingBatches.size() >=
config.getReplication().getMaxPendingBatchesNum()
+ || !iotConsensusMemoryManager.reserve(batch))
+ && !Thread.interrupted()) {
wait();
}
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "Reserved {} bytes for batch {}-{}, current total usage {}",
+ batch.getMemorySize(),
+ batch.getStartIndex(),
+ batch.getEndIndex(),
+ iotConsensusMemoryManager.getMemorySizeInByte());
+ }
pendingBatches.add(batch);
}
@@ -64,7 +77,7 @@ public class SyncStatus {
while (current.isSynced()) {
controller.update(current.getEndIndex(), false);
iterator.remove();
- iotConsensusMemoryManager.free(current.getSerializedSize(), false);
+ iotConsensusMemoryManager.free(current);
if (iterator.hasNext()) {
current = iterator.next();
} else {
@@ -77,13 +90,11 @@ public class SyncStatus {
}
public synchronized void free() {
- long size = 0;
for (Batch pendingBatch : pendingBatches) {
- size += pendingBatch.getSerializedSize();
+ iotConsensusMemoryManager.free(pendingBatch);
}
pendingBatches.clear();
controller.update(0L, true);
- iotConsensusMemoryManager.free(size, false);
}
/** Gets the first index that is not currently synchronized. */
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
index 2bac66738fd..71c14aebaa1 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceProcessor.java
@@ -129,7 +129,8 @@ public class IoTConsensusRPCServiceProcessor implements
IoTConsensusIService.Ifa
"execute TSyncLogEntriesReq for {} with result {}",
req.consensusGroupId,
writeStatus.subStatus);
- return new TSyncLogEntriesRes(writeStatus.subStatus);
+ return new TSyncLogEntriesRes(writeStatus.subStatus)
+ .setReceiverMemSize(deserializedRequest.getMemorySize());
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
index e152ed09bda..3f5bdf205b2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/pipe/PipeEnrichedInsertNode.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.SearchNode;
import org.apache.iotdb.db.trigger.executor.TriggerFireVisitor;
import org.apache.tsfile.enums.TSDataType;
@@ -225,8 +226,9 @@ public class PipeEnrichedInsertNode extends InsertNode {
}
@Override
- public void setSearchIndex(long searchIndex) {
+ public SearchNode setSearchIndex(final long searchIndex) {
insertNode.setSearchIndex(searchIndex);
+ return this;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
index 0e7eae3f9d7..c4c4159f66d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/DeleteDataNode.java
@@ -354,9 +354,10 @@ public class DeleteDataNode extends SearchNode implements
WALEntryValue {
.distinct()
.collect(Collectors.toList());
return new DeleteDataNode(
- firstOne.getPlanNodeId(),
- pathList,
- firstOne.getDeleteStartTime(),
- firstOne.getDeleteEndTime());
+ firstOne.getPlanNodeId(),
+ pathList,
+ firstOne.getDeleteStartTime(),
+ firstOne.getDeleteEndTime())
+ .setSearchIndex(firstOne.searchIndex);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
index 6e401b4d54b..773881e398e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertMultiTabletsNode.java
@@ -134,9 +134,10 @@ public class InsertMultiTabletsNode extends InsertNode {
}
@Override
- public void setSearchIndex(long index) {
+ public SearchNode setSearchIndex(long index) {
searchIndex = index;
insertTabletNodeList.forEach(plan -> plan.setSearchIndex(index));
+ return this;
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
index caf86e67bab..7ec81d626df 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsNode.java
@@ -128,9 +128,10 @@ public class InsertRowsNode extends InsertNode implements
WALEntryValue {
}
@Override
- public void setSearchIndex(long index) {
+ public SearchNode setSearchIndex(long index) {
searchIndex = index;
insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
+ return this;
}
public Map<Integer, TSStatus> getResults() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
index 021e51b69af..c7b193841d4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java
@@ -98,9 +98,10 @@ public class InsertRowsOfOneDeviceNode extends InsertNode {
}
@Override
- public void setSearchIndex(long index) {
+ public SearchNode setSearchIndex(long index) {
searchIndex = index;
insertRowNodeList.forEach(plan -> plan.setSearchIndex(index));
+ return this;
}
public TSStatus[] getFailingStatus() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
index ecae2cd7197..e9c614e7a98 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/SearchNode.java
@@ -45,8 +45,9 @@ public abstract class SearchNode extends WritePlanNode {
}
/** Search index should start from 1 */
- public void setSearchIndex(long searchIndex) {
+ public SearchNode setSearchIndex(long searchIndex) {
this.searchIndex = searchIndex;
+ return this;
}
public abstract SearchNode merge(List<SearchNode> searchNodes);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
index 1a6af1cf9b0..4d0f4a5a412 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNodeShutdownHook.java
@@ -52,15 +52,43 @@ public class DataNodeShutdownHook extends Thread {
private static final Logger logger =
LoggerFactory.getLogger(DataNodeShutdownHook.class);
private final TDataNodeLocation nodeLocation;
+ private Thread watcherThread;
public DataNodeShutdownHook(TDataNodeLocation nodeLocation) {
super(ThreadName.DATANODE_SHUTDOWN_HOOK.getName());
this.nodeLocation = nodeLocation;
}
+ private void startWatcher() {
+ Thread hookThread = Thread.currentThread();
+ watcherThread =
+ new Thread(
+ () -> {
+ while (!Thread.interrupted()) {
+ try {
+ Thread.sleep(10000);
+ StackTraceElement[] stackTrace = hookThread.getStackTrace();
+ StringBuilder stackTraceBuilder =
+ new StringBuilder("Stack trace of shutdown hook:\n");
+ for (StackTraceElement traceElement : stackTrace) {
+
stackTraceBuilder.append(traceElement.toString()).append("\n");
+ }
+ logger.info(stackTraceBuilder.toString());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ }
+ },
+ "ShutdownHookWatcher");
+ watcherThread.setDaemon(true);
+ watcherThread.start();
+ }
+
@Override
public void run() {
logger.info("DataNode exiting...");
+ startWatcher();
// Stop external rpc service firstly.
ExternalRPCService.getInstance().stop();
@@ -77,7 +105,6 @@ public class DataNodeShutdownHook extends Thread {
.equals(ConsensusFactory.RATIS_CONSENSUS)) {
StorageEngine.getInstance().syncCloseAllProcessor();
}
- WALManager.getInstance().syncDeleteOutdatedFilesInWALNodes();
// We did this work because the RatisConsensus recovery mechanism is
different from other
// consensus algorithms, which will replace the underlying storage engine
based on its
@@ -141,6 +168,8 @@ public class DataNodeShutdownHook extends Thread {
"DataNode exits. Jvm memory usage: {}",
MemUtils.bytesCntToStr(
Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory()));
+
+ watcherThread.interrupt();
}
private void triggerSnapshotForAllDataRegion() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
index 96d4931fa43..e5fbdbca9af 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/WALManager.java
@@ -164,6 +164,7 @@ public class WALManager implements IService {
logger.info("Start rebooting wal delete thread.");
if (walDeleteThread != null) {
shutdownThread(walDeleteThread, ThreadName.WAL_DELETE);
+ walDeleteThread = null;
}
logger.info("Stop wal delete thread successfully, and now restart it.");
registerScheduleTask(0, config.getDeleteWalFilesPeriodInMs());
@@ -177,7 +178,7 @@ public class WALManager implements IService {
// threshold, the system continues to delete expired files until the disk
size is smaller than
// the threshold.
boolean firstLoop = true;
- while (firstLoop || shouldThrottle()) {
+ while ((firstLoop || shouldThrottle()) && !Thread.interrupted()) {
deleteOutdatedFilesInWALNodes();
if (firstLoop && shouldThrottle()) {
logger.warn(
@@ -186,6 +187,10 @@ public class WALManager implements IService {
getThrottleThreshold());
}
firstLoop = false;
+ if (Thread.interrupted()) {
+ logger.info("Timed wal delete thread is interrupted.");
+ return;
+ }
}
}
@@ -264,16 +269,19 @@ public class WALManager implements IService {
if (config.getWalMode() == WALMode.DISABLE) {
return;
}
-
+ logger.info("Stopping WALManager");
if (walDeleteThread != null) {
shutdownThread(walDeleteThread, ThreadName.WAL_DELETE);
walDeleteThread = null;
}
+ logger.info("Deleting outdated files before exiting");
+ deleteOutdatedFilesInWALNodes();
clear();
+ logger.info("WALManager stopped");
}
private void shutdownThread(ExecutorService thread, ThreadName threadName) {
- thread.shutdown();
+ thread.shutdownNow();
try {
if (!thread.awaitTermination(30, TimeUnit.SECONDS)) {
logger.warn("Waiting thread {} to be terminated is timeout",
threadName.getName());
diff --git
a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
index d0b4808977e..829443f9552 100644
--- a/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
+++ b/iotdb-protocol/thrift-consensus/src/main/thrift/iotconsensus.thrift
@@ -26,6 +26,7 @@ struct TLogEntry {
1: required list<binary> data
2: required i64 searchIndex
3: required bool fromWAL
+ 4: required i64 memorySize
}
struct TSyncLogEntriesReq {
@@ -37,6 +38,7 @@ struct TSyncLogEntriesReq {
struct TSyncLogEntriesRes {
1: required list<common.TSStatus> statuses
+ 2: optional i64 receiverMemSize
}
struct TInactivatePeerReq {