This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch preallocate_array_list in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3318788728cdd50e45c8ee8fb783723bd04d1826 Author: Tian Jiang <[email protected]> AuthorDate: Thu Apr 3 17:49:20 2025 +0800 add pre-allocation and metrics --- .../iotdb/db/service/metrics/WritingMetrics.java | 34 +++--- .../metrics/memory/StorageEngineMemoryMetrics.java | 126 +++++++++++++++++++++ .../rescon/memory/PrimitiveArrayManager.java | 27 +++++ .../db/utils/datastructure/AlignedTVList.java | 12 +- .../iotdb/db/utils/datastructure/TVList.java | 24 +++- .../iotdb/commons/service/metric/enums/Metric.java | 4 + 6 files changed, 200 insertions(+), 27 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java index 2e201e0c211..72c8abfc2a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java @@ -447,6 +447,8 @@ public class WritingMetrics implements IMetricSet { private Counter manualFlushMemtableCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; private Counter memControlFlushMemtableCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; + private Histogram avgPointHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; + public void bindDataRegionMetrics() { List<DataRegion> allDataRegions = StorageEngine.getInstance().getAllDataRegions(); List<DataRegionId> allDataRegionIds = StorageEngine.getInstance().getAllDataRegionIds(); @@ -563,13 +565,7 @@ public class WritingMetrics implements IMetricSet { } public void createFlushingMemTableStatusMetrics(DataRegionId dataRegionId) { - Arrays.asList( - MEM_TABLE_SIZE, - SERIES_NUM, - POINTS_NUM, - AVG_SERIES_POINT_NUM, - COMPRESSION_RATIO, - FLUSH_TSFILE_SIZE) + Arrays.asList(MEM_TABLE_SIZE, SERIES_NUM, POINTS_NUM, COMPRESSION_RATIO, FLUSH_TSFILE_SIZE) .forEach( name -> MetricService.getInstance() @@ -580,6 +576,15 @@ public class WritingMetrics implements IMetricSet { name, Tag.REGION.toString(), dataRegionId.toString())); + avgPointHistogram = + MetricService.getInstance() + .getOrCreateHistogram( + Metric.FLUSHING_MEM_TABLE_STATUS.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + AVG_SERIES_POINT_NUM, + Tag.REGION.toString(), + dataRegionId.toString()); } public Counter createWalFlushMemTableCounterMetrics() { @@ -700,6 +705,7 @@ public class WritingMetrics implements IMetricSet { name, Tag.REGION.toString(), dataRegionId.toString())); + avgPointHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM; } public void recordWALNodeEffectiveInfoRatio(String walNodeId, double ratio) { @@ -785,15 +791,7 @@ public class WritingMetrics implements IMetricSet { POINTS_NUM, Tag.REGION.toString(), dataRegionId.toString()); - MetricService.getInstance() - .histogram( - avgSeriesNum, - Metric.FLUSHING_MEM_TABLE_STATUS.toString(), - MetricLevel.IMPORTANT, - Tag.NAME.toString(), - AVG_SERIES_POINT_NUM, - Tag.REGION.toString(), - dataRegionId.toString()); + avgPointHistogram.update(avgSeriesNum); } public void recordFlushTsFileSize(String storageGroup, long size) { @@ -986,4 +984,8 @@ public class WritingMetrics implements IMetricSet { public static WritingMetrics getInstance() { return INSTANCE; } + + public Histogram getAvgPointHistogram() { + return avgPointHistogram; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/memory/StorageEngineMemoryMetrics.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/memory/StorageEngineMemoryMetrics.java index 82a020de54f..59021fdad2d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/memory/StorageEngineMemoryMetrics.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/memory/StorageEngineMemoryMetrics.java @@ -25,7 +25,9 @@ import org.apache.iotdb.commons.service.metric.enums.Tag; import org.apache.iotdb.db.conf.DataNodeMemoryConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.metrics.AbstractMetricService; +import org.apache.iotdb.metrics.impl.DoNothingMetricManager; import org.apache.iotdb.metrics.metricsets.IMetricSet; +import org.apache.iotdb.metrics.type.Counter; import org.apache.iotdb.metrics.utils.MetricLevel; import org.apache.iotdb.metrics.utils.MetricType; @@ -46,6 +48,17 @@ public class StorageEngineMemoryMetrics implements IMetricSet { private static final String STORAGE_ENGINE_WRITE_TIME_PARTITION_INFO = "StorageEngine-Write-TimePartitionInfo"; private static final String STORAGE_ENGINE_COMPACTION = "StorageEngine-Compaction"; + private static final String STORAGE_ENGINE_PAM_ALLOCATION = "StorageEngine-PamAllocation"; + private static final String STORAGE_ENGINE_PAM_RELEASE = "StorageEngine-PamRelease"; + private static final String STORAGE_ENGINE_PAM_ALLOCATION_FAILURE = + "StorageEngine-PamAllocationFailure"; + private static final String STORAGE_ENGINE_PAM_RELEASE_FAILURE = + "StorageEngine-PamReleaseFailure"; + + private Counter pamAllocationCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; + private Counter pamReleaseCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; + private Counter pamAllocationFailureCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; + private Counter pamReleaseFailureCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; @Override public void bindTo(AbstractMetricService metricService) { @@ -118,6 +131,47 @@ public class StorageEngineMemoryMetrics implements IMetricSet { GlobalMemoryMetrics.ON_HEAP, Tag.LEVEL.toString(), GlobalMemoryMetrics.LEVELS[2]); + + pamAllocationCounter = + metricService.getOrCreateCounter( + Metric.PAM_ALLOCATED_COUNT.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + STORAGE_ENGINE_PAM_ALLOCATION, + Tag.TYPE.toString(), + GlobalMemoryMetrics.ON_HEAP, + Tag.LEVEL.toString(), + GlobalMemoryMetrics.LEVELS[2]); + pamReleaseCounter = + metricService.getOrCreateCounter( + Metric.PAM_RELEASED_COUNT.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + STORAGE_ENGINE_PAM_RELEASE, + Tag.TYPE.toString(), + GlobalMemoryMetrics.ON_HEAP, + Tag.LEVEL.toString(), + GlobalMemoryMetrics.LEVELS[2]); + pamAllocationFailureCounter = + metricService.getOrCreateCounter( + Metric.PAM_ALLOCATED_FAILURE_COUNT.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + STORAGE_ENGINE_PAM_ALLOCATION, + Tag.TYPE.toString(), + GlobalMemoryMetrics.ON_HEAP, + Tag.LEVEL.toString(), + GlobalMemoryMetrics.LEVELS[2]); + pamReleaseFailureCounter = + metricService.getOrCreateCounter( + Metric.PAM_RELEASED_FAILURE_COUNT.toString(), + MetricLevel.IMPORTANT, + Tag.NAME.toString(), + STORAGE_ENGINE_PAM_RELEASE, + Tag.TYPE.toString(), + GlobalMemoryMetrics.ON_HEAP, + Tag.LEVEL.toString(), + GlobalMemoryMetrics.LEVELS[2]); } private void unbindStorageEngineDividedMetrics(AbstractMetricService metricService) { @@ -143,6 +197,46 @@ public class StorageEngineMemoryMetrics implements IMetricSet { Tag.LEVEL.toString(), GlobalMemoryMetrics.LEVELS[2]); }); + metricService.remove( + MetricType.COUNTER, + Metric.PAM_ALLOCATED_COUNT.toString(), + Tag.NAME.toString(), + STORAGE_ENGINE_PAM_ALLOCATION, + Tag.TYPE.toString(), + GlobalMemoryMetrics.ON_HEAP, + Tag.LEVEL.toString(), + GlobalMemoryMetrics.LEVELS[2]); + metricService.remove( + MetricType.COUNTER, + Metric.PAM_RELEASED_COUNT.toString(), + Tag.NAME.toString(), + STORAGE_ENGINE_PAM_RELEASE, + Tag.TYPE.toString(), + GlobalMemoryMetrics.ON_HEAP, + Tag.LEVEL.toString(), + GlobalMemoryMetrics.LEVELS[2]); + metricService.remove( + MetricType.COUNTER, + Metric.PAM_ALLOCATED_FAILURE_COUNT.toString(), + Tag.NAME.toString(), + STORAGE_ENGINE_PAM_ALLOCATION_FAILURE, + Tag.TYPE.toString(), + GlobalMemoryMetrics.ON_HEAP, + Tag.LEVEL.toString(), + GlobalMemoryMetrics.LEVELS[2]); + metricService.remove( + MetricType.COUNTER, + Metric.PAM_RELEASED_FAILURE_COUNT.toString(), + Tag.NAME.toString(), + STORAGE_ENGINE_PAM_RELEASE_FAILURE, + Tag.TYPE.toString(), + GlobalMemoryMetrics.ON_HEAP, + Tag.LEVEL.toString(), + GlobalMemoryMetrics.LEVELS[2]); + pamReleaseCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; + pamAllocationCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; + pamReleaseFailureCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; + pamAllocationFailureCounter = DoNothingMetricManager.DO_NOTHING_COUNTER; } // endregion @@ -304,6 +398,38 @@ public class StorageEngineMemoryMetrics implements IMetricSet { }); } + public void incPamAllocation() { + pamAllocationCounter.inc(); + } + + public void incPamRelease() { + pamReleaseCounter.inc(); + } + + public void incPamAllocationFailure() { + pamAllocationFailureCounter.inc(); + } + + public void incPamReleaseFailure() { + pamReleaseFailureCounter.inc(); + } + + public long getPamAllocation() { + return pamAllocationCounter.getCount(); + } + + public long getPamRelease() { + return pamReleaseCounter.getCount(); + } + + public long getPamAllocationFailure() { + return pamAllocationFailureCounter.getCount(); + } + + public long getPamReleaseFailure() { + return pamReleaseFailureCounter.getCount(); + } + // endregion public static StorageEngineMemoryMetrics getInstance() { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java index f4bc88db2ef..0e2b77d9a3e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java @@ -23,6 +23,7 @@ import org.apache.iotdb.commons.memory.MemoryBlockType; import org.apache.iotdb.db.conf.DataNodeMemoryConfig; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.service.metrics.memory.StorageEngineMemoryMetrics; import org.apache.iotdb.db.utils.datastructure.TVListSortAlgorithm; import org.apache.tsfile.enums.TSDataType; @@ -86,6 +87,9 @@ public class PrimitiveArrayManager { private static final AtomicLong TOTAL_ALLOCATION_REQUEST_COUNT = new AtomicLong(0); + // TODO remove + private static volatile long lastPrintTimeMs = 0; + static { init(); } @@ -153,8 +157,14 @@ public class PrimitiveArrayManager { synchronized (POOLED_ARRAYS[order]) { array = POOLED_ARRAYS[order].poll(); } + StorageEngineMemoryMetrics.getInstance().incPamAllocation(); if (array == null) { array = createPrimitiveArray(dataType); + StorageEngineMemoryMetrics.getInstance().incPamAllocationFailure(); + } + + if (System.currentTimeMillis() - lastPrintTimeMs > 10_000L) { + printStatus(); } return array; } @@ -278,12 +288,19 @@ public class PrimitiveArrayManager { throw new UnSupportedDataTypeException(array.getClass().toString()); } + StorageEngineMemoryMetrics.getInstance().incPamRelease(); synchronized (POOLED_ARRAYS[order]) { ArrayDeque<Object> arrays = POOLED_ARRAYS[order]; if (arrays.size() < LIMITS[order]) { arrays.add(array); + } else { + StorageEngineMemoryMetrics.getInstance().incPamReleaseFailure(); } } + + if (System.currentTimeMillis() - lastPrintTimeMs > 10_000L) { + printStatus(); + } } public static void close() { @@ -348,4 +365,14 @@ public class PrimitiveArrayManager { public static int getArrayRowCount(int size) { return size / ARRAY_SIZE + (size % ARRAY_SIZE == 0 ? 0 : 1); } + + public static void printStatus() { + LOGGER.info( + "Allocation(failure): {}({}); Release(failure): {}({})", + StorageEngineMemoryMetrics.getInstance().getPamAllocation(), + StorageEngineMemoryMetrics.getInstance().getPamAllocationFailure(), + StorageEngineMemoryMetrics.getInstance().getPamRelease(), + StorageEngineMemoryMetrics.getInstance().getPamReleaseFailure()); + lastPrintTimeMs = System.currentTimeMillis(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index 548fa4d76b7..a31bca030bf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -94,7 +94,7 @@ public abstract class AlignedTVList extends TVList { values = new ArrayList<>(types.size()); for (int i = 0; i < types.size(); i++) { - values.add(new ArrayList<>()); + values.add(new ArrayList<>(getDefaultArrayNum())); } } @@ -168,7 +168,7 @@ public abstract class AlignedTVList extends TVList { } } if (cloneList.bitMaps.get(i) == null) { - List<BitMap> cloneColumnBitMaps = new ArrayList<>(); + List<BitMap> cloneColumnBitMaps = new ArrayList<>(columnBitMaps.size()); for (BitMap bitMap : columnBitMaps) { cloneColumnBitMaps.add(bitMap == null ? null : bitMap.clone()); } @@ -352,8 +352,8 @@ public abstract class AlignedTVList extends TVList { } bitMaps = localBitMaps; } - List<Object> columnValue = new ArrayList<>(); - List<BitMap> columnBitMaps = new ArrayList<>(); + List<Object> columnValue = new ArrayList<>(timestamps.size()); + List<BitMap> columnBitMaps = new ArrayList<>(timestamps.size()); for (int i = 0; i < timestamps.size(); i++) { switch (dataType) { case TEXT: @@ -619,7 +619,7 @@ public abstract class AlignedTVList extends TVList { bitMaps = localBitMaps; } if (bitMaps.get(columnIndex) == null) { - List<BitMap> columnBitMaps = new ArrayList<>(); + List<BitMap> columnBitMaps = new ArrayList<>(values.get(columnIndex).size()); for (int i = 0; i < values.get(columnIndex).size(); i++) { columnBitMaps.add(new BitMap(ARRAY_SIZE)); } @@ -881,7 +881,7 @@ public abstract class AlignedTVList extends TVList { // if the bitmap in columnIndex is null, init the bitmap of this column from the beginning if (bitMaps.get(columnIndex) == null) { - List<BitMap> columnBitMaps = new ArrayList<>(); + List<BitMap> columnBitMaps = new ArrayList<>(values.get(columnIndex).size()); for (int i = 0; i < values.get(columnIndex).size(); i++) { columnBitMaps.add(new BitMap(ARRAY_SIZE)); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index b4392e7866a..c6f4570df1c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.utils.datastructure; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; +import org.apache.iotdb.db.service.metrics.WritingMetrics; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager; import org.apache.iotdb.db.utils.MathUtils; @@ -90,8 +91,11 @@ public abstract class TVList implements WALEntryValue { private final TVList outer = this; + protected static int defaultArrayNum = 0; + protected static volatile long defaultArrayNumLastUpdatedTimeMs = 0; + protected TVList() { - timestamps = new ArrayList<>(); + timestamps = new ArrayList<>(getDefaultArrayNum()); rowCount = 0; seqRowCount = 0; maxTime = Long.MIN_VALUE; @@ -210,7 +214,7 @@ public abstract class TVList implements WALEntryValue { timestamps.get(arrayIndex)[elementIndex] = timestamp; // prepare indices for sorting if (indices == null) { - indices = new ArrayList<>(); + indices = new ArrayList<>(getDefaultArrayNum()); for (int i = 0; i < timestamps.size(); i++) { indices.add((int[]) getPrimitiveArraysByType(TSDataType.INT32)); int offset = i * ARRAY_SIZE; @@ -245,7 +249,7 @@ public abstract class TVList implements WALEntryValue { protected void markNullValue(int arrayIndex, int elementIndex) { // init bitMap if doesn't have if (bitMap == null) { - List<BitMap> localBitMap = new ArrayList<>(); + List<BitMap> localBitMap = new ArrayList<>(getDefaultArrayNum()); for (int i = 0; i < timestamps.size(); i++) { localBitMap.add(new BitMap(ARRAY_SIZE)); } @@ -280,7 +284,7 @@ public abstract class TVList implements WALEntryValue { protected void cloneBitMap(TVList cloneList) { if (bitMap != null) { - cloneList.bitMap = new ArrayList<>(); + cloneList.bitMap = new ArrayList<>(bitMap.size()); for (BitMap bm : bitMap) { cloneList.bitMap.add(bm == null ? null : bm.clone()); } @@ -429,7 +433,7 @@ public abstract class TVList implements WALEntryValue { } // clone indices if (indices != null) { - cloneList.indices = new ArrayList<>(); + cloneList.indices = new ArrayList<>(indices.size()); for (int[] indicesArray : indices) { cloneList.indices.add(cloneIndex(indicesArray)); } @@ -863,4 +867,14 @@ public abstract class TVList implements WALEntryValue { return outer; } } + + protected static int getDefaultArrayNum() { + if (System.currentTimeMillis() - defaultArrayNumLastUpdatedTimeMs > 10_000) { + defaultArrayNumLastUpdatedTimeMs = System.currentTimeMillis(); + defaultArrayNum = + ((int) WritingMetrics.getInstance().getAvgPointHistogram().takeSnapshot().getMean() + / ARRAY_SIZE); + } + return defaultArrayNum; + } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java index 755ad8187df..8495bdff51d 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java @@ -82,6 +82,10 @@ public enum Metric { ACTIVE_MEMTABLE_COUNT("active_memtable_count"), ACTIVE_TIME_PARTITION_COUNT("active_time_partition_count"), MEMTABLE_LIVE_DURATION("memtable_live_duration"), + PAM_ALLOCATED_COUNT("primitive_array_manager_allocated_count"), + PAM_RELEASED_COUNT("primitive_array_manager_released_count"), + PAM_ALLOCATED_FAILURE_COUNT("primitive_array_manager_allocated_failure_count"), + PAM_RELEASED_FAILURE_COUNT("primitive_array_manager_released_failure_count"), // compaction related DATA_WRITTEN("data_written"),
