JackieTien97 commented on code in PR #16103:
URL: https://github.com/apache/iotdb/pull/16103#discussion_r2256159390


##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java:
##########
@@ -285,11 +295,202 @@ private void buildResultUseLastCache() {
     currentHitCacheIndex++;
   }
 
-  private void updateLastCacheIfPossible() {
-    if (!needUpdateCache) {
-      return;
+  private void buildResultUseLastValuesCache() {
+    appendGroupKeysToResult(cachedDeviceEntries, currentHitCacheIndex);
+    TimeValuePair[] currentHitResult = 
lastValuesCacheResults.get(currentHitCacheIndex);
+    // there is no problem when the cache result doesn't contain time column, 
because we will not
+    // use lastRowTime in later process
+    long lastRowTime = currentHitResult[currentHitResult.length - 
1].getTimestamp();
+    int channel = 0;
+    for (int i = 0; i < tableAggregators.size(); i++) {
+      TableAggregator aggregator = tableAggregators.get(i);
+      ColumnBuilder columnBuilder = 
resultTsBlockBuilder.getColumnBuilder(groupingKeySize + i);
+      int columnIdx = aggregatorInputChannels.get(channel);
+      ColumnSchema schema = aggColumnSchemas.get(columnIdx);
+      TsTableColumnCategory category = schema.getColumnCategory();
+      switch (category) {
+        case TAG:
+          String id =
+              getNthIdColumnValue(
+                  cachedDeviceEntries.get(currentHitCacheIndex), 
aggColumnsIndexArray[columnIdx]);
+          if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+            if (id == null) {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(getTSDataType(schema.getType()), 
lastRowTime, null)));

Review Comment:
   won't NPE if you pass null?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java:
##########
@@ -365,12 +566,84 @@ private void updateLastCacheIfPossible() {
     }
   }
 
+  private void updateLastCacheUseLastValuesIfPossible() {
+    int channel = 0;
+    List<String> updateMeasurementList = new ArrayList<>();
+    List<TimeValuePair> updateTimeValuePairList = new ArrayList<>();
+    boolean hasSetLastTime = false;
+    for (TableAggregator tableAggregator : tableAggregators) {
+      ColumnSchema schema = 
aggColumnSchemas.get(aggregatorInputChannels.get(channel));
+
+      switch (schema.getColumnCategory()) {
+        case TIME:
+        case TAG:
+        case ATTRIBUTE:
+          if (!hasSetLastTime && tableAggregator.getAccumulator() instanceof 
LastDescAccumulator) {
+            hasSetLastTime = true;
+            LastDescAccumulator lastAccumulator =
+                (LastDescAccumulator) tableAggregator.getAccumulator();
+            if (lastAccumulator.hasInitResult()) {
+              updateMeasurementList.add("");
+              updateTimeValuePairList.add(
+                  new TimeValuePair(
+                      lastAccumulator.getMaxTime(),
+                      new 
TsPrimitiveType.TsLong(lastAccumulator.getMaxTime())));
+            } else {
+              TABLE_DEVICE_SCHEMA_CACHE.updateLastCacheIfExists(
+                  dbName,
+                  currentDeviceEntry.getDeviceID(),
+                  new String[] {""},
+                  new TimeValuePair[] {EMPTY_TIME_VALUE_PAIR});
+            }

Review Comment:
   Should last row be same as here?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java:
##########
@@ -285,11 +295,202 @@ private void buildResultUseLastCache() {
     currentHitCacheIndex++;
   }
 
-  private void updateLastCacheIfPossible() {
-    if (!needUpdateCache) {
-      return;
+  private void buildResultUseLastValuesCache() {
+    appendGroupKeysToResult(cachedDeviceEntries, currentHitCacheIndex);
+    TimeValuePair[] currentHitResult = 
lastValuesCacheResults.get(currentHitCacheIndex);
+    // there is no problem when the cache result doesn't contain time column, 
because we will not
+    // use lastRowTime in later process
+    long lastRowTime = currentHitResult[currentHitResult.length - 
1].getTimestamp();
+    int channel = 0;
+    for (int i = 0; i < tableAggregators.size(); i++) {
+      TableAggregator aggregator = tableAggregators.get(i);
+      ColumnBuilder columnBuilder = 
resultTsBlockBuilder.getColumnBuilder(groupingKeySize + i);
+      int columnIdx = aggregatorInputChannels.get(channel);
+      ColumnSchema schema = aggColumnSchemas.get(columnIdx);
+      TsTableColumnCategory category = schema.getColumnCategory();
+      switch (category) {
+        case TAG:
+          String id =
+              getNthIdColumnValue(
+                  cachedDeviceEntries.get(currentHitCacheIndex), 
aggColumnsIndexArray[columnIdx]);
+          if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+            if (id == null) {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(getTSDataType(schema.getType()), 
lastRowTime, null)));
+              } else {
+                columnBuilder.appendNull();
+              }
+            } else {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(
+                            getTSDataType(schema.getType()),
+                            lastRowTime,
+                            new TsPrimitiveType.TsBinary(
+                                new Binary(id, 
TSFileConfig.STRING_CHARSET)))));
+              } else {
+                columnBuilder.writeBinary(new Binary(id, 
TSFileConfig.STRING_CHARSET));
+              }
+            }
+          } else {
+            int measurementIdx = 
aggColumnsIndexArray[aggregatorInputChannels.get(channel)];
+            long lastTime =
+                
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+
+            // last_by
+            if (id == null) {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(getTSDataType(schema.getType()), 
lastTime, true, null)));
+              } else {
+                columnBuilder.appendNull();
+              }
+            } else {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(
+                            getTSDataType(schema.getType()),
+                            lastTime,
+                            false,
+                            new TsPrimitiveType.TsBinary(
+                                new Binary(id, 
TSFileConfig.STRING_CHARSET)))));
+              } else {
+                columnBuilder.writeBinary(new Binary(id, 
TSFileConfig.STRING_CHARSET));
+              }
+            }
+          }
+          break;
+        case ATTRIBUTE:
+          Binary attribute =
+              cachedDeviceEntries.get(currentHitCacheIndex)
+                  .getAttributeColumnValues()[aggColumnsIndexArray[columnIdx]];
+          if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+            if (attribute == null) {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(getTSDataType(schema.getType()), 
lastRowTime, null)));
+              } else {
+                columnBuilder.appendNull();
+              }
+            } else {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(
+                            getTSDataType(schema.getType()),
+                            lastRowTime,
+                            new TsPrimitiveType.TsBinary(attribute))));
+              } else {
+                columnBuilder.writeBinary(attribute);
+              }
+            }
+          } else {
+            int measurementIdx = 
aggColumnsIndexArray[aggregatorInputChannels.get(channel)];
+            long lastTime =
+                
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+
+            // last_by
+            if (attribute == null) {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(getTSDataType(schema.getType()), 
lastTime, true, null)));
+              } else {
+                columnBuilder.appendNull();
+              }
+            } else {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(
+                            getTSDataType(schema.getType()),
+                            lastTime,
+                            false,
+                            new TsPrimitiveType.TsBinary(attribute))));
+              } else {
+                columnBuilder.writeBinary(attribute);
+              }
+            }
+          }
+          break;
+        case TIME:
+          if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+            // for last(time) aggregation
+            if (aggregator.getStep().isOutputPartial()) {
+              columnBuilder.writeBinary(
+                  new Binary(
+                      serializeTimeValue(
+                          getTSDataType(schema.getType()),
+                          lastRowTime,
+                          new TsPrimitiveType.TsLong(lastRowTime))));
+            } else {
+              columnBuilder.writeTsPrimitiveType(new 
TsPrimitiveType.TsLong(lastRowTime));
+            }
+          } else {
+            // for aggregation like last_by(time,s1)
+            int measurementIdx = 
aggColumnsIndexArray[aggregatorInputChannels.get(channel + 1)];
+            long lastTime =
+                
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+
+            if (aggregator.getStep().isOutputPartial()) {
+              // output: xDataType, yLastTime, xIsNull, xResult
+              columnBuilder.writeBinary(
+                  new Binary(
+                      serializeTimeValue(
+                          getTSDataType(schema.getType()),
+                          lastTime,
+                          false,
+                          new TsPrimitiveType.TsLong(lastTime))));
+            } else {
+              columnBuilder.writeTsPrimitiveType(new 
TsPrimitiveType.TsLong(lastTime));
+            }
+          }
+          break;
+        case FIELD:
+          int measurementIdx = 
aggColumnsIndexArray[aggregatorInputChannels.get(channel)];
+          long lastTime =
+              
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+          TsPrimitiveType tsPrimitiveType =
+              
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getValue();
+
+          if (tsPrimitiveType == EMPTY_PRIMITIVE_TYPE) {
+            // there is no data for this time series
+            if (aggregator.getStep().isOutputPartial()) {
+              columnBuilder.writeBinary(
+                  new 
Binary(serializeTimeValue(getTSDataType(schema.getType()), lastTime, null)));
+            } else {
+              columnBuilder.appendNull();
+            }
+          } else {
+            if (aggregator.getStep().isOutputPartial()) {
+              columnBuilder.writeBinary(
+                  new Binary(
+                      serializeTimeValue(

Review Comment:
   why here not same as `last row`?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastByAccumulator.java:
##########
@@ -63,14 +63,6 @@ public LastByAccumulator(
     this.xResult = TsPrimitiveType.getByType(xDataType);
   }
 
-  public boolean xIsTimeColumn() {
-    return xIsTimeColumn;
-  }
-
-  public boolean yIsTimeColumn() {
-    return this.yIsTimeColumn;
-  }
-

Review Comment:
   It seems that these two will only be used in sub class? Maybe we should move 
them into subClass



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java:
##########
@@ -285,11 +295,202 @@ private void buildResultUseLastCache() {
     currentHitCacheIndex++;
   }
 
-  private void updateLastCacheIfPossible() {
-    if (!needUpdateCache) {
-      return;
+  private void buildResultUseLastValuesCache() {
+    appendGroupKeysToResult(cachedDeviceEntries, currentHitCacheIndex);
+    TimeValuePair[] currentHitResult = 
lastValuesCacheResults.get(currentHitCacheIndex);
+    // there is no problem when the cache result doesn't contain time column, 
because we will not

Review Comment:
   ```suggestion
       // there is no problem when the cache result doesn't contain time 
column, because if in such case we will not
   ```



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/LastAccumulator.java:
##########
@@ -43,7 +43,7 @@ public class LastAccumulator implements TableAccumulator {
   protected TsPrimitiveType lastValue;
   protected long maxTime = Long.MIN_VALUE;
   protected boolean initResult = false;
-  protected boolean isTimeColumn = false;
+  protected final boolean isTimeColumn;

Review Comment:
   It seems that it will only be used in sub class? Maybe we should move it 
into subClass



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java:
##########
@@ -285,11 +295,202 @@ private void buildResultUseLastCache() {
     currentHitCacheIndex++;
   }
 
-  private void updateLastCacheIfPossible() {
-    if (!needUpdateCache) {
-      return;
+  private void buildResultUseLastValuesCache() {
+    appendGroupKeysToResult(cachedDeviceEntries, currentHitCacheIndex);
+    TimeValuePair[] currentHitResult = 
lastValuesCacheResults.get(currentHitCacheIndex);
+    // there is no problem when the cache result doesn't contain time column, 
because we will not
+    // use lastRowTime in later process
+    long lastRowTime = currentHitResult[currentHitResult.length - 
1].getTimestamp();
+    int channel = 0;
+    for (int i = 0; i < tableAggregators.size(); i++) {
+      TableAggregator aggregator = tableAggregators.get(i);
+      ColumnBuilder columnBuilder = 
resultTsBlockBuilder.getColumnBuilder(groupingKeySize + i);
+      int columnIdx = aggregatorInputChannels.get(channel);
+      ColumnSchema schema = aggColumnSchemas.get(columnIdx);
+      TsTableColumnCategory category = schema.getColumnCategory();
+      switch (category) {
+        case TAG:
+          String id =
+              getNthIdColumnValue(
+                  cachedDeviceEntries.get(currentHitCacheIndex), 
aggColumnsIndexArray[columnIdx]);
+          if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+            if (id == null) {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(getTSDataType(schema.getType()), 
lastRowTime, null)));
+              } else {
+                columnBuilder.appendNull();
+              }
+            } else {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(
+                            getTSDataType(schema.getType()),
+                            lastRowTime,
+                            new TsPrimitiveType.TsBinary(
+                                new Binary(id, 
TSFileConfig.STRING_CHARSET)))));
+              } else {
+                columnBuilder.writeBinary(new Binary(id, 
TSFileConfig.STRING_CHARSET));
+              }
+            }
+          } else {
+            int measurementIdx = 
aggColumnsIndexArray[aggregatorInputChannels.get(channel)];
+            long lastTime =
+                
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+
+            // last_by
+            if (id == null) {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(getTSDataType(schema.getType()), 
lastTime, true, null)));
+              } else {
+                columnBuilder.appendNull();
+              }
+            } else {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(
+                            getTSDataType(schema.getType()),
+                            lastTime,
+                            false,
+                            new TsPrimitiveType.TsBinary(
+                                new Binary(id, 
TSFileConfig.STRING_CHARSET)))));
+              } else {
+                columnBuilder.writeBinary(new Binary(id, 
TSFileConfig.STRING_CHARSET));
+              }
+            }
+          }
+          break;
+        case ATTRIBUTE:
+          Binary attribute =
+              cachedDeviceEntries.get(currentHitCacheIndex)
+                  .getAttributeColumnValues()[aggColumnsIndexArray[columnIdx]];
+          if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+            if (attribute == null) {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(getTSDataType(schema.getType()), 
lastRowTime, null)));
+              } else {
+                columnBuilder.appendNull();
+              }
+            } else {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(
+                            getTSDataType(schema.getType()),
+                            lastRowTime,
+                            new TsPrimitiveType.TsBinary(attribute))));
+              } else {
+                columnBuilder.writeBinary(attribute);
+              }
+            }
+          } else {
+            int measurementIdx = 
aggColumnsIndexArray[aggregatorInputChannels.get(channel)];
+            long lastTime =
+                
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+
+            // last_by
+            if (attribute == null) {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(getTSDataType(schema.getType()), 
lastTime, true, null)));
+              } else {
+                columnBuilder.appendNull();
+              }
+            } else {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(
+                            getTSDataType(schema.getType()),
+                            lastTime,
+                            false,
+                            new TsPrimitiveType.TsBinary(attribute))));
+              } else {
+                columnBuilder.writeBinary(attribute);
+              }
+            }
+          }
+          break;
+        case TIME:
+          if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+            // for last(time) aggregation
+            if (aggregator.getStep().isOutputPartial()) {
+              columnBuilder.writeBinary(
+                  new Binary(
+                      serializeTimeValue(
+                          getTSDataType(schema.getType()),
+                          lastRowTime,
+                          new TsPrimitiveType.TsLong(lastRowTime))));
+            } else {
+              columnBuilder.writeTsPrimitiveType(new 
TsPrimitiveType.TsLong(lastRowTime));
+            }
+          } else {
+            // for aggregation like last_by(time,s1)
+            int measurementIdx = 
aggColumnsIndexArray[aggregatorInputChannels.get(channel + 1)];
+            long lastTime =
+                
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+
+            if (aggregator.getStep().isOutputPartial()) {
+              // output: xDataType, yLastTime, xIsNull, xResult
+              columnBuilder.writeBinary(
+                  new Binary(
+                      serializeTimeValue(
+                          getTSDataType(schema.getType()),
+                          lastTime,
+                          false,
+                          new TsPrimitiveType.TsLong(lastTime))));
+            } else {
+              columnBuilder.writeTsPrimitiveType(new 
TsPrimitiveType.TsLong(lastTime));
+            }
+          }
+          break;
+        case FIELD:

Review Comment:
   there is no difference between `LastDescAccumulator` and `LastBy` for FIELD?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java:
##########
@@ -285,11 +295,202 @@ private void buildResultUseLastCache() {
     currentHitCacheIndex++;
   }
 
-  private void updateLastCacheIfPossible() {
-    if (!needUpdateCache) {
-      return;
+  private void buildResultUseLastValuesCache() {
+    appendGroupKeysToResult(cachedDeviceEntries, currentHitCacheIndex);
+    TimeValuePair[] currentHitResult = 
lastValuesCacheResults.get(currentHitCacheIndex);
+    // there is no problem when the cache result doesn't contain time column, 
because we will not
+    // use lastRowTime in later process
+    long lastRowTime = currentHitResult[currentHitResult.length - 
1].getTimestamp();
+    int channel = 0;
+    for (int i = 0; i < tableAggregators.size(); i++) {
+      TableAggregator aggregator = tableAggregators.get(i);
+      ColumnBuilder columnBuilder = 
resultTsBlockBuilder.getColumnBuilder(groupingKeySize + i);
+      int columnIdx = aggregatorInputChannels.get(channel);
+      ColumnSchema schema = aggColumnSchemas.get(columnIdx);
+      TsTableColumnCategory category = schema.getColumnCategory();
+      switch (category) {
+        case TAG:
+          String id =
+              getNthIdColumnValue(
+                  cachedDeviceEntries.get(currentHitCacheIndex), 
aggColumnsIndexArray[columnIdx]);
+          if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+            if (id == null) {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(getTSDataType(schema.getType()), 
lastRowTime, null)));
+              } else {
+                columnBuilder.appendNull();
+              }
+            } else {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(
+                            getTSDataType(schema.getType()),
+                            lastRowTime,
+                            new TsPrimitiveType.TsBinary(
+                                new Binary(id, 
TSFileConfig.STRING_CHARSET)))));
+              } else {
+                columnBuilder.writeBinary(new Binary(id, 
TSFileConfig.STRING_CHARSET));
+              }
+            }
+          } else {
+            int measurementIdx = 
aggColumnsIndexArray[aggregatorInputChannels.get(channel)];
+            long lastTime =
+                
lastValuesCacheResults.get(currentHitCacheIndex)[measurementIdx].getTimestamp();
+
+            // last_by
+            if (id == null) {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(getTSDataType(schema.getType()), 
lastTime, true, null)));

Review Comment:
   same as `TAG`



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java:
##########
@@ -285,11 +295,202 @@ private void buildResultUseLastCache() {
     currentHitCacheIndex++;
   }
 
-  private void updateLastCacheIfPossible() {
-    if (!needUpdateCache) {
-      return;
+  private void buildResultUseLastValuesCache() {
+    appendGroupKeysToResult(cachedDeviceEntries, currentHitCacheIndex);
+    TimeValuePair[] currentHitResult = 
lastValuesCacheResults.get(currentHitCacheIndex);
+    // there is no problem when the cache result doesn't contain time column, 
because we will not
+    // use lastRowTime in later process
+    long lastRowTime = currentHitResult[currentHitResult.length - 
1].getTimestamp();
+    int channel = 0;
+    for (int i = 0; i < tableAggregators.size(); i++) {
+      TableAggregator aggregator = tableAggregators.get(i);
+      ColumnBuilder columnBuilder = 
resultTsBlockBuilder.getColumnBuilder(groupingKeySize + i);
+      int columnIdx = aggregatorInputChannels.get(channel);
+      ColumnSchema schema = aggColumnSchemas.get(columnIdx);
+      TsTableColumnCategory category = schema.getColumnCategory();
+      switch (category) {
+        case TAG:
+          String id =
+              getNthIdColumnValue(
+                  cachedDeviceEntries.get(currentHitCacheIndex), 
aggColumnsIndexArray[columnIdx]);
+          if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+            if (id == null) {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(getTSDataType(schema.getType()), 
lastRowTime, null)));

Review Comment:
   always append null is enough?



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java:
##########
@@ -3024,63 +3040,137 @@ private LastQueryAggTableScanOperator 
constructLastQueryAggTableScanOperator(
                 node.getQualifiedObjectName().getObjectName());
     Filter updateTimeFilter =
         
updateFilterUsingTTL(parameter.getSeriesScanOptions().getGlobalTimeFilter(), 
tableTTL);
-    for (int i = 0; i < node.getDeviceEntries().size(); i++) {
-      Optional<Pair<OptionalLong, TsPrimitiveType[]>> lastByResult =
+    if (isLastRowOptimize) {
+      lastRowCacheResults = new ArrayList<>();
+      for (int i = 0; i < node.getDeviceEntries().size(); i++) {
+        Optional<Pair<OptionalLong, TsPrimitiveType[]>> lastByResult =
+            TableDeviceSchemaCache.getInstance()
+                .getLastRow(
+                    node.getQualifiedObjectName().getDatabaseName(),
+                    node.getDeviceEntries().get(i).getDeviceID(),
+                    "",
+                    parameter.getMeasurementColumnNames());
+        boolean allHitCache = true;
+        if (lastByResult.isPresent() && 
lastByResult.get().getLeft().isPresent()) {
+          for (int j = 0; j < lastByResult.get().getRight().length; j++) {
+            TsPrimitiveType tsPrimitiveType = lastByResult.get().getRight()[j];
+            if (tsPrimitiveType == null
+                || (updateTimeFilter != null
+                    && !LastQueryUtil.satisfyFilter(
+                        updateTimeFilter,
+                        new TimeValuePair(
+                            lastByResult.get().getLeft().getAsLong(), 
tsPrimitiveType)))) {
+              // the process logic is different from tree model which examine 
if
+              // `isFilterGtOrGe(seriesScanOptions.getGlobalTimeFilter())`, set
+              // `lastByResult.get().getRight()[j] = EMPTY_PRIMITIVE_TYPE`,
+              // but it should skip in table model
+              allHitCache = false;
+              break;
+            }
+          }
+        } else {
+          allHitCache = false;
+        }
+
+        if (!allHitCache) {
+          DeviceEntry deviceEntry = node.getDeviceEntries().get(i);
+          AlignedFullPath alignedPath =
+              constructAlignedPath(
+                  deviceEntry,
+                  parameter.getMeasurementColumnNames(),
+                  parameter.getMeasurementSchemas(),
+                  parameter.getAllSensors());
+          ((DataDriverContext) 
context.getDriverContext()).addPath(alignedPath);
+          unCachedDeviceEntries.add(deviceEntry);
+
+          // last cache updateColumns need to put "" as time column
+          String[] updateColumns = new 
String[parameter.getMeasurementColumnNames().size() + 1];
+          updateColumns[0] = "";
+          for (int j = 1; j < updateColumns.length; j++) {
+            updateColumns[j] = parameter.getMeasurementColumnNames().get(j - 
1);
+          }
           TableDeviceSchemaCache.getInstance()
-              .getLastRow(
+              .initOrInvalidateLastCache(
                   node.getQualifiedObjectName().getDatabaseName(),
-                  node.getDeviceEntries().get(i).getDeviceID(),
-                  "",
-                  parameter.getMeasurementColumnNames());
-      boolean allHitCache = true;
-      if (lastByResult.isPresent() && 
lastByResult.get().getLeft().isPresent()) {
-        for (int j = 0; j < lastByResult.get().getRight().length; j++) {
-          TsPrimitiveType tsPrimitiveType = lastByResult.get().getRight()[j];
-          if (tsPrimitiveType == null
-              || (updateTimeFilter != null
-                  && !LastQueryUtil.satisfyFilter(
-                      updateTimeFilter,
-                      new TimeValuePair(
-                          lastByResult.get().getLeft().getAsLong(), 
tsPrimitiveType)))) {
-            // the process logic is different from tree model which examine if
-            // `isFilterGtOrGe(seriesScanOptions.getGlobalTimeFilter())`, set
-            // `lastByResult.get().getRight()[j] = EMPTY_PRIMITIVE_TYPE`,
-            // but it should skip in table model
-            allHitCache = false;
-            break;
-          }
+                  deviceEntry.getDeviceID(),
+                  updateColumns,
+                  false);
+        } else {
+          hitCachesIndexes.add(i);
+          lastRowCacheResults.add(lastByResult.get());
+          cachedDeviceEntries.add(node.getDeviceEntries().get(i));
         }
+      }
+    } else {
+      // LAST_VALUES optimize
+      lastValuesCacheResults = new ArrayList<>();
+      int measurementSize = parameter.getMeasurementColumnNames().size();
+      boolean needTime =

Review Comment:
   add some comments about this `needTime` field, when we need to set this 
field to `true`?
   
   Actually, I'm a little bit confused by `parameter.getTimeColumnName() == 
null` since in my first impression, if it's null should mean that we don't need 
time column.



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/LastQueryAggTableScanOperator.java:
##########
@@ -285,11 +295,202 @@ private void buildResultUseLastCache() {
     currentHitCacheIndex++;
   }
 
-  private void updateLastCacheIfPossible() {
-    if (!needUpdateCache) {
-      return;
+  private void buildResultUseLastValuesCache() {
+    appendGroupKeysToResult(cachedDeviceEntries, currentHitCacheIndex);
+    TimeValuePair[] currentHitResult = 
lastValuesCacheResults.get(currentHitCacheIndex);
+    // there is no problem when the cache result doesn't contain time column, 
because we will not
+    // use lastRowTime in later process
+    long lastRowTime = currentHitResult[currentHitResult.length - 
1].getTimestamp();
+    int channel = 0;
+    for (int i = 0; i < tableAggregators.size(); i++) {
+      TableAggregator aggregator = tableAggregators.get(i);
+      ColumnBuilder columnBuilder = 
resultTsBlockBuilder.getColumnBuilder(groupingKeySize + i);
+      int columnIdx = aggregatorInputChannels.get(channel);
+      ColumnSchema schema = aggColumnSchemas.get(columnIdx);
+      TsTableColumnCategory category = schema.getColumnCategory();
+      switch (category) {
+        case TAG:
+          String id =
+              getNthIdColumnValue(
+                  cachedDeviceEntries.get(currentHitCacheIndex), 
aggColumnsIndexArray[columnIdx]);
+          if (aggregator.getAccumulator() instanceof LastDescAccumulator) {
+            if (id == null) {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(getTSDataType(schema.getType()), 
lastRowTime, null)));
+              } else {
+                columnBuilder.appendNull();
+              }
+            } else {
+              if (aggregator.getStep().isOutputPartial()) {
+                columnBuilder.writeBinary(
+                    new Binary(
+                        serializeTimeValue(

Review Comment:
   call the same method as last row



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java:
##########
@@ -3024,63 +3040,137 @@ private LastQueryAggTableScanOperator 
constructLastQueryAggTableScanOperator(
                 node.getQualifiedObjectName().getObjectName());
     Filter updateTimeFilter =
         
updateFilterUsingTTL(parameter.getSeriesScanOptions().getGlobalTimeFilter(), 
tableTTL);
-    for (int i = 0; i < node.getDeviceEntries().size(); i++) {
-      Optional<Pair<OptionalLong, TsPrimitiveType[]>> lastByResult =
+    if (isLastRowOptimize) {
+      lastRowCacheResults = new ArrayList<>();
+      for (int i = 0; i < node.getDeviceEntries().size(); i++) {
+        Optional<Pair<OptionalLong, TsPrimitiveType[]>> lastByResult =
+            TableDeviceSchemaCache.getInstance()
+                .getLastRow(
+                    node.getQualifiedObjectName().getDatabaseName(),
+                    node.getDeviceEntries().get(i).getDeviceID(),
+                    "",
+                    parameter.getMeasurementColumnNames());
+        boolean allHitCache = true;
+        if (lastByResult.isPresent() && 
lastByResult.get().getLeft().isPresent()) {
+          for (int j = 0; j < lastByResult.get().getRight().length; j++) {
+            TsPrimitiveType tsPrimitiveType = lastByResult.get().getRight()[j];
+            if (tsPrimitiveType == null
+                || (updateTimeFilter != null
+                    && !LastQueryUtil.satisfyFilter(
+                        updateTimeFilter,
+                        new TimeValuePair(
+                            lastByResult.get().getLeft().getAsLong(), 
tsPrimitiveType)))) {
+              // the process logic is different from tree model which examine 
if
+              // `isFilterGtOrGe(seriesScanOptions.getGlobalTimeFilter())`, set
+              // `lastByResult.get().getRight()[j] = EMPTY_PRIMITIVE_TYPE`,
+              // but it should skip in table model
+              allHitCache = false;
+              break;
+            }
+          }
+        } else {
+          allHitCache = false;
+        }
+
+        if (!allHitCache) {
+          DeviceEntry deviceEntry = node.getDeviceEntries().get(i);
+          AlignedFullPath alignedPath =
+              constructAlignedPath(
+                  deviceEntry,
+                  parameter.getMeasurementColumnNames(),
+                  parameter.getMeasurementSchemas(),
+                  parameter.getAllSensors());
+          ((DataDriverContext) 
context.getDriverContext()).addPath(alignedPath);
+          unCachedDeviceEntries.add(deviceEntry);
+
+          // last cache updateColumns need to put "" as time column
+          String[] updateColumns = new 
String[parameter.getMeasurementColumnNames().size() + 1];
+          updateColumns[0] = "";
+          for (int j = 1; j < updateColumns.length; j++) {
+            updateColumns[j] = parameter.getMeasurementColumnNames().get(j - 
1);
+          }
           TableDeviceSchemaCache.getInstance()
-              .getLastRow(
+              .initOrInvalidateLastCache(
                   node.getQualifiedObjectName().getDatabaseName(),
-                  node.getDeviceEntries().get(i).getDeviceID(),
-                  "",
-                  parameter.getMeasurementColumnNames());
-      boolean allHitCache = true;
-      if (lastByResult.isPresent() && 
lastByResult.get().getLeft().isPresent()) {
-        for (int j = 0; j < lastByResult.get().getRight().length; j++) {
-          TsPrimitiveType tsPrimitiveType = lastByResult.get().getRight()[j];
-          if (tsPrimitiveType == null
-              || (updateTimeFilter != null
-                  && !LastQueryUtil.satisfyFilter(
-                      updateTimeFilter,
-                      new TimeValuePair(
-                          lastByResult.get().getLeft().getAsLong(), 
tsPrimitiveType)))) {
-            // the process logic is different from tree model which examine if
-            // `isFilterGtOrGe(seriesScanOptions.getGlobalTimeFilter())`, set
-            // `lastByResult.get().getRight()[j] = EMPTY_PRIMITIVE_TYPE`,
-            // but it should skip in table model
-            allHitCache = false;
-            break;
-          }
+                  deviceEntry.getDeviceID(),
+                  updateColumns,
+                  false);
+        } else {
+          hitCachesIndexes.add(i);
+          lastRowCacheResults.add(lastByResult.get());
+          cachedDeviceEntries.add(node.getDeviceEntries().get(i));
         }
+      }
+    } else {
+      // LAST_VALUES optimize
+      lastValuesCacheResults = new ArrayList<>();
+      int measurementSize = parameter.getMeasurementColumnNames().size();
+      boolean needTime =
+          parameter.getTimeColumnName() == null
+              || parameter.getTableAggregators().stream()
+                  .anyMatch(
+                      aggregator ->
+                          aggregator.getAccumulator() instanceof 
LastDescAccumulator
+                              && !((LastDescAccumulator) 
aggregator.getAccumulator())
+                                  .isMeasurementColumn());
+      String[] targetColumns;
+
+      if (needTime) {
+        targetColumns = new String[measurementSize + 1];
+        // put time column in the last for convenience of later processing
+        targetColumns[targetColumns.length - 1] = "";
       } else {
-        allHitCache = false;
+        targetColumns = new String[measurementSize];
       }
 
-      if (!allHitCache) {
-        DeviceEntry deviceEntry = node.getDeviceEntries().get(i);
-        AlignedFullPath alignedPath =
-            constructAlignedPath(
-                deviceEntry,
-                parameter.getMeasurementColumnNames(),
-                parameter.getMeasurementSchemas(),
-                parameter.getAllSensors());
-        ((DataDriverContext) context.getDriverContext()).addPath(alignedPath);
-        unCachedDeviceEntries.add(deviceEntry);
+      for (int j = 0; j < measurementSize; j++) {
+        targetColumns[j] = parameter.getMeasurementColumnNames().get(j);
+      }
+
+      for (int i = 0; i < node.getDeviceEntries().size(); i++) {
+        TimeValuePair[] lastResult =
+            TableDeviceSchemaCache.getInstance()
+                .getLastEntries(
+                    node.getQualifiedObjectName().getDatabaseName(),
+                    node.getDeviceEntries().get(i).getDeviceID(),
+                    targetColumns);
+        boolean allHitCache = true;
+        if (lastResult != null) {
+          for (TimeValuePair timeValuePair : lastResult) {
+            if (timeValuePair == null
+                || timeValuePair.getValue() == null
+                || (updateTimeFilter != null
+                    && !LastQueryUtil.satisfyFilter(updateTimeFilter, 
timeValuePair))) {

Review Comment:
                 // the process logic should be same with tree model which 
examine if
                 // `isFilterGtOrGe(seriesScanOptions.getGlobalTimeFilter())`, 
set
                 // `lastByResult.get().getRight()[j] = EMPTY_PRIMITIVE_TYPE`,
                 // but it should skip in table model



##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/metadata/fetcher/cache/TableDeviceSchemaCache.java:
##########
@@ -314,8 +314,7 @@ public TimeValuePair getLastEntry(
   }
 
   /**
-   * Get the last {@link TimeValuePair}s of given measurements, the 
measurements shall never be
-   * "time".
+   * Get the last {@link TimeValuePair}s of given measurements, the 
measurements shall never be "".

Review Comment:
   actually, you pass "" empty string into this method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to