This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new aea54d1bab8 Pipe: Added manual performance test in
TsFileInsertionEventParserTest (#17641)
aea54d1bab8 is described below
commit aea54d1bab892f5cc09e2e5c6a25ed8b7e78dfff
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 14 10:37:23 2026 +0800
Pipe: Added manual performance test in TsFileInsertionEventParserTest
(#17641)
* Update TsFileInsertionEventParserTest.java
* Update TsFileInsertionEventParserTest.java
* Update TsFileInsertionEventParserTest.java
* chips
---
.../pipe/event/TsFileInsertionEventParserTest.java | 560 +++++++++++++++++++++
.../apache/iotdb/commons/conf/CommonConfig.java | 13 -
.../iotdb/commons/pipe/config/PipeConfig.java | 5 -
.../iotdb/commons/pipe/config/PipeDescriptor.java | 8 -
4 files changed, 560 insertions(+), 26 deletions(-)
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
index 8a19b25986a..cdde28bce38 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
@@ -24,27 +24,33 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixTreePattern;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.query.TsFileInsertionEventQueryParser;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.AlignedSinglePageWholeChunkReader;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.table.TsFileInsertionEventTableParser;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.pipe.api.access.Row;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.common.conf.TSFileDescriptor;
+import org.apache.tsfile.enums.ColumnCategory;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.MetaMarker;
import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
import org.apache.tsfile.file.metadata.ChunkMetadata;
import org.apache.tsfile.file.metadata.IChunkMetadata;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.TsFileSequenceReader;
@@ -60,6 +66,7 @@ import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -73,6 +80,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -89,6 +97,12 @@ public class TsFileInsertionEventParserTest {
private static final String PREFIX_FORMAT = "prefix";
private static final String IOTDB_FORMAT = "iotdb";
+ private static final String MANUAL_SCAN_PARSER_PERFORMANCE_TEST =
+ "iotdb.scan.parser.performance.enabled";
+ private static final String MANUAL_QUERY_PARSER_PERFORMANCE_TEST =
+ "iotdb.query.parser.performance.enabled";
+ private static final String MANUAL_TABLE_PARSER_PERFORMANCE_TEST =
+ "iotdb.table.parser.performance.enabled";
private File alignedTsFile;
private File nonalignedTsFile;
@@ -262,6 +276,260 @@ public class TsFileInsertionEventParserTest {
}
}
+ @Test
+ public void manualTestScanParserSplitPerformance() throws Exception {
+ Assume.assumeTrue(
+ "Set -D" + MANUAL_SCAN_PARSER_PERFORMANCE_TEST + "=true to run this
manual test.",
+ Boolean.getBoolean(MANUAL_SCAN_PARSER_PERFORMANCE_TEST));
+
+ final int deviceCount =
+
getManualPerformanceIntProperty("iotdb.scan.parser.performance.device.count",
1);
+ final int measurementCount =
+
getManualPerformanceIntProperty("iotdb.scan.parser.performance.measurement.count",
256);
+ final int rowCountPerDevice =
+
getManualPerformanceIntProperty("iotdb.scan.parser.performance.row.count",
200_000);
+ final int tabletRowCount =
+
getManualPerformanceIntProperty("iotdb.scan.parser.performance.tablet.row.count",
1024);
+ final long pipeMaxReaderChunkSize =
+ getManualPerformanceLongProperty(
+ "iotdb.scan.parser.performance.reader.chunk.size", 1024 * 1024L);
+ final long expectedPointCount = (long) deviceCount * measurementCount *
rowCountPerDevice;
+ final long originalPipeMaxReaderChunkSize =
+ PipeConfig.getInstance().getPipeMaxReaderChunkSize();
+
+
CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(pipeMaxReaderChunkSize);
+
+ alignedTsFile = new File("scan-parser-split-performance.tsfile");
+ try {
+ final List<IMeasurementSchema> schemaList = new ArrayList<>();
+ for (int i = 0; i < measurementCount; ++i) {
+ schemaList.add(
+ new MeasurementSchema(
+ "s" + i, TSDataType.INT64, TSEncoding.PLAIN,
CompressionType.LZ4));
+ }
+
+ final long writeStartTime = System.nanoTime();
+ generateLargeAlignedTsFile(
+ alignedTsFile, schemaList, deviceCount, rowCountPerDevice,
tabletRowCount);
+ final long writeElapsedNanos = System.nanoTime() - writeStartTime;
+
+ long pointCount = 0;
+ long tabletRowCountSum = 0;
+ int tabletCount = 0;
+ int alignedTabletCount = 0;
+ int minMeasurementCountInTablet = Integer.MAX_VALUE;
+ int maxMeasurementCountInTablet = 0;
+
+ final long parseStartTime = System.nanoTime();
+ try (final TsFileInsertionEventScanParser parser =
+ new TsFileInsertionEventScanParser(
+ alignedTsFile,
+ new PrefixTreePattern("root"),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE,
+ null,
+ null,
+ false)) {
+ for (final Pair<Tablet, Boolean> tabletWithIsAligned :
parser.toTabletWithIsAligneds()) {
+ final Tablet tablet = tabletWithIsAligned.getLeft();
+ ++tabletCount;
+ if (tabletWithIsAligned.getRight()) {
+ ++alignedTabletCount;
+ }
+ tabletRowCountSum += tablet.getRowSize();
+ // The generated performance file is dense and mod-free. Avoid
scanning every cell here,
+ // otherwise parseTime also includes the test-side validation cost.
+ pointCount += (long) tablet.getRowSize() *
tablet.getSchemas().size();
+ minMeasurementCountInTablet =
+ Math.min(minMeasurementCountInTablet,
tablet.getSchemas().size());
+ maxMeasurementCountInTablet =
+ Math.max(maxMeasurementCountInTablet,
tablet.getSchemas().size());
+ }
+ }
+ final long parseElapsedNanos = System.nanoTime() - parseStartTime;
+
+ Assert.assertEquals(expectedPointCount, pointCount);
+ Assert.assertTrue(
+ "Expected TsFileInsertionEventScanParser to split tablets.",
tabletCount > 1);
+ Assert.assertTrue(
+ "Expected measurement split by pipe max reader chunk size.",
+ maxMeasurementCountInTablet < measurementCount);
+
+ printScanParserPerformanceResult(
+ alignedTsFile.length(),
+ deviceCount,
+ measurementCount,
+ rowCountPerDevice,
+ tabletRowCount,
+ pipeMaxReaderChunkSize,
+ expectedPointCount,
+ writeElapsedNanos,
+ parseElapsedNanos,
+ tabletCount,
+ alignedTabletCount,
+ tabletRowCountSum,
+ pointCount,
+ minMeasurementCountInTablet,
+ maxMeasurementCountInTablet);
+ } finally {
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+ }
+ }
+
+ @Test
+ public void manualTestQueryParserPerformance() throws Exception {
+ Assume.assumeTrue(
+ "Set -D" + MANUAL_QUERY_PARSER_PERFORMANCE_TEST + "=true to run this
manual test.",
+ Boolean.getBoolean(MANUAL_QUERY_PARSER_PERFORMANCE_TEST));
+
+ final int deviceCount =
+
getManualPerformanceIntProperty("iotdb.query.parser.performance.device.count",
1);
+ final int measurementCount =
+
getManualPerformanceIntProperty("iotdb.query.parser.performance.measurement.count",
256);
+ final int rowCountPerDevice =
+
getManualPerformanceIntProperty("iotdb.query.parser.performance.row.count",
200_000);
+ final int tabletRowCount =
+
getManualPerformanceIntProperty("iotdb.query.parser.performance.tablet.row.count",
1024);
+ final long expectedPointCount = (long) deviceCount * measurementCount *
rowCountPerDevice;
+
+ alignedTsFile = new File("query-parser-performance.tsfile");
+ final List<IMeasurementSchema> schemaList = new ArrayList<>();
+ for (int i = 0; i < measurementCount; ++i) {
+ schemaList.add(
+ new MeasurementSchema("s" + i, TSDataType.INT64, TSEncoding.PLAIN,
CompressionType.LZ4));
+ }
+
+ final long writeStartTime = System.nanoTime();
+ generateLargeAlignedTsFile(
+ alignedTsFile, schemaList, deviceCount, rowCountPerDevice,
tabletRowCount);
+ final long writeElapsedNanos = System.nanoTime() - writeStartTime;
+
+ final ParserPerformanceStats stats;
+ final long parseStartTime = System.nanoTime();
+ try (final TsFileInsertionEventQueryParser parser =
+ new TsFileInsertionEventQueryParser(
+ alignedTsFile, new PrefixTreePattern("root"), Long.MIN_VALUE,
Long.MAX_VALUE, null)) {
+ stats =
+ collectTabletInsertionEventParserPerformanceStats(
+ parser.toTabletInsertionEvents(), false);
+ }
+ final long parseElapsedNanos = System.nanoTime() - parseStartTime;
+
+ Assert.assertEquals(expectedPointCount, stats.pointCount);
+ Assert.assertTrue(
+ "Expected TsFileInsertionEventQueryParser to parse tablets.",
stats.tabletCount > 0);
+
+ printTabletInsertionEventParserPerformanceResult(
+ "TsFileInsertionEventQueryParser",
+ alignedTsFile.length(),
+ String.format(
+ Locale.ROOT,
+ "deviceCount=%d, measurementCount=%d, rowCountPerDevice=%d",
+ deviceCount,
+ measurementCount,
+ rowCountPerDevice),
+ tabletRowCount,
+ "",
+ expectedPointCount,
+ writeElapsedNanos,
+ parseElapsedNanos,
+ stats,
+ "measurement");
+ }
+
+ @Test
+ public void manualTestTableParserSplitPerformance() throws Exception {
+ Assume.assumeTrue(
+ "Set -D" + MANUAL_TABLE_PARSER_PERFORMANCE_TEST + "=true to run this
manual test.",
+ Boolean.getBoolean(MANUAL_TABLE_PARSER_PERFORMANCE_TEST));
+
+ final int tableCount =
+
getManualPerformanceIntProperty("iotdb.table.parser.performance.table.count",
1);
+ final int deviceCount =
+
getManualPerformanceIntProperty("iotdb.table.parser.performance.device.count",
1);
+ final int tagCount =
+
getManualPerformanceIntProperty("iotdb.table.parser.performance.tag.count", 1);
+ final int fieldCount =
+
getManualPerformanceIntProperty("iotdb.table.parser.performance.field.count",
256);
+ final int rowCountPerDevice =
+
getManualPerformanceIntProperty("iotdb.table.parser.performance.row.count",
200_000);
+ final int tabletRowCount =
+
getManualPerformanceIntProperty("iotdb.table.parser.performance.tablet.row.count",
1024);
+ final long pipeMaxReaderChunkSize =
+ getManualPerformanceLongProperty(
+ "iotdb.table.parser.performance.reader.chunk.size", 1024 * 1024L);
+ final long expectedPointCount =
+ (long) tableCount * deviceCount * fieldCount * rowCountPerDevice;
+ final long originalPipeMaxReaderChunkSize =
+ PipeConfig.getInstance().getPipeMaxReaderChunkSize();
+
+
CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(pipeMaxReaderChunkSize);
+
+ alignedTsFile = new File("table-parser-split-performance.tsfile");
+ try {
+ final long writeStartTime = System.nanoTime();
+ generateLargeTableTsFile(
+ alignedTsFile,
+ tableCount,
+ deviceCount,
+ tagCount,
+ fieldCount,
+ rowCountPerDevice,
+ tabletRowCount);
+ final long writeElapsedNanos = System.nanoTime() - writeStartTime;
+
+ final ParserPerformanceStats stats;
+ final long parseStartTime = System.nanoTime();
+ try (final TsFileInsertionEventTableParser parser =
+ new TsFileInsertionEventTableParser(
+ alignedTsFile,
+ new TablePattern(true, null, null),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE,
+ null,
+ null,
+ null,
+ false)) {
+ stats =
+ collectTabletInsertionEventParserPerformanceStats(
+ parser.toTabletInsertionEvents(), true);
+ }
+ final long parseElapsedNanos = System.nanoTime() - parseStartTime;
+
+ Assert.assertEquals(expectedPointCount, stats.pointCount);
+ Assert.assertTrue(
+ "Expected TsFileInsertionEventTableParser to split tablets.",
stats.tabletCount > 1);
+ Assert.assertTrue(
+ "Expected field split by pipe max reader chunk size.",
+ fieldCount == 1 || stats.maxColumnCountInTablet < fieldCount);
+
+ printTabletInsertionEventParserPerformanceResult(
+ "TsFileInsertionEventTableParser split",
+ alignedTsFile.length(),
+ String.format(
+ Locale.ROOT,
+ "tableCount=%d, deviceCount=%d, tagCount=%d, fieldCount=%d,
rowCountPerDevice=%d",
+ tableCount,
+ deviceCount,
+ tagCount,
+ fieldCount,
+ rowCountPerDevice),
+ tabletRowCount,
+ ", pipeMaxReaderChunkSize=" + formatBytes(pipeMaxReaderChunkSize),
+ expectedPointCount,
+ writeElapsedNanos,
+ parseElapsedNanos,
+ stats,
+ "field");
+ } finally {
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+ }
+ }
+
public void testToTabletInsertionEvents(final boolean isQuery) throws
Exception {
// Test empty chunk
testMixedTsFileWithEmptyChunk(isQuery);
@@ -702,6 +970,289 @@ public class TsFileInsertionEventParserTest {
alignedTsFile, new PrefixTreePattern("root"), Long.MIN_VALUE,
Long.MAX_VALUE, isQuery, 4);
}
+ private void generateLargeAlignedTsFile(
+ final File tsFile,
+ final List<IMeasurementSchema> schemaList,
+ final int deviceCount,
+ final int rowCountPerDevice,
+ final int tabletRowCount)
+ throws Exception {
+ if (tsFile.exists()) {
+ Assert.assertTrue(tsFile.delete());
+ }
+
+ try (final TsFileWriter writer = new TsFileWriter(tsFile)) {
+ for (int deviceIndex = 0; deviceIndex < deviceCount; ++deviceIndex) {
+ final String device = "root.sg.performance.d" + deviceIndex;
+ writer.registerAlignedTimeseries(new PartialPath(device), schemaList);
+
+ final Tablet tablet = new Tablet(device, schemaList, tabletRowCount);
+ for (int row = 0; row < rowCountPerDevice; ++row) {
+ int rowIndex = tablet.getRowSize();
+ if (rowIndex == tablet.getMaxRowNumber()) {
+ writer.writeAligned(tablet);
+ tablet.reset();
+ rowIndex = 0;
+ }
+
+ tablet.addTimestamp(rowIndex, row);
+ for (int measurementIndex = 0; measurementIndex < schemaList.size();
++measurementIndex) {
+ tablet.addValue(
+ rowIndex,
+ measurementIndex,
+ ((long) deviceIndex << 48) + (long) row * schemaList.size() +
measurementIndex);
+ }
+ }
+
+ if (tablet.getRowSize() > 0) {
+ writer.writeAligned(tablet);
+ }
+ }
+ }
+ }
+
+ private void generateLargeTableTsFile(
+ final File tsFile,
+ final int tableCount,
+ final int deviceCount,
+ final int tagCount,
+ final int fieldCount,
+ final int rowCountPerDevice,
+ final int tabletRowCount)
+ throws Exception {
+ if (tsFile.exists()) {
+ Assert.assertTrue(tsFile.delete());
+ }
+
+ try (final TsFileWriter writer = new TsFileWriter(tsFile)) {
+ for (int tableIndex = 0; tableIndex < tableCount; ++tableIndex) {
+ final String tableName = "performance_table_" + tableIndex;
+ final List<IMeasurementSchema> schemaList = new ArrayList<>();
+ final List<String> columnNameList = new ArrayList<>();
+ final List<TSDataType> dataTypeList = new ArrayList<>();
+ final List<ColumnCategory> columnCategoryList = new ArrayList<>();
+
+ for (int tagIndex = 0; tagIndex < tagCount; ++tagIndex) {
+ final String tagName = "tag" + tagIndex;
+ schemaList.add(
+ new MeasurementSchema(
+ tagName, TSDataType.STRING, TSEncoding.PLAIN,
CompressionType.LZ4));
+ columnNameList.add(tagName);
+ dataTypeList.add(TSDataType.STRING);
+ columnCategoryList.add(ColumnCategory.TAG);
+ }
+
+ for (int fieldIndex = 0; fieldIndex < fieldCount; ++fieldIndex) {
+ final String fieldName = "s" + fieldIndex;
+ schemaList.add(
+ new MeasurementSchema(
+ fieldName, TSDataType.INT64, TSEncoding.PLAIN,
CompressionType.LZ4));
+ columnNameList.add(fieldName);
+ dataTypeList.add(TSDataType.INT64);
+ columnCategoryList.add(ColumnCategory.FIELD);
+ }
+
+ writer.registerTableSchema(new TableSchema(tableName, schemaList,
columnCategoryList));
+
+ for (int deviceIndex = 0; deviceIndex < deviceCount; ++deviceIndex) {
+ final Tablet tablet =
+ new Tablet(
+ tableName, columnNameList, dataTypeList, columnCategoryList,
tabletRowCount);
+
+ for (int row = 0; row < rowCountPerDevice; ++row) {
+ int rowIndex = tablet.getRowSize();
+ if (rowIndex == tablet.getMaxRowNumber()) {
+ writer.writeTable(tablet);
+ tablet.reset();
+ rowIndex = 0;
+ }
+
+ tablet.addTimestamp(rowIndex, row);
+ for (int tagIndex = 0; tagIndex < tagCount; ++tagIndex) {
+ tablet.addValue(rowIndex, tagIndex, "d" + deviceIndex + "_tag" +
tagIndex);
+ }
+ for (int fieldIndex = 0; fieldIndex < fieldCount; ++fieldIndex) {
+ tablet.addValue(
+ rowIndex,
+ tagCount + fieldIndex,
+ ((long) tableIndex << 56)
+ + ((long) deviceIndex << 48)
+ + (long) row * fieldCount
+ + fieldIndex);
+ }
+ }
+
+ if (tablet.getRowSize() > 0) {
+ writer.writeTable(tablet);
+ }
+ }
+ }
+ }
+ }
+
+ private ParserPerformanceStats
collectTabletInsertionEventParserPerformanceStats(
+ final Iterable<TabletInsertionEvent> tabletInsertionEvents,
+ final boolean countFieldColumnsOnly) {
+ final ParserPerformanceStats stats = new ParserPerformanceStats();
+
+ for (final TabletInsertionEvent tabletInsertionEvent :
tabletInsertionEvents) {
+ Assert.assertTrue(
+ "Expected parser to generate PipeRawTabletInsertionEvent.",
+ tabletInsertionEvent instanceof PipeRawTabletInsertionEvent);
+
+ final Tablet tablet = ((PipeRawTabletInsertionEvent)
tabletInsertionEvent).convertToTablet();
+ final int columnCount =
+ countFieldColumnsOnly ? getFieldColumnCount(tablet) :
tablet.getSchemas().size();
+
+ ++stats.tabletCount;
+ stats.tabletRowCountSum += tablet.getRowSize();
+ stats.pointCount += (long) tablet.getRowSize() * columnCount;
+ stats.minColumnCountInTablet = Math.min(stats.minColumnCountInTablet,
columnCount);
+ stats.maxColumnCountInTablet = Math.max(stats.maxColumnCountInTablet,
columnCount);
+ }
+
+ return stats;
+ }
+
+ private int getFieldColumnCount(final Tablet tablet) {
+ if (Objects.isNull(tablet.getColumnTypes())) {
+ return tablet.getSchemas().size();
+ }
+
+ int fieldCount = 0;
+ for (final ColumnCategory columnCategory : tablet.getColumnTypes()) {
+ if (ColumnCategory.FIELD.equals(columnCategory)) {
+ ++fieldCount;
+ }
+ }
+ return fieldCount;
+ }
+
+ private int getManualPerformanceIntProperty(final String propertyName, final
int defaultValue) {
+ final int value = Integer.getInteger(propertyName, defaultValue);
+ Assert.assertTrue(propertyName + " should be positive.", value > 0);
+ return value;
+ }
+
+ private long getManualPerformanceLongProperty(
+ final String propertyName, final long defaultValue) {
+ final Long value = Long.getLong(propertyName, defaultValue);
+ Assert.assertTrue(propertyName + " should be positive.", value > 0);
+ return value;
+ }
+
+ private void printScanParserPerformanceResult(
+ final long tsFileSizeInBytes,
+ final int deviceCount,
+ final int measurementCount,
+ final int rowCountPerDevice,
+ final int inputTabletRowCount,
+ final long pipeMaxReaderChunkSize,
+ final long expectedPointCount,
+ final long writeElapsedNanos,
+ final long parseElapsedNanos,
+ final int tabletCount,
+ final int alignedTabletCount,
+ final long parsedTabletRowCount,
+ final long pointCount,
+ final int minMeasurementCountInTablet,
+ final int maxMeasurementCountInTablet) {
+ final double writeElapsedSeconds = nanosToSeconds(writeElapsedNanos);
+ final double parseElapsedSeconds = nanosToSeconds(parseElapsedNanos);
+ final double pointThroughput = pointCount / Math.max(parseElapsedSeconds,
1.0e-9);
+ final double fileThroughputInMiBPerSecond =
+ tsFileSizeInBytes / 1024.0 / 1024.0 / Math.max(parseElapsedSeconds,
1.0e-9);
+
+ System.out.printf(
+ Locale.ROOT,
+ "%nTsFileInsertionEventScanParser split performance:%n"
+ + " fileSize=%s%n"
+ + " deviceCount=%d, measurementCount=%d, rowCountPerDevice=%d,
expectedPoints=%d%n"
+ + " inputTabletRowCount=%d, pipeMaxReaderChunkSize=%s%n"
+ + " writeTime=%.3fs, parseTime=%.3fs%n"
+ + " tablets=%d, alignedTablets=%d, parsedTabletRows=%d,
points=%d%n"
+ + " measurementCountInTablet[min=%d, max=%d]%n"
+ + " pointThroughput=%.2f points/s, fileThroughput=%.2f MiB/s%n",
+ formatBytes(tsFileSizeInBytes),
+ deviceCount,
+ measurementCount,
+ rowCountPerDevice,
+ expectedPointCount,
+ inputTabletRowCount,
+ formatBytes(pipeMaxReaderChunkSize),
+ writeElapsedSeconds,
+ parseElapsedSeconds,
+ tabletCount,
+ alignedTabletCount,
+ parsedTabletRowCount,
+ pointCount,
+ minMeasurementCountInTablet,
+ maxMeasurementCountInTablet,
+ pointThroughput,
+ fileThroughputInMiBPerSecond);
+ }
+
+ private void printTabletInsertionEventParserPerformanceResult(
+ final String parserName,
+ final long tsFileSizeInBytes,
+ final String dataShape,
+ final int inputTabletRowCount,
+ final String extraConfig,
+ final long expectedPointCount,
+ final long writeElapsedNanos,
+ final long parseElapsedNanos,
+ final ParserPerformanceStats stats,
+ final String columnName) {
+ final double writeElapsedSeconds = nanosToSeconds(writeElapsedNanos);
+ final double parseElapsedSeconds = nanosToSeconds(parseElapsedNanos);
+ final double pointThroughput = stats.pointCount /
Math.max(parseElapsedSeconds, 1.0e-9);
+ final double fileThroughputInMiBPerSecond =
+ tsFileSizeInBytes / 1024.0 / 1024.0 / Math.max(parseElapsedSeconds,
1.0e-9);
+ final int minColumnCountInTablet = stats.tabletCount == 0 ? 0 :
stats.minColumnCountInTablet;
+
+ System.out.printf(
+ Locale.ROOT,
+ "%n%s performance:%n"
+ + " fileSize=%s%n"
+ + " %s, expectedPoints=%d%n"
+ + " inputTabletRowCount=%d%s%n"
+ + " writeTime=%.3fs, parseTime=%.3fs%n"
+ + " tablets=%d, parsedTabletRows=%d, points=%d%n"
+ + " %sCountInTablet[min=%d, max=%d]%n"
+ + " pointThroughput=%.2f points/s, fileThroughput=%.2f MiB/s%n",
+ parserName,
+ formatBytes(tsFileSizeInBytes),
+ dataShape,
+ expectedPointCount,
+ inputTabletRowCount,
+ extraConfig,
+ writeElapsedSeconds,
+ parseElapsedSeconds,
+ stats.tabletCount,
+ stats.tabletRowCountSum,
+ stats.pointCount,
+ columnName,
+ minColumnCountInTablet,
+ stats.maxColumnCountInTablet,
+ pointThroughput,
+ fileThroughputInMiBPerSecond);
+ }
+
+ private double nanosToSeconds(final long nanos) {
+ return nanos / 1_000_000_000.0;
+ }
+
+ private String formatBytes(final long bytes) {
+ double value = bytes;
+ final String[] units = {"B", "KiB", "MiB", "GiB"};
+ int unitIndex = 0;
+ while (value >= 1024 && unitIndex < units.length - 1) {
+ value /= 1024;
+ ++unitIndex;
+ }
+ return String.format(Locale.ROOT, "%.2f %s", value, units[unitIndex]);
+ }
+
private void testTsFilePointNum(
final File tsFile,
final TreePattern pattern,
@@ -857,4 +1408,13 @@ public class TsFileInsertionEventParserTest {
return chunkSizeLimit;
}
}
+
+ private static class ParserPerformanceStats {
+
+ private long pointCount;
+ private long tabletRowCountSum;
+ private int tabletCount;
+ private int minColumnCountInTablet = Integer.MAX_VALUE;
+ private int maxColumnCountInTablet;
+ }
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index e34d5804cac..158fee2c392 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -271,7 +271,6 @@ public class CommonConfig {
private int pipeSourceAssignerDisruptorRingBufferSize = 128;
private long pipeSourceAssignerDisruptorRingBufferEntrySizeInBytes = 72 * KB;
- private long pipeSourceMatcherCacheSize = 1024;
private int pipeSinkHandshakeTimeoutMs = 10 * 1000; // 10 seconds
private int pipeAirGapSinkTabletTimeoutMs = 60 * 1000; // 1 min
@@ -1073,18 +1072,6 @@ public class CommonConfig {
pipeSourceAssignerDisruptorRingBufferEntrySize);
}
- public long getPipeSourceMatcherCacheSize() {
- return pipeSourceMatcherCacheSize;
- }
-
- public void setPipeSourceMatcherCacheSize(long pipeSourceMatcherCacheSize) {
- if (this.pipeSourceMatcherCacheSize == pipeSourceMatcherCacheSize) {
- return;
- }
- this.pipeSourceMatcherCacheSize = pipeSourceMatcherCacheSize;
- logger.info("pipeSourceMatcherCacheSize is set to {}.",
pipeSourceMatcherCacheSize);
- }
-
public int getPipeSinkHandshakeTimeoutMs() {
return pipeSinkHandshakeTimeoutMs;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
index e2c94ac3cd1..2f507c7c055 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeConfig.java
@@ -169,10 +169,6 @@ public class PipeConfig {
return
COMMON_CONFIG.getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes();
}
- public long getPipeSourceMatcherCacheSize() {
- return COMMON_CONFIG.getPipeSourceMatcherCacheSize();
- }
-
/////////////////////////////// Sink ///////////////////////////////
public int getPipeSinkHandshakeTimeoutMs() {
@@ -535,7 +531,6 @@ public class PipeConfig {
LOGGER.info(
"PipeSourceAssignerDisruptorRingBufferEntrySizeInBytes: {}",
getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes());
- LOGGER.info("PipeSourceMatcherCacheSize: {}",
getPipeSourceMatcherCacheSize());
LOGGER.info("PipeSinkHandshakeTimeoutMs: {}",
getPipeSinkHandshakeTimeoutMs());
LOGGER.info("PipeSinkTransferTimeoutMs: {}",
getPipeSinkTransferTimeoutMs());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
index d0c37a50367..fb312fd4652 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/config/PipeDescriptor.java
@@ -345,14 +345,6 @@ public class PipeDescriptor {
String.valueOf(
config.getPipeSourceAssignerDisruptorRingBufferEntrySizeInBytes())))));
- config.setPipeSourceMatcherCacheSize(
- Integer.parseInt(
-
Optional.ofNullable(properties.getProperty("pipe_source_matcher_cache_size"))
- .orElse(
- properties.getProperty(
- "pipe_extractor_matcher_cache_size",
-
String.valueOf(config.getPipeSourceMatcherCacheSize())))));
-
config.setPipeSinkHandshakeTimeoutMs(
Long.parseLong(
Optional.ofNullable(properties.getProperty("pipe_sink_handshake_timeout_ms"))