This is an automated email from the ASF dual-hosted git repository. hui pushed a commit to branch lmh/AggOpMemoryControl in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit eb0ff4978c70109c9c80fde8fd46071ccd63f913 Author: Minghui Liu <[email protected]> AuthorDate: Wed Aug 10 23:14:41 2022 +0800 getMaxBinarySizeInBytes by empty stats impl --- .../iotdb/db/metadata/LocalSchemaProcessor.java | 6 +- .../db/metadata/rescon/SchemaResourceManager.java | 4 +- ...tatistics.java => SchemaStatisticsManager.java} | 12 ++-- .../schemaregion/SchemaRegionMemoryImpl.java | 12 ++-- .../schemaregion/SchemaRegionSchemaFileImpl.java | 12 ++-- .../iotdb/db/mpp/statistics/StatisticsManager.java | 46 +++++++++++++++ .../iotdb/db/mpp/statistics/TimeseriesStats.java | 24 ++++++++ .../mpp/execution/operator/OperatorMemoryTest.java | 65 ++++++++++++++++++++++ 8 files changed, 158 insertions(+), 23 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java index 32518a6239..fafd13ddd9 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/LocalSchemaProcessor.java @@ -37,7 +37,7 @@ import org.apache.iotdb.db.metadata.mnode.IMNode; import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode; import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode; import org.apache.iotdb.db.metadata.path.MeasurementPath; -import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics; +import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager; import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion; import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine; import org.apache.iotdb.db.metadata.template.Template; @@ -466,7 +466,7 @@ public class LocalSchemaProcessor { // todo this is for test assistance, refactor this to support massive timeseries if (pathPattern.getFullPath().equals("root.**") && TemplateManager.getInstance().getAllTemplateName().isEmpty()) { - return (int) TimeseriesStatistics.getInstance().getTotalSeriesNumber(); + return (int) SchemaStatisticsManager.getInstance().getTotalSeriesNumber(); } int count = 0; for (ISchemaRegion schemaRegion : getInvolvedSchemaRegions(pathPattern, isPrefixMatch)) { @@ -1380,7 +1380,7 @@ public class LocalSchemaProcessor { @TestOnly public long getTotalSeriesNumber() { - return TimeseriesStatistics.getInstance().getTotalSeriesNumber(); + return SchemaStatisticsManager.getInstance().getTotalSeriesNumber(); } /** diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java index 0fe18ac955..c5df8ff9a7 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaResourceManager.java @@ -30,7 +30,7 @@ public class SchemaResourceManager { private SchemaResourceManager() {} public static void initSchemaResource() { - TimeseriesStatistics.getInstance().init(); + SchemaStatisticsManager.getInstance().init(); MemoryStatistics.getInstance().init(); if (IoTDBDescriptor.getInstance() .getConfig() @@ -41,7 +41,7 @@ public class SchemaResourceManager { } public static void clearSchemaResource() { - TimeseriesStatistics.getInstance().clear(); + SchemaStatisticsManager.getInstance().clear(); MemoryStatistics.getInstance().clear(); if (IoTDBDescriptor.getInstance() .getConfig() diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java similarity index 87% rename from server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java rename to server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java index 097d1accce..f79e3f7a20 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/rescon/TimeseriesStatistics.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/rescon/SchemaStatisticsManager.java @@ -26,22 +26,22 @@ import org.apache.iotdb.metrics.utils.MetricLevel; import java.util.concurrent.atomic.AtomicLong; -public class TimeseriesStatistics { +public class SchemaStatisticsManager { private final AtomicLong totalSeriesNumber = new AtomicLong(); - private static class TimeseriesStatisticsHolder { + private static class SchemaStatisticsHolder { - private TimeseriesStatisticsHolder() { + private SchemaStatisticsHolder() { // allowed to do nothing } - private static final TimeseriesStatistics INSTANCE = new TimeseriesStatistics(); + private static final SchemaStatisticsManager INSTANCE = new SchemaStatisticsManager(); } /** we should not use this function in other place, but only in IoTDB class */ - public static TimeseriesStatistics getInstance() { - return TimeseriesStatisticsHolder.INSTANCE; + public static SchemaStatisticsManager getInstance() { + return SchemaStatisticsHolder.INSTANCE; } public void init() { diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java index 78754414eb..1cef2e826e 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionMemoryImpl.java @@ -52,7 +52,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGMemoryImpl; import org.apache.iotdb.db.metadata.path.MeasurementPath; import org.apache.iotdb.db.metadata.rescon.MemoryStatistics; -import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics; +import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager; import org.apache.iotdb.db.metadata.tag.TagManager; import org.apache.iotdb.db.metadata.template.Template; import org.apache.iotdb.db.metadata.template.TemplateManager; @@ -165,7 +165,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { private boolean usingMLog = true; private MLogWriter logWriter; - private TimeseriesStatistics timeseriesStatistics = TimeseriesStatistics.getInstance(); + private SchemaStatisticsManager schemaStatisticsManager = SchemaStatisticsManager.getInstance(); private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance(); private final IStorageGroupMNode storageGroupMNode; @@ -451,7 +451,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { // collect all the LeafMNode in this schema region List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode(); - timeseriesStatistics.deleteTimeseries(leafMNodes.size()); + schemaStatisticsManager.deleteTimeseries(leafMNodes.size()); // drop triggers with no exceptions TriggerEngine.drop(leafMNodes); @@ -598,7 +598,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { mNodeCache.invalidate(path.getDevicePath()); // update statistics and schemaDataTypeNumMap - timeseriesStatistics.addTimeseries(1); + schemaStatisticsManager.addTimeseries(1); // update tag index if (offset != -1 && isRecovering) { @@ -715,7 +715,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { mNodeCache.invalidate(prefixPath); // update statistics and schemaDataTypeNumMap - timeseriesStatistics.addTimeseries(plan.getMeasurements().size()); + schemaStatisticsManager.addTimeseries(plan.getMeasurements().size()); List<Long> tagOffsets = plan.getTagOffsets(); for (int i = 0; i < measurements.size(); i++) { @@ -857,7 +857,7 @@ public class SchemaRegionMemoryImpl implements ISchemaRegion { mNodeCache.invalidate(node.getPartialPath()); - timeseriesStatistics.deleteTimeseries(1); + schemaStatisticsManager.deleteTimeseries(1); return storageGroupPath; } // endregion diff --git a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java index a49e3767de..0602891c67 100644 --- a/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java +++ b/server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/SchemaRegionSchemaFileImpl.java @@ -50,7 +50,7 @@ import org.apache.iotdb.db.metadata.mnode.MeasurementMNode; import org.apache.iotdb.db.metadata.mtree.MTreeBelowSGCachedImpl; import org.apache.iotdb.db.metadata.path.MeasurementPath; import org.apache.iotdb.db.metadata.rescon.MemoryStatistics; -import org.apache.iotdb.db.metadata.rescon.TimeseriesStatistics; +import org.apache.iotdb.db.metadata.rescon.SchemaStatisticsManager; import org.apache.iotdb.db.metadata.tag.TagManager; import org.apache.iotdb.db.metadata.template.Template; import org.apache.iotdb.db.metadata.template.TemplateManager; @@ -161,7 +161,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion { private File logFile; private MLogWriter logWriter; - private TimeseriesStatistics timeseriesStatistics = TimeseriesStatistics.getInstance(); + private SchemaStatisticsManager schemaStatisticsManager = SchemaStatisticsManager.getInstance(); private MemoryStatistics memoryStatistics = MemoryStatistics.getInstance(); private final IStorageGroupMNode storageGroupMNode; @@ -412,7 +412,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion { // collect all the LeafMNode in this schema region List<IMeasurementMNode> leafMNodes = mtree.getAllMeasurementMNode(); - timeseriesStatistics.deleteTimeseries(leafMNodes.size()); + schemaStatisticsManager.deleteTimeseries(leafMNodes.size()); // drop triggers with no exceptions TriggerEngine.drop(leafMNodes); @@ -494,7 +494,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion { mNodeCache.invalidate(path.getDevicePath()); // update statistics and schemaDataTypeNumMap - timeseriesStatistics.addTimeseries(1); + schemaStatisticsManager.addTimeseries(1); // update tag index if (offset != -1 && isRecovering) { @@ -636,7 +636,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion { mNodeCache.invalidate(prefixPath); // update statistics and schemaDataTypeNumMap - timeseriesStatistics.addTimeseries(plan.getMeasurements().size()); + schemaStatisticsManager.addTimeseries(plan.getMeasurements().size()); List<Long> tagOffsets = plan.getTagOffsets(); for (int i = 0; i < measurements.size(); i++) { @@ -784,7 +784,7 @@ public class SchemaRegionSchemaFileImpl implements ISchemaRegion { mNodeCache.invalidate(node.getPartialPath()); - timeseriesStatistics.deleteTimeseries(1); + schemaStatisticsManager.deleteTimeseries(1); return storageGroupPath; } // endregion diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java new file mode 100644 index 0000000000..44d5fc1c66 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/StatisticsManager.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.statistics; + +import org.apache.iotdb.commons.path.PartialPath; + +import com.google.common.collect.Maps; + +import java.util.Map; + +public class StatisticsManager { + + private final Map<PartialPath, TimeseriesStats> seriesToStatsMap = Maps.newConcurrentMap(); + + public long getMaxBinarySizeInBytes(PartialPath path) { + return 512 * Byte.BYTES; + } + + public static StatisticsManager getInstance() { + return StatisticsManager.StatisticsManagerHelper.INSTANCE; + } + + private static class StatisticsManagerHelper { + + private static final StatisticsManager INSTANCE = new StatisticsManager(); + + private StatisticsManagerHelper() {} + } +} diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java new file mode 100644 index 0000000000..509341d3d5 --- /dev/null +++ b/server/src/main/java/org/apache/iotdb/db/mpp/statistics/TimeseriesStats.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.mpp.statistics; + +public class TimeseriesStats { + // TODO collect time series statistics +} diff --git a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java index b57ee14dcf..abc5ab3a4b 100644 --- a/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java +++ b/server/src/test/java/org/apache/iotdb/db/mpp/execution/operator/OperatorMemoryTest.java @@ -22,6 +22,8 @@ import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.db.metadata.path.AlignedPath; import org.apache.iotdb.db.metadata.path.MeasurementPath; +import org.apache.iotdb.db.mpp.aggregation.AccumulatorFactory; +import org.apache.iotdb.db.mpp.aggregation.Aggregator; import org.apache.iotdb.db.mpp.common.FragmentInstanceId; import org.apache.iotdb.db.mpp.common.PlanFragmentId; import org.apache.iotdb.db.mpp.common.QueryId; @@ -44,9 +46,12 @@ import org.apache.iotdb.db.mpp.execution.operator.process.last.UpdateLastCacheOp import org.apache.iotdb.db.mpp.execution.operator.source.AlignedSeriesScanOperator; import org.apache.iotdb.db.mpp.execution.operator.source.ExchangeOperator; import org.apache.iotdb.db.mpp.execution.operator.source.LastCacheScanOperator; +import org.apache.iotdb.db.mpp.execution.operator.source.SeriesAggregationScanOperator; import org.apache.iotdb.db.mpp.execution.operator.source.SeriesScanOperator; import org.apache.iotdb.db.mpp.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.mpp.plan.planner.plan.parameter.AggregationStep; import org.apache.iotdb.db.mpp.plan.statement.component.Ordering; +import org.apache.iotdb.db.query.aggregation.AggregationType; import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor; import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType; import org.apache.iotdb.tsfile.read.common.block.TsBlock; @@ -421,4 +426,64 @@ public class OperatorMemoryTest { assertEquals(2048 * 3, linearFillOperator.calculateMaxPeekMemory()); assertEquals(1024, linearFillOperator.calculateMaxReturnSize()); } + + @Test + public void seriesAggregationScanOperatorTest() { + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + try { + MeasurementPath measurementPath = + new MeasurementPath( + "root.SeriesAggregationScanOperatorTest.device0.sensor0", TSDataType.INT32); + Set<String> allSensors = Sets.newHashSet("sensor0"); + + QueryId queryId = new QueryId("stub_query"); + FragmentInstanceId instanceId = + new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance"); + FragmentInstanceStateMachine stateMachine = + new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext = + createFragmentInstanceContext(instanceId, stateMachine); + PlanNodeId planNodeId = new PlanNodeId("1"); + fragmentInstanceContext.addOperatorContext( + 1, planNodeId, SeriesAggregationScanOperatorTest.class.getSimpleName()); + + SeriesAggregationScanOperator seriesAggregationScanOperator = + new SeriesAggregationScanOperator( + planNodeId, + measurementPath, + allSensors, + fragmentInstanceContext.getOperatorContexts().get(0), + Arrays.asList( + new Aggregator( + AccumulatorFactory.createAccumulator( + AggregationType.COUNT, TSDataType.INT32, true), + AggregationStep.SINGLE), + new Aggregator( + AccumulatorFactory.createAccumulator( + AggregationType.MAX_VALUE, TSDataType.INT32, true), + AggregationStep.SINGLE), + new Aggregator( + AccumulatorFactory.createAccumulator( + AggregationType.MIN_TIME, TSDataType.INT32, true), + AggregationStep.SINGLE)), + null, + true, + null); + + assertEquals( + 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() + + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + seriesAggregationScanOperator.calculateMaxPeekMemory()); + assertEquals( + 2L * TSFileDescriptor.getInstance().getConfig().getPageSizeInByte() + + DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES, + seriesAggregationScanOperator.calculateMaxReturnSize()); + } catch (IllegalPathException e) { + e.printStackTrace(); + fail(); + } finally { + instanceNotificationExecutor.shutdown(); + } + } }
