This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch last_cache in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 6479ad912d3b7a9a1a671081526dfc1b9a600bf0 Author: Beyyes <[email protected]> AuthorDate: Fri Oct 25 16:06:11 2024 +0800 fix it --- .../db/it/IoTDBMultiIDsWithAttributesTableIT.java | 9 +- .../TableAggregationTableScanOperator.java | 117 +++++++++++---------- .../aggregation/TableModeAccumulator.java | 8 +- .../plan/planner/TableOperatorGenerator.java | 28 ----- 4 files changed, 68 insertions(+), 94 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java index cc87922666d..c97c0fcab50 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java @@ -615,7 +615,7 @@ public class IoTDBMultiIDsWithAttributesTableIT { retArray, DATABASE_NAME); - expectedHeader = buildHeaders(2); + expectedHeader = buildHeaders(3); sql = "select count(*),count(t1),sum(t1) from (select avg(num+1) as t1 from table0 where time < 0)"; retArray = @@ -908,15 +908,16 @@ public class IoTDBMultiIDsWithAttributesTableIT { tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); // flush multi times, generated multi tsfile - expectedHeader = buildHeaders(1); - sql = "select date_bin(40ms,time), first(time) from table1 where device='d11' group by 1"; + expectedHeader = buildHeaders(2); + sql = "select date_bin(30ms,time), first(time) from table1 where device='d11' group by 1"; retArray = new String[] { "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.000Z,", - "1970-01-01T00:00:00.000Z,1970-01-01T00:00:00.000Z," + "1970-01-01T00:00:00.030Z,1970-01-01T00:00:00.030Z," }; tableResultSetEqualTest(sql, expectedHeader, retArray, DATABASE_NAME); + expectedHeader = buildHeaders(1); sql = "select count(*) from (" + "select device, level, date_bin(1d, time) as bin, " diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java index e5b3e2b1b3f..fe086fd9e13 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/TableAggregationTableScanOperator.java @@ -24,7 +24,7 @@ import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper; import org.apache.iotdb.db.queryengine.execution.aggregation.timerangeiterator.ITableTimeRangeIterator; import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext; -import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSeriesAggregationScanOperator; +import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractDataSourceOperator; import org.apache.iotdb.db.queryengine.execution.operator.source.AlignedSeriesScanUtil; import org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.TableAggregator; import org.apache.iotdb.db.queryengine.execution.operator.window.IWindow; @@ -39,6 +39,7 @@ import org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource; import org.apache.tsfile.block.column.Column; import org.apache.tsfile.block.column.ColumnBuilder; +import org.apache.tsfile.common.conf.TSFileDescriptor; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.file.metadata.StringArrayDeviceID; import org.apache.tsfile.file.metadata.statistics.Statistics; @@ -67,53 +68,54 @@ import static org.apache.iotdb.db.queryengine.execution.operator.AggregationUtil import static org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator.constructAlignedPath; import static org.apache.tsfile.read.common.block.TsBlockUtil.skipPointsOutOfTimeRange; -public class TableAggregationTableScanOperator extends AbstractSeriesAggregationScanOperator { +public class TableAggregationTableScanOperator extends AbstractDataSourceOperator { + + public static final LongColumn TIME_COLUMN_TEMPLATE = + new LongColumn(1, Optional.empty(), new long[] {0}); private static final long INSTANCE_SIZE = RamUsageEstimator.shallowSizeOfInstance(TableAggregationTableScanOperator.class); + // can not calc maxTsBlockLineNum using date_bin + private final int maxTsBlockLineNum = + TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(); + private final long cachedRawDataSize; - private final List<TableAggregator> tableAggregators; + private boolean finished = false; + private final List<TableAggregator> tableAggregators; private final List<ColumnSchema> groupingKeySchemas; private final int[] groupingKeyIndex; - - public static final LongColumn TIME_COLUMN_TEMPLATE = - new LongColumn(1, Optional.empty(), new long[] {0}); + // for different aggregations aiming to same column, use this variable to point to same column + private final List<Integer> aggArguments; private final List<ColumnSchema> columnSchemas; - private final int[] columnsIndexArray; private final List<DeviceEntry> deviceEntries; - private final int deviceCount; - - private final Ordering scanOrder; - private final SeriesScanOptions seriesScanOptions; + private final int measurementCount; + private int currentDeviceIndex; private final List<String> measurementColumnNames; - private final List<IMeasurementSchema> measurementSchemas; - private final List<TSDataType> measurementColumnTSDataTypes; - // TODO calc maxTsBlockLineNum using date_bin - private final int maxTsBlockLineNum; - - // for different aggregations aiming to same column, use this variable to point to same column - private final List<Integer> aggArguments; - + private final Ordering scanOrder; + private final SeriesScanOptions seriesScanOptions; private QueryDataSource queryDataSource; - private int currentDeviceIndex; - - ITableTimeRangeIterator timeIterator; + private final ITableTimeRangeIterator timeIterator; + private final boolean canUseStatistics; + private final boolean ascending; private boolean allAggregatorsHasFinalResult = false; + private long leftRuntimeOfOneNextCall; + + private TsBlock inputTsBlock; public TableAggregationTableScanOperator( PlanNodeId sourceId, - OperatorContext context, + OperatorContext operatorContext, List<ColumnSchema> columnSchemas, int[] columnsIndexArray, List<DeviceEntry> deviceEntries, @@ -121,36 +123,25 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation SeriesScanOptions seriesScanOptions, List<String> measurementColumnNames, List<IMeasurementSchema> measurementSchemas, - int maxTsBlockLineNum, int measurementCount, List<TableAggregator> tableAggregators, List<ColumnSchema> groupingKeySchemas, int[] groupingKeyIndex, ITableTimeRangeIterator tableTimeRangeIterator, boolean ascending, - long maxReturnSize, boolean canUseStatistics, List<Integer> aggArguments) { - super( - sourceId, - context, - null, - measurementCount, - null, - null, - ascending, - false, - null, - maxReturnSize, - canUseStatistics); + this.sourceId = sourceId; + this.operatorContext = operatorContext; + this.canUseStatistics = canUseStatistics; + this.ascending = ascending; + this.measurementCount = measurementCount; this.tableAggregators = tableAggregators; this.groupingKeySchemas = groupingKeySchemas; this.groupingKeyIndex = groupingKeyIndex; - this.sourceId = sourceId; - this.operatorContext = context; this.columnSchemas = columnSchemas; this.columnsIndexArray = columnsIndexArray; this.deviceEntries = deviceEntries; @@ -164,13 +155,10 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation this.currentDeviceIndex = 0; this.aggArguments = aggArguments; this.timeIterator = tableTimeRangeIterator; - if (tableTimeRangeIterator.getType() - == ITableTimeRangeIterator.TimeIteratorType.SINGLE_TIME_ITERATOR) { - curTimeRange = new TimeRange(Long.MIN_VALUE, Long.MAX_VALUE); - } - this.maxReturnSize = maxReturnSize; - this.maxTsBlockLineNum = maxTsBlockLineNum; + this.maxReturnSize = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); + this.cachedRawDataSize = + (1L + measurementCount) * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte(); constructAlignedSeriesScanUtil(); } @@ -279,7 +267,7 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation } /** Return true if we have the result of this timeRange. */ - @Override + // @Override protected boolean calculateAggregationResultForCurrentTimeRange() { try { if (calcFromCachedData()) { @@ -342,14 +330,14 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation } } - @Override + // @Override protected void updateResultTsBlock() { appendAggregationResult(resultTsBlockBuilder, tableAggregators); // after appendAggregationResult invoked, aggregators must be cleared resetTableAggregators(); } - @Override + // @Override protected boolean calcFromCachedData() { return calcUsingRawData(inputTsBlock); } @@ -552,7 +540,7 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation } @SuppressWarnings({"squid:S3776", "squid:S135", "squid:S3740"}) - @Override + // @Override public boolean readAndCalcFromFile() throws IOException { // start stopwatch long start = System.nanoTime(); @@ -575,8 +563,8 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation if (timeIterator .getCurTimeRange() .contains(fileTimeStatistics.getStartTime(), fileTimeStatistics.getEndTime())) { - Statistics[] statisticsList = new Statistics[subSensorSize]; - for (int i = 0; i < subSensorSize; i++) { + Statistics[] statisticsList = new Statistics[measurementCount]; + for (int i = 0; i < measurementCount; i++) { statisticsList[i] = seriesScanUtil.currentFileStatistics(i); } calcFromStatistics(fileTimeStatistics, statisticsList); @@ -622,8 +610,8 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation .getCurTimeRange() .contains(chunkTimeStatistics.getStartTime(), chunkTimeStatistics.getEndTime())) { // calc from chunkMetaData - Statistics[] statisticsList = new Statistics[subSensorSize]; - for (int i = 0; i < subSensorSize; i++) { + Statistics[] statisticsList = new Statistics[measurementCount]; + for (int i = 0; i < measurementCount; i++) { statisticsList[i] = seriesScanUtil.currentChunkStatistics(i); } calcFromStatistics(chunkTimeStatistics, statisticsList); @@ -644,8 +632,6 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation return false; } - long leftRuntimeOfOneNextCall = Long.MAX_VALUE; - @SuppressWarnings({"squid:S3776", "squid:S135", "squid:S3740"}) protected boolean readAndCalcFromPage() throws IOException { long start = System.nanoTime(); @@ -671,8 +657,8 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation if (timeIterator .getCurTimeRange() .contains(pageTimeStatistics.getStartTime(), pageTimeStatistics.getEndTime())) { - Statistics[] statisticsList = new Statistics[subSensorSize]; - for (int i = 0; i < subSensorSize; i++) { + Statistics[] statisticsList = new Statistics[measurementCount]; + for (int i = 0; i < measurementCount; i++) { statisticsList[i] = seriesScanUtil.currentPageStatistics(i); } calcFromStatistics(pageTimeStatistics, statisticsList); @@ -863,4 +849,21 @@ public class TableAggregationTableScanOperator extends AbstractSeriesAggregation + (resultTsBlockBuilder == null ? 0 : resultTsBlockBuilder.getRetainedSizeInBytes()) + RamUsageEstimator.sizeOfCollection(deviceEntries); } + + @Override + public long calculateMaxPeekMemory() { + return 0; + } + + @Override + public long calculateMaxReturnSize() { + return 0; + } + + @Override + public long calculateRetainedSizeAfterCallingNext() { + return timeIterator.getType() == ITableTimeRangeIterator.TimeIteratorType.DATE_BIN_TIME_ITERATOR + ? cachedRawDataSize + : 0; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableModeAccumulator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableModeAccumulator.java index 1ad01095fac..38d7c2372c2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableModeAccumulator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/TableModeAccumulator.java @@ -466,9 +466,7 @@ public class TableModeAccumulator implements TableAccumulator { for (int i = 0; i < column.getPositionCount(); i++) { if (!column.isNull(i)) { booleanCountMap.compute(column.getBoolean(i), (k, v) -> v == null ? 1 : v + 1); - if (booleanCountMap.size() > MAP_SIZE_THRESHOLD) { - checkMapSize(booleanCountMap.size()); - } + checkMapSize(booleanCountMap.size()); } else { nullCount++; } @@ -531,11 +529,11 @@ public class TableModeAccumulator implements TableAccumulator { } private void checkMapSize(int size) { - if (size > MAP_SIZE_THRESHOLD) { + if (size > IoTDBDescriptor.getInstance().getConfig().getModeMapSizeThreshold()) { throw new RuntimeException( String.format( "distinct values has exceeded the threshold %s when calculate Mode", - MAP_SIZE_THRESHOLD)); + IoTDBDescriptor.getInstance().getConfig().getModeMapSizeThreshold())); } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 56d6868a24b..bcf320fe76d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -1643,14 +1643,12 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution scanOptionsBuilder.build(), measurementColumnNames, measurementSchemas, - TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(), measurementColumnCount, aggregators, groupingKeySchemas, groupingKeyIndex, timeRangeIterator, scanAscending, - calculateMaxAggregationResultSize(), canUseStatistic, aggColumnIndexes); @@ -1734,30 +1732,4 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution } return new boolean[] {canUseStatistic, isAscending}; } - - public static long calculateMaxAggregationResultSize( - // List<? extends AggregationDescriptor> aggregationDescriptors, - // ITimeRangeIterator timeRangeIterator - ) { - // TODO perfect max aggregation result size logic - return TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(); - - // long timeValueColumnsSizePerLine = TimeColumn.SIZE_IN_BYTES_PER_POSITION; - // for (AggregationDescriptor descriptor : aggregationDescriptors) { - // List<TSDataType> outPutDataTypes = - // descriptor.getOutputColumnNames().stream() - // .map(typeProvider::getTableModelType) - // .collect(Collectors.toList()); - // for (TSDataType tsDataType : outPutDataTypes) { - // timeValueColumnsSizePerLine += getOutputColumnSizePerLine(tsDataType); - // } - // } - // - // return Math.min( - // TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes(), - // Math.min( - // TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber(), - // timeRangeIterator.getTotalIntervalNum()) - // * timeValueColumnsSizePerLine); - } }
