This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d8339257e7 Batch update inserted point number metric (#13910)
5d8339257e7 is described below

commit 5d8339257e726cf07f63421b6a703d7969aa5159
Author: Haonan <[email protected]>
AuthorDate: Fri Oct 25 09:34:35 2024 +0800

    Batch update inserted point number metric (#13910)
---
 .../dataregion/memtable/AbstractMemTable.java      | 140 +++++----------------
 .../dataregion/memtable/IMemTable.java             |  12 +-
 .../dataregion/memtable/TsFileProcessor.java       |  26 ++--
 .../wal/recover/file/TsFilePlanRedoer.java         |  22 ++--
 4 files changed, 68 insertions(+), 132 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 97db8205ce8..67f7d192053 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
@@ -205,7 +206,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
   }
 
   @Override
-  public void insert(InsertRowNode insertRowNode) {
+  public int insert(InsertRowNode insertRowNode) {
 
     String[] measurements = insertRowNode.getMeasurements();
     Object[] values = insertRowNode.getValues();
@@ -235,39 +236,11 @@ public abstract class AbstractMemTable implements 
IMemTable {
             - nullPointsNumber;
 
     totalPointsNum += pointsInserted;
-
-    MetricService.getInstance()
-        .count(
-            pointsInserted,
-            Metric.QUANTITY.toString(),
-            MetricLevel.CORE,
-            Tag.NAME.toString(),
-            METRIC_POINT_IN,
-            Tag.DATABASE.toString(),
-            database,
-            Tag.REGION.toString(),
-            dataRegionId,
-            Tag.TYPE.toString(),
-            Metric.MEMTABLE_POINT_COUNT.toString());
-    if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
-      MetricService.getInstance()
-          .count(
-              pointsInserted,
-              Metric.LEADER_QUANTITY.toString(),
-              MetricLevel.CORE,
-              Tag.NAME.toString(),
-              METRIC_POINT_IN,
-              Tag.DATABASE.toString(),
-              database,
-              Tag.REGION.toString(),
-              dataRegionId,
-              Tag.TYPE.toString(),
-              Metric.MEMTABLE_POINT_COUNT.toString());
-    }
+    return pointsInserted;
   }
 
   @Override
-  public void insertAlignedRow(InsertRowNode insertRowNode) {
+  public int insertAlignedRow(InsertRowNode insertRowNode) {
 
     String[] measurements = insertRowNode.getMeasurements();
     Object[] values = insertRowNode.getValues();
@@ -287,7 +260,7 @@ public abstract class AbstractMemTable implements IMemTable 
{
       dataTypes.add(schema.getType());
     }
     if (schemaList.isEmpty()) {
-      return;
+      return 0;
     }
     memSize +=
         MemUtils.getAlignedRowRecordSize(dataTypes, values, 
insertRowNode.getColumnCategories());
@@ -295,39 +268,11 @@ public abstract class AbstractMemTable implements 
IMemTable {
     int pointsInserted =
         insertRowNode.getMeasurementColumnCnt() - 
insertRowNode.getFailedMeasurementNumber();
     totalPointsNum += pointsInserted;
-
-    MetricService.getInstance()
-        .count(
-            pointsInserted,
-            Metric.QUANTITY.toString(),
-            MetricLevel.CORE,
-            Tag.NAME.toString(),
-            METRIC_POINT_IN,
-            Tag.DATABASE.toString(),
-            database,
-            Tag.REGION.toString(),
-            dataRegionId,
-            Tag.TYPE.toString(),
-            Metric.MEMTABLE_POINT_COUNT.toString());
-    if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
-      MetricService.getInstance()
-          .count(
-              pointsInserted,
-              Metric.LEADER_QUANTITY.toString(),
-              MetricLevel.CORE,
-              Tag.NAME.toString(),
-              METRIC_POINT_IN,
-              Tag.DATABASE.toString(),
-              database,
-              Tag.REGION.toString(),
-              dataRegionId,
-              Tag.TYPE.toString(),
-              Metric.MEMTABLE_POINT_COUNT.toString());
-    }
+    return pointsInserted;
   }
 
   @Override
-  public void insertTablet(InsertTabletNode insertTabletNode, int start, int 
end)
+  public int insertTablet(InsertTabletNode insertTabletNode, int start, int 
end)
       throws WriteProcessException {
     try {
       writeTabletNode(insertTabletNode, start, end);
@@ -336,41 +281,14 @@ public abstract class AbstractMemTable implements 
IMemTable {
           (insertTabletNode.getDataTypes().length - 
insertTabletNode.getFailedMeasurementNumber())
               * (end - start);
       totalPointsNum += pointsInserted;
-      MetricService.getInstance()
-          .count(
-              pointsInserted,
-              Metric.QUANTITY.toString(),
-              MetricLevel.CORE,
-              Tag.NAME.toString(),
-              METRIC_POINT_IN,
-              Tag.DATABASE.toString(),
-              database,
-              Tag.REGION.toString(),
-              dataRegionId,
-              Tag.TYPE.toString(),
-              Metric.MEMTABLE_POINT_COUNT.toString());
-      if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
-        MetricService.getInstance()
-            .count(
-                pointsInserted,
-                Metric.LEADER_QUANTITY.toString(),
-                MetricLevel.CORE,
-                Tag.NAME.toString(),
-                METRIC_POINT_IN,
-                Tag.DATABASE.toString(),
-                database,
-                Tag.REGION.toString(),
-                dataRegionId,
-                Tag.TYPE.toString(),
-                Metric.MEMTABLE_POINT_COUNT.toString());
-      }
+      return pointsInserted;
     } catch (RuntimeException e) {
       throw new WriteProcessException(e);
     }
   }
 
   @Override
-  public void insertAlignedTablet(
+  public int insertAlignedTablet(
       InsertTabletNode insertTabletNode, int start, int end, TSStatus[] 
results)
       throws WriteProcessException {
     try {
@@ -382,10 +300,31 @@ public abstract class AbstractMemTable implements 
IMemTable {
                   - insertTabletNode.getFailedMeasurementNumber())
               * (end - start);
       totalPointsNum += pointsInserted;
+      return pointsInserted;
+    } catch (RuntimeException e) {
+      throw new WriteProcessException(e);
+    }
+  }
+
+  public void updateMemtablePointCountMetric(InsertNode insertNode, int 
pointsInserted) {
+    MetricService.getInstance()
+        .count(
+            pointsInserted,
+            Metric.QUANTITY.toString(),
+            MetricLevel.CORE,
+            Tag.NAME.toString(),
+            METRIC_POINT_IN,
+            Tag.DATABASE.toString(),
+            database,
+            Tag.REGION.toString(),
+            dataRegionId,
+            Tag.TYPE.toString(),
+            Metric.MEMTABLE_POINT_COUNT.toString());
+    if (!insertNode.isGeneratedByRemoteConsensusLeader()) {
       MetricService.getInstance()
           .count(
               pointsInserted,
-              Metric.QUANTITY.toString(),
+              Metric.LEADER_QUANTITY.toString(),
               MetricLevel.CORE,
               Tag.NAME.toString(),
               METRIC_POINT_IN,
@@ -395,23 +334,6 @@ public abstract class AbstractMemTable implements 
IMemTable {
               dataRegionId,
               Tag.TYPE.toString(),
               Metric.MEMTABLE_POINT_COUNT.toString());
-      if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
-        MetricService.getInstance()
-            .count(
-                pointsInserted,
-                Metric.LEADER_QUANTITY.toString(),
-                MetricLevel.CORE,
-                Tag.NAME.toString(),
-                METRIC_POINT_IN,
-                Tag.DATABASE.toString(),
-                database,
-                Tag.REGION.toString(),
-                dataRegionId,
-                Tag.TYPE.toString(),
-                Metric.MEMTABLE_POINT_COUNT.toString());
-      }
-    } catch (RuntimeException e) {
-      throw new WriteProcessException(e);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index 958f9fdad9a..b3e8202224c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus;
@@ -95,9 +96,9 @@ public interface IMemTable extends WALEntryValue {
    *
    * @param insertRowNode insertRowNode
    */
-  void insert(InsertRowNode insertRowNode);
+  int insert(InsertRowNode insertRowNode);
 
-  void insertAlignedRow(InsertRowNode insertRowNode);
+  int insertAlignedRow(InsertRowNode insertRowNode);
 
   /**
    * insert tablet into this memtable. The rows to be inserted are in the 
range [start, end). Null
@@ -108,11 +109,10 @@ public interface IMemTable extends WALEntryValue {
    * @param start included
    * @param end excluded
    */
-  void insertTablet(InsertTabletNode insertTabletNode, int start, int end)
+  int insertTablet(InsertTabletNode insertTabletNode, int start, int end)
       throws WriteProcessException;
 
-  void insertAlignedTablet(
-      InsertTabletNode insertTabletNode, int start, int end, TSStatus[] 
results)
+  int insertAlignedTablet(InsertTabletNode insertTabletNode, int start, int 
end, TSStatus[] results)
       throws WriteProcessException;
 
   ReadOnlyMemChunk query(
@@ -208,4 +208,6 @@ public interface IMemTable extends WALEntryValue {
   void markAsNotGeneratedByPipe();
 
   boolean isTotallyGeneratedByPipe();
+
+  void updateMemtablePointCountMetric(InsertNode insertNode, int 
pointsInserted);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 56b2efc4c4c..f1dde34972f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -332,10 +332,11 @@ public class TsFileProcessor {
             insertRowNode,
             tsFileResource);
 
+    int pointInserted;
     if (insertRowNode.isAligned()) {
-      workMemTable.insertAlignedRow(insertRowNode);
+      pointInserted = workMemTable.insertAlignedRow(insertRowNode);
     } else {
-      workMemTable.insert(insertRowNode);
+      pointInserted = workMemTable.insert(insertRowNode);
     }
 
     // Update start time of this memtable
@@ -345,6 +346,7 @@ public class TsFileProcessor {
     if (!sequence) {
       tsFileResource.updateEndTime(insertRowNode.getDeviceID(), 
insertRowNode.getTime());
     }
+    workMemTable.updateMemtablePointCountMetric(insertRowNode, pointInserted);
 
     tsFileResource.updateProgressIndex(insertRowNode.getProgressIndex());
     // RecordScheduleMemTableCost
@@ -419,14 +421,14 @@ public class TsFileProcessor {
             walFlushListener.getWalEntryHandler(),
             insertRowsNode,
             tsFileResource);
-    for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
 
+    int pointInserted = 0;
+    for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
       if (insertRowNode.isAligned()) {
-        workMemTable.insertAlignedRow(insertRowNode);
+        pointInserted += workMemTable.insertAlignedRow(insertRowNode);
       } else {
-        workMemTable.insert(insertRowNode);
+        pointInserted += workMemTable.insert(insertRowNode);
       }
-
       // update start time of this memtable
       tsFileResource.updateStartTime(insertRowNode.getDeviceID(), 
insertRowNode.getTime());
       // for sequence tsfile, we update the endTime only when the file is 
prepared to be closed.
@@ -435,6 +437,8 @@ public class TsFileProcessor {
         tsFileResource.updateEndTime(insertRowNode.getDeviceID(), 
insertRowNode.getTime());
       }
     }
+    workMemTable.updateMemtablePointCountMetric(insertRowsNode, pointInserted);
+
     tsFileResource.updateProgressIndex(insertRowsNode.getProgressIndex());
     // recordScheduleMemTableCost
     costsForMetrics[3] += System.nanoTime() - startTime;
@@ -584,15 +588,17 @@ public class TsFileProcessor {
             insertTabletNode,
             tsFileResource);
 
+    int pointInserted = 0;
     for (int[] rangePair : rangeList) {
       int start = rangePair[0];
       int end = rangePair[1];
       try {
         if (insertTabletNode.isAligned()) {
-          workMemTable.insertAlignedTablet(
-              insertTabletNode, start, end, noFailure ? null : results);
+          pointInserted +=
+              workMemTable.insertAlignedTablet(
+                  insertTabletNode, start, end, noFailure ? null : results);
         } else {
-          workMemTable.insertTablet(insertTabletNode, start, end);
+          pointInserted += workMemTable.insertTablet(insertTabletNode, start, 
end);
         }
       } catch (WriteProcessException e) {
         for (int i = start; i < end; i++) {
@@ -627,7 +633,7 @@ public class TsFileProcessor {
         }
       }
     }
-
+    workMemTable.updateMemtablePointCountMetric(insertTabletNode, 
pointInserted);
     tsFileResource.updateProgressIndex(insertTabletNode.getProgressIndex());
 
     // recordScheduleMemTableCost
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
index 9b98a3c5b79..be5bd356f5a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
@@ -91,24 +91,29 @@ public class TsFilePlanRedoer {
       }
     }
 
+    int pointsInserted;
     if (node instanceof InsertRowNode) {
       if (node.isAligned()) {
-        recoveryMemTable.insertAlignedRow((InsertRowNode) node);
+        pointsInserted = recoveryMemTable.insertAlignedRow((InsertRowNode) 
node);
       } else {
-        recoveryMemTable.insert((InsertRowNode) node);
+        pointsInserted = recoveryMemTable.insert((InsertRowNode) node);
       }
     } else {
       if (node.isAligned()) {
-        recoveryMemTable.insertAlignedTablet(
-            (InsertTabletNode) node, 0, ((InsertTabletNode) 
node).getRowCount(), null);
+        pointsInserted =
+            recoveryMemTable.insertAlignedTablet(
+                (InsertTabletNode) node, 0, ((InsertTabletNode) 
node).getRowCount(), null);
       } else {
-        recoveryMemTable.insertTablet(
-            (InsertTabletNode) node, 0, ((InsertTabletNode) 
node).getRowCount());
+        pointsInserted =
+            recoveryMemTable.insertTablet(
+                (InsertTabletNode) node, 0, ((InsertTabletNode) 
node).getRowCount());
       }
     }
+    recoveryMemTable.updateMemtablePointCountMetric(node, pointsInserted);
   }
 
   void redoInsertRows(InsertRowsNode insertRowsNode) {
+    int pointsInserted = 0;
     for (InsertRowNode node : insertRowsNode.getInsertRowNodeList()) {
       if (!node.hasValidMeasurements()) {
         continue;
@@ -125,11 +130,12 @@ public class TsFilePlanRedoer {
         }
       }
       if (node.isAligned()) {
-        recoveryMemTable.insertAlignedRow(node);
+        pointsInserted += recoveryMemTable.insertAlignedRow(node);
       } else {
-        recoveryMemTable.insert(node);
+        pointsInserted += recoveryMemTable.insert(node);
       }
     }
+    recoveryMemTable.updateMemtablePointCountMetric(insertRowsNode, 
pointsInserted);
   }
 
   void resetRecoveryMemTable(IMemTable memTable) {

Reply via email to