JackieTien97 commented on code in PR #16103:
URL: https://github.com/apache/iotdb/pull/16103#discussion_r2256207749
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java:
##########
@@ -285,11 +295,202 @@ private void buildResultUseLastCache() {
currentHitCacheIndex++;
}
- private void updateLastCacheIfPossible() {
- if (!needUpdateCache) {
- return;
+ private void buildResultUseLastValuesCache() {
+ appendGroupKeysToResult(cachedDeviceEntries, currentHitCacheIndex);
+ TimeValuePair[] currentHitResult =
lastValuesCacheResults.get(currentHitCacheIndex);
+ // there is no problem when the cache result doesn't contain time column,
because we will not
+ // use lastRowTime in later process
+ long lastRowTime = currentHitResult[currentHitResult.length -
1].getTimestamp();
+ int channel = 0;
+ for (int i = 0; i < tableAggregators.size(); i++) {
+ TableAggregator aggregator = tableAggregators.get(i);
+ ColumnBuilder columnBuilder =
resultTsBlockBuilder.getColumnBuilder(groupingKeySize + i);
+ int columnIdx = aggregatorInputChannels.get(channel);
+ ColumnSchema schema = aggColumnSchemas.get(columnIdx);
+ TsTableColumnCategory category = schema.getColumnCategory();
+ switch (category) {
+ case TAG:
+ String id =
+ getNthIdColumnValue(
+ cachedDeviceEntries.get(currentHitCacheIndex),
aggColumnsIndexArray[columnIdx]);
+ if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+ if (id == null) {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(getTSDataType(schema.getType()),
lastRowTime, null)));
+ } else {
+ columnBuilder.appendNull();
+ }
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastRowTime,
+ new TsPrimitiveType.TsBinary(
+ new Binary(id,
TSFileConfig.STRING_CHARSET)))));
+ } else {
+ columnBuilder.writeBinary(new Binary(id,
TSFileConfig.STRING_CHARSET));
+ }
+ }
+ } else {
+ int measurementIdx =
aggColumnsIndexArray[aggregatorInputChannels.get(channel)];
+ long lastTime =
+
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+
+ // last_by
+ if (id == null) {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(getTSDataType(schema.getType()),
lastTime, true, null)));
+ } else {
+ columnBuilder.appendNull();
+ }
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastTime,
+ false,
+ new TsPrimitiveType.TsBinary(
+ new Binary(id,
TSFileConfig.STRING_CHARSET)))));
+ } else {
+ columnBuilder.writeBinary(new Binary(id,
TSFileConfig.STRING_CHARSET));
+ }
+ }
+ }
+ break;
+ case ATTRIBUTE:
+ Binary attribute =
+ cachedDeviceEntries.get(currentHitCacheIndex)
+ .getAttributeColumnValues()[aggColumnsIndexArray[columnIdx]];
+ if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+ if (attribute == null) {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(getTSDataType(schema.getType()),
lastRowTime, null)));
+ } else {
+ columnBuilder.appendNull();
+ }
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastRowTime,
+ new TsPrimitiveType.TsBinary(attribute))));
+ } else {
+ columnBuilder.writeBinary(attribute);
+ }
+ }
+ } else {
+ int measurementIdx =
aggColumnsIndexArray[aggregatorInputChannels.get(channel)];
+ long lastTime =
+
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+
+ // last_by
+ if (attribute == null) {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(getTSDataType(schema.getType()),
lastTime, true, null)));
+ } else {
+ columnBuilder.appendNull();
+ }
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastTime,
+ false,
+ new TsPrimitiveType.TsBinary(attribute))));
+ } else {
+ columnBuilder.writeBinary(attribute);
+ }
+ }
+ }
+ break;
+ case TIME:
+ if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+ // for last(time) aggregation
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastRowTime,
+ new TsPrimitiveType.TsLong(lastRowTime))));
+ } else {
+ columnBuilder.writeTsPrimitiveType(new
TsPrimitiveType.TsLong(lastRowTime));
+ }
+ } else {
+ // for aggregation like last_by(time,s1)
+ int measurementIdx =
aggColumnsIndexArray[aggregatorInputChannels.get(channel + 1)];
+ long lastTime =
+
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+
+ if (aggregator.getStep().isOutputPartial()) {
+ // output: xDataType, yLastTime, xIsNull, xResult
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
+ getTSDataType(schema.getType()),
+ lastTime,
+ false,
+ new TsPrimitiveType.TsLong(lastTime))));
+ } else {
+ columnBuilder.writeTsPrimitiveType(new
TsPrimitiveType.TsLong(lastTime));
+ }
+ }
+ break;
+ case FIELD:
+ int measurementIdx =
aggColumnsIndexArray[aggregatorInputChannels.get(channel)];
+ long lastTime =
+
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+ TsPrimitiveType tsPrimitiveType =
+
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getValue();
+
+ if (tsPrimitiveType == EMPTY_PRIMITIVE_TYPE) {
+ // there is no data for this time series
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new
Binary(serializeTimeValue(getTSDataType(schema.getType()), lastTime, null)));
+ } else {
+ columnBuilder.appendNull();
+ }
+ } else {
+ if (aggregator.getStep().isOutputPartial()) {
+ columnBuilder.writeBinary(
+ new Binary(
+ serializeTimeValue(
Review Comment:
why here not same as `last row`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]