This is an automated email from the ASF dual-hosted git repository.
shuwenwei pushed a commit to branch read_tsfile_table_function
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/read_tsfile_table_function by
this push:
new 8587cbd3366 add tests
8587cbd3366 is described below
commit 8587cbd3366987271a61d5ff240b28b3fdc4fe8b
Author: shuwenwei <[email protected]>
AuthorDate: Tue Jun 16 12:04:34 2026 +0800
add tests
---
.../recent/IoTDBReadTsFileTableFunctionIT.java | 42 +++++
.../operator/source/AlignedSeriesScanUtil.java | 3 +-
.../execution/operator/source/FileLoaderUtils.java | 5 +-
.../relational/ExternalTsFileSeriesScanUtil.java | 4 +-
.../readTsFile/ExternalTsFileQueryResource.java | 53 ++++--
.../tvf/readTsFile/TsFileSchemaCollector.java | 24 ++-
.../buffer/TimeSeriesMetadataCache.java | 9 +-
.../ExternalTsFileQueryResourceTest.java | 193 +++++++++++++++++++++
8 files changed, 298 insertions(+), 35 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBReadTsFileTableFunctionIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBReadTsFileTableFunctionIT.java
index c033491e95b..870a37aedaf 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBReadTsFileTableFunctionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBReadTsFileTableFunctionIT.java
@@ -286,6 +286,48 @@ public class IoTDBReadTsFileTableFunctionIT {
DATABASE_NAME);
}
+ @Test
+ public void testReadMultipleTsFilesWithConflictingTagAndFieldColumns()
throws Exception {
+ File tsFile1 = new File(tmpDir, "tag-field-conflict-1.tsfile");
+ try (TsFileWriter writer = new TsFileWriter(tsFile1)) {
+ generateTable(writer, "table1", Arrays.asList("shared"),
Arrays.asList("s1"), 1, 2);
+ }
+ File tsFile2 = new File(tmpDir, "tag-field-conflict-2.tsfile");
+ try (TsFileWriter writer = new TsFileWriter(tsFile2)) {
+ generateTable(writer, "table1", new ArrayList<>(),
Arrays.asList("shared"), 3, 4);
+ }
+
+ tableAssertTestFail(
+ "SELECT * FROM read_tsfile(PATHS => '"
+ + toSqlPath(tsFile1)
+ + ","
+ + toSqlPath(tsFile2)
+ + "', TABLE_NAME => 'table1')",
+ "conflicting categories when merging table schema",
+ DATABASE_NAME);
+ }
+
+ @Test
+ public void testReadMultipleTsFilesWithConflictingFieldAndTagColumns()
throws Exception {
+ File tsFile1 = new File(tmpDir, "field-tag-conflict-1.tsfile");
+ try (TsFileWriter writer = new TsFileWriter(tsFile1)) {
+ generateTable(writer, "table1", new ArrayList<>(),
Arrays.asList("shared"), 1, 2);
+ }
+ File tsFile2 = new File(tmpDir, "field-tag-conflict-2.tsfile");
+ try (TsFileWriter writer = new TsFileWriter(tsFile2)) {
+ generateTable(writer, "table1", Arrays.asList("shared"),
Arrays.asList("s1"), 3, 4);
+ }
+
+ tableAssertTestFail(
+ "SELECT * FROM read_tsfile(PATHS => '"
+ + toSqlPath(tsFile1)
+ + ","
+ + toSqlPath(tsFile2)
+ + "', TABLE_NAME => 'table1')",
+ "conflicting categories when merging table schema",
+ DATABASE_NAME);
+ }
+
@Test
public void testReadTsFileWithoutTableNameWhenMultipleTablesExist() throws
Exception {
File tsFile = new File(tmpDir, "multiple-tables.tsfile");
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java
index e80f305e73b..331a5a5b267 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/AlignedSeriesScanUtil.java
@@ -38,7 +38,6 @@ import org.apache.tsfile.write.schema.IMeasurementSchema;
import java.io.IOException;
import java.util.List;
-import java.util.Optional;
import java.util.stream.Collectors;
public class AlignedSeriesScanUtil extends SeriesScanUtil {
@@ -102,7 +101,7 @@ public class AlignedSeriesScanUtil extends SeriesScanUtil {
scanOptions.getGlobalTimeFilter(),
isSeq,
ignoreAllNullRows,
- Optional.empty());
+ null);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java
index 79235360cdf..e5761050509 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/FileLoaderUtils.java
@@ -57,7 +57,6 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -200,7 +199,7 @@ public class FileLoaderUtils {
Filter globalTimeFilter,
boolean isSeq,
boolean ignoreAllNullRows,
- Optional<long[]> rootMeasurementMetadataIndexNodeOffset)
+ long[] rootMeasurementMetadataIndexNodeOffset)
throws IOException {
final long t1 = System.nanoTime();
boolean loadFromMem = false;
@@ -293,7 +292,7 @@ public class FileLoaderUtils {
FragmentInstanceContext context,
Filter globalTimeFilter,
boolean ignoreAllNullRows,
- Optional<long[]> rootMeasurementMetadataIndexNodeOffset)
+ long[] rootMeasurementMetadataIndexNodeOffset)
throws IOException {
AbstractAlignedTimeSeriesMetadata alignedTimeSeriesMetadata = null;
// load all the TimeseriesMetadata of vector, the first one is for time
column and the
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java
index c22384160ae..7726a510a14 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/ExternalTsFileSeriesScanUtil.java
@@ -36,7 +36,6 @@ import org.apache.tsfile.read.filter.basic.Filter;
import java.io.IOException;
import java.util.List;
-import java.util.Optional;
public class ExternalTsFileSeriesScanUtil extends AlignedSeriesScanUtil {
@@ -84,8 +83,7 @@ public class ExternalTsFileSeriesScanUtil extends
AlignedSeriesScanUtil {
globalTimeFilter,
resource.isSeq(),
context.isIgnoreAllNullRows(),
- Optional.of(
- new long[] {currentDeviceOffset.getStartOffset(),
currentDeviceOffset.getEndOffset()}));
+ new long[] {currentDeviceOffset.getStartOffset(),
currentDeviceOffset.getEndOffset()});
}
@FunctionalInterface
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
index 4a7ce9f89b3..dfb1fd8b625 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
@@ -20,12 +20,14 @@
package
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile;
import org.apache.iotdb.calc.exception.MemoryNotEnoughException;
-import org.apache.iotdb.commons.queryengine.execution.MemoryEstimationHelper;
+import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
import org.apache.iotdb.commons.schema.filter.SchemaFilter;
import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.QueryId;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.ExternalTsFileDeviceFilterVisitor;
+import
org.apache.iotdb.db.queryengine.plan.planner.memory.ThreadSafeMemoryReservationManager;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource;
@@ -79,7 +81,9 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
private static final long TSFILE_READER_MEMORY_RESERVE_SIZE_IN_BYTES = 4L *
1024;
private final QueryId queryId;
- private final MPPQueryContext queryContext;
+ // This resource outlives the frontend planning phase, whose MPPQueryContext
memory manager is
+ // released after dispatch. Keep a dedicated manager and release it when
this resource closes.
+ private final MemoryReservationManager
externalTsFileResourceMemoryReservationManager;
private final Path queryTempRoot;
private final String tableName;
private final List<String> tsFilePaths;
@@ -98,8 +102,10 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
List<String> tsFilePaths,
LongConsumer ioSizeRecorder,
boolean useExactTempRoot) {
- this.queryContext = requireNonNull(queryContext, "queryContext is null");
- this.queryId = queryContext.getQueryId();
+ this.queryId = requireNonNull(queryContext, "queryContext is
null").getQueryId();
+ this.externalTsFileResourceMemoryReservationManager =
+ new ThreadSafeMemoryReservationManager(
+ queryId, ExternalTsFileQueryResource.class.getName());
this.queryTempRoot =
useExactTempRoot
? requireNonNull(tempRoot, "tempRoot is null")
@@ -146,7 +152,7 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
}
private void acquireMemoryForTsFileReaders() {
- queryContext.reserveMemoryForFrontEndImmediately(
+ externalTsFileResourceMemoryReservationManager.reserveMemoryImmediately(
tsFilePaths.size() * TSFILE_READER_MEMORY_RESERVE_SIZE_IN_BYTES);
}
@@ -160,6 +166,11 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
}
}
+ @TestOnly
+ public void setDeviceEntryComparator(Comparator<DeviceEntry>
deviceEntryComparator) {
+ this.deviceEntryComparator = deviceEntryComparator;
+ }
+
public List<String> getTsFilePaths() {
return tsFilePaths;
}
@@ -193,10 +204,14 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
}
closed = true;
- releaseFileReaderReferences();
+ try {
+ releaseFileReaderReferences();
- if (Files.exists(queryTempRoot)) {
- FileUtils.deleteFileOrDirectory(queryTempRoot.toFile(), true);
+ if (Files.exists(queryTempRoot)) {
+ FileUtils.deleteFileOrDirectory(queryTempRoot.toFile(), true);
+ }
+ } finally {
+
externalTsFileResourceMemoryReservationManager.releaseAllReservedMemory();
}
}
@@ -248,7 +263,7 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
private long reservedBytes;
private long unreservedBytes;
- private DeviceTaskPartition(int partitionIndex) {
+ DeviceTaskPartition(int partitionIndex) {
this.partitionIndex = partitionIndex;
}
@@ -260,12 +275,12 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
return deviceEntryIndexes;
}
- private void add(DeviceTask deviceTask) {
+ void add(DeviceTask deviceTask) {
pendingDeviceTasks.add(deviceTask);
unreservedBytes += deviceTask.ramBytesUsed();
}
- private void flush() {
+ void flush() {
if (pendingDeviceTasks.isEmpty()) {
return;
}
@@ -315,7 +330,7 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
private boolean reserveUnreservedMemory() {
try {
- queryContext.reserveMemoryForFrontEndImmediately(unreservedBytes);
+
externalTsFileResourceMemoryReservationManager.reserveMemoryImmediately(unreservedBytes);
} catch (MemoryNotEnoughException e) {
return false;
}
@@ -330,7 +345,7 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
private void releaseDeviceTaskMemory() {
if (reservedBytes != 0) {
- queryContext.releaseMemoryReservedForFrontEnd(reservedBytes);
+
externalTsFileResourceMemoryReservationManager.releaseMemoryCumulatively(reservedBytes);
reservedBytes = 0;
}
unreservedBytes = 0;
@@ -340,7 +355,7 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
return !deviceEntryIndexes.isEmpty();
}
- private void finish() {
+ void finish() {
if (pendingDeviceTasks.isEmpty()) {
return;
}
@@ -420,7 +435,7 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
private QueryDataSource currentDeviceQueryDataSource;
private Map<TsFileResource, DeviceOffset> currentDeviceOffsetMap;
- private DeviceTaskRunReader(DeviceTaskPartition partition) throws
IOException {
+ DeviceTaskRunReader(DeviceTaskPartition partition) throws IOException {
Comparator<DeviceEntry> comparator = deviceEntryComparator;
this.usePriorityQueue = comparator != null;
this.runCursors =
@@ -699,7 +714,7 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
}
}
- private static class DeviceTask implements Accountable {
+ static class DeviceTask implements Accountable {
private static final long INSTANCE_SIZE =
RamUsageEstimator.shallowSizeOfInstance(DeviceTask.class);
@@ -707,7 +722,7 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
private final int deviceEntryIndex;
private final List<DeviceOffset> deviceOffsets;
- private DeviceTask(int deviceEntryIndex, List<DeviceOffset> deviceOffsets)
{
+ DeviceTask(int deviceEntryIndex, List<DeviceOffset> deviceOffsets) {
this.deviceEntryIndex = deviceEntryIndex;
this.deviceOffsets = deviceOffsets;
}
@@ -738,7 +753,7 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
@Override
public long ramBytesUsed() {
return INSTANCE_SIZE
- + MemoryEstimationHelper.ARRAY_LIST_INSTANCE_SIZE
+ + RamUsageEstimator.shallowSizeOfInstance(ArrayList.class)
+ RamUsageEstimator.NUM_BYTES_ARRAY_HEADER
+ (long) RamUsageEstimator.NUM_BYTES_OBJECT_REF *
deviceOffsets.size()
+ deviceOffsets.size() * DeviceOffset.INSTANCE_SIZE;
@@ -754,7 +769,7 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
private final long startOffset;
private final long endOffset;
- private DeviceOffset(int fileIndex, long startOffset, long endOffset) {
+ DeviceOffset(int fileIndex, long startOffset, long endOffset) {
this.fileIndex = fileIndex;
this.startOffset = startOffset;
this.endOffset = endOffset;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/TsFileSchemaCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/TsFileSchemaCollector.java
index aa2ccdeab8b..395b6c9d6ca 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/TsFileSchemaCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/TsFileSchemaCollector.java
@@ -188,6 +188,7 @@ final class TsFileSchemaCollector {
private IMeasurementSchema timeColumnSchema;
private final List<IMeasurementSchema> tagColumnSchemas = new
ArrayList<>();
private final Map<String, IMeasurementSchema> fieldColumnSchemaMap = new
LinkedHashMap<>();
+ private final Map<String, ColumnCategory> columnCategoryMap = new
LinkedHashMap<>();
private MergedTableSchemaBuilder(String tableName, TableSchema
tableSchema) {
this.tableName = tableName.toLowerCase(Locale.ENGLISH);
@@ -202,15 +203,18 @@ final class TsFileSchemaCollector {
List<ColumnCategory> columnCategories = tableSchema.getColumnTypes();
for (int i = 0; i < columnCategories.size(); i++) {
- if (columnCategories.get(i) == ColumnCategory.TIME) {
+ ColumnCategory currentCategory = columnCategories.get(i);
+ if (currentCategory == ColumnCategory.TIME) {
if (currentTimeColumn != null) {
throw new UDFArgumentNotValidException(
"Multiple time columns found when merging table schema for
table " + tableName);
}
currentTimeColumn = columnSchemas.get(i);
- } else if (columnCategories.get(i) == ColumnCategory.TAG) {
+ } else if (currentCategory == ColumnCategory.TAG) {
+ checkAndRecordColumnCategory(columnSchemas.get(i), currentCategory);
currentTagColumns.add(columnSchemas.get(i));
- } else if (columnCategories.get(i) == ColumnCategory.FIELD) {
+ } else if (currentCategory == ColumnCategory.FIELD) {
+ checkAndRecordColumnCategory(columnSchemas.get(i), currentCategory);
currentFieldColumns.add(columnSchemas.get(i));
}
}
@@ -265,6 +269,20 @@ final class TsFileSchemaCollector {
}
}
+ private void checkAndRecordColumnCategory(
+ IMeasurementSchema columnSchema, ColumnCategory currentCategory) {
+ String columnName =
columnSchema.getMeasurementName().toLowerCase(Locale.ENGLISH);
+ ColumnCategory existingCategory = columnCategoryMap.get(columnName);
+ if (existingCategory != null && existingCategory != currentCategory) {
+ throw new UDFArgumentNotValidException(
+ "Column "
+ + columnSchema.getMeasurementName()
+ + " has conflicting categories when merging table schema for
table "
+ + tableName);
+ }
+ columnCategoryMap.putIfAbsent(columnName, currentCategory);
+ }
+
private TableSchema build() {
List<IMeasurementSchema> columnSchemas = new ArrayList<>();
List<ColumnCategory> columnCategories = new ArrayList<>();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
index 1e1df3e3c57..255cce7c738 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/buffer/TimeSeriesMetadataCache.java
@@ -53,7 +53,6 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.atomic.AtomicLong;
@@ -146,7 +145,7 @@ public class TimeSeriesMetadataCache {
boolean debug,
QueryContext queryContext)
throws IOException {
- return get(filePath, key, allSensors, ignoreNotExists, debug,
queryContext, Optional.empty());
+ return get(filePath, key, allSensors, ignoreNotExists, debug,
queryContext, null);
}
@SuppressWarnings({"squid:S1860", "squid:S6541", "squid:S3776"}) // Suppress
synchronize warning
@@ -157,7 +156,7 @@ public class TimeSeriesMetadataCache {
boolean ignoreNotExists,
boolean debug,
QueryContext queryContext,
- Optional<long[]> deviceMetadataIndexNodeOffset)
+ long[] deviceMetadataIndexNodeOffset)
throws IOException {
long startTime = System.nanoTime();
long loadBloomFilterTime = 0;
@@ -176,7 +175,7 @@ public class TimeSeriesMetadataCache {
TsFileSequenceReader reader =
FileReaderManager.getInstance()
.get(filePath, key.tsFileID, true, bloomFilterIoSizeRecorder,
externalTsFile);
- if (!deviceMetadataIndexNodeOffset.isPresent()) {
+ if (deviceMetadataIndexNodeOffset == null) {
BloomFilter bloomFilter =
reader.readBloomFilter(bloomFilterIoSizeRecorder);
queryContext.getQueryStatistics().getLoadBloomFilterFromDiskCount().incrementAndGet();
if (bloomFilter != null
@@ -218,7 +217,7 @@ public class TimeSeriesMetadataCache {
if (timeseriesMetadata == null) {
cacheHit = false;
- if (!deviceMetadataIndexNodeOffset.isPresent()) {
+ if (deviceMetadataIndexNodeOffset == null) {
long loadBloomFilterStartTime = System.nanoTime();
// bloom filter part
BloomFilter bloomFilter =
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResourceTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResourceTest.java
new file mode 100644
index 00000000000..c1db965364d
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResourceTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile;
+
+import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceOffset;
+import
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceTask;
+import
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceTaskPartition;
+import
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.readTsFile.ExternalTsFileQueryResource.DeviceTaskRunReader;
+import
org.apache.iotdb.db.queryengine.plan.relational.metadata.AlignedDeviceEntry;
+import org.apache.iotdb.db.queryengine.plan.relational.metadata.DeviceEntry;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.utils.Binary;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ExternalTsFileQueryResourceTest {
+
+ @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ private ExternalTsFileQueryResource resource;
+
+ @After
+ public void tearDown() {
+ if (resource != null) {
+ resource.close();
+ }
+ }
+
+ @Test
+ public void testDeviceTaskRunReaderMergesMultipleRunsWithComparator() throws
Exception {
+ resource = newResource("merge_comparator", Arrays.asList("file-0.tsfile",
"file-1.tsfile"));
+ addDevices("d1", "d2", "d3", "d4", "d5");
+
resource.setDeviceEntryComparator(Comparator.comparing(DeviceEntry::getDeviceID));
+ DeviceTaskPartition partition = resource.new DeviceTaskPartition(0);
+
+ partition.add(task(2, offset(0, 30, 39)));
+ partition.add(task(4, offset(1, 50, 59)));
+ partition.flush();
+ partition.add(task(0, offset(0, 10, 19)));
+ partition.add(task(3, offset(1, 40, 49)));
+ partition.flush();
+ partition.add(task(1, offset(0, 20, 29)));
+ partition.flush();
+
+ try (DeviceTaskRunReader reader = resource.new
DeviceTaskRunReader(partition)) {
+ assertDeviceOrder(reader, "d1", "d2", "d3", "d4", "d5");
+ }
+ }
+
+ @Test
+ public void testDeviceTaskRunReaderReadsRunsInFifoOrderWithoutComparator()
throws Exception {
+ resource = newResource("fifo", Collections.singletonList("file-0.tsfile"));
+ addDevices("d1", "d2", "d3", "d4");
+ DeviceTaskPartition partition = resource.new DeviceTaskPartition(0);
+
+ partition.add(task(2, offset(0, 30, 39)));
+ partition.flush();
+ partition.add(task(0, offset(0, 10, 19)));
+ partition.add(task(1, offset(0, 20, 29)));
+ partition.flush();
+ partition.add(task(3, offset(0, 40, 49)));
+ partition.flush();
+
+ try (DeviceTaskRunReader reader = resource.new
DeviceTaskRunReader(partition)) {
+ assertDeviceOrder(reader, "d3", "d1", "d2", "d4");
+ }
+ }
+
+ @Test
+ public void
testDeviceTaskRunReaderReadsDiskRunBeforePendingMemoryTasksWithoutComparator()
+ throws Exception {
+ resource = newResource("disk_memory",
Collections.singletonList("file-0.tsfile"));
+ addDevices("d1", "d2", "d3");
+ DeviceTaskPartition partition = resource.new DeviceTaskPartition(0);
+
+ partition.add(task(1, offset(0, 20, 29)));
+ partition.flush();
+ partition.add(task(0, offset(0, 10, 19)));
+ partition.add(task(2, offset(0, 30, 39)));
+ partition.finish();
+
+ try (DeviceTaskRunReader reader = resource.new
DeviceTaskRunReader(partition)) {
+ assertDeviceOrder(reader, "d2", "d1", "d3");
+ }
+ }
+
+ @Test
+ public void testDeviceTaskRunReaderUsesSharedTsFileResourceAsOffsetMapKey()
throws Exception {
+ resource = newResource("offset_map", Arrays.asList("file-0.tsfile",
"file-1.tsfile"));
+ addDevices("d1");
+ DeviceTaskPartition partition = resource.new DeviceTaskPartition(0);
+ partition.add(task(0, offset(0, 11, 22), offset(1, 33, 44)));
+ partition.finish();
+
+ try (DeviceTaskRunReader reader = resource.new
DeviceTaskRunReader(partition)) {
+ assertTrue(reader.nextDevice());
+ Map<TsFileResource, DeviceOffset> offsetMap =
reader.getCurrentDeviceOffsetMap();
+ List<TsFileResource> sharedResources =
resource.getSharedTsFileResources();
+
+ assertEquals(2, offsetMap.size());
+ assertTrue(offsetMap.containsKey(sharedResources.get(0)));
+ assertTrue(offsetMap.containsKey(sharedResources.get(1)));
+ assertOffset(offsetMap.get(sharedResources.get(0)), 0, 11, 22);
+ assertOffset(offsetMap.get(sharedResources.get(1)), 1, 33, 44);
+ assertEquals(sharedResources,
reader.getCurrentDeviceQueryDataSource().getUnseqResources());
+ assertFalse(reader.nextDevice());
+ }
+ }
+
+ private ExternalTsFileQueryResource newResource(String queryId, List<String>
fileNames)
+ throws Exception {
+ File root = temporaryFolder.newFolder(queryId);
+ List<String> tsFilePaths = new ArrayList<>(fileNames.size());
+ for (String fileName : fileNames) {
+ tsFilePaths.add(new File(root, fileName).getAbsolutePath());
+ }
+ MPPQueryContext queryContext = new MPPQueryContext(new QueryId(queryId));
+ return new ExternalTsFileQueryResource(
+ queryContext, root.toPath().resolve("tmp"), "table1", tsFilePaths,
ignored -> {}, true);
+ }
+
+ private void addDevices(String... deviceNames) {
+ for (String deviceName : deviceNames) {
+ resource
+ .getSharedDeviceEntries()
+ .add(
+ new AlignedDeviceEntry(
+ IDeviceID.Factory.DEFAULT_FACTORY.create(deviceName), new
Binary[0]));
+ }
+ }
+
+ private DeviceTask task(int deviceEntryIndex, DeviceOffset... offsets) {
+ return new DeviceTask(deviceEntryIndex, Arrays.asList(offsets));
+ }
+
+ private DeviceOffset offset(int fileIndex, long startOffset, long endOffset)
{
+ return new DeviceOffset(fileIndex, startOffset, endOffset);
+ }
+
+ private void assertDeviceOrder(DeviceTaskRunReader reader, String...
expectedDeviceNames)
+ throws Exception {
+ for (String expectedDeviceName : expectedDeviceNames) {
+ assertTrue(reader.nextDevice());
+ assertEquals(expectedDeviceName,
reader.getCurrentDevice().getDeviceID().toString());
+ }
+ assertFalse(reader.nextDevice());
+ }
+
+ private void assertOffset(
+ DeviceOffset offset,
+ int expectedFileIndex,
+ long expectedStartOffset,
+ long expectedEndOffset) {
+ assertEquals(expectedFileIndex, offset.getFileIndex());
+ assertEquals(expectedStartOffset, offset.getStartOffset());
+ assertEquals(expectedEndOffset, offset.getEndOffset());
+ }
+}