This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 26fdfebb2cb Fix tree ttl in table view scan
26fdfebb2cb is described below
commit 26fdfebb2cba40a49223a79610124eced73ee004
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jun 6 02:46:43 2025 +0800
Fix tree ttl in table view scan
---
.../recent/IoTDBTableViewWithTreeTTLQueryIT.java | 112 +++++++++++++++++++++
.../execution/operator/EmptyDataOperator.java | 81 +++++++++++++++
.../execution/operator/source/SeriesScanUtil.java | 20 ++--
.../relational/AbstractAggTableScanOperator.java | 4 +-
.../plan/planner/TableOperatorGenerator.java | 24 ++++-
.../planner/plan/parameter/SeriesScanOptions.java | 22 +++-
6 files changed, 250 insertions(+), 13 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/view/recent/IoTDBTableViewWithTreeTTLQueryIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/view/recent/IoTDBTableViewWithTreeTTLQueryIT.java
new file mode 100644
index 00000000000..10ae276b40f
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/view/recent/IoTDBTableViewWithTreeTTLQueryIT.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.query.view.recent;
+
+import org.apache.iotdb.isession.ITableSession;
+import org.apache.iotdb.isession.SessionDataSet;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+import org.apache.iotdb.rpc.IoTDBConnectionException;
+import org.apache.iotdb.rpc.StatementExecutionException;
+
+import org.apache.tsfile.read.common.RowRecord;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBTableViewWithTreeTTLQueryIT {
+
+ protected static final String DATABASE_NAME = "test";
+
+ protected static String[] createTreeDataSqls = {
+ "CREATE ALIGNED TIMESERIES root.db.battery.b1(voltage INT32, current
FLOAT)",
+ "INSERT INTO root.db.battery.b1(time, voltage, current) aligned values (1,
1, 1)",
+ "INSERT INTO root.db.battery.b1(time, voltage, current) aligned values (2,
1, 1)",
+ "INSERT INTO root.db.battery.b1(time, voltage, current) aligned values (3,
1, 1)",
+ "INSERT INTO root.db.battery.b1(time, voltage, current) aligned values (4,
1, 1)",
+ "INSERT INTO root.db.battery.b1(time, voltage, current) aligned values ("
+ + System.currentTimeMillis()
+ + ", 1, 1)",
+ "CREATE TIMESERIES root.db.battery.b2.voltage INT32",
+ "CREATE TIMESERIES root.db.battery.b2.current FLOAT",
+ "INSERT INTO root.db.battery.b2(time, voltage, current) values (1, 1, 1)",
+ "INSERT INTO root.db.battery.b2(time, voltage, current) values (2, 1, 1)",
+ "INSERT INTO root.db.battery.b2(time, voltage, current) values (3, 1, 1)",
+ "INSERT INTO root.db.battery.b2(time, voltage, current) values (4, 1, 1)",
+ "INSERT INTO root.db.battery.b2(time, voltage, current) values ("
+ + System.currentTimeMillis()
+ + ", 1, 1)",
+ "flush",
+ "set ttl to root.db.battery.** 100000"
+ };
+
+ protected static String[] createTableSqls = {
+ "CREATE DATABASE " + DATABASE_NAME,
+ "USE " + DATABASE_NAME,
+ "CREATE VIEW view1 (battery TAG, voltage INT32 FIELD, current FLOAT FIELD)
as root.db.battery.**",
+ };
+
+ @Before
+ public void setUp() throws Exception {
+ EnvFactory.getEnv().getConfig().getCommonConfig().setSortBufferSize(128 *
1024);
+
EnvFactory.getEnv().getConfig().getCommonConfig().setMaxTsBlockSizeInByte(4 *
1024);
+ EnvFactory.getEnv().initClusterEnvironment();
+ prepareData(createTreeDataSqls);
+ prepareTableData(createTableSqls);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void test() throws IoTDBConnectionException,
StatementExecutionException {
+ try (ITableSession session =
EnvFactory.getEnv().getTableSessionConnection()) {
+ session.executeNonQueryStatement("use " + DATABASE_NAME);
+ SessionDataSet sessionDataSet =
+ session.executeQueryStatement("select count(*) from view1 where
battery = 'b1'");
+ Assert.assertTrue(sessionDataSet.hasNext());
+ RowRecord record = sessionDataSet.next();
+ Assert.assertEquals(1, record.getField(0).getLongV());
+ Assert.assertFalse(sessionDataSet.hasNext());
+ sessionDataSet.close();
+
+ sessionDataSet = session.executeQueryStatement("select * from view1");
+ int count = 0;
+ while (sessionDataSet.hasNext()) {
+ sessionDataSet.next();
+ count++;
+ }
+ sessionDataSet.close();
+ Assert.assertEquals(2, count);
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/EmptyDataOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/EmptyDataOperator.java
new file mode 100644
index 00000000000..c36aa745bc4
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/EmptyDataOperator.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator;
+
+import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
+
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+public class EmptyDataOperator implements Operator {
+
+ private static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(EmptyDataOperator.class);
+
+ private final OperatorContext operatorContext;
+
+ public EmptyDataOperator(OperatorContext operatorContext) {
+ this.operatorContext = operatorContext;
+ }
+
+ @Override
+ public OperatorContext getOperatorContext() {
+ return operatorContext;
+ }
+
+ @Override
+ public TsBlock next() throws Exception {
+ return null;
+ }
+
+ @Override
+ public boolean hasNext() throws Exception {
+ return false;
+ }
+
+ @Override
+ public void close() throws Exception {}
+
+ @Override
+ public boolean isFinished() throws Exception {
+ return true;
+ }
+
+ @Override
+ public long calculateMaxPeekMemory() {
+ return 0;
+ }
+
+ @Override
+ public long calculateMaxReturnSize() {
+ return 0;
+ }
+
+ @Override
+ public long calculateRetainedSizeAfterCallingNext() {
+ return 0;
+ }
+
+ @Override
+ public long ramBytesUsed() {
+ return INSTANCE_SIZE
+ +
MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
index cf7e633ec26..3de6fc253b6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java
@@ -40,6 +40,7 @@ import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.IMetadata;
import org.apache.tsfile.file.metadata.ITimeSeriesMetadata;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.read.TimeValuePair;
import org.apache.tsfile.read.common.block.TsBlock;
@@ -73,6 +74,7 @@ import static
org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet.BUI
public class SeriesScanUtil implements Accountable {
+ public static final StringArrayDeviceID EMPTY_DEVICE_ID = new
StringArrayDeviceID("");
protected final FragmentInstanceContext context;
// The path of the target series which will be scanned.
@@ -179,17 +181,19 @@ public class SeriesScanUtil implements Accountable {
this.dataSource = dataSource;
// updated filter concerning TTL
- long ttl;
- // Only the data in the table model needs to retain rows where all value
- // columns are null values, so we can use isIgnoreAllNullRows to
- // differentiate the data of tree model and table model.
- if (context.isIgnoreAllNullRows()) {
- ttl = DataNodeTTLCache.getInstance().getTTLForTree(deviceID);
- scanOptions.setTTL(ttl);
+ // IgnoreAllNullRows is false indicating that the current query is a table
model query.
+ // In most cases, We can use this condition to determine from which model
to obtain the ttl
+ // of the current device. However, it should be noted that for tree model
data queried using
+ // table view, ttl also needs to be obtained from the tree model.
+ if (context.isIgnoreAllNullRows() ||
scanOptions.isTableViewForTreeModel()) {
+ if (deviceID != EMPTY_DEVICE_ID) {
+ long ttl = DataNodeTTLCache.getInstance().getTTLForTree(deviceID);
+ scanOptions.setTTL(ttl);
+ }
} else {
if (scanOptions.timeFilterNeedUpdatedByTll()) {
String databaseName = dataSource.getDatabaseName();
- ttl =
+ long ttl =
databaseName == null
? Long.MAX_VALUE
: DataNodeTTLCache.getInstance()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
index 8570817ce7c..dd96578ab92 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/AbstractAggTableScanOperator.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.I
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
import
org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanUtil;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanUtil;
import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator;
import org.apache.iotdb.db.queryengine.execution.operator.window.IWindow;
import org.apache.iotdb.db.queryengine.execution.operator.window.TimeWindow;
@@ -42,7 +43,6 @@ import org.apache.tsfile.block.column.ColumnBuilder;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.file.metadata.StringArrayDeviceID;
import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.file.metadata.statistics.StringStatistics;
import org.apache.tsfile.read.common.TimeRange;
@@ -174,7 +174,7 @@ public abstract class AbstractAggTableScanOperator extends
AbstractDataSourceOpe
if (this.deviceEntries.isEmpty() ||
this.deviceEntries.get(this.currentDeviceIndex) == null) {
// for device which is not exist
- deviceEntry = new AlignedDeviceEntry(new StringArrayDeviceID(""), new
Binary[0]);
+ deviceEntry = new AlignedDeviceEntry(SeriesScanUtil.EMPTY_DEVICE_ID, new
Binary[0]);
} else {
deviceEntry = this.deviceEntries.get(this.currentDeviceIndex);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 9b976f7f9df..82a7adc131e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -40,6 +40,7 @@ import
org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannel
import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISinkHandle;
import
org.apache.iotdb.db.queryengine.execution.exchange.sink.ShuffleSinkHandle;
import org.apache.iotdb.db.queryengine.execution.exchange.source.ISourceHandle;
+import org.apache.iotdb.db.queryengine.execution.operator.EmptyDataOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.ExplainAnalyzeOperator;
import org.apache.iotdb.db.queryengine.execution.operator.Operator;
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
@@ -198,6 +199,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctio
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TopKNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeAlignedDeviceViewScanNode;
+import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeDeviceViewScanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TreeNonAlignedDeviceViewScanNode;
import
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode;
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.WindowNode;
@@ -610,7 +612,9 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
node.getTimePredicate()
.map(expression -> getSeriesScanOptionsBuilder(context,
expression))
.orElseGet(SeriesScanOptions.Builder::new);
- builder.withAllSensors(new HashSet<>(measurementColumnNames));
+ builder
+ .withIsTableViewForTreeModel(true)
+ .withAllSensors(new HashSet<>(measurementColumnNames));
if (pushDownPredicateForCurrentMeasurement != null) {
builder.withPushDownFilter(
convertPredicateToFilter(
@@ -1075,6 +1079,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
node.isPushLimitToEachDevice(),
node.getPushDownPredicate());
seriesScanOptions.setTTLForTableView(viewTTL);
+ seriesScanOptions.setIsTableViewForTreeModel(node instanceof
TreeDeviceViewScanNode);
OperatorContext operatorContext =
context
@@ -1119,6 +1124,22 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
Long.MAX_VALUE);
}
+ @Override
+ public Operator visitTreeDeviceViewScan(
+ TreeDeviceViewScanNode node, LocalExecutionPlanContext context) {
+ if (node.getDeviceEntries().isEmpty() || node.getTreeDBName() == null) {
+ OperatorContext operatorContext =
+ context
+ .getDriverContext()
+ .addOperatorContext(
+ context.getNextOperatorId(),
+ node.getPlanNodeId(),
+ EmptyDataOperator.class.getSimpleName());
+ return new EmptyDataOperator(operatorContext);
+ }
+ throw new IllegalArgumentException("Valid TreeDeviceViewScanNode is not
expected here.");
+ }
+
@Override
public Operator visitDeviceTableScan(
DeviceTableScanNode node, LocalExecutionPlanContext context) {
@@ -2872,6 +2893,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
node.isPushLimitToEachDevice(),
node.getPushDownPredicate());
seriesScanOptions.setTTLForTableView(tableViewTTL);
+ seriesScanOptions.setIsTableViewForTreeModel(node instanceof
AggregationTreeDeviceViewScanNode);
Set<String> allSensors = new HashSet<>(measurementColumnNames);
allSensors.add(""); // for time column
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
index 319ae5a214c..dc74df6d2f5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/parameter/SeriesScanOptions.java
@@ -49,6 +49,7 @@ public class SeriesScanOptions {
private final boolean pushLimitToEachDevice;
private PaginationController paginationController;
+ private boolean isTableViewForTreeModel;
private long ttlForTableView = Long.MAX_VALUE;
public SeriesScanOptions(
@@ -57,13 +58,15 @@ public class SeriesScanOptions {
long pushDownLimit,
long pushDownOffset,
Set<String> allSensors,
- boolean pushLimitToEachDevice) {
+ boolean pushLimitToEachDevice,
+ boolean isTableViewForTreeModel) {
this.globalTimeFilter = globalTimeFilter;
this.pushDownFilter = pushDownFilter;
this.pushDownLimit = pushDownLimit;
this.pushDownOffset = pushDownOffset;
this.allSensors = allSensors;
this.pushLimitToEachDevice = pushLimitToEachDevice;
+ this.isTableViewForTreeModel = isTableViewForTreeModel;
}
public static SeriesScanOptions getDefaultSeriesScanOptions(IFullPath
seriesPath) {
@@ -145,6 +148,14 @@ public class SeriesScanOptions {
return filter;
}
+ public boolean isTableViewForTreeModel() {
+ return isTableViewForTreeModel;
+ }
+
+ public void setIsTableViewForTreeModel(boolean isTableViewForTreeModel) {
+ this.isTableViewForTreeModel = isTableViewForTreeModel;
+ }
+
/**
* pushLimitToEachDevice==false means that all devices return total limit
rows.
*
@@ -166,6 +177,7 @@ public class SeriesScanOptions {
private Set<String> allSensors;
private boolean pushLimitToEachDevice = true;
+ private boolean isTableViewForTreeModel = false;
public Builder withGlobalTimeFilter(Filter globalTimeFilter) {
this.globalTimeFilter = globalTimeFilter;
@@ -192,6 +204,11 @@ public class SeriesScanOptions {
return this;
}
+ public Builder withIsTableViewForTreeModel(boolean
isTableViewForTreeModel) {
+ this.isTableViewForTreeModel = isTableViewForTreeModel;
+ return this;
+ }
+
public void withAllSensors(Set<String> allSensors) {
this.allSensors = allSensors;
}
@@ -203,7 +220,8 @@ public class SeriesScanOptions {
pushDownLimit,
pushDownOffset,
allSensors,
- pushLimitToEachDevice);
+ pushLimitToEachDevice,
+ isTableViewForTreeModel);
}
}
}