This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5d8339257e7 Batch update inserted point number metric (#13910)
5d8339257e7 is described below
commit 5d8339257e726cf07f63421b6a703d7969aa5159
Author: Haonan <[email protected]>
AuthorDate: Fri Oct 25 09:34:35 2024 +0800
Batch update inserted point number metric (#13910)
---
.../dataregion/memtable/AbstractMemTable.java | 140 +++++----------------
.../dataregion/memtable/IMemTable.java | 12 +-
.../dataregion/memtable/TsFileProcessor.java | 26 ++--
.../wal/recover/file/TsFilePlanRedoer.java | 22 ++--
4 files changed, 68 insertions(+), 132 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
index 97db8205ce8..67f7d192053 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java
@@ -35,6 +35,7 @@ import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
@@ -205,7 +206,7 @@ public abstract class AbstractMemTable implements IMemTable
{
}
@Override
- public void insert(InsertRowNode insertRowNode) {
+ public int insert(InsertRowNode insertRowNode) {
String[] measurements = insertRowNode.getMeasurements();
Object[] values = insertRowNode.getValues();
@@ -235,39 +236,11 @@ public abstract class AbstractMemTable implements
IMemTable {
- nullPointsNumber;
totalPointsNum += pointsInserted;
-
- MetricService.getInstance()
- .count(
- pointsInserted,
- Metric.QUANTITY.toString(),
- MetricLevel.CORE,
- Tag.NAME.toString(),
- METRIC_POINT_IN,
- Tag.DATABASE.toString(),
- database,
- Tag.REGION.toString(),
- dataRegionId,
- Tag.TYPE.toString(),
- Metric.MEMTABLE_POINT_COUNT.toString());
- if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
- MetricService.getInstance()
- .count(
- pointsInserted,
- Metric.LEADER_QUANTITY.toString(),
- MetricLevel.CORE,
- Tag.NAME.toString(),
- METRIC_POINT_IN,
- Tag.DATABASE.toString(),
- database,
- Tag.REGION.toString(),
- dataRegionId,
- Tag.TYPE.toString(),
- Metric.MEMTABLE_POINT_COUNT.toString());
- }
+ return pointsInserted;
}
@Override
- public void insertAlignedRow(InsertRowNode insertRowNode) {
+ public int insertAlignedRow(InsertRowNode insertRowNode) {
String[] measurements = insertRowNode.getMeasurements();
Object[] values = insertRowNode.getValues();
@@ -287,7 +260,7 @@ public abstract class AbstractMemTable implements IMemTable
{
dataTypes.add(schema.getType());
}
if (schemaList.isEmpty()) {
- return;
+ return 0;
}
memSize +=
MemUtils.getAlignedRowRecordSize(dataTypes, values,
insertRowNode.getColumnCategories());
@@ -295,39 +268,11 @@ public abstract class AbstractMemTable implements
IMemTable {
int pointsInserted =
insertRowNode.getMeasurementColumnCnt() -
insertRowNode.getFailedMeasurementNumber();
totalPointsNum += pointsInserted;
-
- MetricService.getInstance()
- .count(
- pointsInserted,
- Metric.QUANTITY.toString(),
- MetricLevel.CORE,
- Tag.NAME.toString(),
- METRIC_POINT_IN,
- Tag.DATABASE.toString(),
- database,
- Tag.REGION.toString(),
- dataRegionId,
- Tag.TYPE.toString(),
- Metric.MEMTABLE_POINT_COUNT.toString());
- if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) {
- MetricService.getInstance()
- .count(
- pointsInserted,
- Metric.LEADER_QUANTITY.toString(),
- MetricLevel.CORE,
- Tag.NAME.toString(),
- METRIC_POINT_IN,
- Tag.DATABASE.toString(),
- database,
- Tag.REGION.toString(),
- dataRegionId,
- Tag.TYPE.toString(),
- Metric.MEMTABLE_POINT_COUNT.toString());
- }
+ return pointsInserted;
}
@Override
- public void insertTablet(InsertTabletNode insertTabletNode, int start, int
end)
+ public int insertTablet(InsertTabletNode insertTabletNode, int start, int
end)
throws WriteProcessException {
try {
writeTabletNode(insertTabletNode, start, end);
@@ -336,41 +281,14 @@ public abstract class AbstractMemTable implements
IMemTable {
(insertTabletNode.getDataTypes().length -
insertTabletNode.getFailedMeasurementNumber())
* (end - start);
totalPointsNum += pointsInserted;
- MetricService.getInstance()
- .count(
- pointsInserted,
- Metric.QUANTITY.toString(),
- MetricLevel.CORE,
- Tag.NAME.toString(),
- METRIC_POINT_IN,
- Tag.DATABASE.toString(),
- database,
- Tag.REGION.toString(),
- dataRegionId,
- Tag.TYPE.toString(),
- Metric.MEMTABLE_POINT_COUNT.toString());
- if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
- MetricService.getInstance()
- .count(
- pointsInserted,
- Metric.LEADER_QUANTITY.toString(),
- MetricLevel.CORE,
- Tag.NAME.toString(),
- METRIC_POINT_IN,
- Tag.DATABASE.toString(),
- database,
- Tag.REGION.toString(),
- dataRegionId,
- Tag.TYPE.toString(),
- Metric.MEMTABLE_POINT_COUNT.toString());
- }
+ return pointsInserted;
} catch (RuntimeException e) {
throw new WriteProcessException(e);
}
}
@Override
- public void insertAlignedTablet(
+ public int insertAlignedTablet(
InsertTabletNode insertTabletNode, int start, int end, TSStatus[]
results)
throws WriteProcessException {
try {
@@ -382,10 +300,31 @@ public abstract class AbstractMemTable implements
IMemTable {
- insertTabletNode.getFailedMeasurementNumber())
* (end - start);
totalPointsNum += pointsInserted;
+ return pointsInserted;
+ } catch (RuntimeException e) {
+ throw new WriteProcessException(e);
+ }
+ }
+
+ public void updateMemtablePointCountMetric(InsertNode insertNode, int
pointsInserted) {
+ MetricService.getInstance()
+ .count(
+ pointsInserted,
+ Metric.QUANTITY.toString(),
+ MetricLevel.CORE,
+ Tag.NAME.toString(),
+ METRIC_POINT_IN,
+ Tag.DATABASE.toString(),
+ database,
+ Tag.REGION.toString(),
+ dataRegionId,
+ Tag.TYPE.toString(),
+ Metric.MEMTABLE_POINT_COUNT.toString());
+ if (!insertNode.isGeneratedByRemoteConsensusLeader()) {
MetricService.getInstance()
.count(
pointsInserted,
- Metric.QUANTITY.toString(),
+ Metric.LEADER_QUANTITY.toString(),
MetricLevel.CORE,
Tag.NAME.toString(),
METRIC_POINT_IN,
@@ -395,23 +334,6 @@ public abstract class AbstractMemTable implements
IMemTable {
dataRegionId,
Tag.TYPE.toString(),
Metric.MEMTABLE_POINT_COUNT.toString());
- if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) {
- MetricService.getInstance()
- .count(
- pointsInserted,
- Metric.LEADER_QUANTITY.toString(),
- MetricLevel.CORE,
- Tag.NAME.toString(),
- METRIC_POINT_IN,
- Tag.DATABASE.toString(),
- database,
- Tag.REGION.toString(),
- dataRegionId,
- Tag.TYPE.toString(),
- Metric.MEMTABLE_POINT_COUNT.toString());
- }
- } catch (RuntimeException e) {
- throw new WriteProcessException(e);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
index 958f9fdad9a..b3e8202224c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus;
@@ -95,9 +96,9 @@ public interface IMemTable extends WALEntryValue {
*
* @param insertRowNode insertRowNode
*/
- void insert(InsertRowNode insertRowNode);
+ int insert(InsertRowNode insertRowNode);
- void insertAlignedRow(InsertRowNode insertRowNode);
+ int insertAlignedRow(InsertRowNode insertRowNode);
/**
* insert tablet into this memtable. The rows to be inserted are in the
range [start, end). Null
@@ -108,11 +109,10 @@ public interface IMemTable extends WALEntryValue {
* @param start included
* @param end excluded
*/
- void insertTablet(InsertTabletNode insertTabletNode, int start, int end)
+ int insertTablet(InsertTabletNode insertTabletNode, int start, int end)
throws WriteProcessException;
- void insertAlignedTablet(
- InsertTabletNode insertTabletNode, int start, int end, TSStatus[]
results)
+ int insertAlignedTablet(InsertTabletNode insertTabletNode, int start, int
end, TSStatus[] results)
throws WriteProcessException;
ReadOnlyMemChunk query(
@@ -208,4 +208,6 @@ public interface IMemTable extends WALEntryValue {
void markAsNotGeneratedByPipe();
boolean isTotallyGeneratedByPipe();
+
+ void updateMemtablePointCountMetric(InsertNode insertNode, int
pointsInserted);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
index 56b2efc4c4c..f1dde34972f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java
@@ -332,10 +332,11 @@ public class TsFileProcessor {
insertRowNode,
tsFileResource);
+ int pointInserted;
if (insertRowNode.isAligned()) {
- workMemTable.insertAlignedRow(insertRowNode);
+ pointInserted = workMemTable.insertAlignedRow(insertRowNode);
} else {
- workMemTable.insert(insertRowNode);
+ pointInserted = workMemTable.insert(insertRowNode);
}
// Update start time of this memtable
@@ -345,6 +346,7 @@ public class TsFileProcessor {
if (!sequence) {
tsFileResource.updateEndTime(insertRowNode.getDeviceID(),
insertRowNode.getTime());
}
+ workMemTable.updateMemtablePointCountMetric(insertRowNode, pointInserted);
tsFileResource.updateProgressIndex(insertRowNode.getProgressIndex());
// RecordScheduleMemTableCost
@@ -419,14 +421,14 @@ public class TsFileProcessor {
walFlushListener.getWalEntryHandler(),
insertRowsNode,
tsFileResource);
- for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
+ int pointInserted = 0;
+ for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) {
if (insertRowNode.isAligned()) {
- workMemTable.insertAlignedRow(insertRowNode);
+ pointInserted += workMemTable.insertAlignedRow(insertRowNode);
} else {
- workMemTable.insert(insertRowNode);
+ pointInserted += workMemTable.insert(insertRowNode);
}
-
// update start time of this memtable
tsFileResource.updateStartTime(insertRowNode.getDeviceID(),
insertRowNode.getTime());
// for sequence tsfile, we update the endTime only when the file is
prepared to be closed.
@@ -435,6 +437,8 @@ public class TsFileProcessor {
tsFileResource.updateEndTime(insertRowNode.getDeviceID(),
insertRowNode.getTime());
}
}
+ workMemTable.updateMemtablePointCountMetric(insertRowsNode, pointInserted);
+
tsFileResource.updateProgressIndex(insertRowsNode.getProgressIndex());
// recordScheduleMemTableCost
costsForMetrics[3] += System.nanoTime() - startTime;
@@ -584,15 +588,17 @@ public class TsFileProcessor {
insertTabletNode,
tsFileResource);
+ int pointInserted = 0;
for (int[] rangePair : rangeList) {
int start = rangePair[0];
int end = rangePair[1];
try {
if (insertTabletNode.isAligned()) {
- workMemTable.insertAlignedTablet(
- insertTabletNode, start, end, noFailure ? null : results);
+ pointInserted +=
+ workMemTable.insertAlignedTablet(
+ insertTabletNode, start, end, noFailure ? null : results);
} else {
- workMemTable.insertTablet(insertTabletNode, start, end);
+ pointInserted += workMemTable.insertTablet(insertTabletNode, start,
end);
}
} catch (WriteProcessException e) {
for (int i = start; i < end; i++) {
@@ -627,7 +633,7 @@ public class TsFileProcessor {
}
}
}
-
+ workMemTable.updateMemtablePointCountMetric(insertTabletNode,
pointInserted);
tsFileResource.updateProgressIndex(insertTabletNode.getProgressIndex());
// recordScheduleMemTableCost
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
index 9b98a3c5b79..be5bd356f5a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java
@@ -91,24 +91,29 @@ public class TsFilePlanRedoer {
}
}
+ int pointsInserted;
if (node instanceof InsertRowNode) {
if (node.isAligned()) {
- recoveryMemTable.insertAlignedRow((InsertRowNode) node);
+ pointsInserted = recoveryMemTable.insertAlignedRow((InsertRowNode)
node);
} else {
- recoveryMemTable.insert((InsertRowNode) node);
+ pointsInserted = recoveryMemTable.insert((InsertRowNode) node);
}
} else {
if (node.isAligned()) {
- recoveryMemTable.insertAlignedTablet(
- (InsertTabletNode) node, 0, ((InsertTabletNode)
node).getRowCount(), null);
+ pointsInserted =
+ recoveryMemTable.insertAlignedTablet(
+ (InsertTabletNode) node, 0, ((InsertTabletNode)
node).getRowCount(), null);
} else {
- recoveryMemTable.insertTablet(
- (InsertTabletNode) node, 0, ((InsertTabletNode)
node).getRowCount());
+ pointsInserted =
+ recoveryMemTable.insertTablet(
+ (InsertTabletNode) node, 0, ((InsertTabletNode)
node).getRowCount());
}
}
+ recoveryMemTable.updateMemtablePointCountMetric(node, pointsInserted);
}
void redoInsertRows(InsertRowsNode insertRowsNode) {
+ int pointsInserted = 0;
for (InsertRowNode node : insertRowsNode.getInsertRowNodeList()) {
if (!node.hasValidMeasurements()) {
continue;
@@ -125,11 +130,12 @@ public class TsFilePlanRedoer {
}
}
if (node.isAligned()) {
- recoveryMemTable.insertAlignedRow(node);
+ pointsInserted += recoveryMemTable.insertAlignedRow(node);
} else {
- recoveryMemTable.insert(node);
+ pointsInserted += recoveryMemTable.insert(node);
}
}
+ recoveryMemTable.updateMemtablePointCountMetric(insertRowsNode,
pointsInserted);
}
void resetRecoveryMemTable(IMemTable memTable) {