This is an automated email from the ASF dual-hosted git repository.
shuwenwei pushed a commit to branch read_tsfile_table_function
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/read_tsfile_table_function by
this push:
new ec2f68ede93 refactor & add it
ec2f68ede93 is described below
commit ec2f68ede93e4489baf94f40d064f2c9322e412b
Author: shuwenwei <[email protected]>
AuthorDate: Mon Jun 15 15:30:33 2026 +0800
refactor & add it
---
.../recent/IoTDBReadTsFileTableFunctionIT.java | 101 ++++++++++++++++++-
.../fragment/FragmentInstanceContext.java | 2 +-
.../readTsFile/ExternalTsFileQueryDataSource.java | 2 +-
.../readTsFile/ExternalTsFileQueryResource.java | 109 ++++++++++++---------
.../node/ExternalTsFileAggregationScanNode.java | 3 +-
.../planner/node/ExternalTsFileScanNode.java | 3 +-
6 files changed, 164 insertions(+), 56 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBReadTsFileTableFunctionIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBReadTsFileTableFunctionIT.java
index ed236b901ac..c033491e95b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBReadTsFileTableFunctionIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBReadTsFileTableFunctionIT.java
@@ -209,6 +209,40 @@ public class IoTDBReadTsFileTableFunctionIT {
DATABASE_NAME);
}
+ @Test
+ public void testReadMultipleTsFilesWithTagSchemaMerge() throws Exception {
+ File tsFile1 = new File(tmpDir, "tag-schema-merge-1.tsfile");
+ try (TsFileWriter writer = new TsFileWriter(tsFile1)) {
+ generateTable(writer, "table1", Arrays.asList("tag1"),
Arrays.asList("s1"), 1, 2);
+ }
+ File tsFile2 = new File(tmpDir, "tag-schema-merge-2.tsfile");
+ try (TsFileWriter writer = new TsFileWriter(tsFile2)) {
+ generateTable(writer, "table1", Arrays.asList("tag1", "tag2"),
Arrays.asList("s1"), 3, 4);
+ }
+
+ String[] expectedHeader = new String[] {"time", "tag1", "tag2", "s1"};
+ String[] retArray =
+ new String[] {
+ "1970-01-01T00:00:00.001Z,tag1_1,null,1,",
+ "1970-01-01T00:00:00.002Z,tag1_1,null,2,",
+ "1970-01-01T00:00:00.003Z,tag1_1,tag2_1,3,",
+ "1970-01-01T00:00:00.004Z,tag1_1,tag2_1,4,",
+ "1970-01-01T00:00:00.001Z,tag1_2,null,1,",
+ "1970-01-01T00:00:00.002Z,tag1_2,null,2,",
+ "1970-01-01T00:00:00.003Z,tag1_2,tag2_2,3,",
+ "1970-01-01T00:00:00.004Z,tag1_2,tag2_2,4,",
+ };
+ tableResultSetEqualTest(
+ "SELECT time, tag1, tag2, s1 FROM read_tsfile(PATHS => '"
+ + toSqlPath(tsFile1)
+ + ","
+ + toSqlPath(tsFile2)
+ + "', TABLE_NAME => 'table1') ORDER BY tag1, time",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
@Test
public void testReadMultipleTsFilesWithConflictingFieldType() throws
Exception {
File tsFile1 = new File(tmpDir, "conflict-1.tsfile");
@@ -266,6 +300,53 @@ public class IoTDBReadTsFileTableFunctionIT {
DATABASE_NAME);
}
+ @Test
+ public void testReadSpecifiedInvalidFileFails() throws IOException {
+ File invalidFile = new File(tmpDir, "invalid-file.txt");
+ Files.write(invalidFile.toPath(), new byte[] {1, 2, 3});
+
+ tableAssertTestFail(
+ "SELECT * FROM read_tsfile(PATHS => '" + toSqlPath(invalidFile) + "')",
+ "not a valid TsFile",
+ DATABASE_NAME);
+ }
+
+ @Test
+ public void testReadDirectoryOnlyScansValidTsFileSuffixFiles() throws
Exception {
+ File scanDir = new File(tmpDir, "scan-dir");
+ File nestedDir = new File(scanDir, "nested");
+ Files.createDirectories(nestedDir.toPath());
+
+ File validTsFile = new File(nestedDir, "valid.tsfile");
+ try (TsFileWriter writer = new TsFileWriter(validTsFile)) {
+ generateTable(writer, "table1", Arrays.asList("tag1"),
Arrays.asList("s1"), 1, 2);
+ }
+
+ File invalidTsFile = new File(scanDir, "invalid.tsfile");
+ Files.write(invalidTsFile.toPath(), new byte[] {1, 2, 3});
+
+ File validFileWithoutTsFileSuffix = new File(scanDir, "valid.data");
+ try (TsFileWriter writer = new TsFileWriter(validFileWithoutTsFileSuffix))
{
+ generateTable(writer, "table1", Arrays.asList("tag1"),
Arrays.asList("s1"), 3, 4);
+ }
+
+ String[] expectedHeader = new String[] {"time", "tag1", "s1"};
+ String[] retArray =
+ new String[] {
+ "1970-01-01T00:00:00.001Z,tag1_1,1,",
+ "1970-01-01T00:00:00.002Z,tag1_1,2,",
+ "1970-01-01T00:00:00.001Z,tag1_2,1,",
+ "1970-01-01T00:00:00.002Z,tag1_2,2,",
+ };
+ tableResultSetEqualTest(
+ "SELECT time, tag1, s1 FROM read_tsfile(PATHS => '"
+ + toSqlPath(scanDir)
+ + "', TABLE_NAME => 'table1') ORDER BY tag1, time",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
@Test
public void testReadTsFileWithInvalidPaths() throws IOException {
File missingFile = new File(tmpDir, "missing.tsfile");
@@ -365,13 +446,25 @@ public class IoTDBReadTsFileTableFunctionIT {
File[] files = tmpDir.listFiles();
if (files != null) {
for (File file : files) {
- try {
- Files.delete(file.toPath());
- } catch (IOException ignored) {
- // ignore
+ deleteRecursively(file);
+ }
+ }
+ }
+
+ private static void deleteRecursively(File file) {
+ if (file.isDirectory()) {
+ File[] children = file.listFiles();
+ if (children != null) {
+ for (File child : children) {
+ deleteRecursively(child);
}
}
}
+ try {
+ Files.delete(file.toPath());
+ } catch (IOException ignored) {
+ // ignore
+ }
}
private static void deleteTmpDir() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
index ac6969d4553..c5df6082ab5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java
@@ -656,7 +656,7 @@ public class FragmentInstanceContext extends QueryContext {
}
this.sharedQueryDataSource = new
ExternalTsFileQueryDataSource(externalTsFileQueryResource);
- closedUnseqFileNum =
externalTsFileQueryResource.getTsFileResources().size();
+ closedUnseqFileNum =
externalTsFileQueryResource.getSharedTsFileResources().size();
return true;
} finally {
addInitQueryDataSourceCost(System.nanoTime() - startTime);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryDataSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryDataSource.java
index fd18da081c7..c9b8a08ea0f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryDataSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryDataSource.java
@@ -44,6 +44,6 @@ public class ExternalTsFileQueryDataSource extends
QueryDataSource {
@Override
public boolean isEmpty() {
- return externalTsFileQueryResource.getTsFileResources().isEmpty();
+ return externalTsFileQueryResource.getSharedTsFileResources().isEmpty();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
index 49de26e2dff..4a7ce9f89b3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/function/tvf/readTsFile/ExternalTsFileQueryResource.java
@@ -51,6 +51,7 @@ import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
@@ -59,6 +60,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
+import java.util.Queue;
import java.util.function.LongConsumer;
import static java.util.Objects.requireNonNull;
@@ -74,14 +76,16 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
public static final String EXTERNAL_TSFILE_TMP_DIR = "external-tsfile";
+ private static final long TSFILE_READER_MEMORY_RESERVE_SIZE_IN_BYTES = 4L *
1024;
+
private final QueryId queryId;
private final MPPQueryContext queryContext;
private final Path queryTempRoot;
private final String tableName;
private final List<String> tsFilePaths;
- private final List<TsFileResource> tsFileResources;
+ private final List<TsFileResource> sharedTsFileResources;
private final LongConsumer ioSizeRecorder;
- private final List<DeviceEntry> deviceEntries = new ArrayList<>();
+ private final List<DeviceEntry> sharedDeviceEntries = new ArrayList<>();
private final List<DeviceTaskPartition> deviceTaskPartitions = new
ArrayList<>();
private Comparator<DeviceEntry> deviceEntryComparator;
@@ -103,7 +107,7 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
this.tableName = tableName;
this.tsFilePaths =
Collections.unmodifiableList(new
ArrayList<>(requireNonNull(tsFilePaths, "tsFilePaths")));
- this.tsFileResources = createTsFileResources(this.tsFilePaths);
+ this.sharedTsFileResources = createTsFileResources(this.tsFilePaths);
this.ioSizeRecorder = requireNonNull(ioSizeRecorder, "ioSizeRecorder is
null");
for (String tsFilePath : tsFilePaths) {
FileReaderManager.getInstance().increaseExternalFileReaderReference(tsFilePath);
@@ -113,6 +117,7 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
public void collectDeviceEntries(
SchemaFilter schemaFilter, Comparator<DeviceEntry> comparator, int
partitionCount) {
checkNotClosed();
+ deviceEntryComparator = comparator;
acquireMemoryForTsFileReaders();
ExternalTsFileDeviceFilterVisitor deviceFilterVisitor = new
ExternalTsFileDeviceFilterVisitor();
try (DeviceCollector deviceCollector = new DeviceCollector()) {
@@ -124,8 +129,8 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
continue;
}
DeviceEntry deviceEntry = new AlignedDeviceEntry(deviceID, new
Binary[0]);
- int deviceEntryIndex = deviceEntries.size();
- deviceEntries.add(deviceEntry);
+ int deviceEntryIndex = sharedDeviceEntries.size();
+ sharedDeviceEntries.add(deviceEntry);
DeviceTask deviceTask =
new DeviceTask(deviceEntryIndex,
deviceCollector.getCurrentDeviceOffsets());
DeviceTaskPartition partition =
@@ -133,16 +138,16 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
Math.floorMod(deviceID.hashCode(),
deviceTaskPartitions.size()));
partition.add(deviceTask);
if (partition.shouldFlush()) {
- partition.flush(comparator);
+ partition.flush();
}
}
- deviceEntryComparator = comparator;
- collectDeviceTaskPartitions(comparator);
+ collectDeviceTaskPartitions();
}
}
private void acquireMemoryForTsFileReaders() {
- queryContext.reserveMemoryForFrontEndImmediately((long) tsFilePaths.size()
* 4 * 1024);
+ queryContext.reserveMemoryForFrontEndImmediately(
+ tsFilePaths.size() * TSFILE_READER_MEMORY_RESERVE_SIZE_IN_BYTES);
}
public DeviceTaskRunReader getDeviceTaskRunReader(int partitionIndex) {
@@ -159,12 +164,12 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
return tsFilePaths;
}
- public List<TsFileResource> getTsFileResources() {
- return tsFileResources;
+ public List<TsFileResource> getSharedTsFileResources() {
+ return sharedTsFileResources;
}
- public List<DeviceEntry> getDeviceEntries() {
- return deviceEntries;
+ public List<DeviceEntry> getSharedDeviceEntries() {
+ return sharedDeviceEntries;
}
public List<DeviceTaskPartition> getDeviceTaskPartitions() {
@@ -260,11 +265,11 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
unreservedBytes += deviceTask.ramBytesUsed();
}
- private void flush(Comparator<DeviceEntry> comparator) {
+ private void flush() {
if (pendingDeviceTasks.isEmpty()) {
return;
}
- sortPendingDeviceTasks(comparator);
+ sortPendingDeviceTasks();
try {
runFiles.add(
writeDeviceTaskRun(
@@ -281,20 +286,20 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
releaseDeviceTaskMemory();
}
- private void sortPendingDeviceTasks(Comparator<DeviceEntry> comparator) {
- if (comparator != null) {
+ private void sortPendingDeviceTasks() {
+ if (deviceEntryComparator != null) {
pendingDeviceTasks.sort(
(left, right) ->
- comparator.compare(
- deviceEntries.get(left.deviceEntryIndex),
- deviceEntries.get(right.deviceEntryIndex)));
+ deviceEntryComparator.compare(
+ sharedDeviceEntries.get(left.deviceEntryIndex),
+ sharedDeviceEntries.get(right.deviceEntryIndex)));
} else {
pendingDeviceTasks.sort(
(left, right) ->
- deviceEntries
+ sharedDeviceEntries
.get(left.deviceEntryIndex)
.getDeviceID()
-
.compareTo(deviceEntries.get(right.deviceEntryIndex).getDeviceID()));
+
.compareTo(sharedDeviceEntries.get(right.deviceEntryIndex).getDeviceID()));
}
}
@@ -335,27 +340,29 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
return !deviceEntryIndexes.isEmpty();
}
- private void finish(Comparator<DeviceEntry> comparator) {
+ private void finish() {
if (pendingDeviceTasks.isEmpty()) {
return;
}
- sortPendingDeviceTasks(comparator);
+ sortPendingDeviceTasks();
for (DeviceTask deviceTask : pendingDeviceTasks) {
deviceEntryIndexes.add(deviceTask.deviceEntryIndex);
}
}
- private void sortDeviceEntries(Comparator<DeviceEntry> comparator) {
- if (comparator != null) {
+ private void sortDeviceEntries() {
+ if (deviceEntryComparator != null) {
deviceEntryIndexes.sort(
- (left, right) -> comparator.compare(deviceEntries.get(left),
deviceEntries.get(right)));
+ (left, right) ->
+ deviceEntryComparator.compare(
+ sharedDeviceEntries.get(left),
sharedDeviceEntries.get(right)));
} else {
deviceEntryIndexes.sort(
(left, right) ->
- deviceEntries
+ sharedDeviceEntries
.get(left)
.getDeviceID()
- .compareTo(deviceEntries.get(right).getDeviceID()));
+ .compareTo(sharedDeviceEntries.get(right).getDeviceID()));
}
}
@@ -378,16 +385,16 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
}
}
- private void collectDeviceTaskPartitions(Comparator<DeviceEntry> comparator)
{
+ private void collectDeviceTaskPartitions() {
Iterator<DeviceTaskPartition> iterator = deviceTaskPartitions.iterator();
while (iterator.hasNext()) {
DeviceTaskPartition partition = iterator.next();
- partition.finish(comparator);
+ partition.finish();
if (!partition.hasDeviceTasks()) {
iterator.remove();
continue;
}
- partition.sortDeviceEntries(comparator);
+ partition.sortDeviceEntries();
}
}
@@ -407,23 +414,24 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
public class DeviceTaskRunReader implements AutoCloseable {
- private final PriorityQueue<DeviceTaskRunCursor> runCursors;
+ private final Queue<DeviceTaskRunCursor> runCursors;
+ private final boolean usePriorityQueue;
private DeviceEntry currentDevice;
private QueryDataSource currentDeviceQueryDataSource;
private Map<TsFileResource, DeviceOffset> currentDeviceOffsetMap;
private DeviceTaskRunReader(DeviceTaskPartition partition) throws
IOException {
- Comparator<DeviceTaskRunCursor> cursorComparator =
- (left, right) ->
- deviceEntryComparator == null
- ? left.getCurrentDeviceEntry()
- .getDeviceID()
- .compareTo(right.getCurrentDeviceEntry().getDeviceID())
- : deviceEntryComparator.compare(
- left.getCurrentDeviceEntry(),
right.getCurrentDeviceEntry());
- this.runCursors = new PriorityQueue<>(cursorComparator);
+ Comparator<DeviceEntry> comparator = deviceEntryComparator;
+ this.usePriorityQueue = comparator != null;
+ this.runCursors =
+ usePriorityQueue
+ ? new PriorityQueue<>(
+ (left, right) ->
+ comparator.compare(
+ left.getCurrentDeviceEntry(),
right.getCurrentDeviceEntry()))
+ : new ArrayDeque<>();
for (Path runFile : partition.getRunFiles()) {
- DeviceTaskRunCursor cursor = new DiskDeviceTaskRunCursor(runFile,
deviceEntries);
+ DeviceTaskRunCursor cursor = new DiskDeviceTaskRunCursor(runFile,
sharedDeviceEntries);
if (cursor.hasCurrentDeviceTask()) {
runCursors.add(cursor);
} else {
@@ -431,7 +439,7 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
}
}
DeviceTaskRunCursor memoryCursor =
- new MemoryDeviceTaskRunCursor(partition.getPendingDeviceTasks(),
deviceEntries);
+ new MemoryDeviceTaskRunCursor(partition.getPendingDeviceTasks(),
sharedDeviceEntries);
if (memoryCursor.hasCurrentDeviceTask()) {
runCursors.add(memoryCursor);
}
@@ -441,20 +449,25 @@ public class ExternalTsFileQueryResource implements
AutoCloseable {
if (runCursors.isEmpty()) {
return false;
}
- DeviceTaskRunCursor cursor = runCursors.poll();
+ DeviceTaskRunCursor cursor = usePriorityQueue ? runCursors.poll() :
runCursors.peek();
DeviceTask result = cursor.getCurrentDeviceTask();
cursor.advance();
if (cursor.hasCurrentDeviceTask()) {
- runCursors.add(cursor);
+ if (usePriorityQueue) {
+ runCursors.add(cursor);
+ }
} else {
+ if (!usePriorityQueue) {
+ runCursors.poll();
+ }
cursor.close();
}
- currentDevice = deviceEntries.get(result.deviceEntryIndex);
+ currentDevice = sharedDeviceEntries.get(result.deviceEntryIndex);
List<TsFileResource> unseqResources = new
ArrayList<>(result.deviceOffsets.size());
currentDeviceOffsetMap = new HashMap<>(result.deviceOffsets.size());
for (DeviceOffset deviceOffset : result.deviceOffsets) {
- TsFileResource tsFileResource =
tsFileResources.get(deviceOffset.getFileIndex());
+ TsFileResource tsFileResource =
sharedTsFileResources.get(deviceOffset.getFileIndex());
unseqResources.add(tsFileResource);
currentDeviceOffsetMap.put(tsFileResource, deviceOffset);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileAggregationScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileAggregationScanNode.java
index dd338dc2d90..d3200ff987c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileAggregationScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileAggregationScanNode.java
@@ -76,7 +76,8 @@ public class ExternalTsFileAggregationScanNode extends
AggregationTableScanNode
qualifiedObjectName,
outputSymbols,
assignments,
- Lists.transform(deviceEntryIndexes,
externalTsFileQueryResource.getDeviceEntries()::get),
+ Lists.transform(
+ deviceEntryIndexes,
externalTsFileQueryResource.getSharedDeviceEntries()::get),
tagAndAttributeIndexMap,
scanOrder,
timePredicate,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java
index be83256c404..d9f66d4aa93 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/ExternalTsFileScanNode.java
@@ -80,7 +80,8 @@ public class ExternalTsFileScanNode extends
DeviceTableScanNode {
qualifiedObjectName,
outputSymbols,
assignments,
- Lists.transform(deviceEntryIndexes,
externalTsFileQueryResource.getDeviceEntries()::get),
+ Lists.transform(
+ deviceEntryIndexes,
externalTsFileQueryResource.getSharedDeviceEntries()::get),
tagAndAttributeIndexMap,
scanOrder,
timePredicate,