This is an automated email from the ASF dual-hosted git repository.
shuwenwei pushed a commit to branch cacheMetadataIndexNodeOffsetsForQuery
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/cacheMetadataIndexNodeOffsetsForQuery by this push:
new 0693cbff6c0 support search devices from different tables
0693cbff6c0 is described below
commit 0693cbff6c06217c8e160ca2872c4019f2c0a518
Author: shuwenwei <[email protected]>
AuthorDate: Tue Jul 29 18:08:05 2025 +0800
support search devices from different tables
---
.../fragment/DeviceMetadataIndexEntryCache.java | 70 +++++++++++++++-------
.../buffer/TimeSeriesMetadataCache.java | 14 ++---
2 files changed, 51 insertions(+), 33 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/DeviceMetadataIndexEntryCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/DeviceMetadataIndexEntryCache.java
index 316cdb4ae02..874518dac1a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/DeviceMetadataIndexEntryCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/DeviceMetadataIndexEntryCache.java
@@ -30,21 +30,28 @@ import org.apache.tsfile.utils.RamUsageEstimator;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.LongConsumer;
public class DeviceMetadataIndexEntryCache {
private static final long MAX_CACHED_SIZE = 32 * 1024 * 1024;
private final FragmentInstanceContext context;
private TreeMap<IDeviceID, Integer> deviceIndexMap;
- private final Map<String, long[]> deviceMetadataIndexNodeOffsetsCache = new
HashMap<>();
+ private final Map<String, long[]> deviceMetadataIndexNodeOffsetsCache = new
ConcurrentHashMap<>();
private List<IDeviceID> sortedDevices;
private int[] deviceIdxArr;
+ private final AtomicInteger cachedFileNum;
+ private LongConsumer ioSizeRecorder;
public DeviceMetadataIndexEntryCache(FragmentInstanceContext context) {
this.context = context;
+ this.cachedFileNum = new AtomicInteger(0);
+ this.ioSizeRecorder =
+
context.getQueryStatistics().getLoadTimeSeriesMetadataActualIOSize()::addAndGet;
}
public void addDevices(AbstractDataSourceOperator operator,
List<DeviceEntry> deviceEntries) {
@@ -66,7 +73,8 @@ public class DeviceMetadataIndexEntryCache {
}
public Pair<long[], Boolean> getCachedDeviceMetadataIndexNodeOffset(
- int deviceIndex, String filePath) throws IOException {
+ IDeviceID device, int deviceIndex, String filePath, boolean
ignoreNotExists)
+ throws IOException {
// cache is disabled
if (deviceIndex < 0) {
return new Pair<>(null, true);
@@ -80,6 +88,9 @@ public class DeviceMetadataIndexEntryCache {
long startOffset = resourceCache[2 * indexAfterSort];
// the device does not exist in the file
if (startOffset < 0) {
+ if (!ignoreNotExists) {
+ throw new IOException("Device {" + device + "} is not in
tsFileMetaData of " + filePath);
+ }
return new Pair<>(null, false);
}
long endOffset = resourceCache[2 * indexAfterSort + 1];
@@ -87,20 +98,23 @@ public class DeviceMetadataIndexEntryCache {
}
private long[] loadOffsetsToCache(String filePath) throws IOException {
- long[] offsets = deviceMetadataIndexNodeOffsetsCache.get(filePath);
- if (offsets != null) {
- return offsets;
- }
- if (!reserveMemory()) {
- return null;
- }
TsFileSequenceReader reader =
FileReaderManager.getInstance().get(filePath, true);
IDeviceID firstDevice = getSortedDevices().get(0);
- offsets =
- reader.getDeviceMetadataIndexNodeOffsets(
- firstDevice.isTableModel() ? firstDevice.getTableName() : "",
sortedDevices, null);
- deviceMetadataIndexNodeOffsetsCache.put(filePath, offsets);
- return offsets;
+ return deviceMetadataIndexNodeOffsetsCache.computeIfAbsent(
+ filePath,
+ k -> {
+ if (!reserveMemory()) {
+ return null;
+ }
+ try {
+ return reader.getDeviceMetadataIndexNodeOffsets(
+ firstDevice.isTableModel() ? firstDevice.getTableName() : null,
+ sortedDevices,
+ ioSizeRecorder);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
}
private synchronized List<IDeviceID> getSortedDevices() {
@@ -123,14 +137,24 @@ public class DeviceMetadataIndexEntryCache {
private boolean reserveMemory() {
long costOfOneFile =
RamUsageEstimator.sizeOfLongArray(sortedDevices.size());
- if (costOfOneFile * (deviceMetadataIndexNodeOffsetsCache.size() + 1) >
MAX_CACHED_SIZE) {
- return false;
- }
- try {
-
context.getMemoryReservationContext().reserveMemoryCumulatively(costOfOneFile);
- return true;
- } catch (Exception ignored) {
- return false;
+ int currentNum = cachedFileNum.get();
+ boolean memoryReserved = false;
+ while (true) {
+ if (costOfOneFile * (currentNum + 1) > MAX_CACHED_SIZE) {
+ return false;
+ }
+ try {
+ if (!memoryReserved) {
+
context.getMemoryReservationContext().reserveMemoryCumulatively(costOfOneFile);
+ memoryReserved = true;
+ }
+ } catch (Exception ignored) {
+ return false;
+ }
+ if (cachedFileNum.compareAndSet(currentNum, currentNum + 1)) {
+ return true;
+ }
+ currentNum = cachedFileNum.get();
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
index a0be5f0e1c5..e457cb64df6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
@@ -151,12 +151,9 @@ public class TimeSeriesMetadataCache {
pair =
((FragmentInstanceContext) queryContext)
.getMetadataIndexEntryCache()
- .getCachedDeviceMetadataIndexNodeOffset(deviceIndexInFI,
filePath);
+ .getCachedDeviceMetadataIndexNodeOffset(
+ key.device, deviceIndexInFI, filePath, ignoreNotExists);
if (!pair.right) {
- if (!ignoreNotExists) {
- throw new IOException(
- "Device {" + key.device + "} is not in tsFileMetaData of " +
filePath);
- }
return null;
}
}
@@ -220,12 +217,9 @@ public class TimeSeriesMetadataCache {
pair =
((FragmentInstanceContext) queryContext)
.getMetadataIndexEntryCache()
- .getCachedDeviceMetadataIndexNodeOffset(deviceIndexInFI,
filePath);
+ .getCachedDeviceMetadataIndexNodeOffset(
+ key.device, deviceIndexInFI, filePath,
ignoreNotExists);
if (!pair.right) {
- if (!ignoreNotExists) {
- throw new IOException(
- "Device {" + key.device + "} is not in tsFileMetaData of
" + filePath);
- }
return null;
}
}