This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 4866f6d2cfa Add logs for PartitionTableAutoCleaner (#14934)
4866f6d2cfa is described below
commit 4866f6d2cfa7a106c0b3eb0db085f4bdda54b558
Author: Yongzao <[email protected]>
AuthorDate: Tue Feb 25 00:26:44 2025 +0800
Add logs for PartitionTableAutoCleaner (#14934)
* Finished
* optimize autoCleanPartitionTable
* Update SeriesPartitionTable.java
---------
Co-authored-by: Potato <[email protected]>
---
.../partition/DatabasePartitionTable.java | 14 +++++++++++++-
.../procedure/PartitionTableAutoCleaner.java | 3 +++
.../iotdb/commons/partition/DataPartitionTable.java | 10 ++++++++--
.../commons/partition/SeriesPartitionTable.java | 20 ++++++++++++++++----
4 files changed, 40 insertions(+), 7 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
index fda7e291315..6544f1a7c7c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/partition/DatabasePartitionTable.java
@@ -617,7 +617,19 @@ public class DatabasePartitionTable {
* @param currentTimeSlot The current TimeSlot
*/
public void autoCleanPartitionTable(long TTL, TTimePartitionSlot
currentTimeSlot) {
- dataPartitionTable.autoCleanPartitionTable(TTL, currentTimeSlot);
+ long[] removedTimePartitionSlots =
+ dataPartitionTable.autoCleanPartitionTable(TTL,
currentTimeSlot).stream()
+ .map(TTimePartitionSlot::getStartTime)
+ .collect(Collectors.toList())
+ .stream()
+ .mapToLong(Long::longValue)
+ .toArray();
+ if (removedTimePartitionSlots.length > 0) {
+ LOGGER.info(
+ "[PartitionTableCleaner] The TimePartitions: {} are removed from
Database: {}",
+ removedTimePartitionSlots,
+ databaseName);
+ }
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java
index 4363610f7ca..8466c06b1a7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/PartitionTableAutoCleaner.java
@@ -46,6 +46,9 @@ public class PartitionTableAutoCleaner<Env> extends
InternalProcedure<Env> {
public PartitionTableAutoCleaner(ConfigManager configManager) {
super(COMMON_CONFIG.getTTLCheckInterval());
this.configManager = configManager;
+ LOGGER.info(
+ "[PartitionTableCleaner] The PartitionTableAutoCleaner is started with
cycle={}ms",
+ COMMON_CONFIG.getTTLCheckInterval());
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
index 64e1233daec..91346f0c69c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/DataPartitionTable.java
@@ -35,9 +35,11 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -270,10 +272,14 @@ public class DataPartitionTable {
* @param TTL The Time To Live
* @param currentTimeSlot The current TimeSlot
*/
- public void autoCleanPartitionTable(long TTL, TTimePartitionSlot
currentTimeSlot) {
+ public Set<TTimePartitionSlot> autoCleanPartitionTable(
+ long TTL, TTimePartitionSlot currentTimeSlot) {
+ Set<TTimePartitionSlot> removedTimePartitionSlots = new HashSet<>();
dataPartitionMap.forEach(
(seriesPartitionSlot, seriesPartitionTable) ->
- seriesPartitionTable.autoCleanPartitionTable(TTL,
currentTimeSlot));
+ removedTimePartitionSlots.addAll(
+ seriesPartitionTable.autoCleanPartitionTable(TTL,
currentTimeSlot)));
+ return removedTimePartitionSlots;
}
public void serialize(OutputStream outputStream, TProtocol protocol)
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
index 579d2fa99ef..f99d06c9c0d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/SeriesPartitionTable.java
@@ -35,6 +35,8 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -245,10 +247,20 @@ public class SeriesPartitionTable {
* @param TTL The Time To Live
* @param currentTimeSlot The current TimeSlot
*/
- public void autoCleanPartitionTable(long TTL, TTimePartitionSlot
currentTimeSlot) {
- seriesPartitionMap
- .entrySet()
- .removeIf(entry -> entry.getKey().getStartTime() + TTL <
currentTimeSlot.getStartTime());
+ public List<TTimePartitionSlot> autoCleanPartitionTable(
+ long TTL, TTimePartitionSlot currentTimeSlot) {
+ List<TTimePartitionSlot> removedTimePartitions = new ArrayList<>();
+ Iterator<Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>>> iterator =
+ seriesPartitionMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<TTimePartitionSlot, List<TConsensusGroupId>> entry =
iterator.next();
+ TTimePartitionSlot timePartitionSlot = entry.getKey();
+ if (timePartitionSlot.getStartTime() + TTL <
currentTimeSlot.getStartTime()) {
+ removedTimePartitions.add(timePartitionSlot);
+ iterator.remove();
+ }
+ }
+ return removedTimePartitions;
}
public void serialize(OutputStream outputStream, TProtocol protocol)