This is an automated email from the ASF dual-hosted git repository. chenyz pushed a commit to branch builtin-udtf in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 9b5514d32126a99856993de07598c89a174edab1 Author: Chen YZ <[email protected]> AuthorDate: Fri Feb 28 16:07:40 2025 +0800 refactor --- .../process/function/TableFunctionOperator.java | 17 +++---- .../operator/process/function/partition/Slice.java | 38 ++++++++------- .../process/function/partition/SliceCache.java | 56 ++++++++++++++++++---- 3 files changed, 76 insertions(+), 35 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java index 0caa8cbb3a9..fa6659d885a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java @@ -190,14 +190,15 @@ public class TableFunctionOperator implements ProcessOperator { if (needPassThrough) { // handle pass through column only if needed Column passThroughIndex = passThroughIndexBuilder.build(); - for (int i = 0; i < passThroughIndex.getPositionCount(); i++) { - long index = passThroughIndex.getLong(i); - Record row = sliceCache.getOriginalRecord((int) index); - for (int j = 0; j < row.size(); j++) { - if (row.isNull(j)) { - passThroughColumnBuilders.get(j).appendNull(); - } else { - passThroughColumnBuilders.get(j).writeObject(row.getObject(j)); + for (Column[] passThroughColumns : sliceCache.getPassThroughResult(passThroughIndex)) { + for (int i = 0; i < passThroughColumns.length; i++) { + ColumnBuilder passThroughColumnBuilder = passThroughColumnBuilders.get(i); + for (int j = 0; j < passThroughColumns[i].getPositionCount(); j++) { + if (passThroughColumns[i].isNull(j)) { + passThroughColumnBuilder.appendNull(); + } else { + passThroughColumnBuilder.write(passThroughColumns[i], j); + } } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java index f488ef89a62..8757971123c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java @@ -28,18 +28,19 @@ import org.apache.tsfile.utils.Binary; import org.apache.tsfile.utils.DateUtils; import java.time.LocalDate; +import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; +import java.util.stream.Collectors; /** Parts of partition. */ public class Slice { - private final int startIndex; - private final int endIndex; private final Column[] requiredColumns; private final Column[] passThroughColumns; private final List<Type> dataTypes; + private final long size; public Slice( int startIndex, @@ -48,34 +49,35 @@ public class Slice { List<Integer> requiredChannels, List<Integer> passThroughChannels, List<Type> dataTypes) { - this.startIndex = startIndex; - this.endIndex = endIndex; - this.requiredColumns = new Column[requiredChannels.size()]; - for (int i = 0; i < requiredChannels.size(); i++) { - requiredColumns[i] = columns[requiredChannels.get(i)]; - } - this.passThroughColumns = new Column[passThroughChannels.size()]; - for (int i = 0; i < passThroughChannels.size(); i++) { - passThroughColumns[i] = columns[passThroughChannels.get(i)]; - } + this.size = endIndex - startIndex; + List<Column> partitionColumns = + Arrays.stream(columns) + .map(i -> i.getRegion(startIndex, (int) size)) + .collect(Collectors.toList()); + this.requiredColumns = + requiredChannels.stream().map(partitionColumns::get).toArray(Column[]::new); + this.passThroughColumns = + passThroughChannels.stream().map(partitionColumns::get).toArray(Column[]::new); this.dataTypes = dataTypes; } - public int getSize() { - return endIndex - startIndex; + public long getSize() { + return size; } - public Record getPassThroughRecord(int offset) { - return getRecord(startIndex + offset, passThroughColumns); + public Column[] getPassThroughResult(int[] indexes) { + return Arrays.stream(passThroughColumns) + .map(i -> i.getPositions(indexes, 0, indexes.length)) + .toArray(Column[]::new); } public Iterator<Record> getRequiredRecordIterator() { return new Iterator<Record>() { - private int curIndex = startIndex; + private int curIndex = 0; @Override public boolean hasNext() { - return curIndex < endIndex; + return curIndex < size; } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/SliceCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/SliceCache.java index cc18f5cb30e..f1bc61f669b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/SliceCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/SliceCache.java @@ -19,7 +19,7 @@ package org.apache.iotdb.db.queryengine.execution.operator.process.function.partition; -import org.apache.iotdb.udf.api.relational.access.Record; +import org.apache.tsfile.block.column.Column; import java.util.ArrayList; import java.util.List; @@ -28,21 +28,59 @@ import java.util.List; public class SliceCache { private final List<Slice> slices = new ArrayList<>(); + private final List<Long> startOffsets = new ArrayList<>(); - public Record getOriginalRecord(long index) { - long previousSize = 0; - for (Slice slice : slices) { - long currentSize = slice.getSize(); - if (index < previousSize + currentSize) { - return slice.getPassThroughRecord((int) (index - previousSize)); + public List<Column[]> getPassThroughResult(Column indexes) { + List<Column[]> result = new ArrayList<>(); + int sliceIndex = findLastLessOrEqual(indexes.getLong(0)); + int indexStart = 0; + for (int i = 1; i < indexes.getPositionCount(); i++) { + int tmp = findLastLessOrEqual(indexes.getLong(i)); + if (tmp != sliceIndex) { + int[] indexArray = new int[i - indexStart]; + for (int j = indexStart; j < i; j++) { + indexArray[j - indexStart] = (int) (indexes.getLong(i) - startOffsets.get(sliceIndex)); + } + + result.add(slices.get(sliceIndex).getPassThroughResult(indexArray)); + indexStart = i; + sliceIndex = tmp; } - previousSize += currentSize; } - throw new IndexOutOfBoundsException("Index out of bound"); + int[] indexArray = new int[indexes.getPositionCount() - indexStart]; + for (int j = indexStart; j < indexes.getPositionCount(); j++) { + indexArray[j - indexStart] = (int) (indexes.getLong(j) - startOffsets.get(sliceIndex)); + } + result.add(slices.get(sliceIndex).getPassThroughResult(indexArray)); + return result; } public void addSlice(Slice slice) { slices.add(slice); + if (startOffsets.isEmpty()) { + startOffsets.add(0L); + } else { + startOffsets.add( + startOffsets.get(startOffsets.size() - 1) + + slices.get(startOffsets.size() - 1).getSize()); + } + } + + private int findLastLessOrEqual(long target) { + int left = 0; + int right = startOffsets.size() - 1; + int result = -1; + while (left <= right) { + int mid = left + (right - left) / 2; + + if (startOffsets.get(mid) <= target) { + result = mid; + left = mid + 1; + } else { + right = mid - 1; + } + } + return result; } public void clear() {
