This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch fix-partial-insert-null-columns in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 91dd4ebb994c2513132c9bb40a0a500654bbd89d Author: Caideyipi <[email protected]> AuthorDate: Tue Jun 9 15:54:32 2026 +0800 Fix partial insert handling for null measurements --- .../tablet/parser/TabletInsertionEventParser.java | 117 ++++++++++---- .../statement/PipeConvertedInsertRowStatement.java | 3 + .../resource/memory/InsertNodeMemoryEstimator.java | 15 +- .../pipe/sink/util/TabletStatementConverter.java | 2 +- .../plan/planner/plan/node/write/InsertNode.java | 29 ++-- .../planner/plan/node/write/InsertRowNode.java | 30 ++-- .../plan/node/write/InsertRowsOfOneDeviceNode.java | 3 + .../planner/plan/node/write/InsertTabletNode.java | 152 ++++++++++++++---- .../node/write/RelationalInsertTabletNode.java | 10 +- .../dataregion/memtable/AbstractMemTable.java | 77 ++++++--- .../dataregion/memtable/TsFileProcessor.java | 133 +++++++++------- .../dataregion/memtable/WritableMemChunkGroup.java | 2 +- .../java/org/apache/iotdb/db/utils/MemUtils.java | 42 +++-- .../pipe/event/PipeTabletInsertionEventTest.java | 81 ++++++++++ .../PipeConvertedInsertRowStatementTest.java | 55 +++++++ .../memory/InsertNodeMemoryEstimatorTest.java | 12 ++ .../sink/util/TabletStatementConverterTest.java | 36 +++++ .../planner/node/write/InsertRowNodeSerdeTest.java | 19 +++ .../write/InsertRowsOfOneDeviceNodeSerdeTest.java | 34 ++++ .../node/write/InsertTabletNodeSerdeTest.java | 63 ++++++++ .../planner/node/write/WritePlanNodeSplitTest.java | 41 +++++ .../write/InsertNodeIsMeasurementFailedTest.java | 135 ++++++++++++++++ .../AbstractMemTablePartialInsertTest.java | 113 ++++++++++++++ .../memtable/MemChunkDeserializeTest.java | 23 +++ .../dataregion/memtable/TsFileProcessorTest.java | 172 +++++++++++++++++++++ .../org/apache/iotdb/db/utils/MemUtilsTest.java | 78 ++++++++++ 26 files changed, 1298 insertions(+), 179 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java index 69035041293..a01dfd5222b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java @@ -54,6 +54,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.function.BiConsumer; +import java.util.function.IntPredicate; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -127,14 +128,26 @@ public abstract class TabletInsertionEventParser { final List<Integer> rowIndexList = generateRowIndexList(originTimestampColumn); this.timestampColumn = rowIndexList.stream().mapToLong(i -> originTimestampColumn[i]).toArray(); + final MeasurementSchema[] originMeasurementSchemaList = insertRowNode.getMeasurementSchemas(); + final String[] originColumnNameStringList = insertRowNode.getMeasurements(); + final TsTableColumnCategory[] originColumnCategories = insertRowNode.getColumnCategories(); + final TSDataType[] originValueDataTypes = insertRowNode.getDataTypes(); + final Object[] originValues = insertRowNode.getValues(); + generateColumnIndexMapper( - insertRowNode.getMeasurements(), originColumnIndex2FilteredColumnIndexMapperList); + originColumnNameStringList, originColumnIndex2FilteredColumnIndexMapperList); final int filteredColumnSize = - Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList) - .filter(Objects::nonNull) - .toArray() - .length; + compactColumnIndexMapper( + originColumnIndex2FilteredColumnIndexMapperList, + i -> + isValidOriginColumn( + originColumnNameStringList, + originMeasurementSchemaList, + originValueDataTypes, + i) + && originValues != null + && i < originValues.length); this.measurementSchemaList = new MeasurementSchema[filteredColumnSize]; this.columnNameStringList = new String[filteredColumnSize]; @@ -143,19 +156,15 @@ public abstract class TabletInsertionEventParser { this.valueColumns = new Object[filteredColumnSize]; this.nullValueColumnBitmaps = new BitMap[filteredColumnSize]; - final MeasurementSchema[] originMeasurementSchemaList = insertRowNode.getMeasurementSchemas(); - final String[] originColumnNameStringList = insertRowNode.getMeasurements(); - final TsTableColumnCategory[] originColumnCategories = insertRowNode.getColumnCategories(); - final TSDataType[] originValueDataTypes = insertRowNode.getDataTypes(); - final Object[] originValues = insertRowNode.getValues(); - for (int i = 0; i < originColumnIndex2FilteredColumnIndexMapperList.length; i++) { if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) { final int filteredColumnIndex = originColumnIndex2FilteredColumnIndexMapperList[i]; this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList[i]; this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i]; this.valueColumnTypes[filteredColumnIndex] = - originColumnCategories != null && originColumnCategories[i] != null + originColumnCategories != null + && i < originColumnCategories.length + && originColumnCategories[i] != null ? originColumnCategories[i].toTsFileColumnType() : ColumnCategory.FIELD; this.valueColumnDataTypes[filteredColumnIndex] = originValueDataTypes[i]; @@ -202,14 +211,29 @@ public abstract class TabletInsertionEventParser { final List<Integer> rowIndexList = generateRowIndexList(originTimestampColumn); this.timestampColumn = rowIndexList.stream().mapToLong(i -> originTimestampColumn[i]).toArray(); + final MeasurementSchema[] originMeasurementSchemaList = + insertTabletNode.getMeasurementSchemas(); + final String[] originColumnNameStringList = insertTabletNode.getMeasurements(); + final TsTableColumnCategory[] originColumnCategories = insertTabletNode.getColumnCategories(); + final TSDataType[] originValueColumnDataTypes = insertTabletNode.getDataTypes(); + final Object[] originValueColumns = insertTabletNode.getColumns(); + final BitMap[] originBitMapList = insertTabletNode.getBitMaps(); + generateColumnIndexMapper( - insertTabletNode.getMeasurements(), originColumnIndex2FilteredColumnIndexMapperList); + originColumnNameStringList, originColumnIndex2FilteredColumnIndexMapperList); final int filteredColumnSize = - Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList) - .filter(Objects::nonNull) - .toArray() - .length; + compactColumnIndexMapper( + originColumnIndex2FilteredColumnIndexMapperList, + i -> + isValidOriginColumn( + originColumnNameStringList, + originMeasurementSchemaList, + originValueColumnDataTypes, + i) + && originValueColumns != null + && i < originValueColumns.length + && originValueColumns[i] != null); this.measurementSchemaList = new MeasurementSchema[filteredColumnSize]; this.columnNameStringList = new String[filteredColumnSize]; @@ -218,21 +242,15 @@ public abstract class TabletInsertionEventParser { this.valueColumns = new Object[filteredColumnSize]; this.nullValueColumnBitmaps = new BitMap[filteredColumnSize]; - final MeasurementSchema[] originMeasurementSchemaList = - insertTabletNode.getMeasurementSchemas(); - final String[] originColumnNameStringList = insertTabletNode.getMeasurements(); - final TsTableColumnCategory[] originColumnCategories = insertTabletNode.getColumnCategories(); - final TSDataType[] originValueColumnDataTypes = insertTabletNode.getDataTypes(); - final Object[] originValueColumns = insertTabletNode.getColumns(); - final BitMap[] originBitMapList = insertTabletNode.getBitMaps(); - for (int i = 0; i < originColumnIndex2FilteredColumnIndexMapperList.length; i++) { if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) { final int filteredColumnIndex = originColumnIndex2FilteredColumnIndexMapperList[i]; this.measurementSchemaList[filteredColumnIndex] = originMeasurementSchemaList[i]; this.columnNameStringList[filteredColumnIndex] = originColumnNameStringList[i]; this.valueColumnTypes[filteredColumnIndex] = - originColumnCategories != null && originColumnCategories[i] != null + originColumnCategories != null + && i < originColumnCategories.length + && originColumnCategories[i] != null ? originColumnCategories[i].toTsFileColumnType() : ColumnCategory.FIELD; this.valueColumnDataTypes[filteredColumnIndex] = originValueColumnDataTypes[i]; @@ -298,10 +316,9 @@ public abstract class TabletInsertionEventParser { originMeasurementList, originColumnIndex2FilteredColumnIndexMapperList); final int filteredColumnSize = - Arrays.stream(originColumnIndex2FilteredColumnIndexMapperList) - .filter(Objects::nonNull) - .toArray() - .length; + compactColumnIndexMapper( + originColumnIndex2FilteredColumnIndexMapperList, + i -> isValidOriginColumn(originMeasurementSchemaList, i)); this.measurementSchemaList = new MeasurementSchema[filteredColumnSize]; this.columnNameStringList = new String[filteredColumnSize]; @@ -373,6 +390,46 @@ public abstract class TabletInsertionEventParser { final String[] originMeasurementList, final Integer[] originColumnIndex2FilteredColumnIndexMapperList); + private static int compactColumnIndexMapper( + final Integer[] originColumnIndex2FilteredColumnIndexMapperList, + final IntPredicate columnValidator) { + int filteredCount = 0; + for (int i = 0; i < originColumnIndex2FilteredColumnIndexMapperList.length; i++) { + if (originColumnIndex2FilteredColumnIndexMapperList[i] != null && columnValidator.test(i)) { + originColumnIndex2FilteredColumnIndexMapperList[i] = filteredCount++; + } else { + originColumnIndex2FilteredColumnIndexMapperList[i] = null; + } + } + return filteredCount; + } + + private static boolean isValidOriginColumn( + final String[] originColumnNameStringList, + final MeasurementSchema[] originMeasurementSchemaList, + final TSDataType[] originValueDataTypes, + final int index) { + return originColumnNameStringList != null + && index < originColumnNameStringList.length + && originColumnNameStringList[index] != null + && originMeasurementSchemaList != null + && index < originMeasurementSchemaList.length + && originMeasurementSchemaList[index] != null + && originMeasurementSchemaList[index].getType() != null + && originValueDataTypes != null + && index < originValueDataTypes.length + && originValueDataTypes[index] != null; + } + + private static boolean isValidOriginColumn( + final List<IMeasurementSchema> originMeasurementSchemaList, final int index) { + return originMeasurementSchemaList != null + && index < originMeasurementSchemaList.size() + && originMeasurementSchemaList.get(index) != null + && originMeasurementSchemaList.get(index).getMeasurementName() != null + && originMeasurementSchemaList.get(index).getType() != null; + } + private List<Integer> generateRowIndexList(final long[] originTimestampColumn) { final int rowCount = originTimestampColumn.length; if (Objects.isNull(sourceEvent) || !sourceEvent.shouldParseTime()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java index c3db7954b40..742f5989553 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatement.java @@ -126,6 +126,9 @@ public class PipeConvertedInsertRowStatement extends InsertRowStatement { // parse string value to specific type dataTypes[i] = measurementSchemas[i].getType(); + if (values == null || i >= values.length || values[i] == null) { + continue; + } try { values[i] = ValueConverter.parse(values[i].toString(), dataTypes[i]); } catch (Exception e) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java index 677b404c217..54655704c1c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimator.java @@ -548,6 +548,9 @@ public class InsertNodeMemoryEstimator { public static long sizeOfColumns( final Object[] columns, final MeasurementSchema[] measurementSchemas) { + if (Objects.isNull(columns)) { + return 0L; + } // Directly calculate if measurementSchemas are absent if (Objects.isNull(measurementSchemas)) { return RamUsageEstimator.shallowSizeOf(columns) @@ -559,7 +562,10 @@ public class InsertNodeMemoryEstimator { RamUsageEstimator.alignObjectSize( NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns.length); for (int i = 0; i < columns.length; i++) { - if (measurementSchemas[i] == null || measurementSchemas[i].getType() == null) { + if (columns[i] == null + || i >= measurementSchemas.length + || measurementSchemas[i] == null + || measurementSchemas[i].getType() == null) { continue; } switch (measurementSchemas[i].getType()) { @@ -611,6 +617,9 @@ public class InsertNodeMemoryEstimator { public static long sizeOfValues( final Object[] values, final MeasurementSchema[] measurementSchemas) { + if (Objects.isNull(values)) { + return 0L; + } // Directly calculate if measurementSchemas are absent if (Objects.isNull(measurementSchemas)) { return RamUsageEstimator.shallowSizeOf(values) @@ -622,7 +631,9 @@ public class InsertNodeMemoryEstimator { RamUsageEstimator.alignObjectSize( NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * values.length); for (int i = 0; i < values.length; i++) { - if (measurementSchemas[i] == null || measurementSchemas[i].getType() == null) { + if (i >= measurementSchemas.length + || measurementSchemas[i] == null + || measurementSchemas[i].getType() == null) { size += NUM_BYTES_OBJECT_HEADER; continue; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java index 773d40e99d1..d7be9f548f5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java @@ -395,7 +395,7 @@ public class TabletStatementConverter { for (int i = 0; i < columns; i++) { final boolean isValueColumnsNotNull = BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer)); - if (isValueColumnsNotNull && types[i] == null) { + if (types[i] == null) { continue; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java index da5f2325acd..d9a202de22b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNode.java @@ -174,6 +174,7 @@ public abstract class InsertNode extends SearchNode { public void setMeasurementSchemas(MeasurementSchema[] measurementSchemas) { this.measurementSchemas = measurementSchemas; + measurementColumnCnt = -1; } public String[] getMeasurements() { @@ -189,13 +190,16 @@ public abstract class InsertNode extends SearchNode { } public boolean isValidMeasurement(int i) { - return measurementSchemas != null + return measurements != null + && measurements[i] != null + && measurementSchemas != null && measurementSchemas[i] != null && (columnCategories == null || columnCategories[i] == TsTableColumnCategory.FIELD); } public void setMeasurements(String[] measurements) { this.measurements = measurements; + measurementColumnCnt = -1; } public TSDataType[] getDataTypes() { @@ -327,8 +331,12 @@ public abstract class InsertNode extends SearchNode { } public boolean hasValidMeasurements() { - for (Object o : measurements) { - if (o != null) { + if (measurements == null) { + return false; + } + for (int i = 0; i < measurements.length; i++) { + if (measurements[i] != null + && (columnCategories == null || columnCategories[i] == TsTableColumnCategory.FIELD)) { return true; } } @@ -354,15 +362,16 @@ public abstract class InsertNode extends SearchNode { } public boolean isMeasurementFailed(int index) { - return measurements[index] == null; + return measurements == null || measurements[index] == null; + } + + protected boolean isWritableFieldMeasurement(int index) { + return !isMeasurementFailed(index) + && (columnCategories == null || columnCategories[index] == TsTableColumnCategory.FIELD); } public boolean allMeasurementFailed() { - if (measurements != null) { - return failedMeasurementNumber - >= measurements.length - (tagColumnIndices == null ? 0 : tagColumnIndices.size()); - } - return true; + return measurements == null || !hasValidMeasurements(); } // endregion @@ -418,6 +427,8 @@ public abstract class InsertNode extends SearchNode { public void setColumnCategories(TsTableColumnCategory[] columnCategories) { this.columnCategories = columnCategories; + measurementColumnCnt = -1; + tagColumnIndices = null; if (columnCategories != null) { tagColumnIndices = new ArrayList<>(); for (int i = 0; i < columnCategories.length; i++) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java index fcb96cf3f07..9d4d826687d 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowNode.java @@ -240,6 +240,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { measurements[index] = null; dataTypes[index] = null; values[index] = null; + measurementColumnCnt = -1; } @Override @@ -268,7 +269,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { /** Serialize measurements and values, ignoring failed time series. */ void serializeMeasurementsAndValues(ByteBuffer buffer) { - ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(), buffer); + ReadWriteIOUtils.write(getValidMeasurementNumber(), buffer); serializeMeasurementsOrSchemas(buffer); putDataTypesAndValues(buffer); ReadWriteIOUtils.write((byte) (isNeedInferType ? 1 : 0), buffer); @@ -282,7 +283,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { * @throws IOException - If an I/O error occurs. */ void serializeMeasurementsAndValues(DataOutputStream stream) throws IOException { - ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(), stream); + ReadWriteIOUtils.write(getValidMeasurementNumber(), stream); serializeMeasurementsOrSchemas(stream); putDataTypesAndValues(stream); ReadWriteIOUtils.write((byte) (isNeedInferType ? 1 : 0), stream); @@ -637,7 +638,7 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { /** Serialize measurements and values, ignoring failed time series. */ private void serializeMeasurementsAndValues(IWALByteBufferView buffer) { - buffer.putInt(measurements.length - getFailedMeasurementNumber()); + buffer.putInt(getValidMeasurementNumber()); serializeMeasurementSchemasToWAL(buffer); putDataTypesAndValues(buffer); buffer.put((byte) (isAligned ? 1 : 0)); @@ -910,15 +911,26 @@ public class InsertRowNode extends InsertNode implements WALEntryValue { } public TimeValuePair composeTimeValuePair(int columnIndex) { - if (columnIndex >= values.length - || Objects.isNull(dataTypes[columnIndex]) - || dataTypes[columnIndex] == TSDataType.OBJECT) { + if (!canComposeTimeValuePair(columnIndex)) { return null; } Object value = values[columnIndex]; - return Objects.nonNull(value) - ? new TimeValuePair(time, TsPrimitiveType.getByType(dataTypes[columnIndex], value)) - : null; + return new TimeValuePair(time, TsPrimitiveType.getByType(dataTypes[columnIndex], value)); + } + + private boolean canComposeTimeValuePair(final int columnIndex) { + return measurements != null + && columnIndex >= 0 + && columnIndex < measurements.length + && values != null + && columnIndex < values.length + && values[columnIndex] != null + && dataTypes != null + && columnIndex < dataTypes.length + && dataTypes[columnIndex] != null + && dataTypes[columnIndex] != TSDataType.OBJECT + && (columnCategories == null || columnIndex < columnCategories.length) + && isWritableFieldMeasurement(columnIndex); } public void updateLastCache(String databaseName) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java index 457ab81dbca..2acaeccffa6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertRowsOfOneDeviceNode.java @@ -241,6 +241,9 @@ public class InsertRowsOfOneDeviceNode extends InsertNode { String[] measurements = insertRowNode.getMeasurements(); TSDataType[] dataTypes = insertRowNode.getDataTypes(); for (int i = 0; i < measurements.length; i++) { + if (measurements[i] == null) { + continue; + } if (!measurementSet.contains(measurements[i])) { measurementList.add(measurements[i]); dataTypeList.add(dataTypes[i]); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java index 98d0eca98b0..9f1cb48708c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java @@ -332,7 +332,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { protected InsertTabletNode getEmptySplit(int count) { long[] subTimes = new long[count]; - Object[] values = initTabletValues(dataTypes.length, count, dataTypes); + Object[] values = initTabletValuesForSplit(dataTypes.length, count, dataTypes); BitMap[] newBitMaps = initBitmapsForSplit(dataTypes.length, count); return new InsertTabletNode( getPlanNodeId(), @@ -368,7 +368,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { System.arraycopy(times, start, subNode.times, destLoc, length); for (int k = 0; k < subNode.columns.length; k++) { - if (dataTypes[k] != null) { + if (hasColumnForSplit(k)) { System.arraycopy(columns[k], start, subNode.columns[k], destLoc, length); } if (subNode.bitMaps != null @@ -438,6 +438,16 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { return values; } + protected Object[] initTabletValuesForSplit(int columnSize, int rowSize, TSDataType[] dataTypes) { + Object[] values = initTabletValues(columnSize, rowSize, dataTypes); + for (int i = 0; i < values.length; i++) { + if (!hasColumnForSplit(i)) { + values[i] = null; + } + } + return values; + } + protected BitMap[] initBitmaps(int columnSize, int rowSize) { BitMap[] bitMaps = new BitMap[columnSize]; for (int i = 0; i < columnSize; i++) { @@ -455,7 +465,8 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { final BitMap[] splitBitMaps = new BitMap[columnSize]; boolean hasBitMap = false; for (int i = 0; i < columnSize && i < this.bitMaps.length; ++i) { - if (this.bitMaps[i] != null + if (hasColumnForSplit(i) + && this.bitMaps[i] != null && !this.bitMaps[i].isAllUnmarked(Math.min(sourceRowCount, this.bitMaps[i].getSize()))) { splitBitMaps[i] = new BitMap(rowSize); hasBitMap = true; @@ -464,6 +475,18 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { return hasBitMap ? splitBitMaps : null; } + protected boolean hasColumnForSplit(int index) { + return dataTypes != null + && index < dataTypes.length + && dataTypes[index] != null + && columns != null + && index < columns.length + && columns[index] != null + && (measurements == null || index < measurements.length && measurements[index] != null) + && (measurementSchemas == null + || index < measurementSchemas.length && measurementSchemas[index] != null); + } + @Override public void markFailedMeasurement(int index) { if (measurements[index] == null) { @@ -472,6 +495,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { measurements[index] = null; dataTypes[index] = null; columns[index] = null; + measurementColumnCnt = -1; } @Override @@ -513,12 +537,12 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { /** Serialize measurements or measurement schemas, ignoring failed time series */ private void writeMeasurementsOrSchemas(ByteBuffer buffer) { - ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(), buffer); + ReadWriteIOUtils.write(getValidMeasurementNumber(), buffer); ReadWriteIOUtils.write((byte) (measurementSchemas != null ? 1 : 0), buffer); for (int i = 0; i < measurements.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + // ignore failed partial insert and null columns + if (!shouldSerializeMeasurement(i)) { continue; } // serialize measurement schemas when exist @@ -532,12 +556,12 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { /** Serialize measurements or measurement schemas, ignoring failed time series */ private void writeMeasurementsOrSchemas(DataOutputStream stream) throws IOException { - ReadWriteIOUtils.write(measurements.length - getFailedMeasurementNumber(), stream); + ReadWriteIOUtils.write(getValidMeasurementNumber(), stream); ReadWriteIOUtils.write((byte) (measurementSchemas != null ? 1 : 0), stream); for (int i = 0; i < measurements.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + // ignore failed partial insert and null columns + if (!shouldSerializeMeasurement(i)) { continue; } // serialize measurement schemas when exist @@ -552,8 +576,8 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { /** Serialize data types, ignoring failed time series */ private void writeDataTypes(ByteBuffer buffer) { for (int i = 0; i < dataTypes.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + // ignore failed partial insert and null columns + if (!shouldSerializeMeasurement(i)) { continue; } dataTypes[i].serializeTo(buffer); @@ -563,8 +587,8 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { /** Serialize data types, ignoring failed time series */ private void writeDataTypes(DataOutputStream stream) throws IOException { for (int i = 0; i < dataTypes.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + // ignore failed partial insert and null columns + if (!shouldSerializeMeasurement(i)) { continue; } dataTypes[i].serializeTo(stream); @@ -590,8 +614,8 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { ReadWriteIOUtils.write(BytesUtils.boolToByte(bitMaps != null), buffer); if (bitMaps != null) { for (int i = 0; i < bitMaps.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + // ignore failed partial insert and null columns + if (!shouldSerializeMeasurement(i)) { continue; } @@ -610,8 +634,8 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { ReadWriteIOUtils.write(BytesUtils.boolToByte(bitMaps != null), stream); if (bitMaps != null) { for (int i = 0; i < bitMaps.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + // ignore failed partial insert and null columns + if (!shouldSerializeMeasurement(i)) { continue; } @@ -628,8 +652,8 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { /** Serialize values, ignoring failed time series */ private void writeValues(ByteBuffer buffer) { for (int i = 0; i < columns.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + // ignore failed partial insert and null columns + if (!shouldSerializeMeasurement(i)) { continue; } serializeColumn(dataTypes[i], columns[i], buffer); @@ -639,8 +663,8 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { /** Serialize values, ignoring failed time series */ private void writeValues(DataOutputStream stream) throws IOException { for (int i = 0; i < columns.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + // ignore failed partial insert and null columns + if (!shouldSerializeMeasurement(i)) { continue; } serializeColumn(dataTypes[i], columns[i], stream); @@ -844,8 +868,8 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { size += Byte.BYTES; if (bitMaps != null) { for (int i = 0; i < bitMaps.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + // ignore failed partial insert and null columns + if (!shouldSerializeMeasurement(i)) { continue; } @@ -864,10 +888,11 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { } // values size for (int i = 0; i < dataTypes.length; i++) { - if (columns[i] != null) { - for (int[] range : rangeList) { - size += getColumnSize(dataTypes[i], columns[i], range[0], range[1]); - } + if (!shouldSerializeMeasurement(i)) { + continue; + } + for (int[] range : rangeList) { + size += getColumnSize(dataTypes[i], columns[i], range[0], range[1]); } } // isAlign @@ -958,7 +983,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { /** Serialize measurement schemas, ignoring failed time series */ protected void writeMeasurementSchemas(IWALByteBufferView buffer) { - buffer.putInt(measurements.length - getFailedMeasurementNumber()); + buffer.putInt(getValidMeasurementNumber()); serializeMeasurementSchemasToWAL(buffer); } @@ -976,8 +1001,8 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { buffer.put(BytesUtils.boolToByte(bitMaps != null)); if (bitMaps != null) { for (int i = 0; i < bitMaps.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + // ignore failed partial insert and null columns + if (!shouldSerializeMeasurement(i)) { continue; } @@ -1001,8 +1026,8 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { /** Serialize values, ignoring failed time series */ protected void writeValues(IWALByteBufferView buffer, List<int[]> rangeList) { for (int i = 0; i < columns.length; i++) { - // ignore failed partial insert - if (measurements[i] == null) { + // ignore failed partial insert and null columns + if (!shouldSerializeMeasurement(i)) { continue; } for (int[] startEnd : rangeList) { @@ -1011,6 +1036,51 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { } } + @Override + protected int getValidMeasurementNumber() { + int validMeasurementNumber = 0; + for (int i = 0; measurements != null && i < measurements.length; i++) { + if (shouldSerializeMeasurement(i)) { + validMeasurementNumber++; + } + } + return validMeasurementNumber; + } + + @Override + protected int serializeMeasurementSchemasSize() { + int byteLen = 0; + for (int i = 0; measurements != null && i < measurements.length; i++) { + if (shouldSerializeMeasurement(i)) { + byteLen += WALWriteUtils.sizeToWrite(measurementSchemas[i]); + } + } + return byteLen; + } + + @Override + protected void serializeMeasurementSchemasToWAL(IWALByteBufferView buffer) { + for (int i = 0; measurements != null && i < measurements.length; i++) { + if (shouldSerializeMeasurement(i)) { + WALWriteUtils.write(measurementSchemas[i], buffer); + } + } + } + + protected boolean shouldSerializeMeasurement(int index) { + return measurements != null + && index < measurements.length + && measurements[index] != null + && (measurementSchemas == null + || index < measurementSchemas.length && measurementSchemas[index] != null) + && dataTypes != null + && index < dataTypes.length + && dataTypes[index] != null + && columns != null + && index < columns.length + && columns[index] != null; + } + private void serializeColumn( TSDataType dataType, Object column, IWALByteBufferView buffer, int start, int end) { switch (dataType) { @@ -1239,7 +1309,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { public TimeValuePair composeLastTimeValuePair( int measurementIndex, int startOffset, int endOffset) { - if (measurementIndex >= columns.length || Objects.isNull(dataTypes[measurementIndex])) { + if (!canComposeLastTimeValuePair(measurementIndex)) { return null; } @@ -1272,7 +1342,7 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { if (results == null) { return composeLastTimeValuePair(measurementIndex, startOffset, endOffset); } - if (measurementIndex >= columns.length || Objects.isNull(dataTypes[measurementIndex])) { + if (!canComposeLastTimeValuePair(measurementIndex)) { return null; } @@ -1293,6 +1363,20 @@ public class InsertTabletNode extends InsertNode implements WALEntryValue { return lastIdx < startOffset ? null : composeTimeValuePair(measurementIndex, lastIdx); } + private boolean canComposeLastTimeValuePair(final int measurementIndex) { + return measurements != null + && measurementIndex >= 0 + && measurementIndex < measurements.length + && columns != null + && measurementIndex < columns.length + && columns[measurementIndex] != null + && dataTypes != null + && measurementIndex < dataTypes.length + && dataTypes[measurementIndex] != null + && (columnCategories == null || measurementIndex < columnCategories.length) + && isWritableFieldMeasurement(measurementIndex); + } + private TimeValuePair composeTimeValuePair(final int measurementIndex, final int rowIndex) { TsPrimitiveType value; switch (dataTypes[measurementIndex]) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java index 839c3db5a7c..11694e6d5ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java @@ -180,7 +180,7 @@ public class RelationalInsertTabletNode extends InsertTabletNode { @Override protected InsertTabletNode getEmptySplit(int count) { long[] subTimes = new long[count]; - Object[] values = initTabletValues(dataTypes.length, count, dataTypes); + Object[] values = initTabletValuesForSplit(dataTypes.length, count, dataTypes); BitMap[] newBitMaps = initBitmapsForSplit(dataTypes.length, count); RelationalInsertTabletNode split = new RelationalInsertTabletNode( @@ -250,7 +250,7 @@ public class RelationalInsertTabletNode extends InsertTabletNode { protected void serializeAttributes(ByteBuffer byteBuffer) { super.serializeAttributes(byteBuffer); for (int i = 0; i < measurements.length; i++) { - if (measurements[i] != null) { + if (shouldSerializeMeasurement(i)) { columnCategories[i].serialize(byteBuffer); } } @@ -260,7 +260,7 @@ public class RelationalInsertTabletNode extends InsertTabletNode { protected void serializeAttributes(DataOutputStream stream) throws IOException { super.serializeAttributes(stream); for (int i = 0; i < measurements.length; i++) { - if (measurements[i] != null) { + if (shouldSerializeMeasurement(i)) { columnCategories[i].serialize(stream); } } @@ -285,7 +285,7 @@ public class RelationalInsertTabletNode extends InsertTabletNode { void subSerialize(IWALByteBufferView buffer, List<int[]> rangeList, long encodedSearchIndex) { super.subSerialize(buffer, rangeList, encodedSearchIndex); for (int i = 0; i < measurements.length; i++) { - if (measurements[i] != null) { + if (shouldSerializeMeasurement(i)) { buffer.put(columnCategories[i].getCategory()); } } @@ -450,7 +450,7 @@ public class RelationalInsertTabletNode extends InsertTabletNode { System.arraycopy(times, start, subNode.times, destLoc, length); for (int i = 0; i < subNode.columns.length; i++) { - if (dataTypes[i] != null) { + if (hasColumnForSplit(i)) { System.arraycopy(columns[i], start, subNode.columns[i], destLoc, length); } if (subNode.bitMaps != null diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index 6ba5f6307cb..866ef26d2d2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -176,14 +176,14 @@ public abstract class AbstractMemTable implements IMemTable { private IWritableMemChunkGroup createAlignedMemChunkGroupIfNotExistAndGet( IDeviceID deviceId, List<IMeasurementSchema> schemaList) { + List<IMeasurementSchema> filteredSchemaList = + schemaList.stream().filter(Objects::nonNull).collect(Collectors.toList()); IWritableMemChunkGroup memChunkGroup = memTableMap.computeIfAbsent( deviceId, k -> { - seriesNumber += schemaList.size(); - return new AlignedWritableMemChunkGroup( - schemaList.stream().filter(Objects::nonNull).collect(Collectors.toList()), - k.isTableModel()); + seriesNumber += filteredSchemaList.size(); + return new AlignedWritableMemChunkGroup(filteredSchemaList, k.isTableModel()); }); for (IMeasurementSchema schema : schemaList) { if (schema != null && !memChunkGroup.contains(schema.getMeasurementName())) { @@ -204,8 +204,8 @@ public abstract class AbstractMemTable implements IMemTable { int nullPointsNumber = 0; for (int i = 0; i < insertRowNode.getMeasurements().length; i++) { // Use measurements[i] to ignore failed partial insert - if (measurements[i] == null || values[i] == null) { - if (values[i] == null) { + if (measurements[i] == null || values[i] == null || !isFieldMeasurement(insertRowNode, i)) { + if (isValidMeasurement(insertRowNode, i, true) && values[i] == null) { nullPointsNumber++; } schemaList.add(null); @@ -215,12 +215,11 @@ public abstract class AbstractMemTable implements IMemTable { dataTypes.add(schema.getType()); } } - memSize += MemUtils.getRowRecordSize(dataTypes, values); + memSize += MemUtils.getRowRecordSize(dataTypes, values, insertRowNode.getColumnCategories()); write(insertRowNode.getDeviceID(), schemaList, insertRowNode.getTime(), values); int pointsInserted = - insertRowNode.getMeasurements().length - - insertRowNode.getFailedMeasurementNumber() + getValidMeasurementNumber(insertRowNode, true) - (IoTDBDescriptor.getInstance().getConfig().isIncludeNullValueInWriteThroughputMetric() ? 0 : nullPointsNumber); @@ -243,7 +242,7 @@ public abstract class AbstractMemTable implements IMemTable { || values[i] == null || insertRowNode.getColumnCategories() != null && insertRowNode.getColumnCategories()[i] != TsTableColumnCategory.FIELD) { - if (measurements[i] != null && values[i] == null) { + if (isValidMeasurement(insertRowNode, i, true) && values[i] == null) { // do not include failed measurement to avoid a negative pointsInserted nullPointsNumber++; } @@ -261,8 +260,7 @@ public abstract class AbstractMemTable implements IMemTable { MemUtils.getAlignedRowRecordSize(dataTypes, values, insertRowNode.getColumnCategories()); writeAlignedRow(insertRowNode.getDeviceID(), schemaList, insertRowNode.getTime(), values); int pointsInserted = - insertRowNode.getMeasurementColumnCnt() - - insertRowNode.getFailedMeasurementNumber() + getValidMeasurementNumber(insertRowNode, true) - (IoTDBDescriptor.getInstance().getConfig().isIncludeNullValueInWriteThroughputMetric() ? 0 : nullPointsNumber); @@ -274,12 +272,11 @@ public abstract class AbstractMemTable implements IMemTable { public int insertTablet(InsertTabletNode insertTabletNode, int start, int end) throws WriteProcessException { try { - int nullPointsNumber = computeTabletNullPointsNumber(insertTabletNode, start, end); + int nullPointsNumber = computeTabletNullPointsNumber(insertTabletNode, start, end, true); writeTabletNode(insertTabletNode, start, end); memSize += MemUtils.getTabletSize(insertTabletNode, start, end); int pointsInserted = - ((insertTabletNode.getDataTypes().length - insertTabletNode.getFailedMeasurementNumber()) - * (end - start)) + (getValidMeasurementNumber(insertTabletNode, true) * (end - start)) - (IoTDBDescriptor.getInstance() .getConfig() .isIncludeNullValueInWriteThroughputMetric() @@ -297,14 +294,12 @@ public abstract class AbstractMemTable implements IMemTable { InsertTabletNode insertTabletNode, int start, int end, TSStatus[] results) throws WriteProcessException { try { - int nullPointsNumber = computeTabletNullPointsNumber(insertTabletNode, start, end); + int nullPointsNumber = computeTabletNullPointsNumber(insertTabletNode, start, end, true); writeAlignedTablet(insertTabletNode, start, end, results); // TODO-Table: what is the relation between this and TsFileProcessor.checkMemCost memSize += MemUtils.getAlignedTabletSize(insertTabletNode, start, end, results); int pointsInserted = - ((insertTabletNode.getMeasurementColumnCnt() - - insertTabletNode.getFailedMeasurementNumber()) - * (end - start)) + (getValidMeasurementNumber(insertTabletNode, true) * (end - start)) - (IoTDBDescriptor.getInstance() .getConfig() .isIncludeNullValueInWriteThroughputMetric() @@ -318,12 +313,13 @@ public abstract class AbstractMemTable implements IMemTable { } private static int computeTabletNullPointsNumber( - InsertTabletNode insertTabletNode, int start, int end) { + InsertTabletNode insertTabletNode, int start, int end, boolean countFieldOnly) { Object[] values = insertTabletNode.getBitMaps(); int nullPointsNumber = 0; if (values != null) { for (int i = 0; i < insertTabletNode.getMeasurements().length; i++) { - if (insertTabletNode.isMeasurementFailed(i)) { + if (!isValidMeasurement(insertTabletNode, i, countFieldOnly) + || insertTabletNode.getColumns()[i] == null) { // do not include failed measurement to avoid a negative pointsInserted continue; } @@ -341,6 +337,38 @@ public abstract class AbstractMemTable implements IMemTable { return nullPointsNumber; } + private static int getValidMeasurementNumber(InsertRowNode insertNode, boolean countFieldOnly) { + int count = 0; + for (int i = 0; i < insertNode.getMeasurements().length; i++) { + if (isValidMeasurement(insertNode, i, countFieldOnly)) { + count++; + } + } + return count; + } + + private static int getValidMeasurementNumber( + InsertTabletNode insertNode, boolean countFieldOnly) { + int count = 0; + for (int i = 0; i < insertNode.getMeasurements().length; i++) { + if (isValidMeasurement(insertNode, i, countFieldOnly) && insertNode.getColumns()[i] != null) { + count++; + } + } + return count; + } + + private static boolean isValidMeasurement( + InsertNode insertNode, int index, boolean countFieldOnly) { + return insertNode.getMeasurements()[index] != null + && (!countFieldOnly || isFieldMeasurement(insertNode, index)); + } + + private static boolean isFieldMeasurement(InsertNode insertNode, int index) { + return insertNode.getColumnCategories() == null + || insertNode.getColumnCategories()[index] == TsTableColumnCategory.FIELD; + } + @Override public void write( IDeviceID deviceId, @@ -366,7 +394,9 @@ public abstract class AbstractMemTable implements IMemTable { public void writeTabletNode(InsertTabletNode insertTabletNode, int start, int end) { List<IMeasurementSchema> schemaList = new ArrayList<>(); for (int i = 0; i < insertTabletNode.getMeasurementSchemas().length; i++) { - if (insertTabletNode.getColumns()[i] == null) { + if (insertTabletNode.getMeasurements()[i] == null + || insertTabletNode.getColumns()[i] == null + || !isFieldMeasurement(insertTabletNode, i)) { schemaList.add(null); } else { schemaList.add(insertTabletNode.getMeasurementSchemas()[i]); @@ -388,7 +418,8 @@ public abstract class AbstractMemTable implements IMemTable { InsertTabletNode insertTabletNode, int start, int end, TSStatus[] results) { List<IMeasurementSchema> schemaList = new ArrayList<>(); for (int i = 0; i < insertTabletNode.getMeasurementSchemas().length; i++) { - if (insertTabletNode.getColumns()[i] == null + if (insertTabletNode.getMeasurements()[i] == null + || insertTabletNode.getColumns()[i] == null || (insertTabletNode.getColumnCategories() != null && insertTabletNode.getColumnCategories()[i] != TsTableColumnCategory.FIELD)) { schemaList.add(null); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index c4e488f8486..54adec65b97 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -306,8 +306,11 @@ public class TsFileProcessor { } else { memIncrements = checkMemCostAndAddToTspInfoForRow( - insertRowNode.getDeviceID(), insertRowNode.getMeasurements(), - insertRowNode.getDataTypes(), insertRowNode.getValues()); + insertRowNode.getDeviceID(), + insertRowNode.getMeasurements(), + insertRowNode.getDataTypes(), + insertRowNode.getValues(), + insertRowNode.getColumnCategories()); } // recordScheduleMemoryBlockCost infoForMetrics[1] += System.nanoTime() - memControlStartTime; @@ -523,6 +526,7 @@ public class TsFileProcessor { insertTabletNode.getMeasurements(), insertTabletNode.getDataTypes(), insertTabletNode.getColumns(), + insertTabletNode.getColumnCategories(), start, end); } @@ -676,7 +680,11 @@ public class TsFileProcessor { @SuppressWarnings("squid:S3776") // High Cognitive Complexity private long[] checkMemCostAndAddToTspInfoForRow( - IDeviceID deviceId, String[] measurements, TSDataType[] dataTypes, Object[] values) + IDeviceID deviceId, + String[] measurements, + TSDataType[] dataTypes, + Object[] values, + TsTableColumnCategory[] columnCategories) throws WriteProcessException { // Memory of increased PrimitiveArray and TEXT values, e.g., add a long[128], add 128*8 long memTableIncrement = 0L; @@ -685,7 +693,7 @@ public class TsFileProcessor { for (int i = 0; i < dataTypes.length; i++) { // Skip failed Measurements - if (dataTypes[i] == null || measurements[i] == null) { + if (!isWritableFieldMeasurement(measurements, dataTypes, values, columnCategories, i)) { continue; } IWritableMemChunk memChunk = workMemTable.getWritableMemChunk(deviceId, measurements[i]); @@ -725,7 +733,8 @@ public class TsFileProcessor { String[] measurements = insertRowNode.getMeasurements(); for (int i = 0; i < dataTypes.length; i++) { // Skip failed Measurements - if (dataTypes[i] == null || measurements[i] == null) { + if (!isWritableFieldMeasurement( + measurements, dataTypes, values, insertRowNode.getColumnCategories(), i)) { continue; } IWritableMemChunk memChunk = workMemTable.getWritableMemChunk(deviceId, measurements[i]); @@ -779,21 +788,21 @@ public class TsFileProcessor { IWritableMemChunk memChunk = workMemTable.getWritableMemChunk(deviceId, AlignedPath.VECTOR_PLACEHOLDER); if (memChunk == null) { + TSDataType[] writableFieldDataTypes = + getWritableFieldDataTypes(measurements, dataTypes, values, columnCategories); // For new device of this mem table // ChunkMetadataIncrement chunkMetadataIncrement += ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR) - * dataTypes.length; - memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes, columnCategories); + * writableFieldDataTypes.length; + memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(writableFieldDataTypes, null); } else { // For existed device of this mem table AlignedWritableMemChunk alignedMemChunk = (AlignedWritableMemChunk) memChunk; List<TSDataType> dataTypesInTVList = new ArrayList<>(); for (int i = 0; i < dataTypes.length; i++) { // Skip failed Measurements - if (dataTypes[i] == null - || measurements[i] == null - || (columnCategories != null && columnCategories[i] != TsTableColumnCategory.FIELD)) { + if (!isWritableFieldMeasurement(measurements, dataTypes, values, columnCategories, i)) { continue; } @@ -817,7 +826,8 @@ public class TsFileProcessor { for (int i = 0; i < dataTypes.length; i++) { // TEXT data mem size - if (dataTypes[i] != null && dataTypes[i].isBinary() && values[i] != null) { + if (isWritableFieldMeasurement(measurements, dataTypes, values, columnCategories, i) + && dataTypes[i].isBinary()) { textDataIncrement += MemUtils.getBinarySize((Binary) values[i]); } } @@ -843,25 +853,26 @@ public class TsFileProcessor { IWritableMemChunk memChunk = workMemTable.getWritableMemChunk(deviceId, AlignedPath.VECTOR_PLACEHOLDER); if (memChunk == null && !increasingMemTableInfo.containsKey(deviceId)) { + TSDataType[] writableFieldDataTypes = + getWritableFieldDataTypes( + measurements, dataTypes, values, insertRowNode.getColumnCategories()); + Pair<Map<String, TSDataType>, Integer> addingPointNumInfo = + increasingMemTableInfo.computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 0)); // For new device of this mem table // ChunkMetadataIncrement chunkMetadataIncrement += ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR) - * dataTypes.length; - memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(dataTypes, null); + * writableFieldDataTypes.length; + memTableIncrement += AlignedTVList.alignedTvListArrayMemCost(writableFieldDataTypes, null); for (int i = 0; i < dataTypes.length; i++) { // Skip failed Measurements - if (dataTypes[i] == null - || measurements[i] == null - || (insertRowNode.getColumnCategories() != null - && insertRowNode.getColumnCategories()[i] != TsTableColumnCategory.FIELD)) { + if (!isWritableFieldMeasurement( + measurements, dataTypes, values, insertRowNode.getColumnCategories(), i)) { continue; } - increasingMemTableInfo - .computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 1)) - .left - .put(measurements[i], dataTypes[i]); + addingPointNumInfo.left.put(measurements[i], dataTypes[i]); } + addingPointNumInfo.setRight(1); } else { // For existed device of this mem table @@ -872,10 +883,8 @@ public class TsFileProcessor { increasingMemTableInfo.computeIfAbsent(deviceId, k -> new Pair<>(new HashMap<>(), 0)); for (int i = 0; i < dataTypes.length; i++) { // Skip failed Measurements - if (dataTypes[i] == null - || measurements[i] == null - || (insertRowNode.getColumnCategories() != null - && insertRowNode.getColumnCategories()[i] != TsTableColumnCategory.FIELD)) { + if (!isWritableFieldMeasurement( + measurements, dataTypes, values, insertRowNode.getColumnCategories(), i)) { continue; } @@ -914,10 +923,8 @@ public class TsFileProcessor { for (int i = 0; i < dataTypes.length; i++) { // Skip failed Measurements - if (dataTypes[i] == null - || measurements[i] == null - || (insertRowNode.getColumnCategories() != null - && insertRowNode.getColumnCategories()[i] != TsTableColumnCategory.FIELD)) { + if (!isWritableFieldMeasurement( + measurements, dataTypes, values, insertRowNode.getColumnCategories(), i)) { continue; } // TEXT data mem size @@ -935,6 +942,7 @@ public class TsFileProcessor { String[] measurements, TSDataType[] dataTypes, Object[] columns, + TsTableColumnCategory[] columnCategories, int start, int end) throws WriteProcessException { @@ -945,7 +953,7 @@ public class TsFileProcessor { for (int i = 0; i < dataTypes.length; i++) { // Skip failed Measurements - if (dataTypes[i] == null || columns[i] == null || measurements[i] == null) { + if (!isWritableFieldMeasurement(measurements, dataTypes, columns, columnCategories, i)) { continue; } updateMemCost(dataTypes[i], measurements[i], deviceId, start, end, memIncrements, columns[i]); @@ -1053,16 +1061,8 @@ public class TsFileProcessor { } } - int measurementColumnNum = 0; - if (columnCategories == null) { - measurementColumnNum = dataTypes.length; - } else { - for (TsTableColumnCategory columnCategory : columnCategories) { - if (columnCategory == TsTableColumnCategory.FIELD) { - measurementColumnNum++; - } - } - } + TSDataType[] writableFieldDataTypes = + getWritableFieldDataTypes(measurementIds, dataTypes, columns, columnCategories); // memIncrements = [memTable, text, chunk metadata] respectively IWritableMemChunk memChunk = @@ -1071,7 +1071,7 @@ public class TsFileProcessor { // new devices introduce new ChunkMetadata // ChunkMetadata memory Increment memIncrements[2] += - measurementColumnNum + writableFieldDataTypes.length * ChunkMetadata.calculateRamSize(AlignedPath.VECTOR_PLACEHOLDER, TSDataType.VECTOR); // TVList memory @@ -1079,7 +1079,7 @@ public class TsFileProcessor { incomingPointNum / PrimitiveArrayManager.ARRAY_SIZE + (incomingPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0); memIncrements[0] += - numArraysToAdd * AlignedTVList.alignedTvListArrayMemCost(dataTypes, columnCategories); + numArraysToAdd * AlignedTVList.alignedTvListArrayMemCost(writableFieldDataTypes, null); } else { AlignedWritableMemChunk alignedMemChunk = (AlignedWritableMemChunk) memChunk; List<TSDataType> dataTypesInTVList = new ArrayList<>(); @@ -1087,12 +1087,7 @@ public class TsFileProcessor { int newPointNum = currentPointNum + incomingPointNum; for (int i = 0; i < dataTypes.length; i++) { TSDataType dataType = dataTypes[i]; - String measurement = measurementIds[i]; - Object column = columns[i]; - if (dataType == null - || column == null - || measurement == null - || (columnCategories != null && columnCategories[i] != TsTableColumnCategory.FIELD)) { + if (!isWritableFieldMeasurement(measurementIds, dataTypes, columns, columnCategories, i)) { continue; } @@ -1125,12 +1120,7 @@ public class TsFileProcessor { // flexible-length data size for (int i = 0; i < dataTypes.length; i++) { TSDataType dataType = dataTypes[i]; - String measurement = measurementIds[i]; - Object column = columns[i]; - if (dataType == null - || column == null - || measurement == null - || (columnCategories != null && columnCategories[i] != TsTableColumnCategory.FIELD)) { + if (!isWritableFieldMeasurement(measurementIds, dataTypes, columns, columnCategories, i)) { continue; } @@ -1141,6 +1131,41 @@ public class TsFileProcessor { } } + private static TSDataType[] getWritableFieldDataTypes( + String[] measurementIds, + TSDataType[] dataTypes, + Object[] valuesOrColumns, + TsTableColumnCategory[] columnCategories) { + List<TSDataType> writableFieldDataTypes = new ArrayList<>(); + for (int i = 0; i < dataTypes.length; i++) { + if (isWritableFieldMeasurement( + measurementIds, dataTypes, valuesOrColumns, columnCategories, i)) { + writableFieldDataTypes.add(dataTypes[i]); + } + } + return writableFieldDataTypes.toArray(new TSDataType[0]); + } + + private static boolean isWritableFieldMeasurement( + String[] measurementIds, + TSDataType[] dataTypes, + Object[] valuesOrColumns, + TsTableColumnCategory[] columnCategories, + int index) { + return isFieldMeasurement(measurementIds, dataTypes, columnCategories, index) + && valuesOrColumns[index] != null; + } + + private static boolean isFieldMeasurement( + String[] measurementIds, + TSDataType[] dataTypes, + TsTableColumnCategory[] columnCategories, + int index) { + return dataTypes[index] != null + && measurementIds[index] != null + && (columnCategories == null || columnCategories[index] == TsTableColumnCategory.FIELD); + } + private void updateMemoryInfo( long memTableIncrement, long chunkMetadataIncrement, long textDataIncrement) throws WriteProcessRejectException { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java index 89f28726c98..6f5c63824c4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunkGroup.java @@ -179,7 +179,7 @@ public class WritableMemChunkGroup implements IWritableMemChunkGroup { int size = 0; size += Integer.BYTES; for (Map.Entry<String, IWritableMemChunk> entry : memChunkMap.entrySet()) { - size += ReadWriteIOUtils.sizeToWrite(entry.getKey()); + size += WALWriteUtils.sizeToWrite(entry.getKey()); size += entry.getValue().serializedSize(); } return size; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java index 3c4d24b5796..09b836db3cc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/MemUtils.java @@ -67,14 +67,18 @@ public class MemUtils { * memory before insertion */ public static long getRowRecordSize(List<TSDataType> dataTypes, Object[] value) { - int emptyRecordCount = 0; + return getRowRecordSize(dataTypes, value, null); + } + + public static long getRowRecordSize( + List<TSDataType> dataTypes, Object[] value, TsTableColumnCategory[] columnCategories) { + int dataTypeIndex = 0; long memSize = 0L; for (int i = 0; i < value.length; i++) { - if (value[i] == null) { - emptyRecordCount++; + if (value[i] == null || !isFieldColumn(columnCategories, i)) { continue; } - memSize += getRecordSize(dataTypes.get(i - emptyRecordCount), value[i], false); + memSize += getRecordSize(dataTypes.get(dataTypeIndex++), value[i], false); } return memSize; } @@ -87,13 +91,15 @@ public class MemUtils { List<TSDataType> dataTypes, Object[] value, TsTableColumnCategory[] columnCategories) { // time and index size long memSize = 8L + 4L; - for (int i = 0; i < dataTypes.size(); i++) { - if (value[i] == null - || dataTypes.get(i).isBinary() - || columnCategories != null && columnCategories[i] != TsTableColumnCategory.FIELD) { + int dataTypeIndex = 0; + for (int i = 0; i < value.length; i++) { + if (value[i] == null || !isFieldColumn(columnCategories, i)) { continue; } - memSize += dataTypes.get(i).getDataTypeSize(); + TSDataType dataType = dataTypes.get(dataTypeIndex++); + if (!dataType.isBinary()) { + memSize += dataType.getDataTypeSize(); + } } return memSize; } @@ -127,7 +133,7 @@ public class MemUtils { } long memSize = 0; for (int i = 0; i < insertTabletNode.getMeasurements().length; i++) { - if (insertTabletNode.getMeasurements()[i] == null) { + if (!isWritableTabletMeasurement(insertTabletNode, i, true)) { continue; } // Time column memSize @@ -144,7 +150,7 @@ public class MemUtils { } long memSize = 0; for (int i = 0; i < insertTabletNode.getMeasurements().length; i++) { - if (!insertTabletNode.isValidMeasurement(i)) { + if (!isWritableTabletMeasurement(insertTabletNode, i, true)) { continue; } if (results == null) { @@ -163,6 +169,20 @@ public class MemUtils { return memSize; } + private static boolean isWritableTabletMeasurement( + InsertTabletNode insertTabletNode, int index, boolean fieldOnly) { + return insertTabletNode.getMeasurements()[index] != null + && insertTabletNode.getColumns()[index] != null + && insertTabletNode.getDataTypes()[index] != null + && insertTabletNode.getMeasurementSchemas() != null + && insertTabletNode.getMeasurementSchemas()[index] != null + && (!fieldOnly || isFieldColumn(insertTabletNode.getColumnCategories(), index)); + } + + private static boolean isFieldColumn(TsTableColumnCategory[] columnCategories, int columnIndex) { + return columnCategories == null || columnCategories[columnIndex] == TsTableColumnCategory.FIELD; + } + /** Calculate how much memory will be used if the given record is written to sequence file. */ public static long getTsRecordMem(TSRecord tsRecord) { long memUsed = 8; // time diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java index da3dee91caa..6b1f9e3d3d3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java @@ -33,10 +33,12 @@ import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.auth.AuthorityChecker; import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; +import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTablePatternParser; import org.apache.iotdb.db.pipe.event.common.tablet.parser.TabletInsertionEventTreePatternParser; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode; import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl; import org.apache.tsfile.enums.TSDataType; @@ -350,6 +352,75 @@ public class PipeTabletInsertionEventTest { Assert.assertTrue(tablet.isNull(1, 1)); } + @Test + public void convertToTabletSkipsFailedMeasurementsForCoveredTreePattern() throws Exception { + final InsertRowNode rowNode = + new InsertRowNode( + new PlanNodeId("plan node failed row"), + new PartialPath(deviceId), + false, + Arrays.copyOf(measurementIds, measurementIds.length), + Arrays.copyOf(dataTypes, dataTypes.length), + Arrays.copyOf(schemas, schemas.length), + times[0], + Arrays.copyOf(insertRowNode.getValues(), schemas.length), + false); + rowNode.markFailedMeasurement(1); + + final Tablet rowTablet = + new TabletInsertionEventTreePatternParser(rowNode, new PrefixTreePattern(pattern)) + .convertToTablet(); + assertTabletDoesNotContainMeasurement(rowTablet, "s2", schemas.length - 1); + + final InsertTabletNode tabletNode = + new InsertTabletNode( + new PlanNodeId("plan node failed tablet"), + new PartialPath(deviceId), + false, + Arrays.copyOf(measurementIds, measurementIds.length), + Arrays.copyOf(dataTypes, dataTypes.length), + Arrays.copyOf(schemas, schemas.length), + times, + null, + Arrays.copyOf(insertTabletNode.getColumns(), schemas.length), + times.length); + tabletNode.markFailedMeasurement(1); + + final Tablet tablet = + new TabletInsertionEventTreePatternParser(tabletNode, new PrefixTreePattern(pattern)) + .convertToTablet(); + assertTabletDoesNotContainMeasurement(tablet, "s2", schemas.length - 1); + } + + @Test + public void convertToTabletSkipsFailedMeasurementsForTablePattern() throws Exception { + final TsTableColumnCategory[] columnCategories = new TsTableColumnCategory[schemas.length]; + Arrays.fill(columnCategories, TsTableColumnCategory.FIELD); + columnCategories[0] = TsTableColumnCategory.TAG; + columnCategories[2] = TsTableColumnCategory.ATTRIBUTE; + + final RelationalInsertTabletNode tabletNode = + new RelationalInsertTabletNode( + new PlanNodeId("plan node failed relational tablet"), + new PartialPath("table1", false), + false, + Arrays.copyOf(measurementIds, measurementIds.length), + Arrays.copyOf(dataTypes, dataTypes.length), + times, + null, + Arrays.copyOf(insertTabletNode.getColumns(), schemas.length), + times.length, + columnCategories); + tabletNode.setMeasurementSchemas(Arrays.copyOf(schemas, schemas.length)); + tabletNode.markFailedMeasurement(1); + + final Tablet tablet = + new TabletInsertionEventTablePatternParser( + null, null, tabletNode, new TablePattern(true, null, null)) + .convertToTablet(); + assertTabletDoesNotContainMeasurement(tablet, "s2", schemas.length - 1); + } + @Test public void convertToTabletWithFilteredRowsForTest() throws Exception { TabletInsertionEventTreePatternParser container1 = @@ -538,4 +609,14 @@ public class PipeTabletInsertionEventTest { : new TSStatus(org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS.getStatusCode()); } } + + private static void assertTabletDoesNotContainMeasurement( + final Tablet tablet, final String measurement, final int expectedSchemaSize) { + Assert.assertEquals(expectedSchemaSize, tablet.getSchemas().size()); + for (int i = 0; i < tablet.getSchemas().size(); i++) { + Assert.assertNotNull(tablet.getSchemas().get(i)); + Assert.assertNotEquals(measurement, tablet.getSchemas().get(i).getMeasurementName()); + Assert.assertNotNull(tablet.getValues()[i]); + } + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatementTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatementTest.java new file mode 100644 index 00000000000..8c563a84b1d --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/receiver/transform/statement/PipeConvertedInsertRowStatementTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.receiver.transform.statement; + +import org.apache.iotdb.commons.path.PartialPath; +import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; + +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.junit.Assert; +import org.junit.Test; + +import java.time.ZoneId; + +public class PipeConvertedInsertRowStatementTest { + + @Test + public void testTransferTypeKeepsNullValue() throws Exception { + final InsertRowStatement statement = new InsertRowStatement(); + statement.setDevicePath(new PartialPath("root.sg.d1")); + statement.setMeasurements(new String[] {"s0"}); + statement.setMeasurementSchemas( + new MeasurementSchema[] {new MeasurementSchema("s0", TSDataType.INT32)}); + statement.setDataTypes(new TSDataType[] {null}); + statement.setTime(1L); + statement.setValues(new Object[] {null}); + statement.setNeedInferType(true); + + final PipeConvertedInsertRowStatement convertedStatement = + new PipeConvertedInsertRowStatement(statement); + convertedStatement.transferType(ZoneId.systemDefault()); + + Assert.assertEquals(TSDataType.INT32, convertedStatement.getDataTypes()[0]); + Assert.assertEquals("s0", convertedStatement.getMeasurements()[0]); + Assert.assertNull(convertedStatement.getValues()[0]); + Assert.assertFalse(convertedStatement.isNeedInferType()); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimatorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimatorTest.java index cefc2377dc7..06db08aaf57 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimatorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/resource/memory/InsertNodeMemoryEstimatorTest.java @@ -134,6 +134,18 @@ public class InsertNodeMemoryEstimatorTest { Assert.assertTrue(largerNodeSize > baselineSize); } + @Test + public void testInsertTabletNodeWithNullColumnIsEstimated() throws IllegalPathException { + InsertTabletNode tablet = createTextInsertTabletNode("tablet", "root.sg.d1", 2, 4, 8); + + long fullSize = InsertNodeMemoryEstimator.sizeOf(tablet); + tablet.getColumns()[1] = null; + long sizeWithNullColumn = InsertNodeMemoryEstimator.sizeOf(tablet); + + Assert.assertTrue(sizeWithNullColumn > 0); + Assert.assertTrue(sizeWithNullColumn < fullSize); + } + @Test public void testPlanNodeIdIsEstimated() throws IllegalPathException { InsertRowNode shortPlanNodeIdRow = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverterTest.java index 410afc76130..8eff4a59191 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverterTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverterTest.java @@ -25,6 +25,7 @@ import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement import org.apache.tsfile.enums.ColumnCategory; import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.utils.Binary; +import org.apache.tsfile.utils.BytesUtils; import org.apache.tsfile.utils.DateUtils; import org.apache.tsfile.utils.PublicBAOS; import org.apache.tsfile.utils.ReadWriteIOUtils; @@ -172,6 +173,41 @@ public class TabletStatementConverterTest { assertTabletsEqual(originalTablet, convertedTablet); } + @Test + public void testDeserializeStatementFromTabletFormatWithNullSchemaAndNullColumn() + throws IOException, MetadataException { + final PublicBAOS byteArrayOutputStream = new PublicBAOS(); + final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream); + + ReadWriteIOUtils.write("root.sg.device", outputStream); + ReadWriteIOUtils.write(1, outputStream); + + ReadWriteIOUtils.write(BytesUtils.boolToByte(true), outputStream); + ReadWriteIOUtils.write(1, outputStream); + ReadWriteIOUtils.write(BytesUtils.boolToByte(false), outputStream); + + ReadWriteIOUtils.write(BytesUtils.boolToByte(true), outputStream); + ReadWriteIOUtils.write(1L, outputStream); + + ReadWriteIOUtils.write(BytesUtils.boolToByte(false), outputStream); + + ReadWriteIOUtils.write(BytesUtils.boolToByte(true), outputStream); + ReadWriteIOUtils.write(BytesUtils.boolToByte(false), outputStream); + + ReadWriteIOUtils.write(true, outputStream); + + final ByteBuffer buffer = + ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size()); + + final InsertTabletStatement statement = + TabletStatementConverter.deserializeStatementFromTabletFormat(buffer); + + Assert.assertArrayEquals(new String[] {null}, statement.getMeasurements()); + Assert.assertArrayEquals(new TSDataType[] {null}, statement.getDataTypes()); + Assert.assertNull(statement.getColumns()[0]); + Assert.assertTrue(statement.isAligned()); + } + /** * Generate a Tablet for tree model with all data types and specified number of columns and rows. * diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java index f0681736329..a2d28762103 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowNodeSerdeTest.java @@ -176,6 +176,25 @@ public class InsertRowNodeSerdeTest { Assert.assertEquals(insertRowNode.serializedSize(), byteBuffer.position()); } + @Test + public void testDeserializeFromWALWithMarkedFailedMeasurementOnly() + throws IllegalPathException, IOException { + InsertRowNode insertRowNode = getInsertRowNodeWithMeasurementSchemas(); + insertRowNode.markFailedMeasurement(1); + + byte[] bytes = new byte[insertRowNode.serializedSize()]; + WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes)); + insertRowNode.serializeToWAL(walBuffer); + Assert.assertFalse(walBuffer.getBuffer().hasRemaining()); + + DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes)); + Assert.assertEquals(PlanNodeType.INSERT_ROW.getNodeType(), dataInputStream.readShort()); + + InsertRowNode tmpNode = InsertRowNode.deserializeFromWAL(dataInputStream); + Assert.assertArrayEquals( + new String[] {"\u6e29\u5ea6", "s3", "s4", "s5"}, tmpNode.getMeasurements()); + } + private InsertRowNode getInsertRowNode() throws IllegalPathException { long time = 110L; TSDataType[] dataTypes = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java index 2cffd65e41e..35204133bb8 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertRowsOfOneDeviceNodeSerdeTest.java @@ -79,4 +79,38 @@ public class InsertRowsOfOneDeviceNodeSerdeTest { Assert.assertEquals(node, InsertRowsOfOneDeviceNode.deserialize(byteBuffer)); } + + @Test + public void testStoreMeasurementsSkipsFailedMeasurements() throws IllegalPathException { + PartialPath device = new PartialPath("root.sg.d"); + InsertRowsOfOneDeviceNode node = new InsertRowsOfOneDeviceNode(new PlanNodeId("plan node 1")); + + List<InsertRowNode> insertRowNodeList = new ArrayList<>(); + InsertRowNode firstRow = + new InsertRowNode( + new PlanNodeId("plan node 1"), + device, + false, + new String[] {"s1", "failed"}, + new TSDataType[] {TSDataType.DOUBLE, TSDataType.FLOAT}, + 1000L, + new Object[] {1.0, 2f}, + false); + firstRow.markFailedMeasurement(1); + insertRowNodeList.add(firstRow); + insertRowNodeList.add( + new InsertRowNode( + new PlanNodeId("plan node 1"), + device, + false, + new String[] {"s2"}, + new TSDataType[] {TSDataType.INT64}, + 2000L, + new Object[] {300L}, + false)); + + node.setInsertRowNodeList(insertRowNodeList); + + Assert.assertArrayEquals(new String[] {"s1", "s2"}, node.getMeasurements()); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java index 8cf0ce6e36e..27a3b8d1ce5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/InsertTabletNodeSerdeTest.java @@ -235,6 +235,69 @@ public class InsertTabletNodeSerdeTest { Assert.assertEquals(insertTabletNode.serializedSize(), byteBuffer.position()); } + @Test + public void testSerializedSizeWithClearedMeasurementAndRetainedColumn() + throws IllegalPathException { + InsertTabletNode insertTabletNode = getInsertTabletNodeWithSchema(); + insertTabletNode.getMeasurements()[1] = null; + insertTabletNode.getMeasurementSchemas()[1] = null; + insertTabletNode.getDataTypes()[1] = null; + + ByteBuffer byteBuffer = ByteBuffer.allocate(insertTabletNode.serializedSize()); + insertTabletNode.serializeToWAL(new WALByteBufferForTest(byteBuffer)); + + Assert.assertEquals(insertTabletNode.serializedSize(), byteBuffer.position()); + } + + @Test + public void testSerializedSizeWithRetainedMeasurementAndNullColumn() + throws IllegalPathException, IOException { + InsertTabletNode insertTabletNode = getInsertTabletNodeWithSchema(); + insertTabletNode.getColumns()[1] = null; + + byte[] bytes = new byte[insertTabletNode.serializedSize()]; + WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes)); + insertTabletNode.serializeToWAL(walBuffer); + Assert.assertFalse(walBuffer.getBuffer().hasRemaining()); + + DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes)); + Assert.assertEquals(PlanNodeType.INSERT_TABLET.getNodeType(), dataInputStream.readShort()); + + InsertTabletNode tmpNode = InsertTabletNode.deserializeFromWAL(dataInputStream); + Assert.assertArrayEquals( + new String[] {"\u6e29\u5ea6", "s3", "s4", "s5"}, tmpNode.getMeasurements()); + } + + @Test + public void testRelationalSerializedSizeWithRetainedMeasurementAndNullColumn() { + RelationalInsertTabletNode insertTabletNode = getRelationalInsertTabletNodeWithSchema("table1"); + insertTabletNode.getColumns()[1] = null; + + ByteBuffer byteBuffer = ByteBuffer.allocate(insertTabletNode.serializedSize()); + insertTabletNode.serializeToWAL(new WALByteBufferForTest(byteBuffer)); + + Assert.assertEquals(insertTabletNode.serializedSize(), byteBuffer.position()); + } + + @Test + public void testDeserializeFromWALWithMarkedFailedMeasurementOnly() + throws IllegalPathException, IOException { + InsertTabletNode insertTabletNode = getInsertTabletNodeWithSchema(); + insertTabletNode.markFailedMeasurement(1); + + byte[] bytes = new byte[insertTabletNode.serializedSize()]; + WALByteBufferForTest walBuffer = new WALByteBufferForTest(ByteBuffer.wrap(bytes)); + insertTabletNode.serializeToWAL(walBuffer); + Assert.assertFalse(walBuffer.getBuffer().hasRemaining()); + + DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(bytes)); + Assert.assertEquals(PlanNodeType.INSERT_TABLET.getNodeType(), dataInputStream.readShort()); + + InsertTabletNode tmpNode = InsertTabletNode.deserializeFromWAL(dataInputStream); + Assert.assertArrayEquals( + new String[] {"\u6e29\u5ea6", "s3", "s4", "s5"}, tmpNode.getMeasurements()); + } + @Test public void testInitTabletValuesWithAllTypes() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java index 01857cb5f8a..6d5185af4ba 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java @@ -260,6 +260,47 @@ public class WritePlanNodeSplitTest { } } + @Test + public void testSplitInsertTabletSkipsClearedMeasurementWithRetainedColumn() + throws IllegalPathException { + InsertTabletNode insertTabletNode = new InsertTabletNode(new PlanNodeId("plan node 1")); + + insertTabletNode.setTargetPath(new PartialPath("root.sg1.d1")); + insertTabletNode.setMeasurements(new String[] {"s0", null}); + insertTabletNode.setTimes( + new long[] {-200, -101, 1, 60, 120, 180, 270, 290, 360, 375, 440, 470}); + insertTabletNode.setDataTypes(new TSDataType[] {TSDataType.INT32, TSDataType.INT32}); + insertTabletNode.setColumns( + new Object[] { + new int[] {-20, -10, 10, 20, 30, 40, 50, 60, 70, 80, 90, 100}, + new int[] {-2, -1, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + }); + insertTabletNode.setRowCount(insertTabletNode.getTimes().length); + final BitMap[] bitMaps = new BitMap[] {null, new BitMap(insertTabletNode.getRowCount())}; + bitMaps[1].mark(2); + insertTabletNode.setBitMaps(bitMaps); + + DataPartitionQueryParam dataPartitionQueryParam = new DataPartitionQueryParam(); + dataPartitionQueryParam.setDeviceID( + insertTabletNode.getTargetPath().getIDeviceIDAsFullDevice()); + dataPartitionQueryParam.setTimePartitionSlotList(insertTabletNode.getTimePartitionSlots()); + + DataPartition dataPartition = + getDataPartition(Collections.singletonList(dataPartitionQueryParam)); + Analysis analysis = new Analysis(); + analysis.setDataPartitionInfo(dataPartition); + + List<WritePlanNode> insertTabletNodeList = insertTabletNode.splitByPartition(analysis); + + Assert.assertEquals(6, insertTabletNodeList.size()); + for (WritePlanNode insertNode : insertTabletNodeList) { + InsertTabletNode tabletNode = (InsertTabletNode) insertNode; + Assert.assertNotNull(tabletNode.getColumns()[0]); + Assert.assertNull(tabletNode.getColumns()[1]); + Assert.assertNull(tabletNode.getBitMaps()); + } + } + @Test public void testSplitRelationalInsertTablet() throws IllegalPathException { RelationalInsertTabletNode relationalInsertTabletNode = diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNodeIsMeasurementFailedTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNodeIsMeasurementFailedTest.java index e5ed818323b..8853348719b 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNodeIsMeasurementFailedTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertNodeIsMeasurementFailedTest.java @@ -22,12 +22,18 @@ package org.apache.iotdb.db.queryengine.plan.planner.plan.node.write; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.utils.Binary; import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Test; +import java.nio.charset.StandardCharsets; + import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; /** @@ -82,6 +88,46 @@ public class InsertNodeIsMeasurementFailedTest { assertFalse(node.isMeasurementFailed(1)); } + @Test + public void testInsertRowNode_clearedMeasurementWithRetainedValue_isFailed() + throws IllegalPathException { + InsertRowNode node = buildInsertRowNode(new String[] {"s0", "s1"}); + node.getMeasurements()[0] = null; + + assertTrue(node.isMeasurementFailed(0)); + assertNull(node.composeTimeValuePair(0)); + } + + @Test + public void testInsertRowNode_retainedMeasurementWithNullValueDoesNotComposeLastCacheValue() + throws IllegalPathException { + InsertRowNode node = buildInsertRowNode(new String[] {"s0", "s1"}); + node.getValues()[0] = null; + + assertFalse(node.isMeasurementFailed(0)); + assertNull(node.composeTimeValuePair(0)); + } + + @Test + public void testInsertRowNode_nullMeasurements_nullSafe() throws IllegalPathException { + InsertRowNode node = buildInsertRowNode(new String[] {"s0"}); + node.setMeasurements(null); + + assertTrue(node.isMeasurementFailed(0)); + assertFalse(node.hasValidMeasurements()); + assertTrue(node.allMeasurementFailed()); + } + + @Test + public void testRelationalInsertRowNode_nonFieldColumnsDoNotComposeLastCacheValue() + throws IllegalPathException { + RelationalInsertRowNode node = buildRelationalInsertRowNode(); + + assertNull(node.composeTimeValuePair(0)); + assertNull(node.composeTimeValuePair(1)); + assertNotNull(node.composeTimeValuePair(2)); + } + // ----------------------------------------------------------------------- // InsertTabletNode // ----------------------------------------------------------------------- @@ -115,6 +161,36 @@ public class InsertNodeIsMeasurementFailedTest { assertTrue(node.isMeasurementFailed(1)); } + @Test + public void testInsertTabletNode_clearedMeasurementWithRetainedColumn_isFailed() + throws IllegalPathException { + InsertTabletNode node = buildInsertTabletNode(new String[] {"s0", "s1"}); + node.getMeasurements()[0] = null; + + assertTrue(node.isMeasurementFailed(0)); + assertNull(node.composeLastTimeValuePair(0)); + } + + @Test + public void testInsertTabletNode_retainedMeasurementWithNullColumnDoesNotComposeLastCacheValue() + throws IllegalPathException { + InsertTabletNode node = buildInsertTabletNode(new String[] {"s0", "s1"}); + node.getColumns()[0] = null; + + assertFalse(node.isMeasurementFailed(0)); + assertNull(node.composeLastTimeValuePair(0)); + } + + @Test + public void testRelationalInsertTabletNode_nonFieldColumnsDoNotComposeLastCacheValue() + throws IllegalPathException { + RelationalInsertTabletNode node = buildRelationalInsertTabletNode(); + + assertNull(node.composeLastTimeValuePair(0)); + assertNull(node.composeLastTimeValuePair(1)); + assertNotNull(node.composeLastTimeValuePair(2)); + } + // ----------------------------------------------------------------------- // Helpers // ----------------------------------------------------------------------- @@ -144,6 +220,35 @@ public class InsertNodeIsMeasurementFailedTest { return node; } + private static RelationalInsertRowNode buildRelationalInsertRowNode() + throws IllegalPathException { + String[] measurements = {"tag0", "attr0", "field0"}; + TSDataType[] dataTypes = {TSDataType.STRING, TSDataType.STRING, TSDataType.INT32}; + MeasurementSchema[] schemas = { + new MeasurementSchema("tag0", TSDataType.STRING), + new MeasurementSchema("attr0", TSDataType.STRING), + new MeasurementSchema("field0", TSDataType.INT32) + }; + Object[] values = { + new Binary("tag".getBytes(StandardCharsets.UTF_8)), + new Binary("attr".getBytes(StandardCharsets.UTF_8)), + 1 + }; + return new RelationalInsertRowNode( + new PlanNodeId("test"), + new PartialPath("table1", false), + true, + measurements, + dataTypes, + schemas, + 1L, + values, + false, + new TsTableColumnCategory[] { + TsTableColumnCategory.TAG, TsTableColumnCategory.ATTRIBUTE, TsTableColumnCategory.FIELD + }); + } + private static InsertTabletNode buildInsertTabletNode(String[] measurementNames) throws IllegalPathException { int n = measurementNames.length; @@ -171,4 +276,34 @@ public class InsertNodeIsMeasurementFailedTest { rowCount); return node; } + + private static RelationalInsertTabletNode buildRelationalInsertTabletNode() + throws IllegalPathException { + String[] measurements = {"tag0", "attr0", "field0"}; + TSDataType[] dataTypes = {TSDataType.STRING, TSDataType.STRING, TSDataType.INT32}; + MeasurementSchema[] schemas = { + new MeasurementSchema("tag0", TSDataType.STRING), + new MeasurementSchema("attr0", TSDataType.STRING), + new MeasurementSchema("field0", TSDataType.INT32) + }; + Object[] columns = { + new Binary[] {new Binary("tag".getBytes(StandardCharsets.UTF_8))}, + new Binary[] {new Binary("attr".getBytes(StandardCharsets.UTF_8))}, + new int[] {1} + }; + return new RelationalInsertTabletNode( + new PlanNodeId("test"), + new PartialPath("table1", false), + true, + measurements, + dataTypes, + schemas, + new long[] {1L}, + null, + columns, + 1, + new TsTableColumnCategory[] { + TsTableColumnCategory.TAG, TsTableColumnCategory.ATTRIBUTE, TsTableColumnCategory.FIELD + }); + } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTablePartialInsertTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTablePartialInsertTest.java index 2f4a85d5840..bdad14c20e3 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTablePartialInsertTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTablePartialInsertTest.java @@ -22,6 +22,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.WriteProcessException; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; @@ -112,6 +113,20 @@ public class AbstractMemTablePartialInsertTest { assertEquals(1, memTable.getTotalPointsNum()); } + @Test + public void testInsertAlignedRow_markedFailedMeasurementOnly_pointsInsertedMatchesWrittenPoints() + throws IllegalPathException { + InsertRowNode node = + buildAlignedInsertRowNode( + new String[] {"s0", "s1"}, new Object[] {1, 2}, -1 /* no failure */); + node.markFailedMeasurement(0); + + int points = memTable.insertAlignedRow(node); + + assertEquals(1, points); + assertEquals(1, memTable.getTotalPointsNum()); + } + /** All measurements fail → insertAlignedRow returns 0 early (schemaList is empty). */ @Test public void testInsertAlignedRow_allMeasurementsFailed_pointsInsertedIsZero() @@ -132,6 +147,67 @@ public class AbstractMemTablePartialInsertTest { assertEquals(0, memTable.getTotalPointsNum()); } + @Test + public void testInsertAlignedRow_tableNonFieldAndFailedMeasurementsNotCountedAsSeries() + throws IllegalPathException { + InsertRowNode node = + buildAlignedInsertRowNode( + new String[] {"tag1", "attr1", "s0", "s1"}, + new Object[] {1, 2, 3, 4}, + -1 /* no failure */); + node.setColumnCategories( + new TsTableColumnCategory[] { + TsTableColumnCategory.TAG, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD + }); + node.markFailedMeasurement(3); + + int points = memTable.insertAlignedRow(node); + + assertEquals(1, points); + assertEquals(1, memTable.getTotalPointsNum()); + assertEquals(1, memTable.getSeriesNumber()); + } + + @Test + public void testInsertRow_tableNonFieldAndFailedMeasurementsNotCountedAsSeries() + throws IllegalPathException { + InsertRowNode node = + new InsertRowNode( + new PlanNodeId("test"), + new PartialPath("root.sg.d1"), + false, + new String[] {"tag1", "attr1", "s0", "s1"}, + new TSDataType[] { + TSDataType.INT32, TSDataType.INT32, TSDataType.INT32, TSDataType.INT32 + }, + new MeasurementSchema[] { + new MeasurementSchema("tag1", TSDataType.INT32), + new MeasurementSchema("attr1", TSDataType.INT32), + new MeasurementSchema("s0", TSDataType.INT32), + new MeasurementSchema("s1", TSDataType.INT32) + }, + 1L, + new Object[] {1, 2, 3, 4}, + false); + node.setColumnCategories( + new TsTableColumnCategory[] { + TsTableColumnCategory.TAG, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD + }); + node.markFailedMeasurement(3); + + int points = memTable.insert(node); + + assertEquals(1, points); + assertEquals(1, memTable.getTotalPointsNum()); + assertEquals(1, memTable.getSeriesNumber()); + } + // ========================================================================= // insertTablet – failed measurement must be skipped in null-point counting // ========================================================================= @@ -179,6 +255,43 @@ public class AbstractMemTablePartialInsertTest { assertEquals(3, memTable.getTotalPointsNum()); } + @Test + public void testInsertTablet_markedFailedMeasurementOnly_pointsInsertedMatchesWrittenPoints() + throws IllegalPathException, WriteProcessException { + int rowCount = 3; + InsertTabletNode node = + buildInsertTabletNode(new String[] {"s0", "s1"}, rowCount, null, -1 /* no failure */); + node.markFailedMeasurement(0); + + int points = memTable.insertTablet(node, 0, rowCount); + + assertEquals(3, points); + assertEquals(3, memTable.getTotalPointsNum()); + } + + @Test + public void testInsertTablet_tableNonFieldAndFailedMeasurementsNotCountedAsSeries() + throws IllegalPathException, WriteProcessException { + int rowCount = 3; + InsertTabletNode node = + buildInsertTabletNode( + new String[] {"tag1", "attr1", "s0", "s1"}, rowCount, null, -1 /* no failure */); + node.setColumnCategories( + new TsTableColumnCategory[] { + TsTableColumnCategory.TAG, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD + }); + node.markFailedMeasurement(3); + + int points = memTable.insertTablet(node, 0, rowCount); + + assertEquals(rowCount, points); + assertEquals(rowCount, memTable.getTotalPointsNum()); + assertEquals(1, memTable.getSeriesNumber()); + } + /** All measurements fail → pointsInserted == 0. formula: (2-2)*3 - 0 = 0 */ @Test public void testInsertTablet_allMeasurementsFailed_pointsInsertedIsZero() diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/MemChunkDeserializeTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/MemChunkDeserializeTest.java index 8e03776231e..5595390752a 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/MemChunkDeserializeTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/MemChunkDeserializeTest.java @@ -283,6 +283,29 @@ public class MemChunkDeserializeTest { } } + @Test + public void testNonAlignedMemChunkGroupSerializedSizeWithNonAsciiMeasurement() + throws IOException { + String measurement = "\u6e29\u5ea6"; + WritableMemChunk series = + new WritableMemChunk( + new MeasurementSchema(measurement, TSDataType.INT32, TSEncoding.PLAIN)); + series.writeNonAlignedPoint(1, 1); + + WritableMemChunkGroup group = new WritableMemChunkGroup(); + group.getMemChunkMap().put(measurement, series); + + WALByteBufferForTest walBuffer = + new WALByteBufferForTest(ByteBuffer.allocate(group.serializedSize())); + group.serializeToWAL(walBuffer); + Assert.assertEquals(group.serializedSize(), walBuffer.getBuffer().position()); + + DataInputStream inputStream = + new DataInputStream(new ByteArrayInputStream(walBuffer.getBuffer().array())); + WritableMemChunkGroup deserialized = WritableMemChunkGroup.deserialize(inputStream); + Assert.assertTrue(deserialized.getMemChunkMap().containsKey(measurement)); + } + private WritableMemChunk createWritableMemChunkFromBytes(WritableMemChunk series) throws IOException { int serializedSize = series.serializedSize(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java index 3cc50496fb6..2609cb1acb5 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessorTest.java @@ -26,6 +26,7 @@ import org.apache.iotdb.commons.path.AlignedFullPath; import org.apache.iotdb.commons.path.NonAlignedFullPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.exception.DataRegionException; @@ -36,6 +37,7 @@ import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode; import org.apache.iotdb.db.storageengine.dataregion.DataRegionInfo; import org.apache.iotdb.db.storageengine.dataregion.DataRegionTest; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; @@ -56,6 +58,7 @@ import org.apache.tsfile.read.common.RowRecord; import org.apache.tsfile.read.expression.QueryExpression; import org.apache.tsfile.read.query.dataset.QueryDataSet; import org.apache.tsfile.read.reader.IPointReader; +import org.apache.tsfile.utils.Binary; import org.apache.tsfile.write.record.TSRecord; import org.apache.tsfile.write.record.datapoint.DataPoint; import org.apache.tsfile.write.schema.MeasurementSchema; @@ -68,6 +71,7 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -898,6 +902,174 @@ public class TsFileProcessorTest { Assert.assertEquals(memTable1.memSize(), memTable2.memSize()); } + @Test + public void testAlignedRamCostIgnoresRelationalNonFieldAndNullFieldColumns() + throws IllegalPathException, WriteProcessException, IOException { + TsFileProcessor relationalProcessor = + new TsFileProcessor( + storageGroup, + SystemFileFactory.INSTANCE.getFile(filePath), + sgInfo, + this::closeTsFileProcessor, + (tsFileProcessor, updateMap, systemFlushTime) -> {}, + true); + TsFileProcessorInfo relationalInfo = new TsFileProcessorInfo(sgInfo); + relationalProcessor.setTsFileProcessorInfo(relationalInfo); + this.sgInfo.initTsFileProcessorInfo(relationalProcessor); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, relationalProcessor); + + RelationalInsertRowNode relationalNode = + new RelationalInsertRowNode( + new PlanNodeId("relational"), + new PartialPath("table1", false), + true, + new String[] {"tag1", "attr1", "s1", "s2"}, + new TSDataType[] {TSDataType.TEXT, TSDataType.TEXT, TSDataType.INT32, TSDataType.INT64}, + new MeasurementSchema[] { + new MeasurementSchema("tag1", TSDataType.TEXT), + new MeasurementSchema("attr1", TSDataType.TEXT), + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT64) + }, + 1L, + new Object[] { + new Binary("tag-value".getBytes(StandardCharsets.UTF_8)), + new Binary("attr-value".getBytes(StandardCharsets.UTF_8)), + 1, + null + }, + false, + new TsTableColumnCategory[] { + TsTableColumnCategory.TAG, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD + }); + relationalProcessor.insert(relationalNode, new long[5]); + + TsFileProcessor fieldOnlyProcessor = + new TsFileProcessor( + storageGroup, + SystemFileFactory.INSTANCE.getFile(filePath), + sgInfo, + this::closeTsFileProcessor, + (tsFileProcessor, updateMap, systemFlushTime) -> {}, + true); + TsFileProcessorInfo fieldOnlyInfo = new TsFileProcessorInfo(sgInfo); + fieldOnlyProcessor.setTsFileProcessorInfo(fieldOnlyInfo); + this.sgInfo.initTsFileProcessorInfo(fieldOnlyProcessor); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, fieldOnlyProcessor); + + InsertRowNode fieldOnlyNode = + new InsertRowNode( + new PlanNodeId("field-only"), + new PartialPath(deviceId), + true, + new String[] {"s1", "s2"}, + new TSDataType[] {TSDataType.INT32, TSDataType.INT64}, + new MeasurementSchema[] { + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT64) + }, + 1L, + new Object[] {1, null}, + false); + fieldOnlyProcessor.insert(fieldOnlyNode, new long[5]); + + IMemTable relationalMemTable = relationalProcessor.getWorkMemTable(); + IMemTable fieldOnlyMemTable = fieldOnlyProcessor.getWorkMemTable(); + Assert.assertEquals( + fieldOnlyMemTable.getTVListsRamCost(), relationalMemTable.getTVListsRamCost()); + Assert.assertEquals(fieldOnlyInfo.getMemCost(), relationalInfo.getMemCost()); + Assert.assertEquals(fieldOnlyMemTable.memSize(), relationalMemTable.memSize()); + Assert.assertEquals(1, relationalMemTable.getTotalPointsNum()); + Assert.assertEquals(1, relationalMemTable.getSeriesNumber()); + } + + @Test + public void testNonAlignedRamCostIgnoresRelationalNonFieldAndNullFieldColumns() + throws IllegalPathException, WriteProcessException, IOException { + TsFileProcessor relationalProcessor = + new TsFileProcessor( + storageGroup, + SystemFileFactory.INSTANCE.getFile(filePath), + sgInfo, + this::closeTsFileProcessor, + (tsFileProcessor, updateMap, systemFlushTime) -> {}, + true); + TsFileProcessorInfo relationalInfo = new TsFileProcessorInfo(sgInfo); + relationalProcessor.setTsFileProcessorInfo(relationalInfo); + this.sgInfo.initTsFileProcessorInfo(relationalProcessor); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, relationalProcessor); + + RelationalInsertRowNode relationalNode = + new RelationalInsertRowNode( + new PlanNodeId("relational"), + new PartialPath("table1", false), + false, + new String[] {"tag1", "attr1", "s1", "s2"}, + new TSDataType[] {TSDataType.TEXT, TSDataType.TEXT, TSDataType.INT32, TSDataType.INT64}, + new MeasurementSchema[] { + new MeasurementSchema("tag1", TSDataType.TEXT), + new MeasurementSchema("attr1", TSDataType.TEXT), + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT64) + }, + 1L, + new Object[] { + new Binary("tag-value".getBytes(StandardCharsets.UTF_8)), + new Binary("attr-value".getBytes(StandardCharsets.UTF_8)), + 1, + null + }, + false, + new TsTableColumnCategory[] { + TsTableColumnCategory.TAG, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD + }); + relationalProcessor.insert(relationalNode, new long[5]); + + TsFileProcessor fieldOnlyProcessor = + new TsFileProcessor( + storageGroup, + SystemFileFactory.INSTANCE.getFile(filePath), + sgInfo, + this::closeTsFileProcessor, + (tsFileProcessor, updateMap, systemFlushTime) -> {}, + true); + TsFileProcessorInfo fieldOnlyInfo = new TsFileProcessorInfo(sgInfo); + fieldOnlyProcessor.setTsFileProcessorInfo(fieldOnlyInfo); + this.sgInfo.initTsFileProcessorInfo(fieldOnlyProcessor); + SystemInfo.getInstance().reportStorageGroupStatus(sgInfo, fieldOnlyProcessor); + + InsertRowNode fieldOnlyNode = + new InsertRowNode( + new PlanNodeId("field-only"), + new PartialPath(deviceId), + false, + new String[] {"s1", "s2"}, + new TSDataType[] {TSDataType.INT32, TSDataType.INT64}, + new MeasurementSchema[] { + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT64) + }, + 1L, + new Object[] {1, null}, + false); + fieldOnlyProcessor.insert(fieldOnlyNode, new long[5]); + + IMemTable relationalMemTable = relationalProcessor.getWorkMemTable(); + IMemTable fieldOnlyMemTable = fieldOnlyProcessor.getWorkMemTable(); + Assert.assertEquals( + fieldOnlyMemTable.getTVListsRamCost(), relationalMemTable.getTVListsRamCost()); + Assert.assertEquals(fieldOnlyInfo.getMemCost(), relationalInfo.getMemCost()); + Assert.assertEquals(fieldOnlyMemTable.memSize(), relationalMemTable.memSize()); + Assert.assertEquals(1, relationalMemTable.getTotalPointsNum()); + Assert.assertEquals(1, relationalMemTable.getSeriesNumber()); + } + @Test public void testRamCostInsertSameDataBy2Ways() throws MetadataException, WriteProcessException, IOException { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java index 7af2a9775fb..1c1c1fca122 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/utils/MemUtilsTest.java @@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils; import org.apache.iotdb.commons.exception.IllegalPathException; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.tsfile.common.conf.TSFileConfig; @@ -91,6 +92,46 @@ public class MemUtilsTest { Assert.assertEquals(sizeSum, MemUtils.getAlignedRowRecordSize(dataTypes, row, null)); } + @Test + public void getAlignedRowRecordSizeWithSkippedSlotsTest() { + Object[] row = {null, 1, 2L}; + List<TSDataType> dataTypes = new ArrayList<>(); + dataTypes.add(TSDataType.INT32); + + int sizeSum = 8 + 4; + sizeSum += TSDataType.INT32.getDataTypeSize(); + + Assert.assertEquals( + sizeSum, + MemUtils.getAlignedRowRecordSize( + dataTypes, + row, + new TsTableColumnCategory[] { + TsTableColumnCategory.FIELD, TsTableColumnCategory.FIELD, TsTableColumnCategory.TAG + })); + } + + @Test + public void getRowRecordSizeWithSkippedSlotsTest() { + Object[] row = {1, 2, 3L, null}; + List<TSDataType> dataTypes = new ArrayList<>(); + dataTypes.add(TSDataType.INT64); + + int sizeSum = 8 + TSDataType.INT64.getDataTypeSize(); + + Assert.assertEquals( + sizeSum, + MemUtils.getRowRecordSize( + dataTypes, + row, + new TsTableColumnCategory[] { + TsTableColumnCategory.TAG, + TsTableColumnCategory.ATTRIBUTE, + TsTableColumnCategory.FIELD, + TsTableColumnCategory.FIELD + })); + } + @Test public void getRecordSizeWithInsertTableNodeTest() throws IllegalPathException { PartialPath device = new PartialPath("root.sg.d1"); @@ -125,6 +166,14 @@ public class MemUtilsTest { null, columns, 1); + insertNode.setMeasurementSchemas( + new MeasurementSchema[] { + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT64), + new MeasurementSchema("s3", TSDataType.FLOAT), + new MeasurementSchema("s4", TSDataType.DOUBLE), + new MeasurementSchema("s5", TSDataType.TEXT) + }); Assert.assertEquals(sizeSum, MemUtils.getTabletSize(insertNode, 0, 1)); } @@ -175,6 +224,35 @@ public class MemUtilsTest { Assert.assertEquals(sizeSum, MemUtils.getAlignedTabletSize(insertNode, 0, 1, null)); } + @Test + public void getAlignedTabletSizeWithMarkedFailedMeasurementTest() throws IllegalPathException { + PartialPath device = new PartialPath("root.sg.d1"); + String[] measurements = {"s1", "s2"}; + Object[] columns = {new int[] {1, 2}, new long[] {3, 4}}; + TSDataType[] dataTypes = {TSDataType.INT32, TSDataType.INT64}; + InsertTabletNode insertNode = + new InsertTabletNode( + new PlanNodeId(""), + device, + true, + measurements, + dataTypes, + new long[] {1, 2}, + null, + columns, + 2); + insertNode.setMeasurementSchemas( + new MeasurementSchema[] { + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT64) + }); + insertNode.markFailedMeasurement(0); + + long sizeSum = 2L * TSDataType.INT64.getDataTypeSize(); + sizeSum += 2L * (8L + 4L); + Assert.assertEquals(sizeSum, MemUtils.getAlignedTabletSize(insertNode, 0, 2, null)); + } + /** This method tests MemUtils.getStringMem() and MemUtils.getDataPointMem() */ @Test public void getMemSizeTest() {
