This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new ea7e2a68b92 Fix cast of TimeColumn after deserialize
ea7e2a68b92 is described below
commit ea7e2a68b92cce1a16e14e4dfc66d7870ea1f9f9
Author: Weihao Li <[email protected]>
AuthorDate: Mon Oct 13 10:10:17 2025 +0800
Fix cast of TimeColumn after deserialize
---
.../it/udf/IoTDBUDFIntermediateBlockSerdeIT.java | 74 ++++++++++++++++++++++
.../org/apache/iotdb/db/conf/IoTDBDescriptor.java | 44 +++++++------
.../execution/operator/window/SessionWindow.java | 13 ++--
.../execution/operator/window/TimeWindow.java | 9 ++-
.../dag/intermediate/MultiInputLayer.java | 35 +++++-----
.../SingleInputMultiReferenceLayer.java | 27 ++++----
.../SingleInputSingleReferenceLayer.java | 27 ++++----
.../transformation/dag/udf/UDTFExecutor.java | 2 +-
.../transformation/dag/util/LayerCacheUtils.java | 5 +-
.../transformation/datastructure/TVColumns.java | 7 +-
.../tv/ElasticSerializableBinaryTVList.java | 3 +-
.../datastructure/tv/SerializableTVList.java | 6 +-
12 files changed, 160 insertions(+), 92 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFIntermediateBlockSerdeIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFIntermediateBlockSerdeIT.java
new file mode 100644
index 00000000000..06464debd9e
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/udf/IoTDBUDFIntermediateBlockSerdeIT.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.udf;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+import static
org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualWithDescOrderTest;
+import static org.apache.iotdb.itbase.constant.TestConstant.TIMESTAMP_STR;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBUDFIntermediateBlockSerdeIT {
+ private static final String[] SQLs =
+ new String[] {
+ "insert into root.sg.d1(time, s1) values (1,1)",
+ "insert into root.sg.d1(time, s1) values (2,2)",
+ "insert into root.sg.d1(time, s1) values (3,3)",
+ "insert into root.sg.d1(time, s1) values (4,4)",
+ "insert into root.sg.d1(time, s1) values (5,5)",
+ "insert into root.sg.d1(time, s1) values (6,6)"
+ };
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+
EnvFactory.getEnv().getConfig().getCommonConfig().setUdfMemoryBudgetInMB(0.0001f);
+ EnvFactory.getEnv().initClusterEnvironment();
+ prepareData(SQLs);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testM4() {
+ String[] expectedHeader =
+ new String[] {
+ TIMESTAMP_STR, "EQUAL_SIZE_BUCKET_M4_SAMPLE(root.sg.d1.s1,
\"proportion\"=\"1\")"
+ };
+ String[] retArray = new String[] {"1,1.0,", "2,2.0,", "3,3.0,", "4,4.0,",
"5,5.0,", "6,6.0,"};
+ resultSetEqualWithDescOrderTest(
+ "select EQUAL_SIZE_BUCKET_M4_SAMPLE(s1,'proportion'='1') from
root.sg.d1",
+ expectedHeader,
+ retArray);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index c5adbf2512f..d04d24f722c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2468,27 +2468,31 @@ public class IoTDBDescriptor {
String readerTransformerCollectorMemoryProportion =
properties.getProperty("udf_reader_transformer_collector_memory_proportion");
+ String[] proportions;
if (readerTransformerCollectorMemoryProportion != null) {
- String[] proportions =
readerTransformerCollectorMemoryProportion.split(":");
- int proportionSum = 0;
- for (String proportion : proportions) {
- proportionSum += Integer.parseInt(proportion.trim());
- }
- float maxMemoryAvailable = conf.getUdfMemoryBudgetInMB();
- try {
- conf.setUdfReaderMemoryBudgetInMB(
- maxMemoryAvailable * Integer.parseInt(proportions[0].trim()) /
proportionSum);
- conf.setUdfTransformerMemoryBudgetInMB(
- maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) /
proportionSum);
- conf.setUdfCollectorMemoryBudgetInMB(
- maxMemoryAvailable * Integer.parseInt(proportions[2].trim()) /
proportionSum);
- } catch (Exception e) {
- throw new RuntimeException(
- "Each subsection of configuration item
udf_reader_transformer_collector_memory_proportion"
- + " should be an integer, which is "
- + readerTransformerCollectorMemoryProportion,
- e);
- }
+ proportions = readerTransformerCollectorMemoryProportion.split(":");
+ } else {
+ // Make the default proportion is 1:1:1
+ proportions = new String[] {"1", "1", "1"};
+ }
+ int proportionSum = 0;
+ for (String proportion : proportions) {
+ proportionSum += Integer.parseInt(proportion.trim());
+ }
+ float maxMemoryAvailable = conf.getUdfMemoryBudgetInMB();
+ try {
+ conf.setUdfReaderMemoryBudgetInMB(
+ maxMemoryAvailable * Integer.parseInt(proportions[0].trim()) /
proportionSum);
+ conf.setUdfTransformerMemoryBudgetInMB(
+ maxMemoryAvailable * Integer.parseInt(proportions[1].trim()) /
proportionSum);
+ conf.setUdfCollectorMemoryBudgetInMB(
+ maxMemoryAvailable * Integer.parseInt(proportions[2].trim()) /
proportionSum);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Each subsection of configuration item
udf_reader_transformer_collector_memory_proportion"
+ + " should be an integer, which is "
+ + readerTransformerCollectorMemoryProportion,
+ e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindow.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindow.java
index 5c28c05b3ec..4c5e004c60d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindow.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/SessionWindow.java
@@ -21,7 +21,6 @@ package
org.apache.iotdb.db.queryengine.execution.operator.window;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.read.common.block.column.TimeColumn;
public class SessionWindow implements IWindow {
@@ -87,19 +86,19 @@ public class SessionWindow implements IWindow {
@Override
public boolean contains(Column column) {
- TimeColumn timeColumn = (TimeColumn) column;
-
- long minTime = Math.min(timeColumn.getStartTime(),
timeColumn.getEndTime());
- long maxTime = Math.max(timeColumn.getStartTime(),
timeColumn.getEndTime());
+ long columnStartTime = column.getLong(0);
+ long columnEndTime = column.getLong(column.getPositionCount() - 1);
+ long minTime = Math.min(columnStartTime, columnEndTime);
+ long maxTime = Math.max(columnStartTime, columnEndTime);
boolean contains =
- Math.abs(column.getLong(0) - lastTsBlockTime) < timeInterval
+ Math.abs(columnStartTime - lastTsBlockTime) < timeInterval
&& maxTime - minTime <= timeInterval;
if (contains) {
if (!initializedTimeValue) {
startTime = Long.MAX_VALUE;
endTime = Long.MIN_VALUE;
- lastTsBlockTime = column.getLong(0);
+ lastTsBlockTime = columnStartTime;
timeValue = ascending ? maxTime : minTime;
initializedTimeValue = true;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/TimeWindow.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/TimeWindow.java
index 0a867e69e0d..5500d23d730 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/TimeWindow.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/window/TimeWindow.java
@@ -22,7 +22,6 @@ package
org.apache.iotdb.db.queryengine.execution.operator.window;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.read.common.block.TsBlock;
-import org.apache.tsfile.read.common.block.column.TimeColumn;
public class TimeWindow implements IWindow {
@@ -62,10 +61,10 @@ public class TimeWindow implements IWindow {
@Override
public boolean contains(Column column) {
- TimeColumn timeColumn = (TimeColumn) column;
-
- long minTime = Math.min(timeColumn.getStartTime(),
timeColumn.getEndTime());
- long maxTime = Math.max(timeColumn.getStartTime(),
timeColumn.getEndTime());
+ long startTime = column.getLong(0);
+ long endTime = column.getLong(column.getPositionCount() - 1);
+ long minTime = Math.min(startTime, endTime);
+ long maxTime = Math.max(startTime, endTime);
return curTimeRange.contains(minTime, maxTime);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/intermediate/MultiInputLayer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/intermediate/MultiInputLayer.java
index 930d7d622a4..78e0c7451b7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/intermediate/MultiInputLayer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/intermediate/MultiInputLayer.java
@@ -42,7 +42,6 @@ import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.TsBlockBuilder;
-import org.apache.tsfile.read.common.block.column.TimeColumn;
import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -143,8 +142,8 @@ public class MultiInputLayer extends IntermediateLayer
implements IUDFInputDataS
if (layerReaders[i].isConstantPointReader()) {
inputTVColumnsList[i] = new TVColumns(columns[0]);
} else {
- inputTVColumnsList[i] = new TVColumns((TimeColumn) columns[1],
columns[0]);
- timeHeap.add(((TimeColumn) columns[1]).getStartTime());
+ inputTVColumnsList[i] = new TVColumns(columns[1], columns[0]);
+ timeHeap.add(columns[1].getLong(0));
}
currentConsumedIndexes[i] = 0;
@@ -354,10 +353,10 @@ public class MultiInputLayer extends IntermediateLayer
implements IUDFInputDataS
private long currentEndTime = Long.MAX_VALUE;
private final RowListForwardIterator beginIterator =
rowRecordList.constructIterator();
- private TimeColumn cachedBeginTimeColumn;
+ private Column cachedBeginTimeColumn;
private int cachedBeginConsumed;
- private TimeColumn cachedEndTimeColumn;
+ private Column cachedEndTimeColumn;
private int cachedEndConsumed;
@Override
@@ -370,7 +369,7 @@ public class MultiInputLayer extends IntermediateLayer
implements IUDFInputDataS
}
Column[] columns = udfInputDataSet.currentBlock();
- TimeColumn times = (TimeColumn) columns[columns.length - 1];
+ Column times = columns[columns.length - 1];
rowRecordList.put(columns);
@@ -379,11 +378,12 @@ public class MultiInputLayer extends IntermediateLayer
implements IUDFInputDataS
if (nextWindowTimeBegin == Long.MIN_VALUE) {
// display window begin should be set to the same as the min
timestamp of the query
// result set
- nextWindowTimeBegin = cachedEndTimeColumn.getStartTime();
+ nextWindowTimeBegin = cachedEndTimeColumn.getLong(0);
}
hasAtLeastOneRow = rowRecordList.size() != 0;
if (hasAtLeastOneRow) {
- currentEndTime = cachedEndTimeColumn.getEndTime();
+ currentEndTime =
+
cachedEndTimeColumn.getLong(cachedEndTimeColumn.getPositionCount() - 1);
}
isFirstIteration = false;
}
@@ -407,10 +407,10 @@ public class MultiInputLayer extends IntermediateLayer
implements IUDFInputDataS
}
// Generate data
Column[] columns = udfInputDataSet.currentBlock();
- TimeColumn times = (TimeColumn) columns[columns.length - 1];
+ Column times = columns[columns.length - 1];
// Put data into container
rowRecordList.put(columns);
- currentEndTime = times.getEndTime();
+ currentEndTime = times.getLong(times.getPositionCount() - 1);
// Increase nextIndexEnd
nextIndexEnd += cachedEndTimeColumn.getPositionCount() -
cachedEndConsumed;
// Update cache
@@ -446,7 +446,7 @@ public class MultiInputLayer extends IntermediateLayer
implements IUDFInputDataS
cachedBeginConsumed = 0;
Column[] columns = beginIterator.currentBlock();
- cachedBeginTimeColumn = (TimeColumn) columns[columns.length - 1];
+ cachedBeginTimeColumn = columns[columns.length - 1];
} else {
// No more data
// Set nextIndexBegin to list's size
@@ -456,7 +456,8 @@ public class MultiInputLayer extends IntermediateLayer
implements IUDFInputDataS
}
if ((nextIndexEnd == nextIndexBegin)
- && nextWindowTimeEnd < cachedEndTimeColumn.getEndTime()) {
+ && nextWindowTimeEnd
+ <
cachedEndTimeColumn.getLong(cachedEndTimeColumn.getPositionCount() - 1)) {
window.setEmptyWindow(nextWindowTimeBegin, nextWindowTimeEnd);
return YieldableState.YIELDABLE;
}
@@ -514,7 +515,7 @@ public class MultiInputLayer extends IntermediateLayer
implements IUDFInputDataS
private int nextIndexBegin = 0;
private int nextIndexEnd = 0;
- private TimeColumn cachedTimes;
+ private Column cachedTimes;
private int cachedConsumed;
@Override
@@ -557,7 +558,7 @@ public class MultiInputLayer extends IntermediateLayer
implements IUDFInputDataS
}
if (!findWindow) {
- if (cachedTimes.getEndTime() < displayWindowEnd) {
+ if (cachedTimes.getLong(cachedTimes.getPositionCount() - 1) <
displayWindowEnd) {
YieldableState state = yieldAndCache();
if (state == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
@@ -586,12 +587,12 @@ public class MultiInputLayer extends IntermediateLayer
implements IUDFInputDataS
}
}
// Initialize essential information
- nextWindowTimeBegin = Math.max(displayWindowBegin,
cachedTimes.getStartTime());
+ nextWindowTimeBegin = Math.max(displayWindowBegin,
cachedTimes.getLong(0));
hasAtLeastOneRow = rowRecordList.size() != 0;
isFirstIteration = false;
// Set initial nextIndexBegin
- long currentEndTime = cachedTimes.getEndTime();
+ long currentEndTime =
cachedTimes.getLong(cachedTimes.getPositionCount() - 1);
// Find corresponding block
while (currentEndTime < nextWindowTimeBegin) {
// Consume all data
@@ -625,7 +626,7 @@ public class MultiInputLayer extends IntermediateLayer
implements IUDFInputDataS
return state;
}
Column[] columns = udfInputDataSet.currentBlock();
- TimeColumn times = (TimeColumn) columns[columns.length - 1];
+ Column times = columns[columns.length - 1];
rowRecordList.put(columns);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/intermediate/SingleInputMultiReferenceLayer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/intermediate/SingleInputMultiReferenceLayer.java
index 99814831b96..2bf5f2b3eb8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/intermediate/SingleInputMultiReferenceLayer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/intermediate/SingleInputMultiReferenceLayer.java
@@ -39,7 +39,6 @@ import
org.apache.iotdb.udf.api.customizer.strategy.StateWindowAccessStrategy;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.read.common.block.column.TimeColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -258,7 +257,7 @@ public class SingleInputMultiReferenceLayer extends
IntermediateLayer {
}
Column[] columns = parentLayerReader.current();
- TimeColumn times = (TimeColumn) columns[1];
+ Column times = columns[1];
Column values = columns[0];
tvList.putColumn(times, values);
@@ -298,12 +297,12 @@ public class SingleInputMultiReferenceLayer extends
IntermediateLayer {
}
// Generate data
Column[] columns = parentLayerReader.current();
- TimeColumn times = (TimeColumn) columns[1];
+ Column times = columns[1];
Column values = columns[0];
// Put data into container
tvList.putColumn(times, values);
parentLayerReader.consumedAll();
- currentEndTime = times.getEndTime();
+ currentEndTime = times.getLong(times.getPositionCount() - 1);
// Increase nextIndexEnd
nextIndexEnd += cachedEndTimeColumn.getPositionCount() -
cachedEndConsumed;
// Update cache
@@ -406,7 +405,7 @@ public class SingleInputMultiReferenceLayer extends
IntermediateLayer {
private int nextIndexBegin = 0;
private int nextIndexEnd = 0;
- private TimeColumn cachedTimes;
+ private Column cachedTimes;
private int cachedConsumed;
@Override
@@ -449,7 +448,7 @@ public class SingleInputMultiReferenceLayer extends
IntermediateLayer {
}
if (!findWindow) {
- if (cachedTimes.getEndTime() < displayWindowEnd) {
+ if (cachedTimes.getLong(cachedTimes.getPositionCount() - 1) <
displayWindowEnd) {
YieldableState state = yieldAndCache();
if (state == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
@@ -478,12 +477,12 @@ public class SingleInputMultiReferenceLayer extends
IntermediateLayer {
}
}
// Initialize essential information
- nextWindowTimeBegin = Math.max(displayWindowBegin,
cachedTimes.getStartTime());
+ nextWindowTimeBegin = Math.max(displayWindowBegin,
cachedTimes.getLong(0));
hasAtLeastOneRow = tvList.size() != 0;
isFirstIteration = false;
// Set initial nextIndexBegin
- long currentEndTime = cachedTimes.getEndTime();
+ long currentEndTime =
cachedTimes.getLong(cachedTimes.getPositionCount() - 1);
// Find corresponding block
while (currentEndTime < nextWindowTimeBegin) {
// Consume all data
@@ -517,7 +516,7 @@ public class SingleInputMultiReferenceLayer extends
IntermediateLayer {
return state;
}
Column[] columns = parentLayerReader.current();
- TimeColumn times = (TimeColumn) columns[1];
+ Column times = columns[1];
Column values = columns[0];
tvList.putColumn(times, values);
@@ -572,7 +571,7 @@ public class SingleInputMultiReferenceLayer extends
IntermediateLayer {
private int nextIndexBegin = 0;
private int nextIndexEnd = 0;
- private TimeColumn cachedTimes;
+ private Column cachedTimes;
private Column cachedValues;
private int cachedConsumed;
@@ -617,7 +616,7 @@ public class SingleInputMultiReferenceLayer extends
IntermediateLayer {
}
if (!findWindow) {
- if (cachedTimes.getEndTime() < displayWindowEnd) {
+ if (cachedTimes.getLong(cachedTimes.getPositionCount() - 1) <
displayWindowEnd) {
YieldableState state = yieldAndCache();
if (state == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
@@ -646,12 +645,12 @@ public class SingleInputMultiReferenceLayer extends
IntermediateLayer {
}
}
// Initialize essential information
- nextWindowTimeBegin = Math.max(displayWindowBegin,
cachedTimes.getStartTime());
+ nextWindowTimeBegin = Math.max(displayWindowBegin,
cachedTimes.getLong(0));
hasAtLeastOneRow = tvList.size() != 0;
isFirstIteration = false;
// Set initial nextIndexBegin
- long currentEndTime = cachedTimes.getEndTime();
+ long currentEndTime =
cachedTimes.getLong(cachedTimes.getPositionCount() - 1);
// Find corresponding block
while (currentEndTime < nextWindowTimeBegin) {
// Consume all data
@@ -685,7 +684,7 @@ public class SingleInputMultiReferenceLayer extends
IntermediateLayer {
return state;
}
Column[] columns = parentLayerReader.current();
- TimeColumn times = (TimeColumn) columns[1];
+ Column times = columns[1];
Column values = columns[0];
tvList.putColumn(times, values);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/intermediate/SingleInputSingleReferenceLayer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/intermediate/SingleInputSingleReferenceLayer.java
index dadcb8a2271..e7d67c36349 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/intermediate/SingleInputSingleReferenceLayer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/intermediate/SingleInputSingleReferenceLayer.java
@@ -37,7 +37,6 @@ import
org.apache.iotdb.udf.api.customizer.strategy.StateWindowAccessStrategy;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.read.common.block.column.TimeColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -181,7 +180,7 @@ public class SingleInputSingleReferenceLayer extends
IntermediateLayer {
}
Column[] columns = parentLayerReader.current();
- TimeColumn times = (TimeColumn) columns[1];
+ Column times = columns[1];
Column values = columns[0];
tvList.putColumn(times, values);
@@ -221,12 +220,12 @@ public class SingleInputSingleReferenceLayer extends
IntermediateLayer {
}
// Generate data
Column[] columns = parentLayerReader.current();
- TimeColumn times = (TimeColumn) columns[1];
+ Column times = columns[1];
Column values = columns[0];
// Put data into container
tvList.putColumn(times, values);
parentLayerReader.consumedAll();
- currentEndTime = times.getEndTime();
+ currentEndTime = times.getLong(times.getPositionCount() - 1);
// Increase nextIndexEnd
nextIndexEnd += cachedEndTimeColumn.getPositionCount() -
cachedEndConsumed;
// Update cache
@@ -330,7 +329,7 @@ public class SingleInputSingleReferenceLayer extends
IntermediateLayer {
private int nextIndexBegin = 0;
private int nextIndexEnd = 0;
- private TimeColumn cachedTimes;
+ private Column cachedTimes;
private int cachedConsumed;
@Override
@@ -373,7 +372,7 @@ public class SingleInputSingleReferenceLayer extends
IntermediateLayer {
}
if (!findWindow) {
- if (cachedTimes.getEndTime() < displayWindowEnd) {
+ if (cachedTimes.getLong(cachedTimes.getPositionCount() - 1) <
displayWindowEnd) {
YieldableState state = yieldAndCache();
if (state == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
@@ -402,12 +401,12 @@ public class SingleInputSingleReferenceLayer extends
IntermediateLayer {
}
}
// Initialize essential information
- nextWindowTimeBegin = Math.max(displayWindowBegin,
cachedTimes.getStartTime());
+ nextWindowTimeBegin = Math.max(displayWindowBegin,
cachedTimes.getLong(0));
hasAtLeastOneRow = tvList.size() != 0;
isFirstIteration = false;
// Set initial nextIndexBegin
- long currentEndTime = cachedTimes.getEndTime();
+ long currentEndTime =
cachedTimes.getLong(cachedTimes.getPositionCount() - 1);
// Find corresponding block
while (currentEndTime < nextWindowTimeBegin) {
// Consume all data
@@ -441,7 +440,7 @@ public class SingleInputSingleReferenceLayer extends
IntermediateLayer {
return state;
}
Column[] columns = parentLayerReader.current();
- TimeColumn times = (TimeColumn) columns[1];
+ Column times = columns[1];
Column values = columns[0];
tvList.putColumn(times, values);
@@ -497,7 +496,7 @@ public class SingleInputSingleReferenceLayer extends
IntermediateLayer {
private int nextIndexBegin = 0;
private int nextIndexEnd = 0;
- private TimeColumn cachedTimes;
+ private Column cachedTimes;
private Column cachedValues;
private int cachedConsumed;
@@ -542,7 +541,7 @@ public class SingleInputSingleReferenceLayer extends
IntermediateLayer {
}
if (!findWindow) {
- if (cachedTimes.getEndTime() < displayWindowEnd) {
+ if (cachedTimes.getLong(cachedTimes.getPositionCount() - 1) <
displayWindowEnd) {
YieldableState state = yieldAndCache();
if (state == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
return YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA;
@@ -571,12 +570,12 @@ public class SingleInputSingleReferenceLayer extends
IntermediateLayer {
}
}
// Initialize essential information
- nextWindowTimeBegin = Math.max(displayWindowBegin,
cachedTimes.getStartTime());
+ nextWindowTimeBegin = Math.max(displayWindowBegin,
cachedTimes.getLong(0));
hasAtLeastOneRow = tvList.size() != 0;
isFirstIteration = false;
// Set initial nextIndexBegin
- long currentEndTime = cachedTimes.getEndTime();
+ long currentEndTime =
cachedTimes.getLong(cachedTimes.getPositionCount() - 1);
// Find corresponding block
while (currentEndTime < nextWindowTimeBegin) {
// Consume all data
@@ -610,7 +609,7 @@ public class SingleInputSingleReferenceLayer extends
IntermediateLayer {
return state;
}
Column[] columns = parentLayerReader.current();
- TimeColumn times = (TimeColumn) columns[1];
+ Column times = columns[1];
Column values = columns[0];
tvList.putColumn(times, values);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFExecutor.java
index 00d91ccd696..f7a31fc00f1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/udf/UDTFExecutor.java
@@ -217,7 +217,7 @@ public class UDTFExecutor {
// Some UDTF only generate data in terminate method
if (timeColumn.getPositionCount() != 0) {
cachedColumns = new Column[] {valueColumn, timeColumn};
- outputStorage.putColumn((TimeColumn) timeColumn, valueColumn);
+ outputStorage.putColumn(timeColumn, valueColumn);
} else {
cachedColumns = null;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/util/LayerCacheUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/util/LayerCacheUtils.java
index 56b2a81c802..e63b01403c2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/util/LayerCacheUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/dag/util/LayerCacheUtils.java
@@ -26,7 +26,6 @@ import
org.apache.iotdb.db.queryengine.transformation.datastructure.row.ElasticS
import
org.apache.iotdb.db.queryengine.transformation.datastructure.tv.ElasticSerializableTVList;
import org.apache.tsfile.block.column.Column;
-import org.apache.tsfile.read.common.block.column.TimeColumn;
public class LayerCacheUtils {
@@ -43,7 +42,7 @@ public class LayerCacheUtils {
// First column is the value column;
// Second column is always the time column.
Column[] columns = source.current();
- target.putColumn((TimeColumn) columns[1], columns[0]);
+ target.putColumn(columns[1], columns[0]);
source.consumedAll();
return YieldableState.YIELDABLE;
@@ -58,7 +57,7 @@ public class LayerCacheUtils {
}
Column[] columns = source.current();
- target.putColumn((TimeColumn) columns[1], columns[0]);
+ target.putColumn(columns[1], columns[0]);
source.consumedAll();
int size = columns[0].getPositionCount();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/TVColumns.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/TVColumns.java
index 7d1ef5b7e96..b4e2d21f6fe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/TVColumns.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/TVColumns.java
@@ -20,14 +20,13 @@
package org.apache.iotdb.db.queryengine.transformation.datastructure;
import org.apache.tsfile.block.column.Column;
-import org.apache.tsfile.read.common.block.column.TimeColumn;
public class TVColumns {
private boolean isConstant;
- private TimeColumn timeColumn;
+ private Column timeColumn;
private Column valueColumn;
- public TVColumns(TimeColumn timeColumn, Column valueColumn) {
+ public TVColumns(Column timeColumn, Column valueColumn) {
this.timeColumn = timeColumn;
this.valueColumn = valueColumn;
isConstant = false;
@@ -54,7 +53,7 @@ public class TVColumns {
if (isConstant) {
throw new UnsupportedOperationException();
}
- return timeColumn.getEndTime();
+ return timeColumn.getLong(timeColumn.getPositionCount() - 1);
}
public Column getValueColumn() {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/tv/ElasticSerializableBinaryTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/tv/ElasticSerializableBinaryTVList.java
index e596a3662af..098997fe0e7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/tv/ElasticSerializableBinaryTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/tv/ElasticSerializableBinaryTVList.java
@@ -27,7 +27,6 @@ import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.column.BinaryColumnBuilder;
-import org.apache.tsfile.read.common.block.column.TimeColumn;
import org.apache.tsfile.read.common.block.column.TimeColumnBuilder;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BytesUtils;
@@ -140,7 +139,7 @@ public class ElasticSerializableBinaryTVList extends
ElasticSerializableTVList {
timeColumnBuilder.writeLong(i);
valueColumnBuilder.writeBinary(empty);
}
- TimeColumn timeColumn = (TimeColumn) timeColumnBuilder.build();
+ Column timeColumn = timeColumnBuilder.build();
Column valueColumn = valueColumnBuilder.build();
newESTVList.putColumn(timeColumn, valueColumn);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/tv/SerializableTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/tv/SerializableTVList.java
index d29926947fe..5f76cd46c3d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/tv/SerializableTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/transformation/datastructure/tv/SerializableTVList.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.db.queryengine.transformation.datastructure.SerializableList;
import org.apache.tsfile.block.column.Column;
-import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.column.TsBlockSerde;
@@ -103,10 +102,7 @@ public class SerializableTVList implements
SerializableList {
}
rowLength += ReadWriteIOUtils.BIT_LEN;
- int capacity =
- TSFileConfig.ARRAY_CAPACITY_THRESHOLD
- * (int)
- (memoryLimitInMB * MB / 2 / (rowLength *
TSFileConfig.ARRAY_CAPACITY_THRESHOLD));
+ int capacity = (int) (memoryLimitInMB * MB / 2 / (rowLength));
if (capacity <= 0) {
throw new RuntimeException("Memory is not enough for current query.");
}