This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 829e12ac34f Implement last cache optimize for query like select
last(s1), last(s1, time), last(s2), last(s2, time)
829e12ac34f is described below
commit 829e12ac34f159f11eb2c26b68be151bf87a2b42
Author: Weihao Li <[email protected]>
AuthorDate: Fri Aug 8 14:04:10 2025 +0800
Implement last cache optimize for query like select last(s1), last(s1,
time), last(s2), last(s2, time)
---
.../db/it/IoTDBMultiTAGsWithAttributesTableIT.java | 52 +++-
.../it/query/recent/IoTDBTableAggregationIT.java | 26 +-
.../relational/LastQueryAggTableScanOperator.java | 322 ++++++++++++++++++++-
.../relational/aggregation/AccumulatorFactory.java | 34 ++-
.../relational/aggregation/LastAccumulator.java | 10 +-
.../relational/aggregation/LastByAccumulator.java | 8 -
.../aggregation/LastByDescAccumulator.java | 35 ++-
.../aggregation/LastDescAccumulator.java | 19 +-
.../plan/planner/TableOperatorGenerator.java | 264 +++++++++++++----
.../fetcher/cache/TableDeviceSchemaCache.java | 2 +-
.../ir/GlobalTimePredicateExtractVisitor.java | 6 +
11 files changed, 661 insertions(+), 117 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
index 403077e3436..8dd8c8a343a 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiTAGsWithAttributesTableIT.java
@@ -160,6 +160,13 @@ public class IoTDBMultiTAGsWithAttributesTableIT {
"insert into tableD(time,device,value) values('2020-01-01
00:00:07.000', 'd2', '1970-01-01 00:00:00.000')"
};
+ private static final String[] sql6 =
+ new String[] {
+ "create table table6(device STRING TAG, s1 INT32 FIELD)",
+ "insert into table6 values(2,'d1',2)",
+ "delete from table6 where time >= 1"
+ };
+
String[] expectedHeader;
String[] retArray;
static String sql;
@@ -184,7 +191,7 @@ public class IoTDBMultiTAGsWithAttributesTableIT {
private static void insertData() {
try (Connection connection = EnvFactory.getEnv().getTableConnection();
Statement statement = connection.createStatement()) {
- for (String[] sqlList : Arrays.asList(sql1, sql2, sql3, sql4, sql5)) {
+ for (String[] sqlList : Arrays.asList(sql1, sql2, sql3, sql4, sql5,
sql6)) {
for (String sql : sqlList) {
statement.execute(sql);
}
@@ -2509,6 +2516,49 @@ public class IoTDBMultiTAGsWithAttributesTableIT {
"select level, attr1, device, attr2,
last_by(time,time),last_by(device,time),last_by(level,time),last_by(attr1,time),last_by(attr2,time),last(time),"
+
"last_by(num,time),last_by(bignum,time),last_by(floatnum,time),last_by(time,time),last_by(device,time),last_by(num,time)
from table0 where time>1971-04-26T17:46:40.000 group by attr1, device, attr2,
level order by device,level,attr1,attr2";
repeatTest(sql, expectedHeader, retArray, DATABASE_NAME, 3);
+
+ expectedHeader =
+ new String[] {"_col0", "_col1", "_col2", "_col3", "_col4", "_col5",
"_col6", "_col7"};
+ retArray =
+ new String[] {
+
"1971-08-20T11:33:20.000Z,d2,l5,null,null,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,"
+ };
+ sql =
+ "select last(time),last(device),last(level),last(attr1),last(attr2),"
+ + "last_by(time,num),last_by(time,bignum),last_by(time,floatnum)
from table0 where device='d2' and time>1971-04-26T17:46:40.000";
+ repeatTest(sql, expectedHeader, retArray, DATABASE_NAME, 2);
+
+ expectedHeader =
+ new String[] {
+ "level", "attr1", "device", "attr2", "_col4", "_col5", "_col6",
"_col7", "_col8", "_col9",
+ "_col10", "_col11"
+ };
+ retArray =
+ new String[] {
+
"l3,t,d1,a,1971-04-26T17:46:40.020Z,d1,l3,t,a,1971-04-26T17:46:40.020Z,1971-04-26T17:46:40.020Z,1971-04-26T17:46:40.020Z,",
+
"l4,null,d1,null,1971-04-26T18:01:40.000Z,d1,l4,null,null,1971-04-26T18:01:40.000Z,1971-04-26T18:01:40.000Z,1971-04-26T18:01:40.000Z,",
+
"l5,null,d1,null,1971-08-20T11:33:20.000Z,d1,l5,null,null,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,",
+
"l3,null,d2,null,1971-04-26T17:46:40.020Z,d2,l3,null,null,1971-04-26T17:46:40.020Z,1971-04-26T17:46:40.020Z,1971-04-26T17:46:40.020Z,",
+
"l4,null,d2,null,1971-04-26T18:01:40.000Z,d2,l4,null,null,1971-04-26T18:01:40.000Z,1971-04-26T18:01:40.000Z,1971-04-26T18:01:40.000Z,",
+
"l5,null,d2,null,1971-08-20T11:33:20.000Z,d2,l5,null,null,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,1971-08-20T11:33:20.000Z,",
+ };
+ sql =
+ "select level, attr1, device, attr2,
last(time),last(device),last(level),last(attr1),last(attr2),"
+ + "last_by(time,num),last_by(time,bignum),last_by(time,floatnum)
from table0 where time>1971-04-26T17:46:40.000 group by attr1, device, attr2,
level order by device,level,attr1,attr2";
+ repeatTest(sql, expectedHeader, retArray, DATABASE_NAME, 3);
+
+ expectedHeader = new String[] {"_col0", "_col1"};
+ retArray = new String[] {"null,null,"};
+ repeatTest(
+ "select last(time), last(s1) from table6", expectedHeader, retArray,
DATABASE_NAME, 3);
+
+ retArray = new String[] {};
+ repeatTest(
+ "select last(time), last(s1) from table6 group by device",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME,
+ 3);
}
@Test
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
index ec13fa95bbb..8d29a7f1f32 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBTableAggregationIT.java
@@ -2491,11 +2491,28 @@ public class IoTDBTableAggregationIT {
new String[] {
"2024-09-24T06:15:55.000Z,55,50000,40.0,55.0,false,shanghai_huangpu_red_A_d01_35,shanghai_huangpu_red_A_d01_40,0xcafebabe55,2024-09-24T06:15:55.000Z,2024-09-24,",
};
- tableResultSetEqualTest(
+ repeatTest(
"select
last(time),last(s1),last(s2),last(s3),last(s4),last(s5),last(s6),last(s7),last(s8),last(s9),last(s10)
from table1 where device_id = 'd01'",
expectedHeader,
retArray,
- DATABASE_NAME);
+ DATABASE_NAME,
+ 2);
+
+ expectedHeader =
+ new String[] {
+ "_col0", "_col1", "_col2", "_col3", "_col4", "_col5", "_col6",
"_col7", "_col8", "_col9",
+ "_col10", "_col11", "_col12"
+ };
+ retArray =
+ new String[] {
+
"2024-09-24T06:15:55.000Z,d01,red,55,2024-09-24T06:15:55.000Z,50000,2024-09-24T06:15:50.000Z,40.0,2024-09-24T06:15:40.000Z,55.0,2024-09-24T06:15:55.000Z,false,2024-09-24T06:15:50.000Z,",
+ };
+ repeatTest(
+ "select last(time),last(device_id),last(color),last(s1),last_by(time,
s1),last(s2),last_by(time, s2),last(s3),last_by(time,
s3),last(s4),last_by(time, s4),last(s5),last_by(time, s5) from table1 where
device_id = 'd01'",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME,
+ 2);
expectedHeader =
new String[] {
@@ -2656,11 +2673,12 @@ public class IoTDBTableAggregationIT {
"shanghai,shanghai,pudong,d07,2024-09-24T06:15:51.000Z,41,46000,51.0,46.0,false,shanghai_pudong_yellow_A_d07_51,shanghai_pudong_yellow_A_d07_46,0xcafebabe41,2024-09-24T06:15:51.000Z,2024-09-24,",
"shanghai,shanghai,pudong,d08,2024-09-24T06:15:55.000Z,55,40000,30.0,55.0,true,shanghai_pudong_yellow_B_d08_55,shanghai_pudong_yellow_B_d08_30,0xcafebabe55,2024-09-24T06:15:55.000Z,2024-09-24,",
};
- tableResultSetEqualTest(
+ repeatTest(
"select province,city,region,device_id,
last(time),last(s1),last(s2),last(s3),last(s4),last(s5),last(s6),last(s7),last(s8),last(s9),last(s10)
from table1 group by 1,2,3,4 order by 1,2,3,4",
expectedHeader,
retArray,
- DATABASE_NAME);
+ DATABASE_NAME,
+ 2);
}
@Test
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java
index df487f06c60..e734145e470 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java
@@ -48,8 +48,10 @@ import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
+import static com.google.common.base.Preconditions.checkState;
import static
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.Utils.serializeTimeValue;
import static
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE;
+import static
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.EMPTY_TIME_VALUE_PAIR;
import static
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager.getTSDataType;
/**
@@ -72,7 +74,8 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
private final boolean needUpdateCache;
private final boolean needUpdateNullEntry;
private final List<Integer> hitCachesIndexes;
- private final List<Pair<OptionalLong, TsPrimitiveType[]>> hitCachedResults;
+ private final List<Pair<OptionalLong, TsPrimitiveType[]>>
lastRowCacheResults;
+ private final List<TimeValuePair[]> lastValuesCacheResults;
private int currentHitCacheIndex = 0;
// indicates the index of last(time) aggregation
@@ -83,7 +86,8 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
List<DeviceEntry> cachedDeviceEntries,
QualifiedObjectName qualifiedObjectName,
List<Integer> hitCachesIndexes,
- List<Pair<OptionalLong, TsPrimitiveType[]>> hitCachedResults) {
+ List<Pair<OptionalLong, TsPrimitiveType[]>> lastRowCacheResults,
+ List<TimeValuePair[]> lastValuesCacheResults) {
super(parameter);
@@ -95,7 +99,8 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
this.needUpdateNullEntry =
LastQueryUtil.needUpdateNullEntry(parameter.seriesScanOptions.getGlobalTimeFilter());
this.hitCachesIndexes = hitCachesIndexes;
- this.hitCachedResults = hitCachedResults;
+ this.lastRowCacheResults = lastRowCacheResults;
+ this.lastValuesCacheResults = lastValuesCacheResults;
this.dbName = qualifiedObjectName.getDatabaseName();
this.operatorContext.recordSpecifiedInfo(
@@ -144,7 +149,12 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
if (currentHitCacheIndex < hitCachesIndexes.size()
&& outputDeviceIndex == hitCachesIndexes.get(currentHitCacheIndex)) {
currentDeviceEntry = cachedDeviceEntries.get(currentHitCacheIndex);
- buildResultUseLastCache();
+ if (lastRowCacheResults != null) {
+ buildResultUseLastRowCache();
+ } else {
+ checkState(lastValuesCacheResults != null, "lastValuesCacheResults
shouldn't be null here");
+ buildResultUseLastValuesCache();
+ }
return;
}
@@ -154,10 +164,10 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
}
}
- private void buildResultUseLastCache() {
+ private void buildResultUseLastRowCache() {
appendGroupKeysToResult(cachedDeviceEntries, currentHitCacheIndex);
Pair<OptionalLong, TsPrimitiveType[]> currentHitResult =
- hitCachedResults.get(currentHitCacheIndex);
+ lastRowCacheResults.get(currentHitCacheIndex);
long lastTime = currentHitResult.getLeft().getAsLong();
int channel = 0;
for (int i = 0; i < tableAggregators.size(); i++) {
@@ -251,8 +261,8 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
case FIELD:
int measurementIdx =
aggColumnsIndexArray[aggregatorInputChannels.get(channel)];
TsPrimitiveType tsPrimitiveType =
-
hitCachedResults.get(currentHitCacheIndex).getRight()[measurementIdx];
- long lastByTime =
hitCachedResults.get(currentHitCacheIndex).getLeft().getAsLong();
+
lastRowCacheResults.get(currentHitCacheIndex).getRight()[measurementIdx];
+ long lastByTime =
lastRowCacheResults.get(currentHitCacheIndex).getLeft().getAsLong();
if (tsPrimitiveType == EMPTY_PRIMITIVE_TYPE) {
// there is no data for this time series
if (aggregator.getStep().isOutputPartial()) {
@@ -285,11 +295,217 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
currentHitCacheIndex++;
}
- private void updateLastCacheIfPossible() {
- if (!needUpdateCache) {
+ private void buildResultUseLastValuesCache() {
+ TimeValuePair[] currentHitResult =
lastValuesCacheResults.get(currentHitCacheIndex);
+ // if it is EMPTY_PRIMITIVE_TYPE, means there is no data in device
+ TsPrimitiveType timeLastValue = currentHitResult[currentHitResult.length -
1].getValue();
+ // when there is no data, no need to append result if the query is GROUP
BY or output of
+ // aggregator is partial (final operator will produce NULL result)
+ if (timeLastValue == EMPTY_PRIMITIVE_TYPE
+ && (groupingKeySize != 0 ||
tableAggregators.get(0).getStep().isOutputPartial())) {
+ outputDeviceIndex++;
+ currentHitCacheIndex++;
return;
}
+ // there is no problem when the cache result doesn't contain time column,
because if in such
+ // case we will not use lastRowTime in later process
+ long lastRowTime = currentHitResult[currentHitResult.length -
1].getTimestamp();
+ appendGroupKeysToResult(cachedDeviceEntries, currentHitCacheIndex);
+ int channel = 0;
+ for (int i = 0; i < tableAggregators.size(); i++) {
+ TableAggregator aggregator = tableAggregators.get(i);
+ ColumnBuilder columnBuilder =
resultTsBlockBuilder.getColumnBuilder(groupingKeySize + i);
+ int columnIdx = aggregatorInputChannels.get(channel);
+ ColumnSchema schema = aggColumnSchemas.get(columnIdx);
+ TsTableColumnCategory category = schema.getColumnCategory();
+ switch (category) {
+ case TAG:
+ String id =
+ getNthIdColumnValue(
+ cachedDeviceEntries.get(currentHitCacheIndex),
aggColumnsIndexArray[columnIdx]);
+ if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+ if (timeLastValue == EMPTY_PRIMITIVE_TYPE || id == null) {
+ columnBuilder.appendNull();
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastRowTime,
+ new TsPrimitiveType.TsBinary(
+ new Binary(id,
TSFileConfig.STRING_CHARSET)))));
+ } else {
+ columnBuilder.writeBinary(new Binary(id,
TSFileConfig.STRING_CHARSET));
+ }
+ }
+ } else {
+ // last_by
+ int measurementIdx =
aggColumnsIndexArray[aggregatorInputChannels.get(channel)];
+ long lastTime =
+
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+
+ if (timeLastValue == EMPTY_PRIMITIVE_TYPE || id == null) {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(getTSDataType(schema.getType()),
lastTime, true, null)));
+ } else {
+ columnBuilder.appendNull();
+ }
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastTime,
+ false,
+ new TsPrimitiveType.TsBinary(
+ new Binary(id,
TSFileConfig.STRING_CHARSET)))));
+ } else {
+ columnBuilder.writeBinary(new Binary(id,
TSFileConfig.STRING_CHARSET));
+ }
+ }
+ }
+ break;
+ case ATTRIBUTE:
+ Binary attribute =
+ cachedDeviceEntries.get(currentHitCacheIndex)
+ .getAttributeColumnValues()[aggColumnsIndexArray[columnIdx]];
+ if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+ if (timeLastValue == EMPTY_PRIMITIVE_TYPE || attribute == null) {
+ columnBuilder.appendNull();
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastRowTime,
+ new TsPrimitiveType.TsBinary(attribute))));
+ } else {
+ columnBuilder.writeBinary(attribute);
+ }
+ }
+ } else {
+ int measurementIdx =
aggColumnsIndexArray[aggregatorInputChannels.get(channel)];
+ long lastTime =
+
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+
+ // last_by
+ if (timeLastValue == EMPTY_PRIMITIVE_TYPE || attribute == null) {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(getTSDataType(schema.getType()),
lastTime, true, null)));
+ } else {
+ columnBuilder.appendNull();
+ }
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastTime,
+ false,
+ new TsPrimitiveType.TsBinary(attribute))));
+ } else {
+ columnBuilder.writeBinary(attribute);
+ }
+ }
+ }
+ break;
+ case TIME:
+ if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+ // for last(time) aggregation
+ if (timeLastValue == EMPTY_PRIMITIVE_TYPE) {
+ columnBuilder.appendNull();
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastRowTime,
+ new TsPrimitiveType.TsLong(lastRowTime))));
+ } else {
+ columnBuilder.writeTsPrimitiveType(new
TsPrimitiveType.TsLong(lastRowTime));
+ }
+ }
+ } else {
+ // for aggregation like last_by(time,s1)
+ int measurementIdx =
aggColumnsIndexArray[aggregatorInputChannels.get(channel + 1)];
+ long lastTime =
+
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+ TsPrimitiveType tsPrimitiveType =
+
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getValue();
+
+ if (tsPrimitiveType == EMPTY_PRIMITIVE_TYPE) {
+ // there is no data
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(getTSDataType(schema.getType()),
lastTime, true, null)));
+ } else {
+ columnBuilder.appendNull();
+ }
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ // output: xDataType, yLastTime, xIsNull, xResult
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastTime,
+ false,
+ new TsPrimitiveType.TsLong(lastTime))));
+ } else {
+ columnBuilder.writeTsPrimitiveType(new
TsPrimitiveType.TsLong(lastTime));
+ }
+ }
+ }
+ break;
+ case FIELD:
+ checkState(
+ aggregator.getAccumulator() instanceof LastDescAccumulator,
+ "Accumulator should be LastDescAccumulator when reach here");
+
+ int measurementIdx =
aggColumnsIndexArray[aggregatorInputChannels.get(channel)];
+ long lastTime =
+
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+ TsPrimitiveType tsPrimitiveType =
+
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getValue();
+
+ if (tsPrimitiveType == EMPTY_PRIMITIVE_TYPE) {
+ // there is no data for this time series
+ columnBuilder.appendNull();
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()), lastTime,
tsPrimitiveType)));
+ } else {
+ columnBuilder.writeTsPrimitiveType(tsPrimitiveType);
+ }
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unsupported category: " + category);
+ }
+
+ channel += aggregator.getChannelCount();
+ }
+
+ resultTsBlockBuilder.declarePosition();
+ outputDeviceIndex++;
+ currentHitCacheIndex++;
+ }
+
+ private void updateLastCacheUseLastRowIfPossible() {
int channel = 0;
List<String> updateMeasurementList = new ArrayList<>();
List<TimeValuePair> updateTimeValuePairList = new ArrayList<>();
@@ -311,6 +527,13 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
new TimeValuePair(
lastAccumulator.getMaxTime(),
new
TsPrimitiveType.TsLong(lastAccumulator.getMaxTime())));
+ } else {
+ currentDeviceEntry = deviceEntries.get(currentDeviceIndex);
+ TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
+ dbName,
+ currentDeviceEntry.getDeviceID(),
+ new String[] {""},
+ new TimeValuePair[] {EMPTY_TIME_VALUE_PAIR});
}
} else {
LastByDescAccumulator lastByAccumulator =
@@ -365,12 +588,85 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
}
}
+ private void updateLastCacheUseLastValuesIfPossible() {
+ int channel = 0;
+ List<String> updateMeasurementList = new ArrayList<>();
+ List<TimeValuePair> updateTimeValuePairList = new ArrayList<>();
+ boolean hasSetLastTime = false;
+ for (TableAggregator tableAggregator : tableAggregators) {
+ ColumnSchema schema =
aggColumnSchemas.get(aggregatorInputChannels.get(channel));
+
+ switch (schema.getColumnCategory()) {
+ case TIME:
+ case TAG:
+ case ATTRIBUTE:
+ if (!hasSetLastTime && tableAggregator.getAccumulator() instanceof
LastDescAccumulator) {
+ hasSetLastTime = true;
+ LastDescAccumulator lastAccumulator =
+ (LastDescAccumulator) tableAggregator.getAccumulator();
+ if (lastAccumulator.hasInitResult()) {
+ updateMeasurementList.add("");
+ updateTimeValuePairList.add(
+ new TimeValuePair(
+ lastAccumulator.getMaxTime(),
+ new
TsPrimitiveType.TsLong(lastAccumulator.getMaxTime())));
+ } else {
+ currentDeviceEntry = deviceEntries.get(currentDeviceIndex);
+ TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
+ dbName,
+ currentDeviceEntry.getDeviceID(),
+ new String[] {""},
+ new TimeValuePair[] {EMPTY_TIME_VALUE_PAIR});
+ }
+ }
+ break;
+ case FIELD:
+ checkState(
+ tableAggregator.getAccumulator() instanceof LastDescAccumulator,
+ "Accumulator should be LastDescAccumulator when reach here");
+ LastDescAccumulator lastAccumulator =
+ (LastDescAccumulator) tableAggregator.getAccumulator();
+ updateMeasurementList.add(schema.getName());
+ if (lastAccumulator.hasInitResult()) {
+ updateTimeValuePairList.add(
+ new TimeValuePair(
+ lastAccumulator.getMaxTime(),
+ cloneTsPrimitiveType(lastAccumulator.getLastValue())));
+ } else {
+ updateTimeValuePairList.add(EMPTY_TIME_VALUE_PAIR);
+ }
+ break;
+ default:
+ break;
+ }
+
+ channel += tableAggregator.getChannelCount();
+ }
+
+ if (!updateMeasurementList.isEmpty()) {
+ String[] updateMeasurementArray = updateMeasurementList.toArray(new
String[0]);
+ TimeValuePair[] updateTimeValuePairArray =
+ updateTimeValuePairList.toArray(new TimeValuePair[0]);
+ currentDeviceEntry = deviceEntries.get(currentDeviceIndex);
+ TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
+ dbName,
+ currentDeviceEntry.getDeviceID(),
+ updateMeasurementArray,
+ updateTimeValuePairArray);
+ }
+ }
+
@Override
protected void updateResultTsBlock() {
appendAggregationResult();
- if (timeIterator.hasCachedTimeRange()) {
- updateLastCacheIfPossible();
+ if (needUpdateCache && timeIterator.hasCachedTimeRange()) {
+ if (lastRowCacheResults != null) {
+ updateLastCacheUseLastRowIfPossible();
+ } else {
+ checkState(lastValuesCacheResults != null, "lastValuesCacheResults
shouldn't be null here");
+ updateLastCacheUseLastValuesIfPossible();
+ }
}
outputDeviceIndex++;
@@ -420,6 +716,6 @@ public class LastQueryAggTableScanOperator extends
AbstractAggTableScanOperator
+ (resultTsBlockBuilder == null ? 0 :
resultTsBlockBuilder.getRetainedSizeInBytes())
+ RamUsageEstimator.sizeOfCollection(deviceEntries)
+ RamUsageEstimator.sizeOfCollection(cachedDeviceEntries)
- + RamUsageEstimator.sizeOfCollection(hitCachedResults);
+ + RamUsageEstimator.sizeOfCollection(lastRowCacheResults);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
index 3ff20974168..2050eba426b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/AccumulatorFactory.java
@@ -70,6 +70,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -78,6 +79,7 @@ import static java.util.Objects.requireNonNull;
import static
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction.FIRST_BY;
import static
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction.LAST;
import static
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction.LAST_BY;
+import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.GlobalTimePredicateExtractVisitor.isMeasurementColumn;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.GlobalTimePredicateExtractVisitor.isTimeColumn;
import static org.apache.tsfile.read.common.type.IntType.INT32;
@@ -91,6 +93,7 @@ public class AccumulatorFactory {
Map<String, String> inputAttributes,
boolean ascending,
String timeColumnName,
+ Set<String> measurementColumnNames,
boolean distinct) {
TableAccumulator result;
@@ -100,20 +103,20 @@ public class AccumulatorFactory {
} else if ((LAST_BY.getFunctionName().equals(functionName)
|| FIRST_BY.getFunctionName().equals(functionName))
&& inputExpressions.size() > 1) {
- boolean xIsTimeColumn = false;
- boolean yIsTimeColumn = false;
- if (isTimeColumn(inputExpressions.get(1), timeColumnName)) {
- yIsTimeColumn = true;
- } else if (isTimeColumn(inputExpressions.get(0), timeColumnName)) {
- xIsTimeColumn = true;
- }
+ boolean xIsTimeColumn = isTimeColumn(inputExpressions.get(0),
timeColumnName);
+ boolean yIsTimeColumn = isTimeColumn(inputExpressions.get(1),
timeColumnName);
if (LAST_BY.getFunctionName().equals(functionName)) {
result =
ascending
? new LastByAccumulator(
inputDataTypes.get(0), inputDataTypes.get(1),
xIsTimeColumn, yIsTimeColumn)
: new LastByDescAccumulator(
- inputDataTypes.get(0), inputDataTypes.get(1),
xIsTimeColumn, yIsTimeColumn);
+ inputDataTypes.get(0),
+ inputDataTypes.get(1),
+ xIsTimeColumn,
+ yIsTimeColumn,
+ isMeasurementColumn(inputExpressions.get(0),
measurementColumnNames),
+ isMeasurementColumn(inputExpressions.get(1),
measurementColumnNames));
} else {
result =
ascending
@@ -123,10 +126,12 @@ public class AccumulatorFactory {
inputDataTypes.get(0), inputDataTypes.get(1),
xIsTimeColumn, yIsTimeColumn);
}
} else if (LAST.getFunctionName().equals(functionName)) {
- boolean isTimeColumn = isTimeColumn(inputExpressions.get(0),
timeColumnName);
return ascending
- ? new LastAccumulator(inputDataTypes.get(0), isTimeColumn)
- : new LastDescAccumulator(inputDataTypes.get(0), isTimeColumn);
+ ? new LastAccumulator(inputDataTypes.get(0))
+ : new LastDescAccumulator(
+ inputDataTypes.get(0),
+ isTimeColumn(inputExpressions.get(0), timeColumnName),
+ isMeasurementColumn(inputExpressions.get(0),
measurementColumnNames));
} else {
result =
createBuiltinAccumulator(
@@ -280,8 +285,8 @@ public class AccumulatorFactory {
return new SumAccumulator(inputDataTypes.get(0));
case LAST:
return ascending
- ? new LastAccumulator(inputDataTypes.get(0), false)
- : new LastDescAccumulator(inputDataTypes.get(0), false);
+ ? new LastAccumulator(inputDataTypes.get(0))
+ : new LastDescAccumulator(inputDataTypes.get(0), false, false);
case FIRST:
return ascending
? new FirstAccumulator(inputDataTypes.get(0))
@@ -293,7 +298,8 @@ public class AccumulatorFactory {
case LAST_BY:
return ascending
? new LastByAccumulator(inputDataTypes.get(0),
inputDataTypes.get(1), false, false)
- : new LastByDescAccumulator(inputDataTypes.get(0),
inputDataTypes.get(1), false, false);
+ : new LastByDescAccumulator(
+ inputDataTypes.get(0), inputDataTypes.get(1), false, false,
false, false);
case FIRST_BY:
return ascending
? new FirstByAccumulator(inputDataTypes.get(0),
inputDataTypes.get(1), false, false)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
index 14e652d675a..d805500b118 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java
@@ -43,11 +43,9 @@ public class LastAccumulator implements TableAccumulator {
protected TsPrimitiveType lastValue;
protected long maxTime = Long.MIN_VALUE;
protected boolean initResult = false;
- protected boolean isTimeColumn = false;
- public LastAccumulator(TSDataType seriesDataType, boolean isTimeColumn) {
+ public LastAccumulator(TSDataType seriesDataType) {
this.seriesDataType = seriesDataType;
- this.isTimeColumn = isTimeColumn;
lastValue = TsPrimitiveType.getByType(seriesDataType);
}
@@ -55,10 +53,6 @@ public class LastAccumulator implements TableAccumulator {
return this.initResult;
}
- public boolean isTimeColumn() {
- return this.isTimeColumn;
- }
-
public long getMaxTime() {
return this.maxTime;
}
@@ -74,7 +68,7 @@ public class LastAccumulator implements TableAccumulator {
@Override
public TableAccumulator copy() {
- return new LastAccumulator(seriesDataType, isTimeColumn);
+ return new LastAccumulator(seriesDataType);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java
index 9428e83e8d3..70024273157 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java
@@ -63,14 +63,6 @@ public class LastByAccumulator implements TableAccumulator {
this.xResult = TsPrimitiveType.getByType(xDataType);
}
- public boolean xIsTimeColumn() {
- return xIsTimeColumn;
- }
-
- public boolean yIsTimeColumn() {
- return this.yIsTimeColumn;
- }
-
public boolean hasInitResult() {
return this.initResult;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
index 1e49886092a..04bf1213f61 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByDescAccumulator.java
@@ -23,15 +23,46 @@ import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.enums.TSDataType;
public class LastByDescAccumulator extends LastByAccumulator {
+ private final boolean xIsMeasurementColumn;
+ private final boolean yIsMeasurementColumn;
public LastByDescAccumulator(
- TSDataType xDataType, TSDataType yDataType, boolean xIsTimeColumn,
boolean yIsTimeColumn) {
+ TSDataType xDataType,
+ TSDataType yDataType,
+ boolean xIsTimeColumn,
+ boolean yIsTimeColumn,
+ boolean xIsMeasurementColumn,
+ boolean yIsMeasurementColumn) {
super(xDataType, yDataType, xIsTimeColumn, yIsTimeColumn);
+ this.xIsMeasurementColumn = xIsMeasurementColumn;
+ this.yIsMeasurementColumn = yIsMeasurementColumn;
+ }
+
+ public boolean xIsTimeColumn() {
+ return xIsTimeColumn;
+ }
+
+ public boolean yIsTimeColumn() {
+ return this.yIsTimeColumn;
+ }
+
+ public boolean xIsMeasurementColumn() {
+ return xIsMeasurementColumn;
+ }
+
+ public boolean yIsMeasurementColumn() {
+ return yIsMeasurementColumn;
}
@Override
public TableAccumulator copy() {
- return new LastByDescAccumulator(xDataType, yDataType, xIsTimeColumn,
yIsTimeColumn);
+ return new LastByDescAccumulator(
+ xDataType,
+ yDataType,
+ xIsTimeColumn,
+ yIsTimeColumn,
+ xIsMeasurementColumn,
+ yIsMeasurementColumn);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
index cfe126cd25e..23ee0d21054 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastDescAccumulator.java
@@ -23,14 +23,27 @@ import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.enums.TSDataType;
public class LastDescAccumulator extends LastAccumulator {
+ private final boolean isTimeColumn;
+ private final boolean isMeasurementColumn;
+
+ public LastDescAccumulator(
+ TSDataType seriesDataType, boolean isTimeColumn, boolean
isMeasurementColumn) {
+ super(seriesDataType);
+ this.isTimeColumn = isTimeColumn;
+ this.isMeasurementColumn = isMeasurementColumn;
+ }
+
+ public boolean isTimeColumn() {
+ return this.isTimeColumn;
+ }
- public LastDescAccumulator(TSDataType seriesDataType, boolean isTimeColumn) {
- super(seriesDataType, isTimeColumn);
+ public boolean isMeasurementColumn() {
+ return this.isMeasurementColumn;
}
@Override
public TableAccumulator copy() {
- return new LastDescAccumulator(seriesDataType, isTimeColumn);
+ return new LastDescAccumulator(seriesDataType, isTimeColumn,
isMeasurementColumn);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 7dfa67e016f..52e1afe754a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -317,7 +317,9 @@ import static
org.apache.iotdb.db.queryengine.plan.planner.OperatorTreeGenerator
import static
org.apache.iotdb.db.queryengine.plan.planner.OperatorTreeGenerator.UNKNOWN_DATATYPE;
import static
org.apache.iotdb.db.queryengine.plan.planner.OperatorTreeGenerator.getLinearFill;
import static
org.apache.iotdb.db.queryengine.plan.planner.OperatorTreeGenerator.getPreviousFill;
+import static
org.apache.iotdb.db.queryengine.plan.planner.OperatorTreeGenerator.isFilterGtOrGe;
import static
org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.SeriesScanOptions.updateFilterUsingTTL;
+import static
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceLastCache.EMPTY_PRIMITIVE_TYPE;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder.ASC_NULLS_FIRST;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder.ASC_NULLS_LAST;
import static
org.apache.iotdb.db.queryengine.plan.relational.planner.SortOrder.DESC_NULLS_FIRST;
@@ -2524,11 +2526,12 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
node.getStep(),
typeProvider,
true,
- null)));
+ null,
+ Collections.emptySet())));
return new AggregationOperator(operatorContext, child,
aggregatorBuilder.build());
}
- // timeColumnName will only be set for AggTableScan.
+ // timeColumnName and measurementColumnNames will only be set for
AggTableScan.
private TableAggregator buildAggregator(
Map<Symbol, Integer> childLayout,
Symbol symbol,
@@ -2536,7 +2539,8 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
AggregationNode.Step step,
TypeProvider typeProvider,
boolean scanAscending,
- String timeColumnName) {
+ String timeColumnName,
+ Set<String> measurementColumnNames) {
List<Integer> argumentChannels = new ArrayList<>();
for (Expression argument : aggregation.getArguments()) {
Symbol argumentSymbol = Symbol.from(argument);
@@ -2557,6 +2561,7 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
Collections.emptyMap(),
scanAscending,
timeColumnName,
+ measurementColumnNames,
aggregation.isDistinct());
OptionalInt maskChannel = OptionalInt.empty();
@@ -2594,7 +2599,14 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
(k, v) ->
aggregatorBuilder.add(
buildAggregator(
- childLayout, k, v, node.getStep(), typeProvider,
true, null)));
+ childLayout,
+ k,
+ v,
+ node.getStep(),
+ typeProvider,
+ true,
+ null,
+ Collections.emptySet())));
OperatorContext operatorContext =
context
@@ -2888,7 +2900,8 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
node.getStep(),
context.getTypeProvider(),
scanAscending,
- timeColumnName));
+ timeColumnName,
+ measurementColumnsIndexMap.keySet()));
}
ITableTimeRangeIterator timeRangeIterator = null;
@@ -2999,9 +3012,12 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter
parameter =
constructAbstractAggTableScanOperatorParameter(node, context);
- if (canUseLastCacheOptimize(
- parameter.getTableAggregators(), node, parameter.getTimeColumnName()))
{
- return constructLastQueryAggTableScanOperator(node, parameter, context);
+ OptimizeType optimizeType =
+ canUseLastCacheOptimize(
+ parameter.getTableAggregators(), node,
parameter.getTimeColumnName());
+ if (optimizeType != OptimizeType.NOOP) {
+ return constructLastQueryAggTableScanOperator(
+ node, parameter, optimizeType == OptimizeType.LAST_ROW, context);
} else {
DefaultAggTableScanOperator aggTableScanOperator = new
DefaultAggTableScanOperator(parameter);
@@ -3020,9 +3036,11 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
private LastQueryAggTableScanOperator constructLastQueryAggTableScanOperator(
AggregationTableScanNode node,
AbstractAggTableScanOperator.AbstractAggTableScanOperatorParameter
parameter,
+ boolean isLastRowOptimize,
LocalExecutionPlanContext context) {
List<Integer> hitCachesIndexes = new ArrayList<>();
- List<Pair<OptionalLong, TsPrimitiveType[]>> hitCachedResults = new
ArrayList<>();
+ List<Pair<OptionalLong, TsPrimitiveType[]>> lastRowCacheResults = null;
+ List<TimeValuePair[]> lastValuesCacheResults = null;
List<DeviceEntry> cachedDeviceEntries = new ArrayList<>();
List<DeviceEntry> unCachedDeviceEntries = new ArrayList<>();
long tableTTL =
@@ -3032,63 +3050,150 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
node.getQualifiedObjectName().getObjectName());
Filter updateTimeFilter =
updateFilterUsingTTL(parameter.getSeriesScanOptions().getGlobalTimeFilter(),
tableTTL);
- for (int i = 0; i < node.getDeviceEntries().size(); i++) {
- Optional<Pair<OptionalLong, TsPrimitiveType[]>> lastByResult =
+ if (isLastRowOptimize) {
+ lastRowCacheResults = new ArrayList<>();
+ for (int i = 0; i < node.getDeviceEntries().size(); i++) {
+ Optional<Pair<OptionalLong, TsPrimitiveType[]>> lastByResult =
+ TableDeviceSchemaCache.getInstance()
+ .getLastRow(
+ node.getQualifiedObjectName().getDatabaseName(),
+ node.getDeviceEntries().get(i).getDeviceID(),
+ "",
+ parameter.getMeasurementColumnNames());
+ boolean allHitCache = true;
+ if (lastByResult.isPresent() &&
lastByResult.get().getLeft().isPresent()) {
+ for (int j = 0; j < lastByResult.get().getRight().length; j++) {
+ TsPrimitiveType tsPrimitiveType = lastByResult.get().getRight()[j];
+ if (tsPrimitiveType == null
+ || (updateTimeFilter != null
+ && !LastQueryUtil.satisfyFilter(
+ updateTimeFilter,
+ new TimeValuePair(
+ lastByResult.get().getLeft().getAsLong(),
tsPrimitiveType)))) {
+ // the process logic is different from tree model which examine
if
+ // `isFilterGtOrGe(seriesScanOptions.getGlobalTimeFilter())`, set
+ // `lastByResult.get().getRight()[j] = EMPTY_PRIMITIVE_TYPE`,
+ // but it should skip in table model
+ allHitCache = false;
+ break;
+ }
+ }
+ } else {
+ allHitCache = false;
+ }
+
+ if (!allHitCache) {
+ DeviceEntry deviceEntry = node.getDeviceEntries().get(i);
+ AlignedFullPath alignedPath =
+ constructAlignedPath(
+ deviceEntry,
+ parameter.getMeasurementColumnNames(),
+ parameter.getMeasurementSchemas(),
+ parameter.getAllSensors());
+ ((DataDriverContext)
context.getDriverContext()).addPath(alignedPath);
+ unCachedDeviceEntries.add(deviceEntry);
+
+ // last cache updateColumns need to put "" as time column
+ String[] updateColumns = new
String[parameter.getMeasurementColumnNames().size() + 1];
+ updateColumns[0] = "";
+ for (int j = 1; j < updateColumns.length; j++) {
+ updateColumns[j] = parameter.getMeasurementColumnNames().get(j -
1);
+ }
TableDeviceSchemaCache.getInstance()
- .getLastRow(
+ .initOrInvalidateLastCache(
node.getQualifiedObjectName().getDatabaseName(),
- node.getDeviceEntries().get(i).getDeviceID(),
- "",
- parameter.getMeasurementColumnNames());
- boolean allHitCache = true;
- if (lastByResult.isPresent() &&
lastByResult.get().getLeft().isPresent()) {
- for (int j = 0; j < lastByResult.get().getRight().length; j++) {
- TsPrimitiveType tsPrimitiveType = lastByResult.get().getRight()[j];
- if (tsPrimitiveType == null
- || (updateTimeFilter != null
- && !LastQueryUtil.satisfyFilter(
- updateTimeFilter,
- new TimeValuePair(
- lastByResult.get().getLeft().getAsLong(),
tsPrimitiveType)))) {
- // the process logic is different from tree model which examine if
- // `isFilterGtOrGe(seriesScanOptions.getGlobalTimeFilter())`, set
- // `lastByResult.get().getRight()[j] = EMPTY_PRIMITIVE_TYPE`,
- // but it should skip in table model
- allHitCache = false;
- break;
- }
+ deviceEntry.getDeviceID(),
+ updateColumns,
+ false);
+ } else {
+ hitCachesIndexes.add(i);
+ lastRowCacheResults.add(lastByResult.get());
+ cachedDeviceEntries.add(node.getDeviceEntries().get(i));
}
+ }
+ } else {
+ // LAST_VALUES optimize
+ lastValuesCacheResults = new ArrayList<>();
+ int measurementSize = parameter.getMeasurementColumnNames().size();
+ // When we need last cache of Time column if:
+ // 1. query is group by (we need last cache of Time to help judge if
there is no data in
+ // device)
+ // 2. last(time), last(device) or last(attribute) occurs
+ boolean needTime =
+ !node.getGroupingKeys().isEmpty()
+ || parameter.getTableAggregators().stream()
+ .anyMatch(
+ aggregator ->
+ aggregator.getAccumulator() instanceof
LastDescAccumulator
+ && !((LastDescAccumulator)
aggregator.getAccumulator())
+ .isMeasurementColumn());
+ String[] targetColumns;
+
+ if (needTime) {
+ targetColumns = new String[measurementSize + 1];
+ // put time column in the last for convenience of later processing
+ targetColumns[targetColumns.length - 1] = "";
} else {
- allHitCache = false;
+ targetColumns = new String[measurementSize];
}
- if (!allHitCache) {
- DeviceEntry deviceEntry = node.getDeviceEntries().get(i);
- AlignedFullPath alignedPath =
- constructAlignedPath(
- deviceEntry,
- parameter.getMeasurementColumnNames(),
- parameter.getMeasurementSchemas(),
- parameter.getAllSensors());
- ((DataDriverContext) context.getDriverContext()).addPath(alignedPath);
- unCachedDeviceEntries.add(deviceEntry);
+ for (int j = 0; j < measurementSize; j++) {
+ targetColumns[j] = parameter.getMeasurementColumnNames().get(j);
+ }
+
+ for (int i = 0; i < node.getDeviceEntries().size(); i++) {
+ TimeValuePair[] lastResult =
+ TableDeviceSchemaCache.getInstance()
+ .getLastEntries(
+ node.getQualifiedObjectName().getDatabaseName(),
+ node.getDeviceEntries().get(i).getDeviceID(),
+ targetColumns);
+ boolean allHitCache = true;
+ if (lastResult != null) {
+ for (TimeValuePair timeValuePair : lastResult) {
+ if (timeValuePair == null || timeValuePair.getValue() == null) {
+ allHitCache = false;
+ break;
+ }
+
+ if (updateTimeFilter != null
+ && !LastQueryUtil.satisfyFilter(
+ parameter.getSeriesScanOptions().getGlobalTimeFilter(),
timeValuePair)) {
+ if (isFilterGtOrGe(updateTimeFilter)) {
+ // it means there is no data meets Filter
+ timeValuePair.setValue(EMPTY_PRIMITIVE_TYPE);
+ } else {
+ allHitCache = false;
+ break;
+ }
+ }
+ }
+ } else {
+ allHitCache = false;
+ }
- // last cache updateColumns need put "" as time column
- String[] updateColumns = new
String[parameter.getMeasurementColumnNames().size() + 1];
- updateColumns[0] = "";
- for (int j = 1; j < updateColumns.length; j++) {
- updateColumns[j] = parameter.getMeasurementColumnNames().get(j - 1);
+ if (!allHitCache) {
+ DeviceEntry deviceEntry = node.getDeviceEntries().get(i);
+ AlignedFullPath alignedPath =
+ constructAlignedPath(
+ deviceEntry,
+ parameter.getMeasurementColumnNames(),
+ parameter.getMeasurementSchemas(),
+ parameter.getAllSensors());
+ ((DataDriverContext)
context.getDriverContext()).addPath(alignedPath);
+ unCachedDeviceEntries.add(deviceEntry);
+
+ TableDeviceSchemaCache.getInstance()
+ .initOrInvalidateLastCache(
+ node.getQualifiedObjectName().getDatabaseName(),
+ deviceEntry.getDeviceID(),
+ targetColumns,
+ false);
+ } else {
+ hitCachesIndexes.add(i);
+ lastValuesCacheResults.add(lastResult);
+ cachedDeviceEntries.add(node.getDeviceEntries().get(i));
}
- TableDeviceSchemaCache.getInstance()
- .initOrInvalidateLastCache(
- node.getQualifiedObjectName().getDatabaseName(),
- deviceEntry.getDeviceID(),
- updateColumns,
- false);
- } else {
- hitCachesIndexes.add(i);
- hitCachedResults.add(lastByResult.get());
- cachedDeviceEntries.add(node.getDeviceEntries().get(i));
}
}
@@ -3101,7 +3206,8 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
cachedDeviceEntries,
node.getQualifiedObjectName(),
hitCachesIndexes,
- hitCachedResults);
+ lastRowCacheResults,
+ lastValuesCacheResults);
((DataDriverContext)
context.getDriverContext()).addSourceOperator(lastQueryOperator);
parameter
@@ -3689,24 +3795,36 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
return new boolean[] {canUseStatistic, isAscending};
}
- private boolean canUseLastCacheOptimize(
+ private OptimizeType canUseLastCacheOptimize(
List<TableAggregator> aggregators, AggregationTableScanNode node, String
timeColumnName) {
if (!CommonDescriptor.getInstance().getConfig().isLastCacheEnable() ||
aggregators.isEmpty()) {
- return false;
+ return OptimizeType.NOOP;
}
// has value filter, can not optimize
if (node.getPushDownPredicate() != null) {
- return false;
+ return OptimizeType.NOOP;
}
// has date_bin, can not optimize
if (!node.getGroupingKeys().isEmpty()
&& node.getProjection() != null
&& !node.getProjection().getMap().isEmpty()) {
- return false;
+ return OptimizeType.NOOP;
+ }
+
+ if (canUseLastRowOptimize(aggregators)) {
+ return OptimizeType.LAST_ROW;
+ }
+
+ if (canUseLastValuesOptimize(aggregators)) {
+ return OptimizeType.LAST_VALUES;
}
+ return OptimizeType.NOOP;
+ }
+
+ private boolean canUseLastRowOptimize(List<TableAggregator> aggregators) {
for (TableAggregator aggregator : aggregators) {
if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
if (!((LastDescAccumulator)
aggregator.getAccumulator()).isTimeColumn()) {
@@ -3720,10 +3838,30 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
return false;
}
}
+ return true;
+ }
+ private boolean canUseLastValuesOptimize(List<TableAggregator> aggregators) {
+ for (TableAggregator aggregator : aggregators) {
+ if (aggregator.getAccumulator() instanceof LastByDescAccumulator) {
+ // cannot optimize when x is Measurement or y is not Measurement
+ if (((LastByDescAccumulator)
aggregator.getAccumulator()).xIsMeasurementColumn()
+ || !((LastByDescAccumulator)
aggregator.getAccumulator()).yIsMeasurementColumn()) {
+ return false;
+ }
+ } else if (!(aggregator.getAccumulator() instanceof
LastDescAccumulator)) {
+ return false;
+ }
+ }
return true;
}
+ private enum OptimizeType {
+ LAST_ROW,
+ LAST_VALUES,
+ NOOP
+ }
+
@Override
public Operator visitMarkDistinct(MarkDistinctNode node,
LocalExecutionPlanContext context) {
Operator child = node.getChild().accept(this, context);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
index b30413baa88..63401d59c0c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java
@@ -315,7 +315,7 @@ public class TableDeviceSchemaCache {
/**
* Get the last {@link TimeValuePair}s of given measurements, the
measurements shall never be
- * "time".
+ * "time". If you want to get the last of "time", use "" to represent.
*
* @param database the device's database, without "root", {@code null} for
tree model
* @param deviceId {@link IDeviceID}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/GlobalTimePredicateExtractVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/GlobalTimePredicateExtractVisitor.java
index 09d040b12f3..2ab6cac846e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/GlobalTimePredicateExtractVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/GlobalTimePredicateExtractVisitor.java
@@ -45,6 +45,7 @@ import org.apache.tsfile.utils.Pair;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.BooleanLiteral.TRUE_LITERAL;
import static
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LogicalExpression.Operator.AND;
@@ -271,6 +272,11 @@ public class GlobalTimePredicateExtractVisitor
&& ((SymbolReference) e).getName().equalsIgnoreCase(timeColumnName);
}
+ public static boolean isMeasurementColumn(Expression e, Set<String>
measurementColumns) {
+ return e instanceof SymbolReference
+ && measurementColumns.contains(((SymbolReference) e).getName());
+ }
+
public static boolean isExtractTimeColumn(Expression e, String
timeColumnName) {
return e instanceof Extract
&& ((Extract) e).getExpression() instanceof SymbolReference