This is an automated email from the ASF dual-hosted git repository.

chenyz pushed a commit to branch builtin-udtf
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 9b5514d32126a99856993de07598c89a174edab1
Author: Chen YZ <[email protected]>
AuthorDate: Fri Feb 28 16:07:40 2025 +0800

    refactor
---
 .../process/function/TableFunctionOperator.java    | 17 +++----
 .../operator/process/function/partition/Slice.java | 38 ++++++++-------
 .../process/function/partition/SliceCache.java     | 56 ++++++++++++++++++----
 3 files changed, 76 insertions(+), 35 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
index 0caa8cbb3a9..fa6659d885a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/TableFunctionOperator.java
@@ -190,14 +190,15 @@ public class TableFunctionOperator implements 
ProcessOperator {
     if (needPassThrough) {
       // handle pass through column only if needed
       Column passThroughIndex = passThroughIndexBuilder.build();
-      for (int i = 0; i < passThroughIndex.getPositionCount(); i++) {
-        long index = passThroughIndex.getLong(i);
-        Record row = sliceCache.getOriginalRecord((int) index);
-        for (int j = 0; j < row.size(); j++) {
-          if (row.isNull(j)) {
-            passThroughColumnBuilders.get(j).appendNull();
-          } else {
-            passThroughColumnBuilders.get(j).writeObject(row.getObject(j));
+      for (Column[] passThroughColumns : 
sliceCache.getPassThroughResult(passThroughIndex)) {
+        for (int i = 0; i < passThroughColumns.length; i++) {
+          ColumnBuilder passThroughColumnBuilder = 
passThroughColumnBuilders.get(i);
+          for (int j = 0; j < passThroughColumns[i].getPositionCount(); j++) {
+            if (passThroughColumns[i].isNull(j)) {
+              passThroughColumnBuilder.appendNull();
+            } else {
+              passThroughColumnBuilder.write(passThroughColumns[i], j);
+            }
           }
         }
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
index f488ef89a62..8757971123c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/Slice.java
@@ -28,18 +28,19 @@ import org.apache.tsfile.utils.Binary;
 import org.apache.tsfile.utils.DateUtils;
 
 import java.time.LocalDate;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
 
 /** Parts of partition. */
 public class Slice {
 
-  private final int startIndex;
-  private final int endIndex;
   private final Column[] requiredColumns;
   private final Column[] passThroughColumns;
   private final List<Type> dataTypes;
+  private final long size;
 
   public Slice(
       int startIndex,
@@ -48,34 +49,35 @@ public class Slice {
       List<Integer> requiredChannels,
       List<Integer> passThroughChannels,
       List<Type> dataTypes) {
-    this.startIndex = startIndex;
-    this.endIndex = endIndex;
-    this.requiredColumns = new Column[requiredChannels.size()];
-    for (int i = 0; i < requiredChannels.size(); i++) {
-      requiredColumns[i] = columns[requiredChannels.get(i)];
-    }
-    this.passThroughColumns = new Column[passThroughChannels.size()];
-    for (int i = 0; i < passThroughChannels.size(); i++) {
-      passThroughColumns[i] = columns[passThroughChannels.get(i)];
-    }
+    this.size = endIndex - startIndex;
+    List<Column> partitionColumns =
+        Arrays.stream(columns)
+            .map(i -> i.getRegion(startIndex, (int) size))
+            .collect(Collectors.toList());
+    this.requiredColumns =
+        
requiredChannels.stream().map(partitionColumns::get).toArray(Column[]::new);
+    this.passThroughColumns =
+        
passThroughChannels.stream().map(partitionColumns::get).toArray(Column[]::new);
     this.dataTypes = dataTypes;
   }
 
-  public int getSize() {
-    return endIndex - startIndex;
+  public long getSize() {
+    return size;
   }
 
-  public Record getPassThroughRecord(int offset) {
-    return getRecord(startIndex + offset, passThroughColumns);
+  public Column[] getPassThroughResult(int[] indexes) {
+    return Arrays.stream(passThroughColumns)
+        .map(i -> i.getPositions(indexes, 0, indexes.length))
+        .toArray(Column[]::new);
   }
 
   public Iterator<Record> getRequiredRecordIterator() {
     return new Iterator<Record>() {
-      private int curIndex = startIndex;
+      private int curIndex = 0;
 
       @Override
       public boolean hasNext() {
-        return curIndex < endIndex;
+        return curIndex < size;
       }
 
       @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/SliceCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/SliceCache.java
index cc18f5cb30e..f1bc61f669b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/SliceCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/SliceCache.java
@@ -19,7 +19,7 @@
 
 package 
org.apache.iotdb.db.queryengine.execution.operator.process.function.partition;
 
-import org.apache.iotdb.udf.api.relational.access.Record;
+import org.apache.tsfile.block.column.Column;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -28,21 +28,59 @@ import java.util.List;
 public class SliceCache {
 
   private final List<Slice> slices = new ArrayList<>();
+  private final List<Long> startOffsets = new ArrayList<>();
 
-  public Record getOriginalRecord(long index) {
-    long previousSize = 0;
-    for (Slice slice : slices) {
-      long currentSize = slice.getSize();
-      if (index < previousSize + currentSize) {
-        return slice.getPassThroughRecord((int) (index - previousSize));
+  public List<Column[]> getPassThroughResult(Column indexes) {
+    List<Column[]> result = new ArrayList<>();
+    int sliceIndex = findLastLessOrEqual(indexes.getLong(0));
+    int indexStart = 0;
+    for (int i = 1; i < indexes.getPositionCount(); i++) {
+      int tmp = findLastLessOrEqual(indexes.getLong(i));
+      if (tmp != sliceIndex) {
+        int[] indexArray = new int[i - indexStart];
+        for (int j = indexStart; j < i; j++) {
+          indexArray[j - indexStart] = (int) (indexes.getLong(i) - 
startOffsets.get(sliceIndex));
+        }
+
+        result.add(slices.get(sliceIndex).getPassThroughResult(indexArray));
+        indexStart = i;
+        sliceIndex = tmp;
       }
-      previousSize += currentSize;
     }
-    throw new IndexOutOfBoundsException("Index out of bound");
+    int[] indexArray = new int[indexes.getPositionCount() - indexStart];
+    for (int j = indexStart; j < indexes.getPositionCount(); j++) {
+      indexArray[j - indexStart] = (int) (indexes.getLong(j) - 
startOffsets.get(sliceIndex));
+    }
+    result.add(slices.get(sliceIndex).getPassThroughResult(indexArray));
+    return result;
   }
 
   public void addSlice(Slice slice) {
     slices.add(slice);
+    if (startOffsets.isEmpty()) {
+      startOffsets.add(0L);
+    } else {
+      startOffsets.add(
+          startOffsets.get(startOffsets.size() - 1)
+              + slices.get(startOffsets.size() - 1).getSize());
+    }
+  }
+
+  private int findLastLessOrEqual(long target) {
+    int left = 0;
+    int right = startOffsets.size() - 1;
+    int result = -1;
+    while (left <= right) {
+      int mid = left + (right - left) / 2;
+
+      if (startOffsets.get(mid) <= target) {
+        result = mid;
+        left = mid + 1;
+      } else {
+        right = mid - 1;
+      }
+    }
+    return result;
   }
 
   public void clear() {

Reply via email to