This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch preallocate_array_list
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3318788728cdd50e45c8ee8fb783723bd04d1826
Author: Tian Jiang <[email protected]>
AuthorDate: Thu Apr 3 17:49:20 2025 +0800

    add pre-allocation and metrics
---
 .../iotdb/db/service/metrics/WritingMetrics.java   |  34 +++---
 .../metrics/memory/StorageEngineMemoryMetrics.java | 126 +++++++++++++++++++++
 .../rescon/memory/PrimitiveArrayManager.java       |  27 +++++
 .../db/utils/datastructure/AlignedTVList.java      |  12 +-
 .../iotdb/db/utils/datastructure/TVList.java       |  24 +++-
 .../iotdb/commons/service/metric/enums/Metric.java |   4 +
 6 files changed, 200 insertions(+), 27 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
index 2e201e0c211..72c8abfc2a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/WritingMetrics.java
@@ -447,6 +447,8 @@ public class WritingMetrics implements IMetricSet {
   private Counter manualFlushMemtableCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
   private Counter memControlFlushMemtableCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
 
+  private Histogram avgPointHistogram = 
DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
+
   public void bindDataRegionMetrics() {
     List<DataRegion> allDataRegions = 
StorageEngine.getInstance().getAllDataRegions();
     List<DataRegionId> allDataRegionIds = 
StorageEngine.getInstance().getAllDataRegionIds();
@@ -563,13 +565,7 @@ public class WritingMetrics implements IMetricSet {
   }
 
   public void createFlushingMemTableStatusMetrics(DataRegionId dataRegionId) {
-    Arrays.asList(
-            MEM_TABLE_SIZE,
-            SERIES_NUM,
-            POINTS_NUM,
-            AVG_SERIES_POINT_NUM,
-            COMPRESSION_RATIO,
-            FLUSH_TSFILE_SIZE)
+    Arrays.asList(MEM_TABLE_SIZE, SERIES_NUM, POINTS_NUM, COMPRESSION_RATIO, 
FLUSH_TSFILE_SIZE)
         .forEach(
             name ->
                 MetricService.getInstance()
@@ -580,6 +576,15 @@ public class WritingMetrics implements IMetricSet {
                         name,
                         Tag.REGION.toString(),
                         dataRegionId.toString()));
+    avgPointHistogram =
+        MetricService.getInstance()
+            .getOrCreateHistogram(
+                Metric.FLUSHING_MEM_TABLE_STATUS.toString(),
+                MetricLevel.IMPORTANT,
+                Tag.NAME.toString(),
+                AVG_SERIES_POINT_NUM,
+                Tag.REGION.toString(),
+                dataRegionId.toString());
   }
 
   public Counter createWalFlushMemTableCounterMetrics() {
@@ -700,6 +705,7 @@ public class WritingMetrics implements IMetricSet {
                         name,
                         Tag.REGION.toString(),
                         dataRegionId.toString()));
+    avgPointHistogram = DoNothingMetricManager.DO_NOTHING_HISTOGRAM;
   }
 
   public void recordWALNodeEffectiveInfoRatio(String walNodeId, double ratio) {
@@ -785,15 +791,7 @@ public class WritingMetrics implements IMetricSet {
             POINTS_NUM,
             Tag.REGION.toString(),
             dataRegionId.toString());
-    MetricService.getInstance()
-        .histogram(
-            avgSeriesNum,
-            Metric.FLUSHING_MEM_TABLE_STATUS.toString(),
-            MetricLevel.IMPORTANT,
-            Tag.NAME.toString(),
-            AVG_SERIES_POINT_NUM,
-            Tag.REGION.toString(),
-            dataRegionId.toString());
+    avgPointHistogram.update(avgSeriesNum);
   }
 
   public void recordFlushTsFileSize(String storageGroup, long size) {
@@ -986,4 +984,8 @@ public class WritingMetrics implements IMetricSet {
   public static WritingMetrics getInstance() {
     return INSTANCE;
   }
+
+  public Histogram getAvgPointHistogram() {
+    return avgPointHistogram;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/memory/StorageEngineMemoryMetrics.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/memory/StorageEngineMemoryMetrics.java
index 82a020de54f..59021fdad2d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/memory/StorageEngineMemoryMetrics.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/metrics/memory/StorageEngineMemoryMetrics.java
@@ -25,7 +25,9 @@ import org.apache.iotdb.commons.service.metric.enums.Tag;
 import org.apache.iotdb.db.conf.DataNodeMemoryConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.metrics.AbstractMetricService;
+import org.apache.iotdb.metrics.impl.DoNothingMetricManager;
 import org.apache.iotdb.metrics.metricsets.IMetricSet;
+import org.apache.iotdb.metrics.type.Counter;
 import org.apache.iotdb.metrics.utils.MetricLevel;
 import org.apache.iotdb.metrics.utils.MetricType;
 
@@ -46,6 +48,17 @@ public class StorageEngineMemoryMetrics implements 
IMetricSet {
   private static final String STORAGE_ENGINE_WRITE_TIME_PARTITION_INFO =
       "StorageEngine-Write-TimePartitionInfo";
   private static final String STORAGE_ENGINE_COMPACTION = 
"StorageEngine-Compaction";
+  private static final String STORAGE_ENGINE_PAM_ALLOCATION = 
"StorageEngine-PamAllocation";
+  private static final String STORAGE_ENGINE_PAM_RELEASE = 
"StorageEngine-PamRelease";
+  private static final String STORAGE_ENGINE_PAM_ALLOCATION_FAILURE =
+      "StorageEngine-PamAllocationFailure";
+  private static final String STORAGE_ENGINE_PAM_RELEASE_FAILURE =
+      "StorageEngine-PamReleaseFailure";
+
+  private Counter pamAllocationCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
+  private Counter pamReleaseCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
+  private Counter pamAllocationFailureCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
+  private Counter pamReleaseFailureCounter = 
DoNothingMetricManager.DO_NOTHING_COUNTER;
 
   @Override
   public void bindTo(AbstractMetricService metricService) {
@@ -118,6 +131,47 @@ public class StorageEngineMemoryMetrics implements 
IMetricSet {
         GlobalMemoryMetrics.ON_HEAP,
         Tag.LEVEL.toString(),
         GlobalMemoryMetrics.LEVELS[2]);
+
+    pamAllocationCounter =
+        metricService.getOrCreateCounter(
+            Metric.PAM_ALLOCATED_COUNT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            STORAGE_ENGINE_PAM_ALLOCATION,
+            Tag.TYPE.toString(),
+            GlobalMemoryMetrics.ON_HEAP,
+            Tag.LEVEL.toString(),
+            GlobalMemoryMetrics.LEVELS[2]);
+    pamReleaseCounter =
+        metricService.getOrCreateCounter(
+            Metric.PAM_RELEASED_COUNT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            STORAGE_ENGINE_PAM_RELEASE,
+            Tag.TYPE.toString(),
+            GlobalMemoryMetrics.ON_HEAP,
+            Tag.LEVEL.toString(),
+            GlobalMemoryMetrics.LEVELS[2]);
+    pamAllocationFailureCounter =
+        metricService.getOrCreateCounter(
+            Metric.PAM_ALLOCATED_FAILURE_COUNT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            STORAGE_ENGINE_PAM_ALLOCATION,
+            Tag.TYPE.toString(),
+            GlobalMemoryMetrics.ON_HEAP,
+            Tag.LEVEL.toString(),
+            GlobalMemoryMetrics.LEVELS[2]);
+    pamReleaseFailureCounter =
+        metricService.getOrCreateCounter(
+            Metric.PAM_RELEASED_FAILURE_COUNT.toString(),
+            MetricLevel.IMPORTANT,
+            Tag.NAME.toString(),
+            STORAGE_ENGINE_PAM_RELEASE,
+            Tag.TYPE.toString(),
+            GlobalMemoryMetrics.ON_HEAP,
+            Tag.LEVEL.toString(),
+            GlobalMemoryMetrics.LEVELS[2]);
   }
 
   private void unbindStorageEngineDividedMetrics(AbstractMetricService 
metricService) {
@@ -143,6 +197,46 @@ public class StorageEngineMemoryMetrics implements 
IMetricSet {
                   Tag.LEVEL.toString(),
                   GlobalMemoryMetrics.LEVELS[2]);
             });
+    metricService.remove(
+        MetricType.COUNTER,
+        Metric.PAM_ALLOCATED_COUNT.toString(),
+        Tag.NAME.toString(),
+        STORAGE_ENGINE_PAM_ALLOCATION,
+        Tag.TYPE.toString(),
+        GlobalMemoryMetrics.ON_HEAP,
+        Tag.LEVEL.toString(),
+        GlobalMemoryMetrics.LEVELS[2]);
+    metricService.remove(
+        MetricType.COUNTER,
+        Metric.PAM_RELEASED_COUNT.toString(),
+        Tag.NAME.toString(),
+        STORAGE_ENGINE_PAM_RELEASE,
+        Tag.TYPE.toString(),
+        GlobalMemoryMetrics.ON_HEAP,
+        Tag.LEVEL.toString(),
+        GlobalMemoryMetrics.LEVELS[2]);
+    metricService.remove(
+        MetricType.COUNTER,
+        Metric.PAM_ALLOCATED_FAILURE_COUNT.toString(),
+        Tag.NAME.toString(),
+        STORAGE_ENGINE_PAM_ALLOCATION_FAILURE,
+        Tag.TYPE.toString(),
+        GlobalMemoryMetrics.ON_HEAP,
+        Tag.LEVEL.toString(),
+        GlobalMemoryMetrics.LEVELS[2]);
+    metricService.remove(
+        MetricType.COUNTER,
+        Metric.PAM_RELEASED_FAILURE_COUNT.toString(),
+        Tag.NAME.toString(),
+        STORAGE_ENGINE_PAM_RELEASE_FAILURE,
+        Tag.TYPE.toString(),
+        GlobalMemoryMetrics.ON_HEAP,
+        Tag.LEVEL.toString(),
+        GlobalMemoryMetrics.LEVELS[2]);
+    pamReleaseCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+    pamAllocationCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+    pamReleaseFailureCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
+    pamAllocationFailureCounter = DoNothingMetricManager.DO_NOTHING_COUNTER;
   }
 
   // endregion
@@ -304,6 +398,38 @@ public class StorageEngineMemoryMetrics implements 
IMetricSet {
             });
   }
 
+  public void incPamAllocation() {
+    pamAllocationCounter.inc();
+  }
+
+  public void incPamRelease() {
+    pamReleaseCounter.inc();
+  }
+
+  public void incPamAllocationFailure() {
+    pamAllocationFailureCounter.inc();
+  }
+
+  public void incPamReleaseFailure() {
+    pamReleaseFailureCounter.inc();
+  }
+
+  public long getPamAllocation() {
+    return pamAllocationCounter.getCount();
+  }
+
+  public long getPamRelease() {
+    return pamReleaseCounter.getCount();
+  }
+
+  public long getPamAllocationFailure() {
+    return pamAllocationFailureCounter.getCount();
+  }
+
+  public long getPamReleaseFailure() {
+    return pamReleaseFailureCounter.getCount();
+  }
+
   // endregion
 
   public static StorageEngineMemoryMetrics getInstance() {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
index f4bc88db2ef..0e2b77d9a3e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/memory/PrimitiveArrayManager.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.memory.MemoryBlockType;
 import org.apache.iotdb.db.conf.DataNodeMemoryConfig;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.service.metrics.memory.StorageEngineMemoryMetrics;
 import org.apache.iotdb.db.utils.datastructure.TVListSortAlgorithm;
 
 import org.apache.tsfile.enums.TSDataType;
@@ -86,6 +87,9 @@ public class PrimitiveArrayManager {
 
   private static final AtomicLong TOTAL_ALLOCATION_REQUEST_COUNT = new 
AtomicLong(0);
 
+  // TODO remove
+  private static volatile long lastPrintTimeMs = 0;
+
   static {
     init();
   }
@@ -153,8 +157,14 @@ public class PrimitiveArrayManager {
     synchronized (POOLED_ARRAYS[order]) {
       array = POOLED_ARRAYS[order].poll();
     }
+    StorageEngineMemoryMetrics.getInstance().incPamAllocation();
     if (array == null) {
       array = createPrimitiveArray(dataType);
+      StorageEngineMemoryMetrics.getInstance().incPamAllocationFailure();
+    }
+
+    if (System.currentTimeMillis() - lastPrintTimeMs > 10_000L) {
+      printStatus();
     }
     return array;
   }
@@ -278,12 +288,19 @@ public class PrimitiveArrayManager {
       throw new UnSupportedDataTypeException(array.getClass().toString());
     }
 
+    StorageEngineMemoryMetrics.getInstance().incPamRelease();
     synchronized (POOLED_ARRAYS[order]) {
       ArrayDeque<Object> arrays = POOLED_ARRAYS[order];
       if (arrays.size() < LIMITS[order]) {
         arrays.add(array);
+      } else {
+        StorageEngineMemoryMetrics.getInstance().incPamReleaseFailure();
       }
     }
+
+    if (System.currentTimeMillis() - lastPrintTimeMs > 10_000L) {
+      printStatus();
+    }
   }
 
   public static void close() {
@@ -348,4 +365,14 @@ public class PrimitiveArrayManager {
   public static int getArrayRowCount(int size) {
     return size / ARRAY_SIZE + (size % ARRAY_SIZE == 0 ? 0 : 1);
   }
+
+  public static void printStatus() {
+    LOGGER.info(
+        "Allocation(failure): {}({}); Release(failure): {}({})",
+        StorageEngineMemoryMetrics.getInstance().getPamAllocation(),
+        StorageEngineMemoryMetrics.getInstance().getPamAllocationFailure(),
+        StorageEngineMemoryMetrics.getInstance().getPamRelease(),
+        StorageEngineMemoryMetrics.getInstance().getPamReleaseFailure());
+    lastPrintTimeMs = System.currentTimeMillis();
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 548fa4d76b7..a31bca030bf 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -94,7 +94,7 @@ public abstract class AlignedTVList extends TVList {
 
     values = new ArrayList<>(types.size());
     for (int i = 0; i < types.size(); i++) {
-      values.add(new ArrayList<>());
+      values.add(new ArrayList<>(getDefaultArrayNum()));
     }
   }
 
@@ -168,7 +168,7 @@ public abstract class AlignedTVList extends TVList {
           }
         }
         if (cloneList.bitMaps.get(i) == null) {
-          List<BitMap> cloneColumnBitMaps = new ArrayList<>();
+          List<BitMap> cloneColumnBitMaps = new 
ArrayList<>(columnBitMaps.size());
           for (BitMap bitMap : columnBitMaps) {
             cloneColumnBitMaps.add(bitMap == null ? null : bitMap.clone());
           }
@@ -352,8 +352,8 @@ public abstract class AlignedTVList extends TVList {
       }
       bitMaps = localBitMaps;
     }
-    List<Object> columnValue = new ArrayList<>();
-    List<BitMap> columnBitMaps = new ArrayList<>();
+    List<Object> columnValue = new ArrayList<>(timestamps.size());
+    List<BitMap> columnBitMaps = new ArrayList<>(timestamps.size());
     for (int i = 0; i < timestamps.size(); i++) {
       switch (dataType) {
         case TEXT:
@@ -619,7 +619,7 @@ public abstract class AlignedTVList extends TVList {
       bitMaps = localBitMaps;
     }
     if (bitMaps.get(columnIndex) == null) {
-      List<BitMap> columnBitMaps = new ArrayList<>();
+      List<BitMap> columnBitMaps = new 
ArrayList<>(values.get(columnIndex).size());
       for (int i = 0; i < values.get(columnIndex).size(); i++) {
         columnBitMaps.add(new BitMap(ARRAY_SIZE));
       }
@@ -881,7 +881,7 @@ public abstract class AlignedTVList extends TVList {
 
     // if the bitmap in columnIndex is null, init the bitmap of this column 
from the beginning
     if (bitMaps.get(columnIndex) == null) {
-      List<BitMap> columnBitMaps = new ArrayList<>();
+      List<BitMap> columnBitMaps = new 
ArrayList<>(values.get(columnIndex).size());
       for (int i = 0; i < values.get(columnIndex).size(); i++) {
         columnBitMaps.add(new BitMap(ARRAY_SIZE));
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index b4392e7866a..c6f4570df1c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.utils.datastructure;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
+import org.apache.iotdb.db.service.metrics.WritingMetrics;
 import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
 import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
 import org.apache.iotdb.db.utils.MathUtils;
@@ -90,8 +91,11 @@ public abstract class TVList implements WALEntryValue {
 
   private final TVList outer = this;
 
+  protected static int defaultArrayNum = 0;
+  protected static volatile long defaultArrayNumLastUpdatedTimeMs = 0;
+
   protected TVList() {
-    timestamps = new ArrayList<>();
+    timestamps = new ArrayList<>(getDefaultArrayNum());
     rowCount = 0;
     seqRowCount = 0;
     maxTime = Long.MIN_VALUE;
@@ -210,7 +214,7 @@ public abstract class TVList implements WALEntryValue {
     timestamps.get(arrayIndex)[elementIndex] = timestamp;
     // prepare indices for sorting
     if (indices == null) {
-      indices = new ArrayList<>();
+      indices = new ArrayList<>(getDefaultArrayNum());
       for (int i = 0; i < timestamps.size(); i++) {
         indices.add((int[]) getPrimitiveArraysByType(TSDataType.INT32));
         int offset = i * ARRAY_SIZE;
@@ -245,7 +249,7 @@ public abstract class TVList implements WALEntryValue {
   protected void markNullValue(int arrayIndex, int elementIndex) {
     // init bitMap if doesn't have
     if (bitMap == null) {
-      List<BitMap> localBitMap = new ArrayList<>();
+      List<BitMap> localBitMap = new ArrayList<>(getDefaultArrayNum());
       for (int i = 0; i < timestamps.size(); i++) {
         localBitMap.add(new BitMap(ARRAY_SIZE));
       }
@@ -280,7 +284,7 @@ public abstract class TVList implements WALEntryValue {
 
   protected void cloneBitMap(TVList cloneList) {
     if (bitMap != null) {
-      cloneList.bitMap = new ArrayList<>();
+      cloneList.bitMap = new ArrayList<>(bitMap.size());
       for (BitMap bm : bitMap) {
         cloneList.bitMap.add(bm == null ? null : bm.clone());
       }
@@ -429,7 +433,7 @@ public abstract class TVList implements WALEntryValue {
     }
     // clone indices
     if (indices != null) {
-      cloneList.indices = new ArrayList<>();
+      cloneList.indices = new ArrayList<>(indices.size());
       for (int[] indicesArray : indices) {
         cloneList.indices.add(cloneIndex(indicesArray));
       }
@@ -863,4 +867,14 @@ public abstract class TVList implements WALEntryValue {
       return outer;
     }
   }
+
+  protected static int getDefaultArrayNum() {
+    if (System.currentTimeMillis() - defaultArrayNumLastUpdatedTimeMs > 
10_000) {
+      defaultArrayNumLastUpdatedTimeMs = System.currentTimeMillis();
+      defaultArrayNum =
+          ((int) 
WritingMetrics.getInstance().getAvgPointHistogram().takeSnapshot().getMean()
+              / ARRAY_SIZE);
+    }
+    return defaultArrayNum;
+  }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
index 755ad8187df..8495bdff51d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/service/metric/enums/Metric.java
@@ -82,6 +82,10 @@ public enum Metric {
   ACTIVE_MEMTABLE_COUNT("active_memtable_count"),
   ACTIVE_TIME_PARTITION_COUNT("active_time_partition_count"),
   MEMTABLE_LIVE_DURATION("memtable_live_duration"),
+  PAM_ALLOCATED_COUNT("primitive_array_manager_allocated_count"),
+  PAM_RELEASED_COUNT("primitive_array_manager_released_count"),
+  
PAM_ALLOCATED_FAILURE_COUNT("primitive_array_manager_allocated_failure_count"),
+  PAM_RELEASED_FAILURE_COUNT("primitive_array_manager_released_failure_count"),
 
   // compaction related
   DATA_WRITTEN("data_written"),

Reply via email to