This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch reduceDuplicatedDeviceIdInCompactionSelection in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 1d10c10a806c7f8a9d9860e8461809abbbab0f89 Author: shuwenwei <[email protected]> AuthorDate: Mon Sep 1 10:03:22 2025 +0800 reduce duplicated DeviceID in compaction selection --- .../compaction/execute/utils/CompactionUtils.java | 5 ++++ .../schedule/CompactionScheduleContext.java | 27 +++++++++++++++++++ .../selector/utils/TsFileResourceCandidate.java | 7 ++++- .../dataregion/tsfile/TsFileResource.java | 13 ++++++--- .../tsfile/timeindex/ArrayDeviceTimeIndex.java | 5 ++-- .../dataregion/tsfile/timeindex/FileTimeIndex.java | 3 ++- .../dataregion/tsfile/timeindex/ITimeIndex.java | 10 ++++--- .../tsfile/timeindex/PlainDeviceTimeIndex.java | 3 ++- .../compaction/CompactionSchedulerTest.java | 31 ++++++++++++++++++++++ 9 files changed, 93 insertions(+), 11 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java index 57191a99a4e..b7d87233deb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java @@ -462,6 +462,11 @@ public class CompactionUtils { public static ArrayDeviceTimeIndex buildDeviceTimeIndex(TsFileResource resource) throws IOException { + return buildDeviceTimeIndex(resource, IDeviceID.Deserializer.DEFAULT_DESERIALIZER); + } + + public static ArrayDeviceTimeIndex buildDeviceTimeIndex( + TsFileResource resource, IDeviceID.Deserializer deserializer) throws IOException { long resourceFileSize = new File(resource.getTsFilePath() + TsFileResource.RESOURCE_SUFFIX).length(); CompactionTaskManager.getInstance().getCompactionReadOperationRateLimiter().acquire(1); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java index 1f01cac0b25..f9721b14796 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleContext.java @@ -30,6 +30,11 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.Sett import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ArrayDeviceTimeIndex; +import org.apache.tsfile.file.metadata.IDeviceID; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -49,6 +54,7 @@ public class CompactionScheduleContext { // end region private final Map<TsFileResource, ArrayDeviceTimeIndex> partitionFileDeviceInfoCache; + private final Map<IDeviceID, IDeviceID> deviceIdCache; private long cachedDeviceInfoSize = 0; private final Set<Long> timePartitionsDelayInsertionSelection; @@ -56,6 +62,7 @@ public class CompactionScheduleContext { public CompactionScheduleContext() { this.partitionFileDeviceInfoCache = new HashMap<>(); this.timePartitionsDelayInsertionSelection = new HashSet<>(); + this.deviceIdCache = new HashMap<>(); } public void delayInsertionSelection(long timePartitionId) { @@ -81,6 +88,7 @@ public class CompactionScheduleContext { public void clearTimePartitionDeviceInfoCache() { partitionFileDeviceInfoCache.clear(); + deviceIdCache.clear(); CompactionMetrics.getInstance() .decreaseSelectionCachedDeviceTimeIndexSize(cachedDeviceInfoSize); cachedDeviceInfoSize = 0; @@ -192,4 +200,23 @@ public class CompactionScheduleContext { public ICrossCompactionPerformer getCrossCompactionPerformer() { return IoTDBDescriptor.getInstance().getConfig().getCrossCompactionPerformer().createInstance(); } + + public IDeviceID.Deserializer getCachedDeviceIdDeserializer() { + return new CachedIDeviceIdDeserializer(); + } + + private class CachedIDeviceIdDeserializer implements IDeviceID.Deserializer { + + @Override + public IDeviceID deserializeFrom(ByteBuffer byteBuffer) { + IDeviceID deviceId = IDeviceID.Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(byteBuffer); + return deviceIdCache.computeIfAbsent(deviceId, k -> deviceId); + } + + @Override + public IDeviceID deserializeFrom(InputStream inputStream) throws IOException { + IDeviceID deviceId = IDeviceID.Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(inputStream); + return deviceIdCache.computeIfAbsent(deviceId, k -> deviceId); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java index a2b0b2d40bf..acde3c607f7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/utils/TsFileResourceCandidate.java @@ -100,7 +100,12 @@ public class TsFileResourceCandidate { deviceTimeIndex = new ArrayDeviceTimeIndex(); return; } - deviceTimeIndex = CompactionUtils.buildDeviceTimeIndex(resource); + deviceTimeIndex = + CompactionUtils.buildDeviceTimeIndex( + resource, + compactionScheduleContext == null + ? IDeviceID.Deserializer.DEFAULT_DESERIALIZER + : compactionScheduleContext.getCachedDeviceIdDeserializer()); } finally { resource.readUnlock(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java index eb03718accc..77c1fbb5376 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java @@ -318,7 +318,8 @@ public class TsFileResource implements PersistentResource { try (InputStream inputStream = fsFactory.getBufferedInputStream(file + RESOURCE_SUFFIX)) { // The first byte is VERSION_NUMBER, second byte is timeIndexType. ReadWriteIOUtils.readByte(inputStream); - timeIndex = ITimeIndex.createTimeIndex(inputStream); + timeIndex = + ITimeIndex.createTimeIndex(inputStream, IDeviceID.Deserializer.DEFAULT_DESERIALIZER); maxPlanIndex = ReadWriteIOUtils.readLong(inputStream); minPlanIndex = ReadWriteIOUtils.readLong(inputStream); @@ -674,7 +675,8 @@ public class TsFileResource implements PersistentResource { return timeIndex.getDevices(file.getPath(), this); } - public ArrayDeviceTimeIndex buildDeviceTimeIndex() throws IOException { + public ArrayDeviceTimeIndex buildDeviceTimeIndex(IDeviceID.Deserializer deserializer) + throws IOException { readLock(); try { if (!resourceFileExists()) { @@ -684,7 +686,8 @@ public class TsFileResource implements PersistentResource { FSFactoryProducer.getFSFactory() .getBufferedInputStream(file.getPath() + RESOURCE_SUFFIX)) { ReadWriteIOUtils.readByte(inputStream); - ITimeIndex timeIndexFromResourceFile = ITimeIndex.createTimeIndex(inputStream); + ITimeIndex timeIndexFromResourceFile = + ITimeIndex.createTimeIndex(inputStream, deserializer); if (!(timeIndexFromResourceFile instanceof ArrayDeviceTimeIndex)) { throw new IOException("cannot build DeviceTimeIndex from resource " + file.getPath()); } @@ -698,6 +701,10 @@ public class TsFileResource implements PersistentResource { } } + public ArrayDeviceTimeIndex buildDeviceTimeIndex() throws IOException { + return buildDeviceTimeIndex(IDeviceID.Deserializer.DEFAULT_DESERIALIZER); + } + /** * Used for compaction to verify tsfile, also used to verify TimeIndex version when loading tsfile */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java index f8503c79af5..8499b6d6b3d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ArrayDeviceTimeIndex.java @@ -113,7 +113,8 @@ public class ArrayDeviceTimeIndex implements ITimeIndex { } @Override - public ArrayDeviceTimeIndex deserialize(InputStream inputStream) throws IOException { + public ArrayDeviceTimeIndex deserialize( + InputStream inputStream, IDeviceID.Deserializer deserializer) throws IOException { int deviceNum = ReadWriteIOUtils.readInt(inputStream); startTimes = new long[deviceNum]; @@ -127,7 +128,7 @@ public class ArrayDeviceTimeIndex implements ITimeIndex { } for (int i = 0; i < deviceNum; i++) { - IDeviceID deviceID = Deserializer.DEFAULT_DESERIALIZER.deserializeFrom(inputStream); + IDeviceID deviceID = deserializer.deserializeFrom(inputStream); int index = ReadWriteIOUtils.readInt(inputStream); deviceToIndex.put(deviceID, index); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java index 0e967fc622d..e4a812012a8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/FileTimeIndex.java @@ -72,7 +72,8 @@ public class FileTimeIndex implements ITimeIndex { } @Override - public FileTimeIndex deserialize(InputStream inputStream) throws IOException { + public FileTimeIndex deserialize(InputStream inputStream, IDeviceID.Deserializer deserializer) + throws IOException { throw new UnsupportedOperationException(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java index 214ae53750c..d705a2417d7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/ITimeIndex.java @@ -53,7 +53,8 @@ public interface ITimeIndex { * @param inputStream inputStream * @return TimeIndex */ - ITimeIndex deserialize(InputStream inputStream) throws IOException; + ITimeIndex deserialize(InputStream inputStream, IDeviceID.Deserializer deserializer) + throws IOException; /** * deserialize from byte buffer @@ -218,11 +219,14 @@ public interface ITimeIndex { */ byte getTimeIndexType(); - static ITimeIndex createTimeIndex(InputStream inputStream) throws IOException { + static ITimeIndex createTimeIndex(InputStream inputStream, IDeviceID.Deserializer deserializer) + throws IOException { byte timeIndexType = ReadWriteIOUtils.readByte(inputStream); if (timeIndexType == -1) { throw new IOException("The end of stream has been reached"); } - return TimeIndexLevel.valueOf(timeIndexType).getTimeIndex().deserialize(inputStream); + return TimeIndexLevel.valueOf(timeIndexType) + .getTimeIndex() + .deserialize(inputStream, deserializer); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PlainDeviceTimeIndex.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PlainDeviceTimeIndex.java index b962a447b06..309067d1672 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PlainDeviceTimeIndex.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/timeindex/PlainDeviceTimeIndex.java @@ -41,7 +41,8 @@ public class PlainDeviceTimeIndex extends ArrayDeviceTimeIndex implements ITimeI } @Override - public PlainDeviceTimeIndex deserialize(InputStream inputStream) throws IOException { + public PlainDeviceTimeIndex deserialize( + InputStream inputStream, IDeviceID.Deserializer deserializer) throws IOException { int deviceNum = ReadWriteIOUtils.readInt(inputStream); startTimes = new long[deviceNum]; diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java index 3d914cbeb3e..01242fabba9 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/CompactionSchedulerTest.java @@ -29,6 +29,7 @@ import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.CrossCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerSeqCompactionPerformer; import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.constant.InnerUnseqCompactionPerformer; +import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduleContext; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionScheduler; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.CompactionTaskManager; import org.apache.iotdb.db.storageengine.dataregion.compaction.schedule.constant.CompactionPriority; @@ -42,7 +43,10 @@ import org.apache.iotdb.db.storageengine.rescon.memory.SystemInfo; import org.apache.iotdb.db.utils.EnvironmentUtils; import org.apache.iotdb.db.utils.constant.TestConstant; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -1839,6 +1843,33 @@ public class CompactionSchedulerTest { } } + @Test + public void testManyDuplicatedDevicesInDifferentResources() throws IOException { + String sgName = COMPACTION_TEST_SG + "test18"; + TsFileResource resource1 = CompactionFileGeneratorUtils.generateTsFileResource(true, 0, sgName); + IDeviceID device = new StringArrayDeviceID("root.test.d1"); + resource1.updateStartTime(device, 1); + resource1.updateStartTime(device, 2); + resource1.serialize(); + resource1.degradeTimeIndex(); + + TsFileResource resource2 = CompactionFileGeneratorUtils.generateTsFileResource(true, 0, sgName); + device = new StringArrayDeviceID("root.test.d1"); + resource2.updateStartTime(device, 1); + resource2.updateStartTime(device, 2); + resource2.serialize(); + resource2.degradeTimeIndex(); + + CompactionScheduleContext context = new CompactionScheduleContext(); + IDeviceID.Deserializer deserializer = context.getCachedDeviceIdDeserializer(); + + IDeviceID deserializedFromResource1 = + resource1.buildDeviceTimeIndex(deserializer).getDevices().iterator().next(); + IDeviceID deserializedFromResource2 = + resource2.buildDeviceTimeIndex(deserializer).getDevices().iterator().next(); + Assert.assertSame(deserializedFromResource1, deserializedFromResource2); + } + public void stopCompactionTaskManager() { CompactionTaskManager.getInstance().clearCandidateQueue(); long sleepTime = 0;
