This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new a7c541d4c99 Use PartitionRecognizer and PartitionCache to handle
window function's partition. (#17280)
a7c541d4c99 is described below
commit a7c541d4c9945d4a42a6bf3f3b9e1c19206f55ee
Author: Zhihao Shen <[email protected]>
AuthorDate: Fri Mar 13 15:08:50 2026 +0800
Use PartitionRecognizer and PartitionCache to handle window function's
partition. (#17280)
---
.../process/function/partition/PartitionCache.java | 10 +
.../rowpattern/PatternPartitionExecutor.java | 36 +--
.../process/window/TableWindowOperator.java | 254 ++++++---------------
.../process/window/partition/Partition.java | 213 +++++++++--------
.../window/partition/PartitionExecutor.java | 49 ++--
5 files changed, 235 insertions(+), 327 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/PartitionCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/PartitionCache.java
index 0591de961e2..1797c8fc00e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/PartitionCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/function/partition/PartitionCache.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.queryengine.execution.operator.process.function.part
import org.apache.tsfile.block.column.Column;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
/** Used to manage the slices of the partition. It is all in memory now. */
@@ -97,6 +98,14 @@ public class PartitionCache {
&& passThroughIndex < getSliceOffset(sliceIndex) +
slices.get(sliceIndex).getSize();
}
+ public List<Slice> getSlices() {
+ return Collections.unmodifiableList(slices);
+ }
+
+ public boolean isEmpty() {
+ return slices.isEmpty();
+ }
+
public long getEstimatedSize() {
return estimatedSize;
}
@@ -104,6 +113,7 @@ public class PartitionCache {
public void clear() {
slices.clear();
startOffsets.clear();
+ estimatedSize = 0;
}
public void close() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/PatternPartitionExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/PatternPartitionExecutor.java
index d76fe1232c5..402cc7e8312 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/PatternPartitionExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/rowpattern/PatternPartitionExecutor.java
@@ -32,7 +32,6 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.RowsPerMatch
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.SkipToPosition;
import com.google.common.collect.ImmutableList;
-import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
@@ -205,18 +204,12 @@ public final class PatternPartitionExecutor {
// for ALL ROWS PER MATCH WITH UNMATCHED ROWS
// the output for unmatched row refers to no pattern match.
private void outputUnmatchedRow(TsBlockBuilder builder) {
- // Copy origin data
int index = currentPosition - partitionStart;
- Partition.PartitionIndex partitionIndex =
partition.getPartitionIndex(index);
- int tsBlockIndex = partitionIndex.getTsBlockIndex();
- int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
- TsBlock tsBlock = partition.getTsBlock(tsBlockIndex);
int channel = 0;
for (int i = 0; i < outputChannels.size(); i++) {
- Column column = tsBlock.getColumn(outputChannels.get(i));
ColumnBuilder columnBuilder = builder.getColumnBuilder(i);
- columnBuilder.write(column, offsetInTsBlock);
+ partition.writeTo(columnBuilder, outputChannels.get(i), index);
channel++;
}
@@ -231,18 +224,12 @@ public final class PatternPartitionExecutor {
// the output for empty match refers to empty pattern match.
private void outputEmptyMatch(TsBlockBuilder builder) {
- // Copy origin data
int index = currentPosition - partitionStart;
- Partition.PartitionIndex partitionIndex =
partition.getPartitionIndex(index);
- int tsBlockIndex = partitionIndex.getTsBlockIndex();
- int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
- TsBlock tsBlock = partition.getTsBlock(tsBlockIndex);
int channel = 0;
for (int i = 0; i < outputChannels.size(); i++) {
- Column column = tsBlock.getColumn(outputChannels.get(i));
ColumnBuilder columnBuilder = builder.getColumnBuilder(i);
- columnBuilder.write(column, offsetInTsBlock);
+ partition.writeTo(columnBuilder, outputChannels.get(i), index);
channel++;
}
@@ -268,20 +255,13 @@ public final class PatternPartitionExecutor {
int patternStart,
int searchStart,
int searchEnd) {
- // Copy origin data
int index = currentPosition - partitionStart;
- Partition.PartitionIndex partitionIndex =
partition.getPartitionIndex(index);
- int tsBlockIndex = partitionIndex.getTsBlockIndex();
- int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
- TsBlock tsBlock = partition.getTsBlock(tsBlockIndex);
int channel = 0;
// PARTITION BY
for (int i = 0; i < outputChannels.size(); i++) {
- Column column = tsBlock.getColumn(outputChannels.get(i));
ColumnBuilder columnBuilder = builder.getColumnBuilder(i);
- columnBuilder.write(column, offsetInTsBlock);
-
+ partition.writeTo(columnBuilder, outputChannels.get(i), index);
channel++;
}
@@ -348,18 +328,10 @@ public final class PatternPartitionExecutor {
// Called by method outputAllRowsPerMatch
private void outputRow(
TsBlockBuilder builder, ArrayView labels, int position, int searchStart,
int searchEnd) {
- // Copy origin data
- Partition.PartitionIndex partitionIndex =
partition.getPartitionIndex(position);
- int tsBlockIndex = partitionIndex.getTsBlockIndex();
- int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
- TsBlock tsBlock = partition.getTsBlock(tsBlockIndex);
-
- // map the column data of the current row from the input table to the
output table
int channel = 0;
for (int i = 0; i < outputChannels.size(); i++) {
- Column column = tsBlock.getColumn(outputChannels.get(i));
ColumnBuilder columnBuilder = builder.getColumnBuilder(i);
- columnBuilder.write(column, offsetInTsBlock);
+ partition.writeTo(columnBuilder, outputChannels.get(i), position);
channel++;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperator.java
index b6fb47d601f..32e1a44ce45 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/TableWindowOperator.java
@@ -23,15 +23,18 @@ import
org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import
org.apache.iotdb.db.queryengine.execution.operator.process.ProcessOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.function.PartitionRecognizer;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.PartitionCache;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.PartitionState;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice;
import
org.apache.iotdb.db.queryengine.execution.operator.process.window.function.WindowFunction;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.Partition;
import
org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.PartitionExecutor;
import
org.apache.iotdb.db.queryengine.execution.operator.process.window.partition.frame.FrameInfo;
-import
org.apache.iotdb.db.queryengine.execution.operator.process.window.utils.RowComparator;
import
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
-import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
@@ -65,10 +68,8 @@ public class TableWindowOperator implements ProcessOperator {
private final List<FrameInfo> frameInfoList;
// Partition
- private final List<Integer> partitionChannels;
- private final RowComparator partitionComparator;
- private final List<TsBlock> cachedTsBlocks;
- private int startIndexInFirstBlock;
+ private final PartitionRecognizer partitionRecognizer;
+ private final PartitionCache partitionCache;
// Sort
private final List<Integer> sortChannels;
@@ -80,6 +81,7 @@ public class TableWindowOperator implements ProcessOperator {
private long totalMemorySize;
private long maxUsedMemory;
private final long maxRuntime;
+ private boolean noMoreDataSignaled;
public TableWindowOperator(
OperatorContext operatorContext,
@@ -91,38 +93,32 @@ public class TableWindowOperator implements ProcessOperator
{
List<FrameInfo> frameInfoList,
List<Integer> partitionChannels,
List<Integer> sortChannels) {
- // Common part(among all other operators)
this.operatorContext = operatorContext;
this.inputOperator = inputOperator;
this.inputDataTypes = ImmutableList.copyOf(inputDataTypes);
this.outputChannels = ImmutableList.copyOf(outputChannels);
this.tsBlockBuilder = new TsBlockBuilder(outputDataTypes);
- // Basic information part
this.windowFunctions = ImmutableList.copyOf(windowFunctions);
this.frameInfoList = ImmutableList.copyOf(frameInfoList);
- // Partition Part
- this.partitionChannels = ImmutableList.copyOf(partitionChannels);
- // Acquire partition channels' data types
- List<TSDataType> partitionDataTypes = new ArrayList<>();
- for (Integer channel : partitionChannels) {
- partitionDataTypes.add(inputDataTypes.get(channel));
+ List<Integer> requiredChannels = new ArrayList<>(inputDataTypes.size());
+ for (int i = 0; i < inputDataTypes.size(); i++) {
+ requiredChannels.add(i);
}
- this.partitionComparator = new RowComparator(partitionDataTypes);
+ this.partitionRecognizer =
+ new PartitionRecognizer(
+ partitionChannels, requiredChannels, Collections.emptyList(),
inputDataTypes);
+ this.partitionCache = new PartitionCache();
- // Ordering part
this.sortChannels = ImmutableList.copyOf(sortChannels);
- // Transformation part
this.cachedPartitionExecutors = new LinkedList<>();
- // Misc
- this.cachedTsBlocks = new ArrayList<>();
- this.startIndexInFirstBlock = -1;
this.maxRuntime =
this.operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
this.totalMemorySize = 0;
this.maxUsedMemory = 0;
+ this.noMoreDataSignaled = false;
this.memoryReservationManager =
operatorContext
.getDriverContext()
@@ -144,178 +140,88 @@ public class TableWindowOperator implements
ProcessOperator {
public TsBlock next() throws Exception {
long startTime = System.nanoTime();
- // Transform is not finished
if (!cachedPartitionExecutors.isEmpty()) {
TsBlock tsBlock = transform(startTime);
if (tsBlock != null) {
return tsBlock;
}
- // Receive more data when result TsBlock builder is not full
- // In this case, all partition executors are done
}
if (inputOperator.hasNextWithTimer()) {
- // This TsBlock is pre-sorted with PARTITION BY and ORDER BY channels
TsBlock preSortedBlock = inputOperator.nextWithTimer();
- // StreamSort Operator sometimes returns null
if (preSortedBlock == null || preSortedBlock.isEmpty()) {
return null;
}
- cachedPartitionExecutors = partition(preSortedBlock);
- if (cachedPartitionExecutors.isEmpty()) {
- // No partition found
- // i.e., partition crosses multiple TsBlocks
- return null;
- }
+ partitionRecognizer.addTsBlock(preSortedBlock);
+ processRecognizerStates();
- // May return null if builder is not full
- return transform(startTime);
- } else if (!cachedTsBlocks.isEmpty()) {
- // Form last partition
- TsBlock lastTsBlock = cachedTsBlocks.get(cachedTsBlocks.size() - 1);
- int endIndexOfLastTsBlock = lastTsBlock.getPositionCount();
- PartitionExecutor partitionExecutor =
- new PartitionExecutor(
- cachedTsBlocks,
- inputDataTypes,
- startIndexInFirstBlock,
- endIndexOfLastTsBlock,
- outputChannels,
- windowFunctions,
- frameInfoList,
- sortChannels);
- cachedPartitionExecutors.addLast(partitionExecutor);
- cachedTsBlocks.clear();
- releaseAllCachedTsBlockMemory();
-
- TsBlock tsBlock = transform(startTime);
- if (tsBlock == null) {
- // TsBlockBuilder is not full
- // Force build since this is the last partition
- tsBlock =
- tsBlockBuilder.build(
- new RunLengthEncodedColumn(
- TIME_COLUMN_TEMPLATE, tsBlockBuilder.getPositionCount()));
- tsBlockBuilder.reset();
+ if (!cachedPartitionExecutors.isEmpty()) {
+ return transform(startTime);
+ }
+ return null;
+ } else if (!noMoreDataSignaled) {
+ partitionRecognizer.noMoreData();
+ noMoreDataSignaled = true;
+ processRecognizerStates();
+
+ if (!cachedPartitionExecutors.isEmpty()) {
+ TsBlock tsBlock = transform(startTime);
+ if (tsBlock == null && !tsBlockBuilder.isEmpty()) {
+ tsBlock = getTsBlockFromTsBlockBuilder();
+ }
+ return tsBlock;
}
-
- return tsBlock;
} else if (!tsBlockBuilder.isEmpty()) {
- // Return remaining data in result TsBlockBuilder
- // This happens when last partition is too large
- // And TsBlockBuilder is not full at the end of transform
return getTsBlockFromTsBlockBuilder();
}
return null;
}
- private LinkedList<PartitionExecutor> partition(TsBlock tsBlock) {
- LinkedList<PartitionExecutor> partitionExecutors = new LinkedList<>();
-
- int partitionStartInCurrentBlock = 0;
- int partitionEndInCurrentBlock = partitionStartInCurrentBlock + 1;
-
- // In this stage, we only consider partition channels
- List<Column> partitionColumns = extractPartitionColumns(tsBlock);
-
- // Previous TsBlocks forms a partition
- if (!cachedTsBlocks.isEmpty()) {
- TsBlock lastTsBlock = cachedTsBlocks.get(cachedTsBlocks.size() - 1);
- int endIndexOfLastTsBlock = lastTsBlock.getPositionCount();
-
- // Whether the first row of current TsBlock is not equal to
- // last row of previous cached TsBlocks
- List<Column> lastPartitionColumns = extractPartitionColumns(lastTsBlock);
- if (!partitionComparator.equal(
- partitionColumns, 0, lastPartitionColumns, endIndexOfLastTsBlock -
1)) {
- PartitionExecutor partitionExecutor =
- new PartitionExecutor(
- cachedTsBlocks,
- inputDataTypes,
- startIndexInFirstBlock,
- endIndexOfLastTsBlock,
- outputChannels,
- windowFunctions,
- frameInfoList,
- sortChannels);
-
- partitionExecutors.addLast(partitionExecutor);
- cachedTsBlocks.clear();
- releaseAllCachedTsBlockMemory();
- startIndexInFirstBlock = -1;
+ private void processRecognizerStates() {
+ while (true) {
+ PartitionState state = partitionRecognizer.nextState();
+ switch (state.getStateType()) {
+ case INIT:
+ case NEED_MORE_DATA:
+ return;
+ case FINISHED:
+ finalizeCurrentPartition();
+ return;
+ case NEW_PARTITION:
+ finalizeCurrentPartition();
+ addSliceToCache(state.getSlice());
+ break;
+ case ITERATING:
+ addSliceToCache(state.getSlice());
+ break;
+ default:
+ break;
}
}
+ }
- // Try to find all partitions
- int count = tsBlock.getPositionCount();
- while (count == 1 || partitionEndInCurrentBlock < count) {
- // Try to find one partition
- while (partitionEndInCurrentBlock < count
- && partitionComparator.equalColumns(
- partitionColumns, partitionStartInCurrentBlock,
partitionEndInCurrentBlock)) {
- partitionEndInCurrentBlock++;
- }
-
- if (partitionEndInCurrentBlock != count) {
- // Find partition
- PartitionExecutor partitionExecutor;
- if (partitionStartInCurrentBlock != 0 || startIndexInFirstBlock == -1)
{
- // Small partition within this TsBlock
- partitionExecutor =
- new PartitionExecutor(
- Collections.singletonList(tsBlock),
- inputDataTypes,
- partitionStartInCurrentBlock,
- partitionEndInCurrentBlock,
- outputChannels,
- windowFunctions,
- frameInfoList,
- sortChannels);
- } else {
- // Large partition crosses multiple TsBlocks
- reserveOneTsBlockMemory(tsBlock);
- cachedTsBlocks.add(tsBlock);
- partitionExecutor =
- new PartitionExecutor(
- cachedTsBlocks,
- inputDataTypes,
- startIndexInFirstBlock,
- partitionEndInCurrentBlock,
- outputChannels,
- windowFunctions,
- frameInfoList,
- sortChannels);
- // Clear TsBlock of last partition
- cachedTsBlocks.clear();
- releaseAllCachedTsBlockMemory();
- }
- partitionExecutors.addLast(partitionExecutor);
-
- partitionStartInCurrentBlock = partitionEndInCurrentBlock;
- // Reset cross-TsBlock tracking after partition completion
- startIndexInFirstBlock = -1;
- } else {
- // Last partition of TsBlock
- // The beginning of next TsBlock may have rows in this partition
- if (startIndexInFirstBlock == -1) {
- startIndexInFirstBlock = partitionStartInCurrentBlock;
- }
- reserveOneTsBlockMemory(tsBlock);
- cachedTsBlocks.add(tsBlock);
- // For count == 1
- break;
- }
+ private void finalizeCurrentPartition() {
+ if (!partitionCache.isEmpty()) {
+ Partition partition = new Partition(partitionCache.getSlices());
+ PartitionExecutor partitionExecutor =
+ new PartitionExecutor(
+ partition,
+ inputDataTypes,
+ outputChannels,
+ windowFunctions,
+ frameInfoList,
+ sortChannels);
+ cachedPartitionExecutors.addLast(partitionExecutor);
+ releasePartitionCacheMemory();
+ partitionCache.clear();
}
-
- return partitionExecutors;
}
private TsBlock transform(long startTime) {
while (!cachedPartitionExecutors.isEmpty()) {
PartitionExecutor partitionExecutor =
cachedPartitionExecutors.getFirst();
- // Reset window functions for new partition
partitionExecutor.resetWindowFunctions();
while (System.nanoTime() - startTime < maxRuntime
@@ -333,19 +239,9 @@ public class TableWindowOperator implements
ProcessOperator {
}
}
- // Reach partition end, but builder is not full yet
return null;
}
- private List<Column> extractPartitionColumns(TsBlock tsBlock) {
- List<Column> partitionColumns = new ArrayList<>(partitionChannels.size());
- for (int channel : partitionChannels) {
- Column partitionColumn = tsBlock.getColumn(channel);
- partitionColumns.add(partitionColumn);
- }
- return partitionColumns;
- }
-
private TsBlock getTsBlockFromTsBlockBuilder() {
TsBlock result =
tsBlockBuilder.build(
@@ -358,13 +254,14 @@ public class TableWindowOperator implements
ProcessOperator {
public boolean hasNext() throws Exception {
return !cachedPartitionExecutors.isEmpty()
|| inputOperator.hasNext()
- || !cachedTsBlocks.isEmpty()
+ || !partitionCache.isEmpty()
|| !tsBlockBuilder.isEmpty();
}
@Override
public void close() throws Exception {
inputOperator.close();
+ partitionCache.close();
if (totalMemorySize != 0) {
memoryReservationManager.releaseMemoryCumulatively(totalMemorySize);
}
@@ -375,19 +272,19 @@ public class TableWindowOperator implements
ProcessOperator {
return !this.hasNextWithTimer();
}
- private void reserveOneTsBlockMemory(TsBlock tsBlock) {
- long reserved = tsBlock.getTotalInstanceSize();
+ private void addSliceToCache(Slice slice) {
+ long reserved = slice.getEstimatedSize();
memoryReservationManager.reserveMemoryCumulatively(reserved);
totalMemorySize += reserved;
maxUsedMemory = Math.max(maxUsedMemory, totalMemorySize);
operatorContext.recordSpecifiedInfo(MAX_RESERVED_MEMORY,
Long.toString(maxUsedMemory));
+ partitionCache.addSlice(slice);
}
- private void releaseAllCachedTsBlockMemory() {
- long released =
cachedTsBlocks.stream().mapToInt(TsBlock::getTotalInstanceSize).sum();
+ private void releasePartitionCacheMemory() {
+ long released = partitionCache.getEstimatedSize();
memoryReservationManager.releaseMemoryCumulatively(released);
totalMemorySize -= released;
- // No need to update maxUsedMemory
operatorContext.recordSpecifiedInfo(MAX_RESERVED_MEMORY,
Long.toString(maxUsedMemory));
}
@@ -415,6 +312,7 @@ public class TableWindowOperator implements ProcessOperator
{
return INSTANCE_SIZE
+
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(inputOperator)
+
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext)
- + tsBlockBuilder.getRetainedSizeInBytes();
+ + tsBlockBuilder.getRetainedSizeInBytes()
+ + partitionCache.getEstimatedSize();
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/Partition.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/Partition.java
index b8acd7fae11..ac23bbc9fef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/Partition.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/Partition.java
@@ -19,6 +19,7 @@
package
org.apache.iotdb.db.queryengine.execution.operator.process.window.partition;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.function.partition.Slice;
import
org.apache.iotdb.db.queryengine.execution.operator.process.window.utils.ColumnList;
import org.apache.tsfile.block.column.Column;
@@ -27,40 +28,45 @@ import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.utils.Binary;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
public class Partition {
- private final List<TsBlock> tsBlocks;
+ private final List<Column[]> segments;
private int cachedPositionCount = -1;
public Partition(List<TsBlock> tsBlocks, int startIndexInFirstBlock, int
endIndexInLastBlock) {
+ this.segments = new ArrayList<>(tsBlocks.size());
if (tsBlocks.size() == 1) {
int length = endIndexInLastBlock - startIndexInFirstBlock;
- this.tsBlocks =
-
Collections.singletonList(tsBlocks.get(0).getRegion(startIndexInFirstBlock,
length));
- return;
+ TsBlock region = tsBlocks.get(0).getRegion(startIndexInFirstBlock,
length);
+ segments.add(region.getValueColumns());
+ } else {
+ TsBlock firstBlock = tsBlocks.get(0).subTsBlock(startIndexInFirstBlock);
+ segments.add(firstBlock.getValueColumns());
+ for (int i = 1; i < tsBlocks.size() - 1; i++) {
+ segments.add(tsBlocks.get(i).getValueColumns());
+ }
+ TsBlock lastBlock = tsBlocks.get(tsBlocks.size() - 1).getRegion(0,
endIndexInLastBlock);
+ segments.add(lastBlock.getValueColumns());
}
+ }
- this.tsBlocks = new ArrayList<>(tsBlocks.size());
- // First TsBlock
- TsBlock firstBlock = tsBlocks.get(0).subTsBlock(startIndexInFirstBlock);
- this.tsBlocks.add(firstBlock);
- // Middle TsBlock
- for (int i = 1; i < tsBlocks.size() - 1; i++) {
- this.tsBlocks.add(tsBlocks.get(i));
+ public Partition(List<Slice> slices) {
+ this.segments = new ArrayList<>(slices.size());
+ for (Slice slice : slices) {
+ segments.add(slice.getRequiredColumns());
}
- // Last TsBlock
- TsBlock lastBlock = tsBlocks.get(tsBlocks.size() - 1).getRegion(0,
endIndexInLastBlock);
- this.tsBlocks.add(lastBlock);
+ }
+
+ private Partition(List<Column[]> segments, boolean directSegments) {
+ this.segments = segments;
}
public int getPositionCount() {
if (cachedPositionCount == -1) {
- // Lazy initialized
cachedPositionCount = 0;
- for (TsBlock block : tsBlocks) {
- cachedPositionCount += block.getPositionCount();
+ for (Column[] segment : segments) {
+ cachedPositionCount += segment[0].getPositionCount();
}
}
@@ -68,144 +74,149 @@ public class Partition {
}
public int getValueColumnCount() {
- return tsBlocks.get(0).getValueColumnCount();
- }
-
- public TsBlock getTsBlock(int tsBlockIndex) {
- return tsBlocks.get(tsBlockIndex);
+ return segments.get(0).length;
}
public List<Column[]> getAllColumns() {
- List<Column[]> allColumns = new ArrayList<>();
- for (TsBlock block : tsBlocks) {
- allColumns.add(block.getAllColumns());
- }
-
- return allColumns;
+ return segments;
}
public boolean getBoolean(int channel, int rowIndex) {
PartitionIndex partitionIndex = getPartitionIndex(rowIndex);
- int tsBlockIndex = partitionIndex.getTsBlockIndex();
- int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
-
- TsBlock tsBlock = tsBlocks.get(tsBlockIndex);
- return tsBlock.getColumn(channel).getBoolean(offsetInTsBlock);
+ int segmentIndex = partitionIndex.getSegmentIndex();
+ int offset = partitionIndex.getOffsetInSegment();
+ return segments.get(segmentIndex)[channel].getBoolean(offset);
}
public int getInt(int channel, int rowIndex) {
PartitionIndex partitionIndex = getPartitionIndex(rowIndex);
- int tsBlockIndex = partitionIndex.getTsBlockIndex();
- int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
-
- TsBlock tsBlock = tsBlocks.get(tsBlockIndex);
- return tsBlock.getColumn(channel).getInt(offsetInTsBlock);
+ int segmentIndex = partitionIndex.getSegmentIndex();
+ int offset = partitionIndex.getOffsetInSegment();
+ return segments.get(segmentIndex)[channel].getInt(offset);
}
public long getLong(int channel, int rowIndex) {
PartitionIndex partitionIndex = getPartitionIndex(rowIndex);
- int tsBlockIndex = partitionIndex.getTsBlockIndex();
- int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
-
- TsBlock tsBlock = tsBlocks.get(tsBlockIndex);
- return tsBlock.getColumn(channel).getLong(offsetInTsBlock);
+ int segmentIndex = partitionIndex.getSegmentIndex();
+ int offset = partitionIndex.getOffsetInSegment();
+ return segments.get(segmentIndex)[channel].getLong(offset);
}
public float getFloat(int channel, int rowIndex) {
PartitionIndex partitionIndex = getPartitionIndex(rowIndex);
- int tsBlockIndex = partitionIndex.getTsBlockIndex();
- int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
-
- TsBlock tsBlock = tsBlocks.get(tsBlockIndex);
- return tsBlock.getColumn(channel).getFloat(offsetInTsBlock);
+ int segmentIndex = partitionIndex.getSegmentIndex();
+ int offset = partitionIndex.getOffsetInSegment();
+ return segments.get(segmentIndex)[channel].getFloat(offset);
}
public double getDouble(int channel, int rowIndex) {
PartitionIndex partitionIndex = getPartitionIndex(rowIndex);
- int tsBlockIndex = partitionIndex.getTsBlockIndex();
- int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
-
- TsBlock tsBlock = tsBlocks.get(tsBlockIndex);
- return tsBlock.getColumn(channel).getDouble(offsetInTsBlock);
+ int segmentIndex = partitionIndex.getSegmentIndex();
+ int offset = partitionIndex.getOffsetInSegment();
+ return segments.get(segmentIndex)[channel].getDouble(offset);
}
public Binary getBinary(int channel, int rowIndex) {
PartitionIndex partitionIndex = getPartitionIndex(rowIndex);
- int tsBlockIndex = partitionIndex.getTsBlockIndex();
- int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
-
- TsBlock tsBlock = tsBlocks.get(tsBlockIndex);
- return tsBlock.getColumn(channel).getBinary(offsetInTsBlock);
+ int segmentIndex = partitionIndex.getSegmentIndex();
+ int offset = partitionIndex.getOffsetInSegment();
+ return segments.get(segmentIndex)[channel].getBinary(offset);
}
public boolean isNull(int channel, int rowIndex) {
PartitionIndex partitionIndex = getPartitionIndex(rowIndex);
- int tsBlockIndex = partitionIndex.getTsBlockIndex();
- int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
-
- TsBlock tsBlock = tsBlocks.get(tsBlockIndex);
- return tsBlock.getColumn(channel).isNull(offsetInTsBlock);
+ int segmentIndex = partitionIndex.getSegmentIndex();
+ int offset = partitionIndex.getOffsetInSegment();
+ return segments.get(segmentIndex)[channel].isNull(offset);
}
public void writeTo(ColumnBuilder builder, int channel, int rowIndex) {
PartitionIndex partitionIndex = getPartitionIndex(rowIndex);
- int tsBlockIndex = partitionIndex.getTsBlockIndex();
- int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
-
- Column column = tsBlocks.get(tsBlockIndex).getColumn(channel);
- builder.write(column, offsetInTsBlock);
+ int segmentIndex = partitionIndex.getSegmentIndex();
+ int offset = partitionIndex.getOffsetInSegment();
+ Column column = segments.get(segmentIndex)[channel];
+ builder.write(column, offset);
}
public static class PartitionIndex {
- private final int tsBlockIndex;
- private final int offsetInTsBlock;
+ private final int segmentIndex;
+ private final int offsetInSegment;
- PartitionIndex(int tsBlockIndex, int offsetInTsBlock) {
- this.tsBlockIndex = tsBlockIndex;
- this.offsetInTsBlock = offsetInTsBlock;
+ PartitionIndex(int segmentIndex, int offsetInSegment) {
+ this.segmentIndex = segmentIndex;
+ this.offsetInSegment = offsetInSegment;
}
- public int getTsBlockIndex() {
- return tsBlockIndex;
+ public int getSegmentIndex() {
+ return segmentIndex;
}
- public int getOffsetInTsBlock() {
- return offsetInTsBlock;
+ public int getOffsetInSegment() {
+ return offsetInSegment;
}
}
// start and end are indexes within partition
// Both of them are inclusive, i.e. [start, end]
public Partition getRegion(int start, int end) {
- PartitionIndex startPartitionIndex = getPartitionIndex(start);
- PartitionIndex endPartitionIndex = getPartitionIndex(end);
-
- List<TsBlock> tsBlockList = new ArrayList<>();
- int startTsBlockIndex = startPartitionIndex.getTsBlockIndex();
- int endTsBlockIndex = endPartitionIndex.getTsBlockIndex();
- for (int i = startTsBlockIndex; i <= endTsBlockIndex; i++) {
- tsBlockList.add(tsBlocks.get(i));
+ PartitionIndex startPI = getPartitionIndex(start);
+ PartitionIndex endPI = getPartitionIndex(end);
+
+ int startSeg = startPI.getSegmentIndex();
+ int endSeg = endPI.getSegmentIndex();
+ int columnCount = segments.get(0).length;
+
+ List<Column[]> regionSegments = new ArrayList<>();
+
+ if (startSeg == endSeg) {
+ int offset = startPI.getOffsetInSegment();
+ int length = endPI.getOffsetInSegment() - offset + 1;
+ Column[] cols = segments.get(startSeg);
+ Column[] region = new Column[columnCount];
+ for (int c = 0; c < columnCount; c++) {
+ region[c] = cols[c].getRegion(offset, length);
+ }
+ regionSegments.add(region);
+ } else {
+ // First segment
+ Column[] firstCols = segments.get(startSeg);
+ int firstOffset = startPI.getOffsetInSegment();
+ int firstLen = firstCols[0].getPositionCount() - firstOffset;
+ Column[] firstRegion = new Column[columnCount];
+ for (int c = 0; c < columnCount; c++) {
+ firstRegion[c] = firstCols[c].getRegion(firstOffset, firstLen);
+ }
+ regionSegments.add(firstRegion);
+
+ // Middle segments
+ for (int i = startSeg + 1; i < endSeg; i++) {
+ regionSegments.add(segments.get(i));
+ }
+
+ // Last segment
+ Column[] lastCols = segments.get(endSeg);
+ int lastLen = endPI.getOffsetInSegment() + 1;
+ Column[] lastRegion = new Column[columnCount];
+ for (int c = 0; c < columnCount; c++) {
+ lastRegion[c] = lastCols[c].getRegion(0, lastLen);
+ }
+ regionSegments.add(lastRegion);
}
- int startIndexInFirstBlock = startPartitionIndex.getOffsetInTsBlock();
- int endIndexInLastBlock = endPartitionIndex.getOffsetInTsBlock();
- return new Partition(tsBlockList, startIndexInFirstBlock,
endIndexInLastBlock + 1);
+ return new Partition(regionSegments, true);
}
- // rowIndex is index within partition
public PartitionIndex getPartitionIndex(int rowIndex) {
- int tsBlockIndex = 0;
- while (tsBlockIndex < tsBlocks.size()
- && rowIndex >= tsBlocks.get(tsBlockIndex).getPositionCount()) {
- rowIndex -= tsBlocks.get(tsBlockIndex).getPositionCount();
- // Enter next TsBlock
- tsBlockIndex++;
+ int segmentIndex = 0;
+ while (segmentIndex < segments.size()
+ && rowIndex >= segments.get(segmentIndex)[0].getPositionCount()) {
+ rowIndex -= segments.get(segmentIndex)[0].getPositionCount();
+ segmentIndex++;
}
- if (tsBlockIndex != tsBlocks.size()) {
- return new PartitionIndex(tsBlockIndex, rowIndex);
+ if (segmentIndex != segments.size()) {
+ return new PartitionIndex(segmentIndex, rowIndex);
} else {
- // Unlikely
throw new IndexOutOfBoundsException("Index out of Partition's bounds!");
}
}
@@ -215,8 +226,8 @@ public class Partition {
for (Integer sortedChannel : sortedChannels) {
List<Column> columns = new ArrayList<>();
- for (TsBlock tsBlock : tsBlocks) {
- columns.add(tsBlock.getColumn(sortedChannel));
+ for (Column[] segment : segments) {
+ columns.add(segment[sortedChannel]);
}
columnLists.add(new ColumnList(columns));
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/PartitionExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/PartitionExecutor.java
index 20167955d79..d5a68ea5796 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/PartitionExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/window/partition/PartitionExecutor.java
@@ -30,7 +30,6 @@ import
org.apache.iotdb.db.queryengine.execution.operator.process.window.utils.R
import
org.apache.iotdb.db.queryengine.execution.operator.process.window.utils.RowComparator;
import com.google.common.collect.ImmutableList;
-import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
@@ -69,17 +68,42 @@ public final class PartitionExecutor {
List<WindowFunction> windowFunctions,
List<FrameInfo> frameInfoList,
List<Integer> sortChannels) {
- // Partition
- this.partition = new Partition(tsBlocks, startIndexInFirstBlock,
endIndexInLastBlock);
- this.partitionStart = startIndexInFirstBlock;
- this.partitionEnd = startIndexInFirstBlock +
this.partition.getPositionCount();
- // Window functions and frames
+ this(
+ new Partition(tsBlocks, startIndexInFirstBlock, endIndexInLastBlock),
+ dataTypes,
+ startIndexInFirstBlock,
+ outputChannels,
+ windowFunctions,
+ frameInfoList,
+ sortChannels);
+ }
+
+ public PartitionExecutor(
+ Partition partition,
+ List<TSDataType> dataTypes,
+ List<Integer> outputChannels,
+ List<WindowFunction> windowFunctions,
+ List<FrameInfo> frameInfoList,
+ List<Integer> sortChannels) {
+ this(partition, dataTypes, 0, outputChannels, windowFunctions,
frameInfoList, sortChannels);
+ }
+
+ private PartitionExecutor(
+ Partition partition,
+ List<TSDataType> dataTypes,
+ int partitionStart,
+ List<Integer> outputChannels,
+ List<WindowFunction> windowFunctions,
+ List<FrameInfo> frameInfoList,
+ List<Integer> sortChannels) {
+ this.partition = partition;
+ this.partitionStart = partitionStart;
+ this.partitionEnd = partitionStart + this.partition.getPositionCount();
this.windowFunctions = ImmutableList.copyOf(windowFunctions);
this.frames = new ArrayList<>();
this.outputChannels = ImmutableList.copyOf(outputChannels);
- // Prepare for peer group comparing
List<TSDataType> sortDataTypes = new ArrayList<>();
for (int channel : sortChannels) {
TSDataType dataType = dataTypes.get(channel);
@@ -118,7 +142,6 @@ public final class PartitionExecutor {
peerGroupEnd - partitionStart - 1);
break;
default:
- // Unreachable
throw new UnsupportedOperationException("Unreachable!");
}
}
@@ -131,21 +154,15 @@ public final class PartitionExecutor {
}
public void processNextRow(TsBlockBuilder builder) {
- // Copy origin data
int index = currentPosition - partitionStart;
- Partition.PartitionIndex partitionIndex =
partition.getPartitionIndex(index);
- int tsBlockIndex = partitionIndex.getTsBlockIndex();
- int offsetInTsBlock = partitionIndex.getOffsetInTsBlock();
- TsBlock tsBlock = partition.getTsBlock(tsBlockIndex);
int channel = 0;
for (int i = 0; i < outputChannels.size(); i++) {
- Column column = tsBlock.getColumn(outputChannels.get(i));
ColumnBuilder columnBuilder = builder.getColumnBuilder(i);
- if (column.isNull(offsetInTsBlock)) {
+ if (partition.isNull(outputChannels.get(i), index)) {
columnBuilder.appendNull();
} else {
- columnBuilder.write(column, offsetInTsBlock);
+ partition.writeTo(columnBuilder, outputChannels.get(i), index);
}
channel++;
}