JackieTien97 commented on code in PR #16103:
URL: https://github.com/apache/iotdb/pull/16103#discussion_r2256159390
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java:
##########
@@ -285,11 +295,202 @@ private void buildResultUseLastCache() {
currentHitCacheIndex++;
}
- private void updateLastCacheIfPossible() {
- if (!needUpdateCache) {
- return;
+ private void buildResultUseLastValuesCache() {
+ appendGroupKeysToResult(cachedDeviceEntries, currentHitCacheIndex);
+ TimeValuePair[] currentHitResult =
lastValuesCacheResults.get(currentHitCacheIndex);
+ // there is no problem when the cache result doesn't contain time column,
because we will not
+ // use lastRowTime in later process
+ long lastRowTime = currentHitResult[currentHitResult.length -
1].getTimestamp();
+ int channel = 0;
+ for (int i = 0; i < tableAggregators.size(); i++) {
+ TableAggregator aggregator = tableAggregators.get(i);
+ ColumnBuilder columnBuilder =
resultTsBlockBuilder.getColumnBuilder(groupingKeySize + i);
+ int columnIdx = aggregatorInputChannels.get(channel);
+ ColumnSchema schema = aggColumnSchemas.get(columnIdx);
+ TsTableColumnCategory category = schema.getColumnCategory();
+ switch (category) {
+ case TAG:
+ String id =
+ getNthIdColumnValue(
+ cachedDeviceEntries.get(currentHitCacheIndex),
aggColumnsIndexArray[columnIdx]);
+ if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+ if (id == null) {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(getTSDataType(schema.getType()),
lastRowTime, null)));
Review Comment:
won't NPE if you pass null?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java:
##########
@@ -365,12 +566,84 @@ private void updateLastCacheIfPossible() {
}
}
+ private void updateLastCacheUseLastValuesIfPossible() {
+ int channel = 0;
+ List<String> updateMeasurementList = new ArrayList<>();
+ List<TimeValuePair> updateTimeValuePairList = new ArrayList<>();
+ boolean hasSetLastTime = false;
+ for (TableAggregator tableAggregator : tableAggregators) {
+ ColumnSchema schema =
aggColumnSchemas.get(aggregatorInputChannels.get(channel));
+
+ switch (schema.getColumnCategory()) {
+ case TIME:
+ case TAG:
+ case ATTRIBUTE:
+ if (!hasSetLastTime && tableAggregator.getAccumulator() instanceof
LastDescAccumulator) {
+ hasSetLastTime = true;
+ LastDescAccumulator lastAccumulator =
+ (LastDescAccumulator) tableAggregator.getAccumulator();
+ if (lastAccumulator.hasInitResult()) {
+ updateMeasurementList.add("");
+ updateTimeValuePairList.add(
+ new TimeValuePair(
+ lastAccumulator.getMaxTime(),
+ new
TsPrimitiveType.TsLong(lastAccumulator.getMaxTime())));
+ } else {
+ TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
+ dbName,
+ currentDeviceEntry.getDeviceID(),
+ new String[] {""},
+ new TimeValuePair[] {EMPTY_TIME_VALUE_PAIR});
+ }
Review Comment:
Should last row be same as here?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java:
##########
@@ -285,11 +295,202 @@ private void buildResultUseLastCache() {
currentHitCacheIndex++;
}
- private void updateLastCacheIfPossible() {
- if (!needUpdateCache) {
- return;
+ private void buildResultUseLastValuesCache() {
+ appendGroupKeysToResult(cachedDeviceEntries, currentHitCacheIndex);
+ TimeValuePair[] currentHitResult =
lastValuesCacheResults.get(currentHitCacheIndex);
+ // there is no problem when the cache result doesn't contain time column,
because we will not
+ // use lastRowTime in later process
+ long lastRowTime = currentHitResult[currentHitResult.length -
1].getTimestamp();
+ int channel = 0;
+ for (int i = 0; i < tableAggregators.size(); i++) {
+ TableAggregator aggregator = tableAggregators.get(i);
+ ColumnBuilder columnBuilder =
resultTsBlockBuilder.getColumnBuilder(groupingKeySize + i);
+ int columnIdx = aggregatorInputChannels.get(channel);
+ ColumnSchema schema = aggColumnSchemas.get(columnIdx);
+ TsTableColumnCategory category = schema.getColumnCategory();
+ switch (category) {
+ case TAG:
+ String id =
+ getNthIdColumnValue(
+ cachedDeviceEntries.get(currentHitCacheIndex),
aggColumnsIndexArray[columnIdx]);
+ if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+ if (id == null) {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(getTSDataType(schema.getType()),
lastRowTime, null)));
+ } else {
+ columnBuilder.appendNull();
+ }
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastRowTime,
+ new TsPrimitiveType.TsBinary(
+ new Binary(id,
TSFileConfig.STRING_CHARSET)))));
+ } else {
+ columnBuilder.writeBinary(new Binary(id,
TSFileConfig.STRING_CHARSET));
+ }
+ }
+ } else {
+ int measurementIdx =
aggColumnsIndexArray[aggregatorInputChannels.get(channel)];
+ long lastTime =
+
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+
+ // last_by
+ if (id == null) {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(getTSDataType(schema.getType()),
lastTime, true, null)));
+ } else {
+ columnBuilder.appendNull();
+ }
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastTime,
+ false,
+ new TsPrimitiveType.TsBinary(
+ new Binary(id,
TSFileConfig.STRING_CHARSET)))));
+ } else {
+ columnBuilder.writeBinary(new Binary(id,
TSFileConfig.STRING_CHARSET));
+ }
+ }
+ }
+ break;
+ case ATTRIBUTE:
+ Binary attribute =
+ cachedDeviceEntries.get(currentHitCacheIndex)
+ .getAttributeColumnValues()[aggColumnsIndexArray[columnIdx]];
+ if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+ if (attribute == null) {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(getTSDataType(schema.getType()),
lastRowTime, null)));
+ } else {
+ columnBuilder.appendNull();
+ }
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastRowTime,
+ new TsPrimitiveType.TsBinary(attribute))));
+ } else {
+ columnBuilder.writeBinary(attribute);
+ }
+ }
+ } else {
+ int measurementIdx =
aggColumnsIndexArray[aggregatorInputChannels.get(channel)];
+ long lastTime =
+
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+
+ // last_by
+ if (attribute == null) {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(getTSDataType(schema.getType()),
lastTime, true, null)));
+ } else {
+ columnBuilder.appendNull();
+ }
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastTime,
+ false,
+ new TsPrimitiveType.TsBinary(attribute))));
+ } else {
+ columnBuilder.writeBinary(attribute);
+ }
+ }
+ }
+ break;
+ case TIME:
+ if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+ // for last(time) aggregation
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastRowTime,
+ new TsPrimitiveType.TsLong(lastRowTime))));
+ } else {
+ columnBuilder.writeTsPrimitiveType(new
TsPrimitiveType.TsLong(lastRowTime));
+ }
+ } else {
+ // for aggregation like last_by(time,s1)
+ int measurementIdx =
aggColumnsIndexArray[aggregatorInputChannels.get(channel + 1)];
+ long lastTime =
+
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+
+ if (aggregator.getStep().isOutputPartial()) {
+ // output: xDataType, yLastTime, xIsNull, xResult
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastTime,
+ false,
+ new TsPrimitiveType.TsLong(lastTime))));
+ } else {
+ columnBuilder.writeTsPrimitiveType(new
TsPrimitiveType.TsLong(lastTime));
+ }
+ }
+ break;
+ case FIELD:
+ int measurementIdx =
aggColumnsIndexArray[aggregatorInputChannels.get(channel)];
+ long lastTime =
+
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+ TsPrimitiveType tsPrimitiveType =
+
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getValue();
+
+ if (tsPrimitiveType == EMPTY_PRIMITIVE_TYPE) {
+ // there is no data for this time series
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new
Binary(serializeTimeValue(getTSDataType(schema.getType()), lastTime, null)));
+ } else {
+ columnBuilder.appendNull();
+ }
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
Review Comment:
why here not same as `last row`?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java:
##########
@@ -63,14 +63,6 @@ public LastByAccumulator(
this.xResult = TsPrimitiveType.getByType(xDataType);
}
- public boolean xIsTimeColumn() {
- return xIsTimeColumn;
- }
-
- public boolean yIsTimeColumn() {
- return this.yIsTimeColumn;
- }
-
Review Comment:
It seems that these two will only be used in sub class? Maybe we should move
them into subClass
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java:
##########
@@ -285,11 +295,202 @@ private void buildResultUseLastCache() {
currentHitCacheIndex++;
}
- private void updateLastCacheIfPossible() {
- if (!needUpdateCache) {
- return;
+ private void buildResultUseLastValuesCache() {
+ appendGroupKeysToResult(cachedDeviceEntries, currentHitCacheIndex);
+ TimeValuePair[] currentHitResult =
lastValuesCacheResults.get(currentHitCacheIndex);
+ // there is no problem when the cache result doesn't contain time column,
because we will not
Review Comment:
```suggestion
// there is no problem when the cache result doesn't contain time
column, because if in such case we will not
```
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java:
##########
@@ -43,7 +43,7 @@ public class LastAccumulator implements TableAccumulator {
protected TsPrimitiveType lastValue;
protected long maxTime = Long.MIN_VALUE;
protected boolean initResult = false;
- protected boolean isTimeColumn = false;
+ protected final boolean isTimeColumn;
Review Comment:
It seems that it will only be used in sub class? Maybe we should move it
into subClass
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java:
##########
@@ -285,11 +295,202 @@ private void buildResultUseLastCache() {
currentHitCacheIndex++;
}
- private void updateLastCacheIfPossible() {
- if (!needUpdateCache) {
- return;
+ private void buildResultUseLastValuesCache() {
+ appendGroupKeysToResult(cachedDeviceEntries, currentHitCacheIndex);
+ TimeValuePair[] currentHitResult =
lastValuesCacheResults.get(currentHitCacheIndex);
+ // there is no problem when the cache result doesn't contain time column,
because we will not
+ // use lastRowTime in later process
+ long lastRowTime = currentHitResult[currentHitResult.length -
1].getTimestamp();
+ int channel = 0;
+ for (int i = 0; i < tableAggregators.size(); i++) {
+ TableAggregator aggregator = tableAggregators.get(i);
+ ColumnBuilder columnBuilder =
resultTsBlockBuilder.getColumnBuilder(groupingKeySize + i);
+ int columnIdx = aggregatorInputChannels.get(channel);
+ ColumnSchema schema = aggColumnSchemas.get(columnIdx);
+ TsTableColumnCategory category = schema.getColumnCategory();
+ switch (category) {
+ case TAG:
+ String id =
+ getNthIdColumnValue(
+ cachedDeviceEntries.get(currentHitCacheIndex),
aggColumnsIndexArray[columnIdx]);
+ if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+ if (id == null) {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(getTSDataType(schema.getType()),
lastRowTime, null)));
+ } else {
+ columnBuilder.appendNull();
+ }
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastRowTime,
+ new TsPrimitiveType.TsBinary(
+ new Binary(id,
TSFileConfig.STRING_CHARSET)))));
+ } else {
+ columnBuilder.writeBinary(new Binary(id,
TSFileConfig.STRING_CHARSET));
+ }
+ }
+ } else {
+ int measurementIdx =
aggColumnsIndexArray[aggregatorInputChannels.get(channel)];
+ long lastTime =
+
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+
+ // last_by
+ if (id == null) {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(getTSDataType(schema.getType()),
lastTime, true, null)));
+ } else {
+ columnBuilder.appendNull();
+ }
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastTime,
+ false,
+ new TsPrimitiveType.TsBinary(
+ new Binary(id,
TSFileConfig.STRING_CHARSET)))));
+ } else {
+ columnBuilder.writeBinary(new Binary(id,
TSFileConfig.STRING_CHARSET));
+ }
+ }
+ }
+ break;
+ case ATTRIBUTE:
+ Binary attribute =
+ cachedDeviceEntries.get(currentHitCacheIndex)
+ .getAttributeColumnValues()[aggColumnsIndexArray[columnIdx]];
+ if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+ if (attribute == null) {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(getTSDataType(schema.getType()),
lastRowTime, null)));
+ } else {
+ columnBuilder.appendNull();
+ }
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastRowTime,
+ new TsPrimitiveType.TsBinary(attribute))));
+ } else {
+ columnBuilder.writeBinary(attribute);
+ }
+ }
+ } else {
+ int measurementIdx =
aggColumnsIndexArray[aggregatorInputChannels.get(channel)];
+ long lastTime =
+
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+
+ // last_by
+ if (attribute == null) {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(getTSDataType(schema.getType()),
lastTime, true, null)));
+ } else {
+ columnBuilder.appendNull();
+ }
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastTime,
+ false,
+ new TsPrimitiveType.TsBinary(attribute))));
+ } else {
+ columnBuilder.writeBinary(attribute);
+ }
+ }
+ }
+ break;
+ case TIME:
+ if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+ // for last(time) aggregation
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastRowTime,
+ new TsPrimitiveType.TsLong(lastRowTime))));
+ } else {
+ columnBuilder.writeTsPrimitiveType(new
TsPrimitiveType.TsLong(lastRowTime));
+ }
+ } else {
+ // for aggregation like last_by(time,s1)
+ int measurementIdx =
aggColumnsIndexArray[aggregatorInputChannels.get(channel + 1)];
+ long lastTime =
+
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+
+ if (aggregator.getStep().isOutputPartial()) {
+ // output: xDataType, yLastTime, xIsNull, xResult
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastTime,
+ false,
+ new TsPrimitiveType.TsLong(lastTime))));
+ } else {
+ columnBuilder.writeTsPrimitiveType(new
TsPrimitiveType.TsLong(lastTime));
+ }
+ }
+ break;
+ case FIELD:
Review Comment:
there is no difference between `LastDescAccumulator` and `LastBy` for FIELD?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java:
##########
@@ -285,11 +295,202 @@ private void buildResultUseLastCache() {
currentHitCacheIndex++;
}
- private void updateLastCacheIfPossible() {
- if (!needUpdateCache) {
- return;
+ private void buildResultUseLastValuesCache() {
+ appendGroupKeysToResult(cachedDeviceEntries, currentHitCacheIndex);
+ TimeValuePair[] currentHitResult =
lastValuesCacheResults.get(currentHitCacheIndex);
+ // there is no problem when the cache result doesn't contain time column,
because we will not
+ // use lastRowTime in later process
+ long lastRowTime = currentHitResult[currentHitResult.length -
1].getTimestamp();
+ int channel = 0;
+ for (int i = 0; i < tableAggregators.size(); i++) {
+ TableAggregator aggregator = tableAggregators.get(i);
+ ColumnBuilder columnBuilder =
resultTsBlockBuilder.getColumnBuilder(groupingKeySize + i);
+ int columnIdx = aggregatorInputChannels.get(channel);
+ ColumnSchema schema = aggColumnSchemas.get(columnIdx);
+ TsTableColumnCategory category = schema.getColumnCategory();
+ switch (category) {
+ case TAG:
+ String id =
+ getNthIdColumnValue(
+ cachedDeviceEntries.get(currentHitCacheIndex),
aggColumnsIndexArray[columnIdx]);
+ if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+ if (id == null) {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(getTSDataType(schema.getType()),
lastRowTime, null)));
+ } else {
+ columnBuilder.appendNull();
+ }
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastRowTime,
+ new TsPrimitiveType.TsBinary(
+ new Binary(id,
TSFileConfig.STRING_CHARSET)))));
+ } else {
+ columnBuilder.writeBinary(new Binary(id,
TSFileConfig.STRING_CHARSET));
+ }
+ }
+ } else {
+ int measurementIdx =
aggColumnsIndexArray[aggregatorInputChannels.get(channel)];
+ long lastTime =
+
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+
+ // last_by
+ if (id == null) {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(getTSDataType(schema.getType()),
lastTime, true, null)));
Review Comment:
same as `TAG`
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java:
##########
@@ -285,11 +295,202 @@ private void buildResultUseLastCache() {
currentHitCacheIndex++;
}
- private void updateLastCacheIfPossible() {
- if (!needUpdateCache) {
- return;
+ private void buildResultUseLastValuesCache() {
+ appendGroupKeysToResult(cachedDeviceEntries, currentHitCacheIndex);
+ TimeValuePair[] currentHitResult =
lastValuesCacheResults.get(currentHitCacheIndex);
+ // there is no problem when the cache result doesn't contain time column,
because we will not
+ // use lastRowTime in later process
+ long lastRowTime = currentHitResult[currentHitResult.length -
1].getTimestamp();
+ int channel = 0;
+ for (int i = 0; i < tableAggregators.size(); i++) {
+ TableAggregator aggregator = tableAggregators.get(i);
+ ColumnBuilder columnBuilder =
resultTsBlockBuilder.getColumnBuilder(groupingKeySize + i);
+ int columnIdx = aggregatorInputChannels.get(channel);
+ ColumnSchema schema = aggColumnSchemas.get(columnIdx);
+ TsTableColumnCategory category = schema.getColumnCategory();
+ switch (category) {
+ case TAG:
+ String id =
+ getNthIdColumnValue(
+ cachedDeviceEntries.get(currentHitCacheIndex),
aggColumnsIndexArray[columnIdx]);
+ if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+ if (id == null) {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(getTSDataType(schema.getType()),
lastRowTime, null)));
Review Comment:
always append null is enough?
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java:
##########
@@ -3024,63 +3040,137 @@ private LastQueryAggTableScanOperator
constructLastQueryAggTableScanOperator(
node.getQualifiedObjectName().getObjectName());
Filter updateTimeFilter =
updateFilterUsingTTL(parameter.getSeriesScanOptions().getGlobalTimeFilter(),
tableTTL);
- for (int i = 0; i < node.getDeviceEntries().size(); i++) {
- Optional<Pair<OptionalLong, TsPrimitiveType[]>> lastByResult =
+ if (isLastRowOptimize) {
+ lastRowCacheResults = new ArrayList<>();
+ for (int i = 0; i < node.getDeviceEntries().size(); i++) {
+ Optional<Pair<OptionalLong, TsPrimitiveType[]>> lastByResult =
+ TableDeviceSchemaCache.getInstance()
+ .getLastRow(
+ node.getQualifiedObjectName().getDatabaseName(),
+ node.getDeviceEntries().get(i).getDeviceID(),
+ "",
+ parameter.getMeasurementColumnNames());
+ boolean allHitCache = true;
+ if (lastByResult.isPresent() &&
lastByResult.get().getLeft().isPresent()) {
+ for (int j = 0; j < lastByResult.get().getRight().length; j++) {
+ TsPrimitiveType tsPrimitiveType = lastByResult.get().getRight()[j];
+ if (tsPrimitiveType == null
+ || (updateTimeFilter != null
+ && !LastQueryUtil.satisfyFilter(
+ updateTimeFilter,
+ new TimeValuePair(
+ lastByResult.get().getLeft().getAsLong(),
tsPrimitiveType)))) {
+ // the process logic is different from tree model which examine
if
+ // `isFilterGtOrGe(seriesScanOptions.getGlobalTimeFilter())`, set
+ // `lastByResult.get().getRight()[j] = EMPTY_PRIMITIVE_TYPE`,
+ // but it should skip in table model
+ allHitCache = false;
+ break;
+ }
+ }
+ } else {
+ allHitCache = false;
+ }
+
+ if (!allHitCache) {
+ DeviceEntry deviceEntry = node.getDeviceEntries().get(i);
+ AlignedFullPath alignedPath =
+ constructAlignedPath(
+ deviceEntry,
+ parameter.getMeasurementColumnNames(),
+ parameter.getMeasurementSchemas(),
+ parameter.getAllSensors());
+ ((DataDriverContext)
context.getDriverContext()).addPath(alignedPath);
+ unCachedDeviceEntries.add(deviceEntry);
+
+ // last cache updateColumns need to put "" as time column
+ String[] updateColumns = new
String[parameter.getMeasurementColumnNames().size() + 1];
+ updateColumns[0] = "";
+ for (int j = 1; j < updateColumns.length; j++) {
+ updateColumns[j] = parameter.getMeasurementColumnNames().get(j -
1);
+ }
TableDeviceSchemaCache.getInstance()
- .getLastRow(
+ .initOrInvalidateLastCache(
node.getQualifiedObjectName().getDatabaseName(),
- node.getDeviceEntries().get(i).getDeviceID(),
- "",
- parameter.getMeasurementColumnNames());
- boolean allHitCache = true;
- if (lastByResult.isPresent() &&
lastByResult.get().getLeft().isPresent()) {
- for (int j = 0; j < lastByResult.get().getRight().length; j++) {
- TsPrimitiveType tsPrimitiveType = lastByResult.get().getRight()[j];
- if (tsPrimitiveType == null
- || (updateTimeFilter != null
- && !LastQueryUtil.satisfyFilter(
- updateTimeFilter,
- new TimeValuePair(
- lastByResult.get().getLeft().getAsLong(),
tsPrimitiveType)))) {
- // the process logic is different from tree model which examine if
- // `isFilterGtOrGe(seriesScanOptions.getGlobalTimeFilter())`, set
- // `lastByResult.get().getRight()[j] = EMPTY_PRIMITIVE_TYPE`,
- // but it should skip in table model
- allHitCache = false;
- break;
- }
+ deviceEntry.getDeviceID(),
+ updateColumns,
+ false);
+ } else {
+ hitCachesIndexes.add(i);
+ lastRowCacheResults.add(lastByResult.get());
+ cachedDeviceEntries.add(node.getDeviceEntries().get(i));
}
+ }
+ } else {
+ // LAST_VALUES optimize
+ lastValuesCacheResults = new ArrayList<>();
+ int measurementSize = parameter.getMeasurementColumnNames().size();
+ boolean needTime =
Review Comment:
add some comments about this `needTime` field, when we need to set this
field to `true`?
Actually, I'm a little bit confused by `parameter.getTimeColumnName() ==
null` since in my first impression, if it's null should mean that we don't need
time column.
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java:
##########
@@ -285,11 +295,202 @@ private void buildResultUseLastCache() {
currentHitCacheIndex++;
}
- private void updateLastCacheIfPossible() {
- if (!needUpdateCache) {
- return;
+ private void buildResultUseLastValuesCache() {
+ appendGroupKeysToResult(cachedDeviceEntries, currentHitCacheIndex);
+ TimeValuePair[] currentHitResult =
lastValuesCacheResults.get(currentHitCacheIndex);
+ // there is no problem when the cache result doesn't contain time column,
because we will not
+ // use lastRowTime in later process
+ long lastRowTime = currentHitResult[currentHitResult.length -
1].getTimestamp();
+ int channel = 0;
+ for (int i = 0; i < tableAggregators.size(); i++) {
+ TableAggregator aggregator = tableAggregators.get(i);
+ ColumnBuilder columnBuilder =
resultTsBlockBuilder.getColumnBuilder(groupingKeySize + i);
+ int columnIdx = aggregatorInputChannels.get(channel);
+ ColumnSchema schema = aggColumnSchemas.get(columnIdx);
+ TsTableColumnCategory category = schema.getColumnCategory();
+ switch (category) {
+ case TAG:
+ String id =
+ getNthIdColumnValue(
+ cachedDeviceEntries.get(currentHitCacheIndex),
aggColumnsIndexArray[columnIdx]);
+ if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+ if (id == null) {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(getTSDataType(schema.getType()),
lastRowTime, null)));
+ } else {
+ columnBuilder.appendNull();
+ }
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
Review Comment:
call the same method as last row
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java:
##########
@@ -3024,63 +3040,137 @@ private LastQueryAggTableScanOperator
constructLastQueryAggTableScanOperator(
node.getQualifiedObjectName().getObjectName());
Filter updateTimeFilter =
updateFilterUsingTTL(parameter.getSeriesScanOptions().getGlobalTimeFilter(),
tableTTL);
- for (int i = 0; i < node.getDeviceEntries().size(); i++) {
- Optional<Pair<OptionalLong, TsPrimitiveType[]>> lastByResult =
+ if (isLastRowOptimize) {
+ lastRowCacheResults = new ArrayList<>();
+ for (int i = 0; i < node.getDeviceEntries().size(); i++) {
+ Optional<Pair<OptionalLong, TsPrimitiveType[]>> lastByResult =
+ TableDeviceSchemaCache.getInstance()
+ .getLastRow(
+ node.getQualifiedObjectName().getDatabaseName(),
+ node.getDeviceEntries().get(i).getDeviceID(),
+ "",
+ parameter.getMeasurementColumnNames());
+ boolean allHitCache = true;
+ if (lastByResult.isPresent() &&
lastByResult.get().getLeft().isPresent()) {
+ for (int j = 0; j < lastByResult.get().getRight().length; j++) {
+ TsPrimitiveType tsPrimitiveType = lastByResult.get().getRight()[j];
+ if (tsPrimitiveType == null
+ || (updateTimeFilter != null
+ && !LastQueryUtil.satisfyFilter(
+ updateTimeFilter,
+ new TimeValuePair(
+ lastByResult.get().getLeft().getAsLong(),
tsPrimitiveType)))) {
+ // the process logic is different from tree model which examine
if
+ // `isFilterGtOrGe(seriesScanOptions.getGlobalTimeFilter())`, set
+ // `lastByResult.get().getRight()[j] = EMPTY_PRIMITIVE_TYPE`,
+ // but it should skip in table model
+ allHitCache = false;
+ break;
+ }
+ }
+ } else {
+ allHitCache = false;
+ }
+
+ if (!allHitCache) {
+ DeviceEntry deviceEntry = node.getDeviceEntries().get(i);
+ AlignedFullPath alignedPath =
+ constructAlignedPath(
+ deviceEntry,
+ parameter.getMeasurementColumnNames(),
+ parameter.getMeasurementSchemas(),
+ parameter.getAllSensors());
+ ((DataDriverContext)
context.getDriverContext()).addPath(alignedPath);
+ unCachedDeviceEntries.add(deviceEntry);
+
+ // last cache updateColumns need to put "" as time column
+ String[] updateColumns = new
String[parameter.getMeasurementColumnNames().size() + 1];
+ updateColumns[0] = "";
+ for (int j = 1; j < updateColumns.length; j++) {
+ updateColumns[j] = parameter.getMeasurementColumnNames().get(j -
1);
+ }
TableDeviceSchemaCache.getInstance()
- .getLastRow(
+ .initOrInvalidateLastCache(
node.getQualifiedObjectName().getDatabaseName(),
- node.getDeviceEntries().get(i).getDeviceID(),
- "",
- parameter.getMeasurementColumnNames());
- boolean allHitCache = true;
- if (lastByResult.isPresent() &&
lastByResult.get().getLeft().isPresent()) {
- for (int j = 0; j < lastByResult.get().getRight().length; j++) {
- TsPrimitiveType tsPrimitiveType = lastByResult.get().getRight()[j];
- if (tsPrimitiveType == null
- || (updateTimeFilter != null
- && !LastQueryUtil.satisfyFilter(
- updateTimeFilter,
- new TimeValuePair(
- lastByResult.get().getLeft().getAsLong(),
tsPrimitiveType)))) {
- // the process logic is different from tree model which examine if
- // `isFilterGtOrGe(seriesScanOptions.getGlobalTimeFilter())`, set
- // `lastByResult.get().getRight()[j] = EMPTY_PRIMITIVE_TYPE`,
- // but it should skip in table model
- allHitCache = false;
- break;
- }
+ deviceEntry.getDeviceID(),
+ updateColumns,
+ false);
+ } else {
+ hitCachesIndexes.add(i);
+ lastRowCacheResults.add(lastByResult.get());
+ cachedDeviceEntries.add(node.getDeviceEntries().get(i));
}
+ }
+ } else {
+ // LAST_VALUES optimize
+ lastValuesCacheResults = new ArrayList<>();
+ int measurementSize = parameter.getMeasurementColumnNames().size();
+ boolean needTime =
+ parameter.getTimeColumnName() == null
+ || parameter.getTableAggregators().stream()
+ .anyMatch(
+ aggregator ->
+ aggregator.getAccumulator() instanceof
LastDescAccumulator
+ && !((LastDescAccumulator)
aggregator.getAccumulator())
+ .isMeasurementColumn());
+ String[] targetColumns;
+
+ if (needTime) {
+ targetColumns = new String[measurementSize + 1];
+ // put time column in the last for convenience of later processing
+ targetColumns[targetColumns.length - 1] = "";
} else {
- allHitCache = false;
+ targetColumns = new String[measurementSize];
}
- if (!allHitCache) {
- DeviceEntry deviceEntry = node.getDeviceEntries().get(i);
- AlignedFullPath alignedPath =
- constructAlignedPath(
- deviceEntry,
- parameter.getMeasurementColumnNames(),
- parameter.getMeasurementSchemas(),
- parameter.getAllSensors());
- ((DataDriverContext) context.getDriverContext()).addPath(alignedPath);
- unCachedDeviceEntries.add(deviceEntry);
+ for (int j = 0; j < measurementSize; j++) {
+ targetColumns[j] = parameter.getMeasurementColumnNames().get(j);
+ }
+
+ for (int i = 0; i < node.getDeviceEntries().size(); i++) {
+ TimeValuePair[] lastResult =
+ TableDeviceSchemaCache.getInstance()
+ .getLastEntries(
+ node.getQualifiedObjectName().getDatabaseName(),
+ node.getDeviceEntries().get(i).getDeviceID(),
+ targetColumns);
+ boolean allHitCache = true;
+ if (lastResult != null) {
+ for (TimeValuePair timeValuePair : lastResult) {
+ if (timeValuePair == null
+ || timeValuePair.getValue() == null
+ || (updateTimeFilter != null
+ && !LastQueryUtil.satisfyFilter(updateTimeFilter,
timeValuePair))) {
Review Comment:
// the process logic should be same with tree model which
examine if
// `isFilterGtOrGe(seriesScanOptions.getGlobalTimeFilter())`,
set
// `lastByResult.get().getRight()[j] = EMPTY_PRIMITIVE_TYPE`,
// but it should skip in table model
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java:
##########
@@ -314,8 +314,7 @@ public TimeValuePair getLastEntry(
}
/**
- * Get the last {@link TimeValuePair}s of given measurements, the
measurements shall never be
- * "time".
+ * Get the last {@link TimeValuePair}s of given measurements, the
measurements shall never be "".
Review Comment:
actually, you pass "" empty string into this method.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]