This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch table_disk_usage_statistics_with_cache in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3e3f6821c97e87878b4a7e131504969add276c34 Author: shuwenwei <[email protected]> AuthorDate: Fri Jan 16 19:02:38 2026 +0800 compact --- .../db/storageengine/dataregion/DataRegion.java | 6 +- .../dataregion/tsfile/TsFileManager.java | 12 + .../tableDiskUsageCache/TableDiskUsageCache.java | 55 +++- .../TableDiskUsageCacheWriter.java | 294 ++++++++++++--------- .../TsFileTableSizeCacheReader.java | 191 +++++++------ ...Writer.java => TsFileTableSizeCacheWriter.java} | 166 +++--------- .../utils/TsFileTableSizeCacheWriterTest.java | 183 +++++++++++++ 7 files changed, 562 insertions(+), 345 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index e0b9ab91cd9..1d4f873a20a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -137,6 +137,7 @@ import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeInd import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndexCacheRecorder; import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex; import org.apache.iotdb.db.storageengine.dataregion.utils.fileTimeIndexCache.FileTimeIndexCacheReader; +import org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TableDiskUsageCache; import org.apache.iotdb.db.storageengine.dataregion.utils.validate.TsFileValidator; import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager; import org.apache.iotdb.db.storageengine.dataregion.wal.node.IWALNode; @@ -1998,8 +1999,9 @@ public class DataRegion implements IDataRegionForQuery { "{} will close all files for deleting data folder {}", databaseName + "-" + dataRegionIdString, systemDir); - FileTimeIndexCacheRecorder.getInstance() - .removeFileTimeIndexCache(Integer.parseInt(dataRegionIdString)); + int regionId = Integer.parseInt(dataRegionIdString); + TableDiskUsageCache.getInstance().remove(databaseName, regionId); + FileTimeIndexCacheRecorder.getInstance().removeFileTimeIndexCache(regionId); writeLock("deleteFolder"); try { File dataRegionSystemFolder = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java index b7c1ba2c14f..86890370c31 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileManager.java @@ -146,6 +146,18 @@ public class TsFileManager { } } + public Pair<List<TsFileResource>, List<TsFileResource>> getTsFileListSnapshot( + long timePartition) { + readLock(); + try { + return new Pair<>( + new ArrayList<>(sequenceFiles.getOrDefault(timePartition, new TsFileResourceList())), + new ArrayList<>(unsequenceFiles.getOrDefault(timePartition, new TsFileResourceList()))); + } finally { + readUnlock(); + } + } + public List<TsFileResource> getTsFileList(boolean sequence, long startTime, long endTime) { // the iteration of ConcurrentSkipListMap is not concurrent secure // so we must add read lock here diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java index c63dca4a36a..e804030a614 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java @@ -33,6 +33,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; public class TableDiskUsageCache { private static final Logger LOGGER = LoggerFactory.getLogger(TableDiskUsageCache.class); @@ -51,7 +52,12 @@ public class TableDiskUsageCache { try { while (!Thread.currentThread().isInterrupted()) { try { - Operation operation = queue.take(); + Operation operation = queue.poll(1, TimeUnit.SECONDS); + if (operation == null) { + checkAndMayCloseIdleWriter(); + checkAndMayCompact(TimeUnit.SECONDS.toMillis(1)); + continue; + } operation.apply(this); } catch (InterruptedException e) { return; @@ -64,6 +70,24 @@ public class TableDiskUsageCache { } } + private void checkAndMayCompact(long maxRunTime) { + long startTime = System.currentTimeMillis(); + for (TableDiskUsageCacheWriter writer : writerMap.values()) { + if (System.currentTimeMillis() - startTime > maxRunTime) { + break; + } + if (writer.needCompact()) { + writer.compact(); + } + } + } + + private void checkAndMayCloseIdleWriter() { + for (TableDiskUsageCacheWriter writer : writerMap.values()) { + writer.closeIfIdle(); + } + } + public void write(String database, TsFileID tsFileID, Map<String, Long> tableSizeMap) { queue.add(new WriteOperation(database, tsFileID, tableSizeMap)); } @@ -83,6 +107,17 @@ public class TableDiskUsageCache { queue.add(operation); } + public void remove(String database, int regionId) { + RemoveRegionOperation operation = new RemoveRegionOperation(database, regionId); + queue.add(operation); + try { + operation.future.get(5, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception ignored) { + } + } + public abstract static class Operation { protected final String database; protected final int regionId; @@ -176,6 +211,24 @@ public class TableDiskUsageCache { } } + private static class RemoveRegionOperation extends Operation { + + private final CompletableFuture<Void> future = new CompletableFuture<>(); + + private RemoveRegionOperation(String database, int regionId) { + super(database, regionId); + } + + @Override + public void apply(TableDiskUsageCache tableDiskUsageCache) throws IOException { + TableDiskUsageCacheWriter writer = tableDiskUsageCache.writerMap.remove(regionId); + if (writer != null) { + writer.close(); + } + future.complete(null); + } + } + public static TableDiskUsageCache getInstance() { return TableDiskUsageCache.InstanceHolder.INSTANCE; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheWriter.java index 5c35b0701bf..d0e719fb65e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheWriter.java @@ -19,53 +19,54 @@ package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache; +import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.db.storageengine.StorageEngine; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.tsfile.utils.Pair; -import org.apache.tsfile.utils.ReadWriteForEncodingUtils; -import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.io.BufferedOutputStream; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.nio.channels.FileChannel; import java.nio.file.Files; -import java.nio.file.StandardOpenOption; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; public class TableDiskUsageCacheWriter { + private static final Logger logger = LoggerFactory.getLogger(TableDiskUsageCacheWriter.class); private static final String TSFILE_CACHE_KEY_FILENAME_PREFIX = "TableSizeKeyFile_"; private static final String TSFILE_CACHE_VALUE_FILENAME_PREFIX = "TableSizeValueFile_"; - public static final int KEY_FILE_OFFSET_RECORD_LENGTH = 5 * Long.BYTES; - public static final int KEY_FILE_REDIRECT_RECORD_LENGTH = 7 * Long.BYTES; + public static final int KEY_FILE_OFFSET_RECORD_LENGTH = 5 * Long.BYTES + 1; + public static final int KEY_FILE_REDIRECT_RECORD_LENGTH = 7 * Long.BYTES + 1; private static final String TEMP_CACHE_FILE_SUBFIX = ".tmp"; public static final byte KEY_FILE_RECORD_TYPE_OFFSET = 1; public static final byte KEY_FILE_RECORD_TYPE_REDIRECT = 2; private final int regionId; private int activeReaderNum = 0; + private long previousCompactionTimestamp = System.currentTimeMillis(); + private long lastWriteTimestamp = System.currentTimeMillis(); private int currentTsFileIndexFileVersion = 0; private final File dir; - private File currentKeyIndexFile; - private File currentValueIndexFile; - private FileOutputStream keyFileOutputStream; - private FileOutputStream valueFileOutputStream; - private BufferedOutputStream keyBufferedOutputStream; - private BufferedOutputStream valueBufferedOutputStream; - private long keyFileSize; - private long valueFileSize; + private TsFileTableSizeCacheWriter tsFileTableSizeCacheWriter; public TableDiskUsageCacheWriter(String database, int regionId) { this.regionId = regionId; this.dir = StorageEngine.getDataRegionSystemDir(database, regionId + ""); - recoverTsFileTableSizeIndexFile(); + recoverTsFileTableSizeIndexFile(true); } - private void recoverTsFileTableSizeIndexFile() { + private void recoverTsFileTableSizeIndexFile(boolean needRecover) { dir.mkdirs(); File[] files = dir.listFiles(); currentTsFileIndexFileVersion = 0; @@ -124,55 +125,16 @@ public class TableDiskUsageCacheWriter { currentTsFileIndexFileVersion, TSFILE_CACHE_VALUE_FILENAME_PREFIX, valueFiles); } } - currentKeyIndexFile = - keyFiles.isEmpty() - ? new File( - dir - + File.separator - + TSFILE_CACHE_KEY_FILENAME_PREFIX - + currentTsFileIndexFileVersion) - : keyFiles.get(0); - currentValueIndexFile = - valueFiles.isEmpty() - ? new File( - dir - + File.separator - + TSFILE_CACHE_VALUE_FILENAME_PREFIX - + currentTsFileIndexFileVersion) - : valueFiles.get(0); + File currentKeyIndexFile = generateKeyFile(currentTsFileIndexFileVersion, false); + File currentValueIndexFile = generateValueFile(currentTsFileIndexFileVersion, false); try { - cacheFileSelfCheck(); + this.tsFileTableSizeCacheWriter = + new TsFileTableSizeCacheWriter( + regionId, currentKeyIndexFile, currentValueIndexFile, needRecover); } catch (IOException ignored) { } } - private void cacheFileSelfCheck() throws IOException { - currentKeyIndexFile.createNewFile(); - currentValueIndexFile.createNewFile(); - TsFileTableSizeCacheReader cacheFileReader = - new TsFileTableSizeCacheReader( - currentKeyIndexFile.length(), - currentKeyIndexFile, - currentValueIndexFile.length(), - currentValueIndexFile, - regionId); - Pair<Long, Long> truncateSize = cacheFileReader.selfCheck(); - if (truncateSize.left != currentKeyIndexFile.length()) { - try (FileChannel channel = - FileChannel.open(currentKeyIndexFile.toPath(), StandardOpenOption.WRITE)) { - channel.truncate(truncateSize.left); - } - } - if (truncateSize.right != currentValueIndexFile.length()) { - try (FileChannel channel = - FileChannel.open(currentValueIndexFile.toPath(), StandardOpenOption.WRITE)) { - channel.truncate(truncateSize.right); - } - } - this.keyFileSize = truncateSize.left; - this.valueFileSize = truncateSize.right; - } - private void deleteOldVersionFiles(int maxVersion, String prefix, List<File> files) { for (File file : files) { try { @@ -186,80 +148,169 @@ public class TableDiskUsageCacheWriter { } public void write(TsFileID tsFileID, Map<String, Long> tableSizeMap) throws IOException { - if (keyFileOutputStream == null) { - keyFileOutputStream = new FileOutputStream(currentKeyIndexFile, true); - keyFileSize = currentKeyIndexFile.length(); - keyBufferedOutputStream = new BufferedOutputStream(keyFileOutputStream); - } - if (valueFileOutputStream == null) { - valueFileOutputStream = new FileOutputStream(currentValueIndexFile, true); - valueFileSize = currentValueIndexFile.length(); - valueBufferedOutputStream = new BufferedOutputStream(valueFileOutputStream); - } + tsFileTableSizeCacheWriter.write(tsFileID, tableSizeMap); + } - long valueOffset = valueFileSize; - valueFileSize += - ReadWriteForEncodingUtils.writeVarInt(tableSizeMap.size(), valueBufferedOutputStream); - for (Map.Entry<String, Long> entry : tableSizeMap.entrySet()) { - valueFileSize += ReadWriteIOUtils.writeVar(entry.getKey(), valueBufferedOutputStream); - valueFileSize += ReadWriteIOUtils.write(entry.getValue(), valueBufferedOutputStream); + public void write(TsFileID originTsFileID, TsFileID newTsFileID) throws IOException { + tsFileTableSizeCacheWriter.write(originTsFileID, newTsFileID); + } + + public void closeIfIdle() { + if (System.currentTimeMillis() - lastWriteTimestamp >= TimeUnit.MINUTES.toMillis(1)) { + close(); } - keyFileSize += ReadWriteIOUtils.write(KEY_FILE_RECORD_TYPE_OFFSET, keyBufferedOutputStream); - keyFileSize += ReadWriteIOUtils.write(tsFileID.timePartitionId, keyBufferedOutputStream); - keyFileSize += ReadWriteIOUtils.write(tsFileID.timestamp, keyBufferedOutputStream); - keyFileSize += ReadWriteIOUtils.write(tsFileID.fileVersion, keyBufferedOutputStream); - keyFileSize += ReadWriteIOUtils.write(tsFileID.compactionVersion, keyBufferedOutputStream); - keyFileSize += ReadWriteIOUtils.write(valueOffset, keyBufferedOutputStream); } - public void write(TsFileID originTsFileID, TsFileID newTsFileID) throws IOException { - if (keyFileOutputStream == null) { - keyFileOutputStream = new FileOutputStream(currentKeyIndexFile, true); - keyFileSize = currentKeyIndexFile.length(); - keyBufferedOutputStream = new BufferedOutputStream(keyFileOutputStream); + public boolean needCompact() { + if (activeReaderNum > 0) { + return false; + } + if (System.currentTimeMillis() - previousCompactionTimestamp <= TimeUnit.MINUTES.toMillis(2)) { + return false; + } + DataRegion dataRegion = StorageEngine.getInstance().getDataRegion(new DataRegionId(regionId)); + if (dataRegion == null || dataRegion.isDeleted()) { + return false; } - keyFileSize += ReadWriteIOUtils.write(KEY_FILE_RECORD_TYPE_REDIRECT, keyBufferedOutputStream); - keyFileSize += ReadWriteIOUtils.write(newTsFileID.timePartitionId, keyBufferedOutputStream); - keyFileSize += ReadWriteIOUtils.write(newTsFileID.timestamp, keyBufferedOutputStream); - keyFileSize += ReadWriteIOUtils.write(newTsFileID.fileVersion, keyBufferedOutputStream); - keyFileSize += ReadWriteIOUtils.write(newTsFileID.compactionVersion, keyBufferedOutputStream); - keyFileSize += ReadWriteIOUtils.write(originTsFileID.timestamp, keyBufferedOutputStream); - keyFileSize += ReadWriteIOUtils.write(originTsFileID.fileVersion, keyBufferedOutputStream); - keyFileSize += - ReadWriteIOUtils.write(originTsFileID.compactionVersion, keyBufferedOutputStream); + TsFileManager tsFileManager = dataRegion.getTsFileManager(); + int fileNum = tsFileManager.size(true) + tsFileManager.size(false); + int estimatedEntryNumInCacheFile = (int) (keyFileLength() / KEY_FILE_OFFSET_RECORD_LENGTH); + int delta = estimatedEntryNumInCacheFile - fileNum; + return delta > 0.2 * estimatedEntryNumInCacheFile || delta >= 1000; } - public void compact() {} + public void compact() { + previousCompactionTimestamp = System.currentTimeMillis(); + this.tsFileTableSizeCacheWriter.close(); + TsFileTableSizeCacheReader cacheFileReader = + new TsFileTableSizeCacheReader( + tsFileTableSizeCacheWriter.getKeyFile().length(), + tsFileTableSizeCacheWriter.getKeyFile(), + tsFileTableSizeCacheWriter.getValueFile().length(), + tsFileTableSizeCacheWriter.getValueFile(), + regionId); + Map<Long, TimePartitionTableSizeQueryContext> contextMap = new HashMap<>(); + try { + cacheFileReader.openKeyFile(); + while (cacheFileReader.hasNextEntryInKeyFile()) { + TsFileTableSizeCacheReader.KeyFileEntry keyFileEntry = + cacheFileReader.readOneEntryFromKeyFile(); + TimePartitionTableSizeQueryContext context = + contextMap.computeIfAbsent( + keyFileEntry.getTimePartitionId(), + k -> new TimePartitionTableSizeQueryContext(Collections.emptyMap())); + if (keyFileEntry.originTsFileID == null) { + context.addCachedTsFileIDAndOffsetInValueFile(keyFileEntry.tsFileID, keyFileEntry.offset); + } else { + context.replaceCachedTsFileID(keyFileEntry.originTsFileID, keyFileEntry.tsFileID); + } + } + } catch (IOException e) { + return; + } finally { + cacheFileReader.closeCurrentFile(); + } - public void flush() throws IOException { - if (valueBufferedOutputStream != null) { - valueBufferedOutputStream.flush(); + List<Pair<TsFileID, Long>> validFilesOrderByOffset = new ArrayList<>(); + DataRegion dataRegion = StorageEngine.getInstance().getDataRegion(new DataRegionId(regionId)); + if (dataRegion == null || dataRegion.isDeleted()) { + return; } - if (keyFileOutputStream != null) { - keyBufferedOutputStream.flush(); + TsFileManager tsFileManager = dataRegion.getTsFileManager(); + for (Long timePartition : tsFileManager.getTimePartitions()) { + TimePartitionTableSizeQueryContext context = contextMap.get(timePartition); + if (context == null) { + continue; + } + Pair<List<TsFileResource>, List<TsFileResource>> resources = + tsFileManager.getTsFileListSnapshot(timePartition); + Stream.concat(resources.left.stream(), resources.right.stream()) + .forEach( + resource -> { + Long offset = context.getCachedTsFileIdOffset(resource.getTsFileID()); + if (offset != null) { + validFilesOrderByOffset.add(new Pair<>(resource.getTsFileID(), offset)); + } + }); } + validFilesOrderByOffset.sort(Comparator.comparingLong(Pair::getRight)); + + TsFileTableSizeCacheWriter targetFileWriter = null; + try { + targetFileWriter = + new TsFileTableSizeCacheWriter( + regionId, + generateKeyFile(currentTsFileIndexFileVersion + 1, true), + generateValueFile(currentTsFileIndexFileVersion + 1, true)); + cacheFileReader.openValueFile(); + for (Pair<TsFileID, Long> pair : validFilesOrderByOffset) { + TsFileID tsFileID = pair.getLeft(); + long offset = pair.getRight(); + Map<String, Long> tableSizeMap = cacheFileReader.readOneEntryFromValueFile(offset, true); + targetFileWriter.write(tsFileID, tableSizeMap); + } + targetFileWriter.close(); + + // replace + File targetKeyFile = generateKeyFile(currentTsFileIndexFileVersion + 1, false); + File targetValueFile = generateValueFile(currentTsFileIndexFileVersion + 1, false); + targetFileWriter.getKeyFile().renameTo(targetKeyFile); + targetFileWriter.getValueFile().renameTo(targetValueFile); + this.tsFileTableSizeCacheWriter.close(); + } catch (Exception e) { + logger.error("Failed to execute compaction for tsfile table size cache file", e); + } finally { + if (tsFileTableSizeCacheWriter != null) { + tsFileTableSizeCacheWriter.close(); + } + if (targetFileWriter != null) { + targetFileWriter.close(); + } + cacheFileReader.closeCurrentFile(); + this.recoverTsFileTableSizeIndexFile(false); + } + } + + private File generateKeyFile(int version, boolean isTempFile) { + return new File( + dir + + File.separator + + TSFILE_CACHE_KEY_FILENAME_PREFIX + + version + + (isTempFile ? TEMP_CACHE_FILE_SUBFIX : "")); + } + + private File generateValueFile(int version, boolean isTempFile) { + return new File( + dir + + File.separator + + TSFILE_CACHE_VALUE_FILENAME_PREFIX + + version + + (isTempFile ? TEMP_CACHE_FILE_SUBFIX : "")); + } + + public void flush() throws IOException { + tsFileTableSizeCacheWriter.flush(); } public File getKeyFile() { - return currentKeyIndexFile; + return tsFileTableSizeCacheWriter.getKeyFile(); } public File getValueFile() { - return currentValueIndexFile; + return tsFileTableSizeCacheWriter.getValueFile(); } public long keyFileLength() { - return keyFileSize; + return tsFileTableSizeCacheWriter.keyFileLength(); } public long valueFileLength() { - return valueFileSize; + return tsFileTableSizeCacheWriter.valueFileLength(); } - public void fsync() throws IOException { - flush(); - valueFileOutputStream.getFD().sync(); - keyFileOutputStream.getFD().sync(); + public void sync() throws IOException { + tsFileTableSizeCacheWriter.sync(); } public void increaseActiveReaderNum() { @@ -274,22 +325,9 @@ public class TableDiskUsageCacheWriter { return activeReaderNum; } + public void removeFiles() {} + public void close() { - try { - fsync(); - } catch (IOException ignored) { - } - try { - if (valueBufferedOutputStream != null) { - valueBufferedOutputStream.close(); - } - } catch (IOException ignored) { - } - try { - if (keyBufferedOutputStream != null) { - keyBufferedOutputStream.close(); - } - } catch (IOException ignored) { - } + this.tsFileTableSizeCacheWriter.close(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TsFileTableSizeCacheReader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TsFileTableSizeCacheReader.java index 7229497ae36..04c416e02db 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TsFileTableSizeCacheReader.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TsFileTableSizeCacheReader.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache; -import org.apache.iotdb.commons.exception.IoTDBException; import org.apache.iotdb.commons.exception.IoTDBRuntimeException; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; import org.apache.iotdb.db.utils.MmapUtil; @@ -37,13 +36,13 @@ import java.nio.channels.FileChannel; import java.nio.file.Path; import java.nio.file.StandardOpenOption; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; public class TsFileTableSizeCacheReader { - private long readSize = 0; private final File keyFile; private final long keyFileLength; private final File valueFile; @@ -73,48 +72,50 @@ public class TsFileTableSizeCacheReader { } public Pair<Long, Long> selfCheck() { - List<Pair<Long, Long>> offsetsByReadValueFile = new ArrayList<>(); + if (keyFileLength == 0 || valueFileLength == 0) { + return new Pair<>(0L, 0L); + } + List<Long> offsetsInKeyFile = new ArrayList<>(); + long lastCompleteEntryEndOffsetInKeyFile = 0; try { - openValueFile(); - while (readSize < valueFileLength) { - long offset = inputStream.position(); - int tableNum = ReadWriteForEncodingUtils.readVarInt(inputStream); - if (tableNum <= 0) { - break; - } - for (int i = 0; i < tableNum; i++) { - ReadWriteIOUtils.readVarIntString(inputStream); - ReadWriteIOUtils.readLong(inputStream); + openKeyFile(); + while (hasNextEntryInKeyFile()) { + KeyFileEntry keyFileEntry = readOneEntryFromKeyFile(); + lastCompleteEntryEndOffsetInKeyFile = inputStream.position(); + if (keyFileEntry.originTsFileID != null) { + continue; } - offsetsByReadValueFile.add(new Pair<>(offset, inputStream.position())); + offsetsInKeyFile.add(keyFileEntry.offset); } } catch (Exception ignored) { } finally { closeCurrentFile(); } - if (offsetsByReadValueFile.isEmpty()) { + if (offsetsInKeyFile.isEmpty()) { return new Pair<>(0L, 0L); } - Iterator<Pair<Long, Long>> valueOffsetIterator = offsetsByReadValueFile.iterator(); + + int keyIterIndex = 0; long keyFileTruncateSize = 0; long valueFileTruncateSize = 0; + try { - openKeyFile(); - while (readSize < keyFileLength) { - KeyFileEntry keyFileEntry = readOneEntryFromKeyFile(); - if (keyFileEntry.originTsFileID != null) { - continue; - } - if (!valueOffsetIterator.hasNext()) { - break; - } - Pair<Long, Long> startEndOffsetInValueFile = valueOffsetIterator.next(); - if (startEndOffsetInValueFile.left != keyFileEntry.offset) { + openValueFile(); + while (inputStream.position() < valueFileLength && keyIterIndex < offsetsInKeyFile.size()) { + long startOffsetInKeyFile = offsetsInKeyFile.get(keyIterIndex); + long endOffsetInKeyFile = + keyIterIndex == offsetsInKeyFile.size() - 1 + ? lastCompleteEntryEndOffsetInKeyFile + : offsetsInKeyFile.get(keyIterIndex + 1); + keyIterIndex++; + long startOffset = inputStream.position(); + if (startOffset != startOffsetInKeyFile) { break; } - keyFileTruncateSize = readSize; - valueFileTruncateSize = startEndOffsetInValueFile.right; + readOneEntryFromValueFile(startOffset, false); + keyFileTruncateSize = endOffsetInKeyFile; + valueFileTruncateSize = inputStream.position(); } } catch (Exception ignored) { } finally { @@ -131,7 +132,10 @@ public class TsFileTableSizeCacheReader { long previousTimePartition = 0; TimePartitionTableSizeQueryContext timePartitionContext = null; do { - if (readSize >= keyFileLength) { + if (keyFileLength == 0) { + return true; + } + if (!hasNextEntryInKeyFile()) { closeCurrentFile(); return true; } @@ -150,7 +154,6 @@ public class TsFileTableSizeCacheReader { keyFileEntry.tsFileID, keyFileEntry.originTsFileID); } } catch (IOException e) { - readSize = keyFileLength; closeCurrentFile(); throw e; } @@ -158,7 +161,11 @@ public class TsFileTableSizeCacheReader { return false; } - private KeyFileEntry readOneEntryFromKeyFile() throws IOException { + public boolean hasNextEntryInKeyFile() { + return keyFileLength > 0 && inputStream.position() < keyFileLength; + } + + public KeyFileEntry readOneEntryFromKeyFile() throws IOException { byte type = ReadWriteIOUtils.readByte(inputStream); long timePartition = ReadWriteIOUtils.readLong(inputStream); long timestamp = ReadWriteIOUtils.readLong(inputStream); @@ -170,7 +177,6 @@ public class TsFileTableSizeCacheReader { if (type == TableDiskUsageCacheWriter.KEY_FILE_RECORD_TYPE_OFFSET) { long offset = ReadWriteIOUtils.readLong(inputStream); keyFileEntry = new KeyFileEntry(tsFileID, offset); - readSize += TableDiskUsageCacheWriter.KEY_FILE_OFFSET_RECORD_LENGTH + 1; } else if (type == TableDiskUsageCacheWriter.KEY_FILE_RECORD_TYPE_REDIRECT) { long originTimestamp = ReadWriteIOUtils.readLong(inputStream); long originFileVersion = ReadWriteIOUtils.readLong(inputStream); @@ -179,7 +185,6 @@ public class TsFileTableSizeCacheReader { new TsFileID( regionId, timePartition, originTimestamp, originFileVersion, originCompactionVersion); keyFileEntry = new KeyFileEntry(tsFileID, originTsFileID); - readSize += TableDiskUsageCacheWriter.KEY_FILE_REDIRECT_RECORD_LENGTH + 1; } else { throw new IoTDBRuntimeException( "Unsupported record type in file: " + keyFile.getPath() + ", type: " + type, @@ -210,7 +215,6 @@ public class TsFileTableSizeCacheReader { long offset = pair.right; inputStream.seek(offset); - readSize = offset; int tableNum = ReadWriteForEncodingUtils.readVarInt(inputStream); for (int i = 0; i < tableNum; i++) { String tableName = ReadWriteIOUtils.readVarIntString(inputStream); @@ -221,6 +225,24 @@ public class TsFileTableSizeCacheReader { return false; } + public Map<String, Long> readOneEntryFromValueFile(long offset, boolean needResult) + throws IOException { + inputStream.seek(offset); + int tableNum = ReadWriteForEncodingUtils.readVarInt(inputStream); + if (tableNum <= 0) { + throw new IllegalArgumentException("tableNum should be greater than 0"); + } + Map<String, Long> tableSizeMap = needResult ? new HashMap<>(tableNum) : null; + for (int i = 0; i < tableNum; i++) { + String tableName = ReadWriteIOUtils.readVarIntString(inputStream); + long size = ReadWriteIOUtils.readLong(inputStream); + if (needResult) { + tableSizeMap.put(tableName, size); + } + } + return tableSizeMap; + } + public void closeCurrentFile() { if (inputStream != null) { try { @@ -228,11 +250,10 @@ public class TsFileTableSizeCacheReader { } catch (IOException ignored) { } inputStream = null; - readSize = 0; } } - private static class KeyFileEntry { + public static class KeyFileEntry { public TsFileID tsFileID; public TsFileID originTsFileID; public long offset; @@ -246,55 +267,61 @@ public class TsFileTableSizeCacheReader { this.tsFileID = tsFileID; this.originTsFileID = originTsFileID; } + + public long getTimePartitionId() { + return tsFileID.timePartitionId; + } } - private static class DirectBufferedSeekableFileInputStream extends InputStream { + public static final class DirectBufferedSeekableFileInputStream extends InputStream { private final FileChannel channel; - private ByteBuffer buffer; + private final ByteBuffer buffer; + // file offset of buffer[0] private long bufferStartPos = 0; - private long position = 0; - private final int seekThreshold; + // next read position + private long position = 0; public DirectBufferedSeekableFileInputStream(Path path, int bufferSize) throws IOException { this.channel = FileChannel.open(path, StandardOpenOption.READ); this.buffer = ByteBuffer.allocateDirect(bufferSize); - this.buffer.limit(0); - this.seekThreshold = bufferSize * 2; + this.buffer.limit(0); // mark empty } + /** Only support forward seek: newPos >= position */ public void seek(long newPos) throws IOException { + if (newPos < position) { + throw new UnsupportedOperationException("Backward seek is not supported"); + } + + // Fast path 0: no-op if (newPos == position) { return; } - if (newPos > position) { + long delta = newPos - position; - long bufferEnd = bufferStartPos + buffer.limit(); - - if (newPos < bufferEnd) { - buffer.position((int) (newPos - bufferStartPos)); - position = newPos; - return; - } - - long gap = newPos - position; + // Fast path 1: consume remaining buffer + if (delta <= buffer.remaining()) { + buffer.position(buffer.position() + (int) delta); + position = newPos; + return; + } - if (gap <= seekThreshold) { - discardBuffer(); - bufferStartPos = position; - refill(); - if (newPos < bufferStartPos + buffer.limit()) { - buffer.position((int) (newPos - bufferStartPos)); - position = newPos; - return; - } - } + // Fast path 2: still inside buffer window (rare but safe) + long bufferEnd = bufferStartPos + buffer.limit(); + if (newPos >= bufferStartPos && newPos < bufferEnd) { + buffer.position((int) (newPos - bufferStartPos)); + position = newPos; + return; } - discardBuffer(); + // Slow path: invalidate buffer and jump + buffer.clear(); + buffer.limit(0); + channel.position(newPos); bufferStartPos = newPos; position = newPos; @@ -317,57 +344,57 @@ public class TsFileTableSizeCacheReader { return 0; } - int totalRead = 0; - + int total = 0; while (len > 0) { if (!buffer.hasRemaining()) { if (!refill()) { - return totalRead == 0 ? -1 : totalRead; + return total == 0 ? -1 : total; } } - int n = Math.min(len, buffer.remaining()); buffer.get(dst, off, n); - off += n; len -= n; - totalRead += n; + total += n; position += n; } - - return totalRead; + return total; } private boolean refill() throws IOException { buffer.clear(); - bufferStartPos = channel.position(); + channel.position(position); + bufferStartPos = position; int read = channel.read(buffer); if (read <= 0) { buffer.limit(0); return false; } - buffer.flip(); return true; } - private void discardBuffer() { - buffer.clear(); - buffer.limit(0); - } - public long position() { return position; } + @Override + public int available() throws IOException { + long remainingInFile = channel.size() - position; + if (remainingInFile <= 0) { + return 0; + } + return (int) Math.min(Integer.MAX_VALUE, remainingInFile); + } + @Override public void close() throws IOException { - if (buffer != null) { + try { MmapUtil.clean(buffer); - buffer = null; + } finally { + channel.close(); } - channel.close(); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TsFileTableSizeCacheWriter.java similarity index 57% copy from iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheWriter.java copy to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TsFileTableSizeCacheWriter.java index 5c35b0701bf..29287eb2af2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TsFileTableSizeCacheWriter.java @@ -19,7 +19,6 @@ package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache; -import org.apache.iotdb.db.storageengine.StorageEngine; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID; import org.apache.tsfile.utils.Pair; @@ -31,27 +30,16 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.nio.channels.FileChannel; -import java.nio.file.Files; import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.List; import java.util.Map; -public class TableDiskUsageCacheWriter { - private static final String TSFILE_CACHE_KEY_FILENAME_PREFIX = "TableSizeKeyFile_"; - private static final String TSFILE_CACHE_VALUE_FILENAME_PREFIX = "TableSizeValueFile_"; - public static final int KEY_FILE_OFFSET_RECORD_LENGTH = 5 * Long.BYTES; - public static final int KEY_FILE_REDIRECT_RECORD_LENGTH = 7 * Long.BYTES; - private static final String TEMP_CACHE_FILE_SUBFIX = ".tmp"; - public static final byte KEY_FILE_RECORD_TYPE_OFFSET = 1; - public static final byte KEY_FILE_RECORD_TYPE_REDIRECT = 2; +import static org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TableDiskUsageCacheWriter.KEY_FILE_RECORD_TYPE_OFFSET; +import static org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TableDiskUsageCacheWriter.KEY_FILE_RECORD_TYPE_REDIRECT; +public class TsFileTableSizeCacheWriter { private final int regionId; - private int activeReaderNum = 0; - private int currentTsFileIndexFileVersion = 0; - private final File dir; - private File currentKeyIndexFile; - private File currentValueIndexFile; + private final File currentKeyIndexFile; + private final File currentValueIndexFile; private FileOutputStream keyFileOutputStream; private FileOutputStream valueFileOutputStream; private BufferedOutputStream keyBufferedOutputStream; @@ -59,96 +47,28 @@ public class TableDiskUsageCacheWriter { private long keyFileSize; private long valueFileSize; - public TableDiskUsageCacheWriter(String database, int regionId) { - this.regionId = regionId; - this.dir = StorageEngine.getDataRegionSystemDir(database, regionId + ""); - recoverTsFileTableSizeIndexFile(); + public TsFileTableSizeCacheWriter( + int regionId, File currentKeyIndexFile, File currentValueIndexFile) throws IOException { + this(regionId, currentKeyIndexFile, currentValueIndexFile, true); } - private void recoverTsFileTableSizeIndexFile() { - dir.mkdirs(); - File[] files = dir.listFiles(); - currentTsFileIndexFileVersion = 0; - List<File> keyFiles = new ArrayList<>(); - List<File> valueFiles = new ArrayList<>(); - if (files != null) { - for (File file : files) { - String fileName = file.getName(); - boolean isKeyFile = fileName.startsWith(TSFILE_CACHE_KEY_FILENAME_PREFIX); - boolean isValueFile = !isKeyFile && fileName.startsWith(TSFILE_CACHE_VALUE_FILENAME_PREFIX); - boolean isTempFile = fileName.endsWith(TEMP_CACHE_FILE_SUBFIX); - if (!isKeyFile) { - if (isValueFile && !isTempFile) { - valueFiles.add(file); - } - continue; - } - if (isTempFile) { - try { - Files.delete(file.toPath()); - } catch (IOException ignored) { - } - } - int version; - try { - version = Integer.parseInt(fileName.substring(TSFILE_CACHE_KEY_FILENAME_PREFIX.length())); - currentTsFileIndexFileVersion = Math.max(currentTsFileIndexFileVersion, version); - } catch (NumberFormatException ignored) { - continue; - } - File valueFile = - new File(dir + File.separator + TSFILE_CACHE_VALUE_FILENAME_PREFIX + version); - // may have a valid value index file - if (!valueFile.exists()) { - File tempValueFile = new File(valueFile.getPath() + TEMP_CACHE_FILE_SUBFIX); - if (tempValueFile.exists()) { - tempValueFile.renameTo(valueFile); - valueFiles.add(valueFile); - } else { - // lost value file - try { - Files.delete(file.toPath()); - } catch (IOException ignored) { - } - continue; - } - } - keyFiles.add(file); - } - if (keyFiles.size() > 1) { - deleteOldVersionFiles( - currentTsFileIndexFileVersion, TSFILE_CACHE_KEY_FILENAME_PREFIX, keyFiles); - } - if (valueFiles.size() > 1) { - deleteOldVersionFiles( - currentTsFileIndexFileVersion, TSFILE_CACHE_VALUE_FILENAME_PREFIX, valueFiles); - } - } - currentKeyIndexFile = - keyFiles.isEmpty() - ? new File( - dir - + File.separator - + TSFILE_CACHE_KEY_FILENAME_PREFIX - + currentTsFileIndexFileVersion) - : keyFiles.get(0); - currentValueIndexFile = - valueFiles.isEmpty() - ? new File( - dir - + File.separator - + TSFILE_CACHE_VALUE_FILENAME_PREFIX - + currentTsFileIndexFileVersion) - : valueFiles.get(0); - try { - cacheFileSelfCheck(); - } catch (IOException ignored) { + public TsFileTableSizeCacheWriter( + int regionId, File currentKeyIndexFile, File currentValueIndexFile, boolean recover) + throws IOException { + this.regionId = regionId; + this.currentKeyIndexFile = currentKeyIndexFile; + this.currentValueIndexFile = currentValueIndexFile; + currentKeyIndexFile.createNewFile(); + currentValueIndexFile.createNewFile(); + if (recover) { + recover(); + } else { + keyFileSize = currentKeyIndexFile.length(); + valueFileSize = currentValueIndexFile.length(); } } - private void cacheFileSelfCheck() throws IOException { - currentKeyIndexFile.createNewFile(); - currentValueIndexFile.createNewFile(); + private void recover() throws IOException { TsFileTableSizeCacheReader cacheFileReader = new TsFileTableSizeCacheReader( currentKeyIndexFile.length(), @@ -173,18 +93,6 @@ public class TableDiskUsageCacheWriter { this.valueFileSize = truncateSize.right; } - private void deleteOldVersionFiles(int maxVersion, String prefix, List<File> files) { - for (File file : files) { - try { - int version = Integer.parseInt(file.getName().substring(prefix.length())); - if (version != maxVersion) { - Files.deleteIfExists(file.toPath()); - } - } catch (Exception e) { - } - } - } - public void write(TsFileID tsFileID, Map<String, Long> tableSizeMap) throws IOException { if (keyFileOutputStream == null) { keyFileOutputStream = new FileOutputStream(currentKeyIndexFile, true); @@ -229,8 +137,6 @@ public class TableDiskUsageCacheWriter { ReadWriteIOUtils.write(originTsFileID.compactionVersion, keyBufferedOutputStream); } - public void compact() {} - public void flush() throws IOException { if (valueBufferedOutputStream != null) { valueBufferedOutputStream.flush(); @@ -256,38 +162,34 @@ public class TableDiskUsageCacheWriter { return valueFileSize; } - public void fsync() throws IOException { + public void sync() throws IOException { flush(); - valueFileOutputStream.getFD().sync(); - keyFileOutputStream.getFD().sync(); - } - - public void increaseActiveReaderNum() { - activeReaderNum++; - } - - public void decreaseActiveReaderNum() { - activeReaderNum--; - } - - public int getActiveReaderNum() { - return activeReaderNum; + if (valueFileOutputStream != null) { + valueFileOutputStream.getFD().sync(); + } + if (keyFileOutputStream != null) { + keyFileOutputStream.getFD().sync(); + } } public void close() { try { - fsync(); + sync(); } catch (IOException ignored) { } try { if (valueBufferedOutputStream != null) { valueBufferedOutputStream.close(); + valueBufferedOutputStream = null; + valueFileOutputStream = null; } } catch (IOException ignored) { } try { if (keyBufferedOutputStream != null) { keyBufferedOutputStream.close(); + keyBufferedOutputStream = null; + keyFileOutputStream = null; } } catch (IOException ignored) { } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileTableSizeCacheWriterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileTableSizeCacheWriterTest.java new file mode 100644 index 00000000000..53cf08ab747 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileTableSizeCacheWriterTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.utils; + +import org.apache.iotdb.commons.consensus.DataRegionId; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.db.exception.StorageEngineException; +import org.apache.iotdb.db.storageengine.StorageEngine; +import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager; +import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; +import org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TableDiskUsageCacheWriter; +import org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TsFileTableSizeCacheReader; + +import org.apache.tsfile.exception.write.WriteProcessException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; + +public class TsFileTableSizeCacheWriterTest extends AbstractCompactionTest { + + @Before + public void setUp() + throws IOException, WriteProcessException, MetadataException, InterruptedException { + super.setUp(); + } + + @After + public void tearDown() throws IOException, StorageEngineException { + super.tearDown(); + } + + @Test + public void testCompactEmptyTargetFile() throws IOException { + DataRegion dataRegion = Mockito.mock(DataRegion.class); + Mockito.when(dataRegion.getDatabaseName()).thenReturn("root.test"); + Mockito.when(dataRegion.getDataRegionIdString()).thenReturn("0"); + StorageEngine.getInstance().setDataRegion(new DataRegionId(0), dataRegion); + TsFileManager tsFileManager = new TsFileManager("root.test", "0", ""); + Mockito.when(dataRegion.getTsFileManager()).thenReturn(tsFileManager); + TableDiskUsageCacheWriter writer = + new TableDiskUsageCacheWriter(dataRegion.getDatabaseName(), 0); + File oldKeyFile = writer.getKeyFile(); + File oldValueFile = writer.getValueFile(); + Assert.assertEquals("TableSizeKeyFile_0", oldKeyFile.getName()); + Assert.assertEquals("TableSizeValueFile_0", oldValueFile.getName()); + + TsFileResource resource1 = createEmptyFileAndResourceWithName("1-1-0-0.tsfile", 1, true); + TsFileResource resource2 = createEmptyFileAndResourceWithName("2-2-0-0.tsfile", 1, true); + TsFileResource resource3 = createEmptyFileAndResourceWithName("3-3-0-0.tsfile", 1, false); + + writer.write(resource1.getTsFileID(), Collections.singletonMap("table1", 10L)); + writer.write(resource2.getTsFileID(), Collections.singletonMap("table1", 20L)); + writer.write(resource3.getTsFileID(), Collections.singletonMap("table2", 200L)); + + writer.compact(); + + Assert.assertEquals("TableSizeKeyFile_1", writer.getKeyFile().getName()); + Assert.assertEquals("TableSizeValueFile_1", writer.getValueFile().getName()); + writer.close(); + } + + @Test + public void testCompactTargetFile1() throws IOException { + DataRegion dataRegion = Mockito.mock(DataRegion.class); + Mockito.when(dataRegion.getDatabaseName()).thenReturn("root.test"); + Mockito.when(dataRegion.getDataRegionIdString()).thenReturn("0"); + StorageEngine.getInstance().setDataRegion(new DataRegionId(0), dataRegion); + TsFileManager tsFileManager = new TsFileManager("root.test", "0", ""); + Mockito.when(dataRegion.getTsFileManager()).thenReturn(tsFileManager); + TableDiskUsageCacheWriter writer = + new TableDiskUsageCacheWriter(dataRegion.getDatabaseName(), 0); + File oldKeyFile = writer.getKeyFile(); + File oldValueFile = writer.getValueFile(); + Assert.assertEquals("TableSizeKeyFile_0", oldKeyFile.getName()); + Assert.assertEquals("TableSizeValueFile_0", oldValueFile.getName()); + + TsFileResource resource1 = createEmptyFileAndResourceWithName("1-1-0-0.tsfile", 1, true); + TsFileResource resource2 = createEmptyFileAndResourceWithName("2-2-0-0.tsfile", 1, true); + TsFileResource resource3 = createEmptyFileAndResourceWithName("3-3-0-0.tsfile", 1, false); + tsFileManager.add(resource1, true); + tsFileManager.add(resource3, false); + + writer.write(resource1.getTsFileID(), Collections.singletonMap("table1", 10L)); + writer.write(resource2.getTsFileID(), Collections.singletonMap("table1", 10L)); + writer.write(resource3.getTsFileID(), Collections.singletonMap("table2", 200L)); + + writer.compact(); + + File targetKeyFile = writer.getKeyFile(); + File targetValueFile = writer.getValueFile(); + Assert.assertFalse(oldKeyFile.exists()); + Assert.assertFalse(oldValueFile.exists()); + Assert.assertEquals("TableSizeKeyFile_1", targetKeyFile.getName()); + Assert.assertEquals("TableSizeValueFile_1", targetValueFile.getName()); + writer.close(); + + TsFileTableSizeCacheReader reader = + new TsFileTableSizeCacheReader( + targetKeyFile.length(), targetKeyFile, targetValueFile.length(), targetValueFile, 1); + reader.openKeyFile(); + int count = 0; + while (reader.hasNextEntryInKeyFile()) { + TsFileTableSizeCacheReader.KeyFileEntry keyFileEntry = reader.readOneEntryFromKeyFile(); + count++; + } + reader.closeCurrentFile(); + Assert.assertEquals(2, count); + } + + @Test + public void testCompactTargetFile2() throws IOException { + DataRegion dataRegion = Mockito.mock(DataRegion.class); + Mockito.when(dataRegion.getDatabaseName()).thenReturn("root.test"); + Mockito.when(dataRegion.getDataRegionIdString()).thenReturn("0"); + StorageEngine.getInstance().setDataRegion(new DataRegionId(0), dataRegion); + TsFileManager tsFileManager = new TsFileManager("root.test", "0", ""); + Mockito.when(dataRegion.getTsFileManager()).thenReturn(tsFileManager); + TableDiskUsageCacheWriter writer = + new TableDiskUsageCacheWriter(dataRegion.getDatabaseName(), 0); + File oldKeyFile = writer.getKeyFile(); + File oldValueFile = writer.getValueFile(); + Assert.assertEquals("TableSizeKeyFile_0", oldKeyFile.getName()); + Assert.assertEquals("TableSizeValueFile_0", oldValueFile.getName()); + + TsFileResource resource1 = createEmptyFileAndResourceWithName("1-1-0-0.tsfile", 1, true); + TsFileResource resource2 = createEmptyFileAndResourceWithName("2-2-0-0.tsfile", 1, true); + TsFileResource resource3 = createEmptyFileAndResourceWithName("3-3-0-0.tsfile", 1, false); + tsFileManager.add(resource1, true); + tsFileManager.add(resource2, true); + + writer.write(resource1.getTsFileID(), Collections.singletonMap("table1", 10L)); + writer.write(resource3.getTsFileID(), Collections.singletonMap("table2", 200L)); + writer.write(resource3.getTsFileID(), resource2.getTsFileID()); + + writer.compact(); + + File targetKeyFile = writer.getKeyFile(); + File targetValueFile = writer.getValueFile(); + Assert.assertFalse(oldKeyFile.exists()); + Assert.assertFalse(oldValueFile.exists()); + Assert.assertEquals("TableSizeKeyFile_1", targetKeyFile.getName()); + Assert.assertEquals("TableSizeValueFile_1", targetValueFile.getName()); + writer.close(); + + TsFileTableSizeCacheReader reader = + new TsFileTableSizeCacheReader( + targetKeyFile.length(), targetKeyFile, targetValueFile.length(), targetValueFile, 1); + reader.openKeyFile(); + int count = 0; + while (reader.hasNextEntryInKeyFile()) { + TsFileTableSizeCacheReader.KeyFileEntry keyFileEntry = reader.readOneEntryFromKeyFile(); + Assert.assertNotEquals(3, keyFileEntry.tsFileID.fileVersion); + count++; + } + reader.closeCurrentFile(); + Assert.assertEquals(2, count); + } +}
