This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch batch_update_pointInserted_metric in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit aeac35d033280eabeb9aab4208552867b0dbaa31 Author: HTHou <[email protected]> AuthorDate: Thu Oct 24 22:19:54 2024 +0800 Batch update inserted point number metric --- .../dataregion/memtable/AbstractMemTable.java | 140 +++++---------------- .../dataregion/memtable/IMemTable.java | 12 +- .../dataregion/memtable/TsFileProcessor.java | 26 ++-- .../wal/recover/file/TsFilePlanRedoer.java | 22 ++-- 4 files changed, 68 insertions(+), 132 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index 97db8205ce8..67f7d192053 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -35,6 +35,7 @@ import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils; @@ -205,7 +206,7 @@ public abstract class AbstractMemTable implements IMemTable { } @Override - public void insert(InsertRowNode insertRowNode) { + public int insert(InsertRowNode insertRowNode) { String[] measurements = insertRowNode.getMeasurements(); Object[] values = insertRowNode.getValues(); @@ -235,39 +236,11 @@ public abstract class AbstractMemTable implements IMemTable { - nullPointsNumber; totalPointsNum += pointsInserted; - - MetricService.getInstance() - .count( - pointsInserted, - Metric.QUANTITY.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - METRIC_POINT_IN, - Tag.DATABASE.toString(), - database, - Tag.REGION.toString(), - dataRegionId, - Tag.TYPE.toString(), - Metric.MEMTABLE_POINT_COUNT.toString()); - if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) { - MetricService.getInstance() - .count( - pointsInserted, - Metric.LEADER_QUANTITY.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - METRIC_POINT_IN, - Tag.DATABASE.toString(), - database, - Tag.REGION.toString(), - dataRegionId, - Tag.TYPE.toString(), - Metric.MEMTABLE_POINT_COUNT.toString()); - } + return pointsInserted; } @Override - public void insertAlignedRow(InsertRowNode insertRowNode) { + public int insertAlignedRow(InsertRowNode insertRowNode) { String[] measurements = insertRowNode.getMeasurements(); Object[] values = insertRowNode.getValues(); @@ -287,7 +260,7 @@ public abstract class AbstractMemTable implements IMemTable { dataTypes.add(schema.getType()); } if (schemaList.isEmpty()) { - return; + return 0; } memSize += MemUtils.getAlignedRowRecordSize(dataTypes, values, insertRowNode.getColumnCategories()); @@ -295,39 +268,11 @@ public abstract class AbstractMemTable implements IMemTable { int pointsInserted = insertRowNode.getMeasurementColumnCnt() - insertRowNode.getFailedMeasurementNumber(); totalPointsNum += pointsInserted; - - MetricService.getInstance() - .count( - pointsInserted, - Metric.QUANTITY.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - METRIC_POINT_IN, - Tag.DATABASE.toString(), - database, - Tag.REGION.toString(), - dataRegionId, - Tag.TYPE.toString(), - Metric.MEMTABLE_POINT_COUNT.toString()); - if (!insertRowNode.isGeneratedByRemoteConsensusLeader()) { - MetricService.getInstance() - .count( - pointsInserted, - Metric.LEADER_QUANTITY.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - METRIC_POINT_IN, - Tag.DATABASE.toString(), - database, - Tag.REGION.toString(), - dataRegionId, - Tag.TYPE.toString(), - Metric.MEMTABLE_POINT_COUNT.toString()); - } + return pointsInserted; } @Override - public void insertTablet(InsertTabletNode insertTabletNode, int start, int end) + public int insertTablet(InsertTabletNode insertTabletNode, int start, int end) throws WriteProcessException { try { writeTabletNode(insertTabletNode, start, end); @@ -336,41 +281,14 @@ public abstract class AbstractMemTable implements IMemTable { (insertTabletNode.getDataTypes().length - insertTabletNode.getFailedMeasurementNumber()) * (end - start); totalPointsNum += pointsInserted; - MetricService.getInstance() - .count( - pointsInserted, - Metric.QUANTITY.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - METRIC_POINT_IN, - Tag.DATABASE.toString(), - database, - Tag.REGION.toString(), - dataRegionId, - Tag.TYPE.toString(), - Metric.MEMTABLE_POINT_COUNT.toString()); - if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) { - MetricService.getInstance() - .count( - pointsInserted, - Metric.LEADER_QUANTITY.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - METRIC_POINT_IN, - Tag.DATABASE.toString(), - database, - Tag.REGION.toString(), - dataRegionId, - Tag.TYPE.toString(), - Metric.MEMTABLE_POINT_COUNT.toString()); - } + return pointsInserted; } catch (RuntimeException e) { throw new WriteProcessException(e); } } @Override - public void insertAlignedTablet( + public int insertAlignedTablet( InsertTabletNode insertTabletNode, int start, int end, TSStatus[] results) throws WriteProcessException { try { @@ -382,10 +300,31 @@ public abstract class AbstractMemTable implements IMemTable { - insertTabletNode.getFailedMeasurementNumber()) * (end - start); totalPointsNum += pointsInserted; + return pointsInserted; + } catch (RuntimeException e) { + throw new WriteProcessException(e); + } + } + + public void updateMemtablePointCountMetric(InsertNode insertNode, int pointsInserted) { + MetricService.getInstance() + .count( + pointsInserted, + Metric.QUANTITY.toString(), + MetricLevel.CORE, + Tag.NAME.toString(), + METRIC_POINT_IN, + Tag.DATABASE.toString(), + database, + Tag.REGION.toString(), + dataRegionId, + Tag.TYPE.toString(), + Metric.MEMTABLE_POINT_COUNT.toString()); + if (!insertNode.isGeneratedByRemoteConsensusLeader()) { MetricService.getInstance() .count( pointsInserted, - Metric.QUANTITY.toString(), + Metric.LEADER_QUANTITY.toString(), MetricLevel.CORE, Tag.NAME.toString(), METRIC_POINT_IN, @@ -395,23 +334,6 @@ public abstract class AbstractMemTable implements IMemTable { dataRegionId, Tag.TYPE.toString(), Metric.MEMTABLE_POINT_COUNT.toString()); - if (!insertTabletNode.isGeneratedByRemoteConsensusLeader()) { - MetricService.getInstance() - .count( - pointsInserted, - Metric.LEADER_QUANTITY.toString(), - MetricLevel.CORE, - Tag.NAME.toString(), - METRIC_POINT_IN, - Tag.DATABASE.toString(), - database, - Tag.REGION.toString(), - dataRegionId, - Tag.TYPE.toString(), - Metric.MEMTABLE_POINT_COUNT.toString()); - } - } catch (RuntimeException e) { - throw new WriteProcessException(e); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java index 958f9fdad9a..b3e8202224c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IMemTable.java @@ -25,6 +25,7 @@ import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.storageengine.dataregion.flush.FlushStatus; @@ -95,9 +96,9 @@ public interface IMemTable extends WALEntryValue { * * @param insertRowNode insertRowNode */ - void insert(InsertRowNode insertRowNode); + int insert(InsertRowNode insertRowNode); - void insertAlignedRow(InsertRowNode insertRowNode); + int insertAlignedRow(InsertRowNode insertRowNode); /** * insert tablet into this memtable. The rows to be inserted are in the range [start, end). Null @@ -108,11 +109,10 @@ public interface IMemTable extends WALEntryValue { * @param start included * @param end excluded */ - void insertTablet(InsertTabletNode insertTabletNode, int start, int end) + int insertTablet(InsertTabletNode insertTabletNode, int start, int end) throws WriteProcessException; - void insertAlignedTablet( - InsertTabletNode insertTabletNode, int start, int end, TSStatus[] results) + int insertAlignedTablet(InsertTabletNode insertTabletNode, int start, int end, TSStatus[] results) throws WriteProcessException; ReadOnlyMemChunk query( @@ -208,4 +208,6 @@ public interface IMemTable extends WALEntryValue { void markAsNotGeneratedByPipe(); boolean isTotallyGeneratedByPipe(); + + void updateMemtablePointCountMetric(InsertNode insertNode, int pointsInserted); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index 56b2efc4c4c..f1dde34972f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -332,10 +332,11 @@ public class TsFileProcessor { insertRowNode, tsFileResource); + int pointInserted; if (insertRowNode.isAligned()) { - workMemTable.insertAlignedRow(insertRowNode); + pointInserted = workMemTable.insertAlignedRow(insertRowNode); } else { - workMemTable.insert(insertRowNode); + pointInserted = workMemTable.insert(insertRowNode); } // Update start time of this memtable @@ -345,6 +346,7 @@ public class TsFileProcessor { if (!sequence) { tsFileResource.updateEndTime(insertRowNode.getDeviceID(), insertRowNode.getTime()); } + workMemTable.updateMemtablePointCountMetric(insertRowNode, pointInserted); tsFileResource.updateProgressIndex(insertRowNode.getProgressIndex()); // RecordScheduleMemTableCost @@ -419,14 +421,14 @@ public class TsFileProcessor { walFlushListener.getWalEntryHandler(), insertRowsNode, tsFileResource); - for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) { + int pointInserted = 0; + for (InsertRowNode insertRowNode : insertRowsNode.getInsertRowNodeList()) { if (insertRowNode.isAligned()) { - workMemTable.insertAlignedRow(insertRowNode); + pointInserted += workMemTable.insertAlignedRow(insertRowNode); } else { - workMemTable.insert(insertRowNode); + pointInserted += workMemTable.insert(insertRowNode); } - // update start time of this memtable tsFileResource.updateStartTime(insertRowNode.getDeviceID(), insertRowNode.getTime()); // for sequence tsfile, we update the endTime only when the file is prepared to be closed. @@ -435,6 +437,8 @@ public class TsFileProcessor { tsFileResource.updateEndTime(insertRowNode.getDeviceID(), insertRowNode.getTime()); } } + workMemTable.updateMemtablePointCountMetric(insertRowsNode, pointInserted); + tsFileResource.updateProgressIndex(insertRowsNode.getProgressIndex()); // recordScheduleMemTableCost costsForMetrics[3] += System.nanoTime() - startTime; @@ -584,15 +588,17 @@ public class TsFileProcessor { insertTabletNode, tsFileResource); + int pointInserted = 0; for (int[] rangePair : rangeList) { int start = rangePair[0]; int end = rangePair[1]; try { if (insertTabletNode.isAligned()) { - workMemTable.insertAlignedTablet( - insertTabletNode, start, end, noFailure ? null : results); + pointInserted += + workMemTable.insertAlignedTablet( + insertTabletNode, start, end, noFailure ? null : results); } else { - workMemTable.insertTablet(insertTabletNode, start, end); + pointInserted += workMemTable.insertTablet(insertTabletNode, start, end); } } catch (WriteProcessException e) { for (int i = start; i < end; i++) { @@ -627,7 +633,7 @@ public class TsFileProcessor { } } } - + workMemTable.updateMemtablePointCountMetric(insertTabletNode, pointInserted); tsFileResource.updateProgressIndex(insertTabletNode.getProgressIndex()); // recordScheduleMemTableCost diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java index 9b98a3c5b79..be5bd356f5a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/TsFilePlanRedoer.java @@ -91,24 +91,29 @@ public class TsFilePlanRedoer { } } + int pointsInserted; if (node instanceof InsertRowNode) { if (node.isAligned()) { - recoveryMemTable.insertAlignedRow((InsertRowNode) node); + pointsInserted = recoveryMemTable.insertAlignedRow((InsertRowNode) node); } else { - recoveryMemTable.insert((InsertRowNode) node); + pointsInserted = recoveryMemTable.insert((InsertRowNode) node); } } else { if (node.isAligned()) { - recoveryMemTable.insertAlignedTablet( - (InsertTabletNode) node, 0, ((InsertTabletNode) node).getRowCount(), null); + pointsInserted = + recoveryMemTable.insertAlignedTablet( + (InsertTabletNode) node, 0, ((InsertTabletNode) node).getRowCount(), null); } else { - recoveryMemTable.insertTablet( - (InsertTabletNode) node, 0, ((InsertTabletNode) node).getRowCount()); + pointsInserted = + recoveryMemTable.insertTablet( + (InsertTabletNode) node, 0, ((InsertTabletNode) node).getRowCount()); } } + recoveryMemTable.updateMemtablePointCountMetric(node, pointsInserted); } void redoInsertRows(InsertRowsNode insertRowsNode) { + int pointsInserted = 0; for (InsertRowNode node : insertRowsNode.getInsertRowNodeList()) { if (!node.hasValidMeasurements()) { continue; @@ -125,11 +130,12 @@ public class TsFilePlanRedoer { } } if (node.isAligned()) { - recoveryMemTable.insertAlignedRow(node); + pointsInserted += recoveryMemTable.insertAlignedRow(node); } else { - recoveryMemTable.insert(node); + pointsInserted += recoveryMemTable.insert(node); } } + recoveryMemTable.updateMemtablePointCountMetric(insertRowsNode, pointsInserted); } void resetRecoveryMemTable(IMemTable memTable) {
