This is an automated email from the ASF dual-hosted git repository.
Caideyipi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 16919029ef3 [Pipe] Optimize memory usage (#17770)
16919029ef3 is described below
commit 16919029ef32023868e7c660521e5a8144d5b325
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 27 09:32:17 2026 +0800
[Pipe] Optimize memory usage (#17770)
---
.../iotdb/db/pipe/event/common/row/PipeRow.java | 5 +-
.../db/pipe/event/common/row/PipeRowCollector.java | 21 +-
.../common/tablet/PipeRawTabletInsertionEvent.java | 1 +
.../pipe/event/common/tablet/PipeTabletUtils.java | 249 +++++++++++++++++++++
.../tablet/parser/TabletInsertionEventParser.java | 66 +++---
.../TabletInsertionEventTablePatternParser.java | 6 +-
.../TabletInsertionEventTreePatternParser.java | 6 +-
.../query/TsFileInsertionEventQueryParser.java | 5 +-
...ileInsertionEventQueryParserTabletIterator.java | 41 ++--
.../scan/TsFileInsertionEventScanParser.java | 121 +++++++---
...ileInsertionEventTableParserTabletIterator.java | 76 +++++--
.../pipe/resource/memory/PipeMemoryWeightUtil.java | 12 +-
.../request/PipeTransferTabletBatchReq.java | 6 +-
.../request/PipeTransferTabletBatchReqV2.java | 8 +-
.../request/PipeTransferTabletRawReq.java | 41 ++--
.../request/PipeTransferTabletRawReqV2.java | 26 ++-
.../sink/protocol/opcua/server/OpcUaNameSpace.java | 2 +-
.../pipe/sink/util/TabletStatementConverter.java | 69 ++++--
.../util/builder/PipeTableModelTsFileBuilder.java | 3 +-
.../builder/PipeTableModelTsFileBuilderV2.java | 3 +-
.../util/builder/PipeTreeModelTsFileBuilder.java | 3 +-
.../util/builder/PipeTreeModelTsFileBuilderV2.java | 3 +-
.../sink/util/sorter/PipeInsertEventSorter.java | 3 +-
.../planner/plan/node/write/InsertTabletNode.java | 27 ++-
.../node/write/RelationalInsertTabletNode.java | 9 +-
.../plan/statement/crud/InsertTabletStatement.java | 5 +-
.../rescon/quotas/DefaultOperationQuota.java | 28 ++-
.../org/apache/iotdb/db/utils/BitMapUtils.java | 47 ++++
.../pipe/event/PipeTabletInsertionEventTest.java | 57 +++--
.../pipe/event/TsFileInsertionEventParserTest.java | 152 +++++++++++++
.../event/common/tablet/PipeTabletUtilsTest.java | 52 +++++
.../pipe/sink/PipeDataNodeThriftRequestTest.java | 44 ++++
.../planner/node/write/WritePlanNodeSplitTest.java | 20 ++
.../rescon/quotas/DefaultOperationQuotaTest.java | 64 ++++++
34 files changed, 1060 insertions(+), 221 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
index 6d4d25b9542..b0897ed396a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
@@ -152,7 +152,10 @@ public class PipeRow implements Row {
@Override
public boolean isNull(final int columnIndex) {
- return bitMaps[columnIndex].isMarked(rowIndex);
+ return bitMaps != null
+ && columnIndex < bitMaps.length
+ && bitMaps[columnIndex] != null
+ && bitMaps[columnIndex].isMarked(rowIndex);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
index 97b21695c17..326d9c7d31e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletEventConverter;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
@@ -77,7 +78,6 @@ public class PipeRowCollector extends
PipeRawTabletEventConverter implements Row
Pair<Integer, Integer> rowCountAndMemorySize =
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(pipeRow);
tablet = new Tablet(deviceId, measurementSchemaList,
rowCountAndMemorySize.getLeft());
- tablet.initBitMaps();
isAligned = pipeRow.isAligned();
}
@@ -85,16 +85,16 @@ public class PipeRowCollector extends
PipeRawTabletEventConverter implements Row
tablet.addTimestamp(rowIndex, row.getTime());
for (int i = 0; i < row.size(); i++) {
final Object value = row.getObject(i);
- if (value instanceof Binary) {
- tablet.addValue(
- measurementSchemaArray[i].getMeasurementName(),
- rowIndex,
- PipeBinaryTransformer.transformToBinary((Binary) value));
- } else {
- tablet.addValue(measurementSchemaArray[i].getMeasurementName(),
rowIndex, value);
- }
+ PipeTabletUtils.putValue(
+ tablet,
+ rowIndex,
+ i,
+ measurementSchemaArray[i].getType(),
+ value instanceof Binary
+ ? PipeBinaryTransformer.transformToBinary((Binary) value)
+ : value);
if (row.isNull(i)) {
- tablet.getBitMaps()[i].mark(rowIndex);
+ PipeTabletUtils.markNullValue(tablet, rowIndex, i);
}
}
@@ -105,6 +105,7 @@ public class PipeRowCollector extends
PipeRawTabletEventConverter implements Row
private void collectTabletInsertionEvent() {
if (tablet != null) {
+ PipeTabletUtils.compactBitMaps(tablet);
// TODO: non-PipeInsertionEvent sourceEvent is not supported?
final PipeInsertionEvent pipeInsertionEvent =
sourceEvent instanceof PipeInsertionEvent ? ((PipeInsertionEvent)
sourceEvent) : null;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index 59a1a87b25f..6829f099b47 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -455,6 +455,7 @@ public class PipeRawTabletInsertionEvent extends
PipeInsertionEvent
public Tablet convertToTablet() {
if (!shouldParseTimeOrPattern()) {
+ PipeTabletUtils.compactBitMaps(tablet);
return tablet;
}
return initEventParser().convertToTablet();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java
new file mode 100644
index 00000000000..097d936176d
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tablet;
+
+import org.apache.iotdb.db.i18n.DataNodePipeMessages;
+import org.apache.iotdb.db.utils.BitMapUtils;
+
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public final class PipeTabletUtils {
+
+ private PipeTabletUtils() {}
+
+ public static final class TabletStringInternPool {
+
+ private final Map<String, String> internedStrings = new HashMap<>();
+
+ public String intern(final String value) {
+ if (Objects.isNull(value)) {
+ return null;
+ }
+
+ final String internedValue = internedStrings.get(value);
+ if (Objects.nonNull(internedValue)) {
+ return internedValue;
+ }
+
+ internedStrings.put(value, value);
+ return value;
+ }
+
+ public void intern(final String[] values) {
+ if (Objects.isNull(values)) {
+ return;
+ }
+
+ for (int i = 0; i < values.length; ++i) {
+ values[i] = intern(values[i]);
+ }
+ }
+
+ public void intern(final List<String> values) {
+ if (Objects.isNull(values)) {
+ return;
+ }
+
+ for (int i = 0; i < values.size(); ++i) {
+ values.set(i, intern(values.get(i)));
+ }
+ }
+
+ public Tablet intern(final Tablet tablet) {
+ if (Objects.isNull(tablet)) {
+ return null;
+ }
+
+ tablet.setInsertTargetName(intern(tablet.getDeviceId()));
+ internMeasurementSchemas(tablet.getSchemas());
+ return tablet;
+ }
+
+ public void internMeasurementSchemas(final List<IMeasurementSchema>
schemas) {
+ if (Objects.isNull(schemas)) {
+ return;
+ }
+
+ for (final IMeasurementSchema schema : schemas) {
+ if (schema instanceof MeasurementSchema) {
+ intern((MeasurementSchema) schema);
+ }
+ }
+ }
+
+ public MeasurementSchema intern(final MeasurementSchema schema) {
+ if (Objects.isNull(schema)) {
+ return null;
+ }
+
+ schema.setMeasurementName(intern(schema.getMeasurementName()));
+ schema.setProps(intern(schema.getProps()));
+ return schema;
+ }
+
+ private Map<String, String> intern(final Map<String, String> props) {
+ if (Objects.isNull(props) || props.isEmpty()) {
+ return props;
+ }
+
+ final Map<String, String> internedProps = new HashMap<>(props.size());
+ for (final Map.Entry<String, String> entry : props.entrySet()) {
+ internedProps.put(intern(entry.getKey()), intern(entry.getValue()));
+ }
+ return internedProps;
+ }
+ }
+
+ public static Tablet internTablet(
+ final Tablet tablet, final TabletStringInternPool
tabletStringInternPool) {
+ return Objects.nonNull(tabletStringInternPool) ?
tabletStringInternPool.intern(tablet) : tablet;
+ }
+
+ public static void compactBitMaps(final Tablet tablet) {
+ if (Objects.isNull(tablet)) {
+ return;
+ }
+ tablet.setBitMaps(compactBitMaps(tablet.getBitMaps(),
tablet.getRowSize()));
+ }
+
+ public static BitMap[] compactBitMaps(final BitMap[] bitMaps, final int
rowCount) {
+ return BitMapUtils.compactBitMaps(bitMaps, rowCount);
+ }
+
+ public static BitMap[] copyBitMapsOrCreateEmpty(final Tablet tablet) {
+ final BitMap[] bitMaps = tablet.getBitMaps();
+ return Objects.nonNull(bitMaps)
+ ? Arrays.copyOf(bitMaps, bitMaps.length)
+ : new BitMap[getColumnCount(tablet)];
+ }
+
+ public static void markNullValue(final Tablet tablet, final int rowIndex,
final int columnIndex) {
+ final BitMap[] bitMaps = ensureBitMaps(tablet, columnIndex + 1);
+ if (Objects.isNull(bitMaps[columnIndex])) {
+ bitMaps[columnIndex] = new BitMap(tablet.getMaxRowNumber());
+ }
+ bitMaps[columnIndex].mark(rowIndex);
+ }
+
+ public static void putTimestamp(final Tablet tablet, final int rowIndex,
final long timestamp) {
+ tablet.getTimestamps()[rowIndex] = timestamp;
+ tablet.setRowSize(Math.max(tablet.getRowSize(), rowIndex + 1));
+ }
+
+ public static void putValue(
+ final Tablet tablet,
+ final int rowIndex,
+ final int columnIndex,
+ final TSDataType dataType,
+ final Object value) {
+ switch (dataType) {
+ case BOOLEAN:
+ ((boolean[]) tablet.getValues()[columnIndex])[rowIndex] = (Boolean)
value;
+ break;
+ case INT32:
+ ((int[]) tablet.getValues()[columnIndex])[rowIndex] = (Integer) value;
+ break;
+ case DATE:
+ ((LocalDate[]) tablet.getValues()[columnIndex])[rowIndex] =
(LocalDate) value;
+ break;
+ case INT64:
+ case TIMESTAMP:
+ ((long[]) tablet.getValues()[columnIndex])[rowIndex] = (Long) value;
+ break;
+ case FLOAT:
+ ((float[]) tablet.getValues()[columnIndex])[rowIndex] = (Float) value;
+ break;
+ case DOUBLE:
+ ((double[]) tablet.getValues()[columnIndex])[rowIndex] = (Double)
value;
+ break;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ ((Binary[]) tablet.getValues()[columnIndex])[rowIndex] =
toBinary(value);
+ break;
+ default:
+ throw new
UnSupportedDataTypeException(DataNodePipeMessages.UNSUPPORTED + dataType);
+ }
+ unmarkNullValue(tablet, rowIndex, columnIndex);
+ }
+
+ private static void unmarkNullValue(
+ final Tablet tablet, final int rowIndex, final int columnIndex) {
+ final BitMap[] bitMaps = tablet.getBitMaps();
+ if (Objects.nonNull(bitMaps)
+ && columnIndex < bitMaps.length
+ && Objects.nonNull(bitMaps[columnIndex])) {
+ bitMaps[columnIndex].unmark(rowIndex);
+ }
+ }
+
+ private static BitMap[] ensureBitMaps(final Tablet tablet, final int
minColumnCount) {
+ final int columnCount = Math.max(getColumnCount(tablet), minColumnCount);
+ BitMap[] bitMaps = tablet.getBitMaps();
+ if (Objects.isNull(bitMaps)) {
+ bitMaps = new BitMap[columnCount];
+ tablet.setBitMaps(bitMaps);
+ } else if (bitMaps.length < columnCount) {
+ final BitMap[] expandedBitMaps = new BitMap[columnCount];
+ System.arraycopy(bitMaps, 0, expandedBitMaps, 0, bitMaps.length);
+ bitMaps = expandedBitMaps;
+ tablet.setBitMaps(bitMaps);
+ }
+ return bitMaps;
+ }
+
+ private static int getColumnCount(final Tablet tablet) {
+ if (Objects.nonNull(tablet.getSchemas())) {
+ return tablet.getSchemas().size();
+ }
+ return Objects.nonNull(tablet.getValues()) ? tablet.getValues().length : 0;
+ }
+
+ private static Binary toBinary(final Object value) {
+ if (Objects.isNull(value)) {
+ return Binary.EMPTY_VALUE;
+ }
+ if (value instanceof Binary) {
+ return (Binary) value;
+ }
+ if (value instanceof byte[]) {
+ return new Binary((byte[]) value);
+ }
+ if (value instanceof String) {
+ return new Binary(((String)
value).getBytes(TSFileConfig.STRING_CHARSET));
+ }
+ throw new IllegalArgumentException(
+ String.format("Expected Binary, byte[] or String, but was %s.",
value.getClass()));
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
index 15fca0ef0b7..69035041293 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventParser.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.pipe.api.access.Row;
@@ -169,18 +170,14 @@ public abstract class TabletInsertionEventParser {
} else {
this.valueColumns[filteredColumnIndex] =
filterValueColumnsByRowIndexList(
- originValueDataTypes[i],
- originValues[i],
- rowIndexList,
- true,
- bitMap, // use the output bitmap since there is no bitmap in
InsertRowNode
- bitMap);
+ originValueDataTypes[i], originValues[i], rowIndexList,
true, null, bitMap);
}
this.nullValueColumnBitmaps[filteredColumnIndex] = bitMap;
}
}
this.rowCount = this.timestampColumn.length;
+ this.nullValueColumnBitmaps =
PipeTabletUtils.compactBitMaps(nullValueColumnBitmaps, rowCount);
if (this.rowCount == 0 && LOGGER.isDebugEnabled()) {
LOGGER.debug(
DataNodePipeMessages.INSERTROWNODE_IS_PARSED_TO_ZERO_ROWS_ACCORDING,
@@ -202,7 +199,6 @@ public abstract class TabletInsertionEventParser {
this.isAligned = insertTabletNode.isAligned();
final long[] originTimestampColumn = insertTabletNode.getTimes();
- final int originRowSize = originTimestampColumn.length;
final List<Integer> rowIndexList =
generateRowIndexList(originTimestampColumn);
this.timestampColumn = rowIndexList.stream().mapToLong(i ->
originTimestampColumn[i]).toArray();
@@ -228,18 +224,7 @@ public abstract class TabletInsertionEventParser {
final TsTableColumnCategory[] originColumnCategories =
insertTabletNode.getColumnCategories();
final TSDataType[] originValueColumnDataTypes =
insertTabletNode.getDataTypes();
final Object[] originValueColumns = insertTabletNode.getColumns();
- final BitMap[] originBitMapList =
- (insertTabletNode.getBitMaps() == null
- ? IntStream.range(0, originColumnSize)
- .boxed()
- .map(o -> new BitMap(originRowSize))
- .toArray(BitMap[]::new)
- : insertTabletNode.getBitMaps());
- for (int i = 0; i < originBitMapList.length; i++) {
- if (originBitMapList[i] == null) {
- originBitMapList[i] = new BitMap(originRowSize);
- }
- }
+ final BitMap[] originBitMapList = insertTabletNode.getBitMaps();
for (int i = 0; i <
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
@@ -267,7 +252,7 @@ public abstract class TabletInsertionEventParser {
originValueColumns[i],
rowIndexList,
false,
- originBitMapList[i],
+ getBitMap(originBitMapList, i),
bitMap);
}
this.nullValueColumnBitmaps[filteredColumnIndex] = bitMap;
@@ -275,6 +260,7 @@ public abstract class TabletInsertionEventParser {
}
this.rowCount = this.timestampColumn.length;
+ this.nullValueColumnBitmaps =
PipeTabletUtils.compactBitMaps(nullValueColumnBitmaps, rowCount);
if (rowCount == 0 && LOGGER.isDebugEnabled()) {
LOGGER.debug(
DataNodePipeMessages.INSERTTABLETNODE_IS_PARSED_TO_ZERO_ROWS_ACCORDING,
@@ -337,18 +323,7 @@ public abstract class TabletInsertionEventParser {
}
final Object[] originValueColumns =
tablet.getValues(); // we do not reduce value columns here by origin
row size
- final BitMap[] originBitMapList =
- tablet.getBitMaps() == null
- ? IntStream.range(0, originColumnSize)
- .boxed()
- .map(o -> new BitMap(tablet.getMaxRowNumber()))
- .toArray(BitMap[]::new)
- : tablet.getBitMaps(); // We do not reduce bitmaps here by origin
row size
- for (int i = 0; i < originBitMapList.length; i++) {
- if (originBitMapList[i] == null) {
- originBitMapList[i] = new BitMap(tablet.getMaxRowNumber());
- }
- }
+ final BitMap[] originBitMapList = tablet.getBitMaps();
for (int i = 0; i <
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
@@ -374,7 +349,7 @@ public abstract class TabletInsertionEventParser {
originValueColumns[i],
rowIndexList,
false,
- originBitMapList[i],
+ getBitMap(originBitMapList, i),
bitMap);
}
this.nullValueColumnBitmaps[filteredColumnIndex] = bitMap;
@@ -382,6 +357,7 @@ public abstract class TabletInsertionEventParser {
}
this.rowCount = this.timestampColumn.length;
+ this.nullValueColumnBitmaps =
PipeTabletUtils.compactBitMaps(nullValueColumnBitmaps, rowCount);
if (this.rowCount == 0 && LOGGER.isDebugEnabled()) {
LOGGER.debug(
DataNodePipeMessages.TABLET_IS_PARSED_TO_ZERO_ROWS_ACCORDING,
@@ -443,7 +419,7 @@ public abstract class TabletInsertionEventParser {
: (int[]) originValueColumn;
final int[] valueColumns = new int[rowIndexList.size()];
for (int i = 0; i < rowIndexList.size(); ++i) {
- if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i)))
{
valueColumns[i] = 0;
nullValueColumnBitmap.mark(i);
} else {
@@ -465,7 +441,7 @@ public abstract class TabletInsertionEventParser {
: (LocalDate[]) originValueColumn;
for (int i = 0; i < rowIndexList.size(); ++i) {
- if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ if (isNullValue(originNullValueColumnBitmap,
rowIndexList.get(i))) {
valueColumns[i] = EMPTY_LOCALDATE;
nullValueColumnBitmap.mark(i);
} else {
@@ -479,7 +455,7 @@ public abstract class TabletInsertionEventParser {
? new int[] {(int) originValueColumn}
: (int[]) originValueColumn;
for (int i = 0; i < rowIndexList.size(); ++i) {
- if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ if (isNullValue(originNullValueColumnBitmap,
rowIndexList.get(i))) {
valueColumns[i] = EMPTY_LOCALDATE;
nullValueColumnBitmap.mark(i);
} else {
@@ -499,7 +475,7 @@ public abstract class TabletInsertionEventParser {
: (long[]) originValueColumn;
final long[] valueColumns = new long[rowIndexList.size()];
for (int i = 0; i < rowIndexList.size(); ++i) {
- if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i)))
{
valueColumns[i] = 0L;
nullValueColumnBitmap.mark(i);
} else {
@@ -516,7 +492,7 @@ public abstract class TabletInsertionEventParser {
: (float[]) originValueColumn;
final float[] valueColumns = new float[rowIndexList.size()];
for (int i = 0; i < rowIndexList.size(); ++i) {
- if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i)))
{
valueColumns[i] = 0F;
nullValueColumnBitmap.mark(i);
} else {
@@ -533,7 +509,7 @@ public abstract class TabletInsertionEventParser {
: (double[]) originValueColumn;
final double[] valueColumns = new double[rowIndexList.size()];
for (int i = 0; i < rowIndexList.size(); ++i) {
- if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i)))
{
valueColumns[i] = 0D;
nullValueColumnBitmap.mark(i);
} else {
@@ -550,7 +526,7 @@ public abstract class TabletInsertionEventParser {
: (boolean[]) originValueColumn;
final boolean[] valueColumns = new boolean[rowIndexList.size()];
for (int i = 0; i < rowIndexList.size(); ++i) {
- if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i)))
{
valueColumns[i] = false;
nullValueColumnBitmap.mark(i);
} else {
@@ -571,7 +547,7 @@ public abstract class TabletInsertionEventParser {
for (int i = 0; i < rowIndexList.size(); ++i) {
if (Objects.isNull(binaryValueColumns[rowIndexList.get(i)])
||
Objects.isNull(binaryValueColumns[rowIndexList.get(i)].getValues())
- || originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ || isNullValue(originNullValueColumnBitmap,
rowIndexList.get(i))) {
valueColumns[i] = Binary.EMPTY_VALUE;
nullValueColumnBitmap.mark(i);
} else {
@@ -631,6 +607,14 @@ public abstract class TabletInsertionEventParser {
}
}
+ private static BitMap getBitMap(final BitMap[] bitMaps, final int index) {
+ return Objects.nonNull(bitMaps) && index < bitMaps.length ? bitMaps[index]
: null;
+ }
+
+ private static boolean isNullValue(final BitMap bitMap, final int rowIndex) {
+ return Objects.nonNull(bitMap) && bitMap.isMarked(rowIndex);
+ }
+
//////////////////////////// process ////////////////////////////
public abstract List<TabletInsertionEvent> processRowByRow(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTablePatternParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTablePatternParser.java
index f234045007a..4b2a34964e8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTablePatternParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTablePatternParser.java
@@ -24,6 +24,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
@@ -141,7 +142,10 @@ public class TabletInsertionEventTablePatternParser
extends TabletInsertionEvent
Arrays.asList(valueColumnTypes),
timestampColumn,
valueColumns,
- nullValueColumnBitmaps,
+ nullValueColumnBitmaps == null
+ ? null
+ : PipeTabletUtils.compactBitMaps(
+ Arrays.copyOf(nullValueColumnBitmaps,
nullValueColumnBitmaps.length), rowCount),
rowCount);
tablet = newTablet;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java
index 4e2cc3102e2..9655175759e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/parser/TabletInsertionEventTreePatternParser.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
import org.apache.iotdb.db.pipe.event.common.row.PipeRowCollector;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletCollector;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
@@ -202,7 +203,10 @@ public class TabletInsertionEventTreePatternParser extends
TabletInsertionEventP
Arrays.asList(measurementSchemaList),
timestampColumn,
valueColumns,
- nullValueColumnBitmaps,
+ nullValueColumnBitmaps == null
+ ? null
+ : PipeTabletUtils.compactBitMaps(
+ Arrays.copyOf(nullValueColumnBitmaps,
nullValueColumnBitmaps.length), rowCount),
rowCount);
return tablet;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
index 9069a99cbd7..2656ec7d72d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
@@ -33,6 +33,7 @@ import org.apache.iotdb.db.auth.AuthorityChecker;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -79,6 +80,7 @@ public class TsFileInsertionEventQueryParser extends
TsFileInsertionEventParser
private final Iterator<Map.Entry<IDeviceID, List<String>>>
deviceMeasurementsMapIterator;
private final Map<IDeviceID, Boolean> deviceIsAlignedMap;
private final Map<String, TSDataType> measurementDataTypeMap;
+ private final TabletStringInternPool tabletStringInternPool = new
TabletStringInternPool();
@TestOnly
public TsFileInsertionEventQueryParser(
@@ -418,7 +420,8 @@ public class TsFileInsertionEventQueryParser extends
TsFileInsertionEventParser
entry.getValue(),
timeFilterExpression,
allocatedMemoryBlockForTablet,
- currentModifications);
+ currentModifications,
+ tabletStringInternPool);
} catch (final Exception e) {
close();
throw new PipeException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java
index 20ba62496ca..5c7c06089fe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParserTabletIterator.java
@@ -21,6 +21,8 @@ package
org.apache.iotdb.db.pipe.event.common.tsfile.parser.query;
import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
@@ -39,6 +41,7 @@ import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.read.expression.IExpression;
import org.apache.tsfile.read.expression.QueryExpression;
import org.apache.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
@@ -59,7 +62,9 @@ public class TsFileInsertionEventQueryParserTabletIterator
implements Iterator<T
private final Map<String, TSDataType> measurementDataTypeMap;
private final IDeviceID deviceId;
+ private final String deviceIdString;
private final List<String> measurements;
+ private final List<IMeasurementSchema> schemas;
private final IExpression timeFilterExpression;
@@ -79,20 +84,29 @@ public class TsFileInsertionEventQueryParserTabletIterator
implements Iterator<T
final List<String> measurements,
final IExpression timeFilterExpression,
final PipeMemoryBlock allocatedBlockForTablet,
- final PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
currentModifications)
+ final PatternTreeMap<ModEntry, PatternTreeMapFactory.ModsSerializer>
currentModifications,
+ final TabletStringInternPool tabletStringInternPool)
throws IOException {
this.tsFileReader = tsFileReader;
this.measurementDataTypeMap = measurementDataTypeMap;
this.deviceId = deviceId;
+ this.deviceIdString = tabletStringInternPool.intern(deviceId.toString());
this.measurements =
measurements.stream()
.filter(
measurement ->
// time column in aligned time-series should not be a
query column
measurement != null && !measurement.isEmpty())
+ .map(tabletStringInternPool::intern)
.sorted()
.collect(Collectors.toList());
+ this.schemas = new ArrayList<>();
+ for (final String measurement : this.measurements) {
+ final TSDataType dataType =
+ measurementDataTypeMap.get(deviceIdString +
TsFileConstant.PATH_SEPARATOR + measurement);
+ schemas.add(new MeasurementSchema(measurement, dataType));
+ }
this.timeFilterExpression = timeFilterExpression;
@@ -136,20 +150,12 @@ public class
TsFileInsertionEventQueryParserTabletIterator implements Iterator<T
}
private Tablet buildNextTablet() throws IOException {
- final List<IMeasurementSchema> schemas = new ArrayList<>();
- for (final String measurement : measurements) {
- final TSDataType dataType =
- measurementDataTypeMap.get(deviceId + TsFileConstant.PATH_SEPARATOR
+ measurement);
- schemas.add(new MeasurementSchema(measurement, dataType));
- }
-
Tablet tablet = null;
if (!queryDataSet.hasNext()) {
tablet =
new Tablet(
// Used for tree model
- deviceId.toString(), schemas, 1);
- tablet.initBitMaps();
+ deviceIdString, schemas, 1);
return tablet;
}
@@ -164,8 +170,7 @@ public class TsFileInsertionEventQueryParserTabletIterator
implements Iterator<T
tablet =
new Tablet(
// Used for tree model
- deviceId.toString(), schemas, rowCountAndMemorySize.getLeft());
- tablet.initBitMaps();
+ deviceIdString, schemas, rowCountAndMemorySize.getLeft());
if (allocatedBlockForTablet.getMemoryUsageInBytes() <
rowCountAndMemorySize.getRight()) {
PipeDataNodeResourceManager.memory()
.forceResize(allocatedBlockForTablet,
rowCountAndMemorySize.getRight());
@@ -182,17 +187,22 @@ public class
TsFileInsertionEventQueryParserTabletIterator implements Iterator<T
for (int i = 0; i < fieldSize; i++) {
final Field field = fields.get(i);
final String measurement = measurements.get(i);
+ final TSDataType dataType = schemas.get(i).getType();
// Check if this value is deleted by mods
if (field == null
|| ModsOperationUtil.isDelete(rowRecord.getTimestamp(),
measurementModsList.get(i))) {
- tablet.getBitMaps()[i].mark(rowIndex);
+ if (dataType.isBinary()) {
+ PipeTabletUtils.putValue(tablet, rowIndex, i, dataType,
Binary.EMPTY_VALUE);
+ }
+ PipeTabletUtils.markNullValue(tablet, rowIndex, i);
} else {
- tablet.addValue(measurement, rowIndex,
field.getObjectValue(schemas.get(i).getType()));
+ PipeTabletUtils.putValue(
+ tablet, rowIndex, i, dataType,
field.getObjectValue(schemas.get(i).getType()));
isNeedFillTime = true;
}
}
if (isNeedFillTime) {
- tablet.addTimestamp(rowIndex, rowRecord.getTimestamp());
+ PipeTabletUtils.putTimestamp(tablet, rowIndex,
rowRecord.getTimestamp());
}
if (tablet.getRowSize() == tablet.getMaxRowNumber()) {
@@ -200,6 +210,7 @@ public class TsFileInsertionEventQueryParserTabletIterator
implements Iterator<T
}
}
+ PipeTabletUtils.compactBitMaps(tablet);
return tablet;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index b1f3a5a4c14..b18bf6255ab 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -33,6 +33,8 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.TsFileInsertionEventParser;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -92,8 +94,10 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
private boolean currentIsMultiPage;
private IDeviceID currentDevice;
+ private String currentDeviceString;
private boolean currentIsAligned;
private final List<IMeasurementSchema> currentMeasurements = new
ArrayList<>();
+ private final TabletStringInternPool tabletStringInternPool = new
TabletStringInternPool();
private final List<ModsOperationUtil.ModsInfo> modsInfos = new ArrayList<>();
// Cached time chunk
private final List<Chunk> timeChunkList = new ArrayList<>();
@@ -304,8 +308,7 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
Tablet tablet = null;
if (!data.hasCurrent()) {
- tablet = new Tablet(currentDevice.toString(), currentMeasurements, 1);
- tablet.initBitMaps();
+ tablet = new Tablet(currentDeviceString, currentMeasurements, 1);
return tablet;
}
@@ -319,8 +322,7 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(data);
tablet =
new Tablet(
- currentDevice.toString(), currentMeasurements,
rowCountAndMemorySize.getLeft());
- tablet.initBitMaps();
+ currentDeviceString, currentMeasurements,
rowCountAndMemorySize.getLeft());
if (allocatedMemoryBlockForTablet.getMemoryUsageInBytes()
< rowCountAndMemorySize.getRight()) {
PipeDataNodeResourceManager.memory()
@@ -332,7 +334,7 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
final int rowIndex = tablet.getRowSize();
if (putValueToColumns(data, tablet, rowIndex)) {
- tablet.addTimestamp(rowIndex, data.currentTime());
+ PipeTabletUtils.putTimestamp(tablet, rowIndex, data.currentTime());
}
}
@@ -347,14 +349,14 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
}
if (tablet == null) {
- tablet = new Tablet(currentDevice.toString(), currentMeasurements, 1);
- tablet.initBitMaps();
+ tablet = new Tablet(currentDeviceString, currentMeasurements, 1);
}
// Switch chunk reader iff current chunk is all consumed
if (!data.hasCurrent()) {
prepareData();
}
+ PipeTabletUtils.compactBitMaps(tablet);
return tablet;
} catch (final Exception e) {
close();
@@ -412,37 +414,68 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
case TEXT:
case BLOB:
case STRING:
- tablet.addValue(rowIndex, i, Binary.EMPTY_VALUE.getValues());
+ PipeTabletUtils.putValue(
+ tablet, rowIndex, i, tablet.getSchemas().get(i).getType(),
Binary.EMPTY_VALUE);
}
- tablet.getBitMaps()[i].mark(rowIndex);
+ PipeTabletUtils.markNullValue(tablet, rowIndex, i);
continue;
}
isNeedFillTime = true;
switch (tablet.getSchemas().get(i).getType()) {
case BOOLEAN:
- tablet.addValue(rowIndex, i, primitiveType.getBoolean());
+ PipeTabletUtils.putValue(
+ tablet,
+ rowIndex,
+ i,
+ tablet.getSchemas().get(i).getType(),
+ primitiveType.getBoolean());
break;
case INT32:
- tablet.addValue(rowIndex, i, primitiveType.getInt());
+ PipeTabletUtils.putValue(
+ tablet, rowIndex, i, tablet.getSchemas().get(i).getType(),
primitiveType.getInt());
break;
case DATE:
- tablet.addValue(rowIndex, i,
DateUtils.parseIntToLocalDate(primitiveType.getInt()));
+ PipeTabletUtils.putValue(
+ tablet,
+ rowIndex,
+ i,
+ tablet.getSchemas().get(i).getType(),
+ DateUtils.parseIntToLocalDate(primitiveType.getInt()));
break;
case INT64:
case TIMESTAMP:
- tablet.addValue(rowIndex, i, primitiveType.getLong());
+ PipeTabletUtils.putValue(
+ tablet, rowIndex, i, tablet.getSchemas().get(i).getType(),
primitiveType.getLong());
break;
case FLOAT:
- tablet.addValue(rowIndex, i, primitiveType.getFloat());
+ PipeTabletUtils.putValue(
+ tablet,
+ rowIndex,
+ i,
+ tablet.getSchemas().get(i).getType(),
+ primitiveType.getFloat());
break;
case DOUBLE:
- tablet.addValue(rowIndex, i, primitiveType.getDouble());
+ PipeTabletUtils.putValue(
+ tablet,
+ rowIndex,
+ i,
+ tablet.getSchemas().get(i).getType(),
+ primitiveType.getDouble());
break;
case TEXT:
case BLOB:
case STRING:
- tablet.addValue(rowIndex, i,
primitiveType.getBinary().getValues());
+ final Binary binary = primitiveType.getBinary();
+ PipeTabletUtils.putValue(
+ tablet,
+ rowIndex,
+ i,
+ tablet.getSchemas().get(i).getType(),
+ Objects.isNull(binary) || Objects.isNull(binary.getValues())
+ ? Binary.EMPTY_VALUE
+ : binary);
break;
default:
throw new UnSupportedDataTypeException(
@@ -458,28 +491,46 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
isNeedFillTime = true;
switch (tablet.getSchemas().get(0).getType()) {
case BOOLEAN:
- tablet.addValue(rowIndex, 0, data.getBoolean());
+ PipeTabletUtils.putValue(
+ tablet, rowIndex, 0, tablet.getSchemas().get(0).getType(),
data.getBoolean());
break;
case INT32:
- tablet.addValue(rowIndex, 0, data.getInt());
+ PipeTabletUtils.putValue(
+ tablet, rowIndex, 0, tablet.getSchemas().get(0).getType(),
data.getInt());
break;
case DATE:
- tablet.addValue(rowIndex, 0,
DateUtils.parseIntToLocalDate(data.getInt()));
+ PipeTabletUtils.putValue(
+ tablet,
+ rowIndex,
+ 0,
+ tablet.getSchemas().get(0).getType(),
+ DateUtils.parseIntToLocalDate(data.getInt()));
break;
case INT64:
case TIMESTAMP:
- tablet.addValue(rowIndex, 0, data.getLong());
+ PipeTabletUtils.putValue(
+ tablet, rowIndex, 0, tablet.getSchemas().get(0).getType(),
data.getLong());
break;
case FLOAT:
- tablet.addValue(rowIndex, 0, data.getFloat());
+ PipeTabletUtils.putValue(
+ tablet, rowIndex, 0, tablet.getSchemas().get(0).getType(),
data.getFloat());
break;
case DOUBLE:
- tablet.addValue(rowIndex, 0, data.getDouble());
+ PipeTabletUtils.putValue(
+ tablet, rowIndex, 0, tablet.getSchemas().get(0).getType(),
data.getDouble());
break;
case TEXT:
case BLOB:
case STRING:
- tablet.addValue(rowIndex, 0, data.getBinary().getValues());
+ final Binary binary = data.getBinary();
+ PipeTabletUtils.putValue(
+ tablet,
+ rowIndex,
+ 0,
+ tablet.getSchemas().get(0).getType(),
+ Objects.isNull(binary) || Objects.isNull(binary.getValues())
+ ? Binary.EMPTY_VALUE
+ : binary);
break;
default:
throw new UnSupportedDataTypeException(
@@ -542,17 +593,17 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
? new ChunkReader(chunk, filter)
: new SinglePageWholeChunkReader(chunk);
currentIsAligned = false;
+ final String measurementID =
+ tabletStringInternPool.intern(chunkHeader.getMeasurementID());
currentMeasurements.add(
new MeasurementSchema(
- chunkHeader.getMeasurementID(),
+ measurementID,
chunkHeader.getDataType(),
chunkHeader.getEncodingType(),
chunkHeader.getCompressionType()));
modsInfos.addAll(
ModsOperationUtil.initializeMeasurementMods(
- currentDevice,
- Collections.singletonList(chunkHeader.getMeasurementID()),
- currentModifications));
+ currentDevice, Collections.singletonList(measurementID),
currentModifications));
return;
}
case MetaMarker.VALUE_CHUNK_HEADER:
@@ -569,9 +620,11 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
}
// Increase value index
+ final String measurementID =
+
tabletStringInternPool.intern(chunkHeader.getMeasurementID());
final int valueIndex =
measurementIndexMap.compute(
- chunkHeader.getMeasurementID(),
+ measurementID,
(measurement, index) -> Objects.nonNull(index) ? index +
1 : 0);
// Emit when encountered non-sequential value chunk, or the
chunk size exceeds
@@ -631,17 +684,17 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
valueChunkSize += chunkHeader.getDataSize();
valueChunkPageMemorySize += currentValueChunkPageMemorySize;
valueChunkList.add(chunk);
+ final String measurementID =
+ tabletStringInternPool.intern(chunkHeader.getMeasurementID());
currentMeasurements.add(
new MeasurementSchema(
- chunkHeader.getMeasurementID(),
+ measurementID,
chunkHeader.getDataType(),
chunkHeader.getEncodingType(),
chunkHeader.getCompressionType()));
modsInfos.addAll(
ModsOperationUtil.initializeMeasurementMods(
- currentDevice,
- Collections.singletonList(chunkHeader.getMeasurementID()),
- currentModifications));
+ currentDevice, Collections.singletonList(measurementID),
currentModifications));
break;
}
case MetaMarker.CHUNK_GROUP_HEADER:
@@ -658,6 +711,10 @@ public class TsFileInsertionEventScanParser extends
TsFileInsertionEventParser {
measurementIndexMap.clear();
final IDeviceID deviceID =
tsFileSequenceReader.readChunkGroupHeader().getDeviceID();
currentDevice = treePattern.mayOverlapWithDevice(deviceID) ?
deviceID : null;
+ currentDeviceString =
+ Objects.nonNull(currentDevice)
+ ? tabletStringInternPool.intern(currentDevice.toString())
+ : null;
break;
}
case MetaMarker.OPERATION_INDEX_RANGE:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
index e3caecd144d..5b50eb166be 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/table/TsFileInsertionEventTableParserTabletIterator.java
@@ -21,6 +21,8 @@ package
org.apache.iotdb.db.pipe.event.common.tsfile.parser.table;
import org.apache.iotdb.commons.path.PatternTreeMap;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
@@ -52,6 +54,7 @@ import org.apache.tsfile.utils.TsPrimitiveType;
import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
import java.io.IOException;
import java.util.ArrayList;
@@ -72,6 +75,7 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
private final IMetadataQuerier metadataQuerier;
private final TsFileMetadata fileMetadata;
private final Iterator<Map.Entry<String, TableSchema>>
filteredTableSchemaIterator;
+ private final TabletStringInternPool tabletStringInternPool = new
TabletStringInternPool();
// For memory control
private final PipeMemoryBlock allocatedMemoryBlockForTablet;
@@ -237,7 +241,7 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
case INIT_DEVICE_META:
if (filteredTableSchemaIterator != null &&
filteredTableSchemaIterator.hasNext()) {
final Map.Entry<String, TableSchema> entry =
filteredTableSchemaIterator.next();
- tableName = entry.getKey();
+ tableName = tabletStringInternPool.intern(entry.getKey());
final TableSchema tableSchema = entry.getValue();
// The table name has changed, set to false
isSameTableName = false;
@@ -257,7 +261,7 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
if (schema != null
&& schema.getMeasurementName() != null
&& !schema.getMeasurementName().isEmpty()) {
- final String measurementName = schema.getMeasurementName();
+ final String measurementName = internMeasurementName(schema);
if (ColumnCategory.TAG.equals(columnCategory)) {
columnTypes.add(ColumnCategory.TAG);
measurementList.add(measurementName);
@@ -321,7 +325,6 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
new ArrayList<>(dataTypeList),
new ArrayList<>(columnTypes),
rowCountAndMemorySize.getLeft());
- tablet.initBitMaps();
isFirstRow = false;
}
final int rowIndex = tablet.getRowSize();
@@ -331,7 +334,7 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
if (fillMeasurementValueColumns(batchData, tablet, rowIndex)) {
fillDeviceIdColumns(deviceID, tablet, rowIndex);
- tablet.addTimestamp(rowIndex, batchData.currentTime());
+ PipeTabletUtils.putTimestamp(tablet, rowIndex,
batchData.currentTime());
}
}
@@ -342,9 +345,9 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
if (isFirstRow) {
tablet = new Tablet(tableName, measurementList, dataTypeList,
columnTypes, 0);
- tablet.initBitMaps();
}
+ PipeTabletUtils.compactBitMaps(tablet);
return tablet;
}
@@ -396,14 +399,15 @@ public class
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
boolean hasSelectedNonNullChunk = false;
for (; offset < fieldSchemaList.size(); ++offset) {
final IMeasurementSchema schema = fieldSchemaList.get(offset);
+ final String measurementName = internMeasurementName(schema);
if (isFieldDeletedByMods(
- schema.getMeasurementName(),
+ measurementName,
alignedChunkMetadata.getStartTime(),
alignedChunkMetadata.getEndTime())) {
continue;
}
- final IChunkMetadata metadata =
valueChunkMetadataMap.get(schema.getMeasurementName());
+ final IChunkMetadata metadata =
valueChunkMetadataMap.get(measurementName);
Chunk chunk = null;
if (metadata != null) {
chunk = reader.readMemChunk((ChunkMetadata) metadata);
@@ -422,7 +426,7 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
hasSelectedNonNullChunk = true;
}
columnTypes.add(ColumnCategory.FIELD);
- measurementList.add(schema.getMeasurementName());
+ measurementList.add(measurementName);
dataTypeList.add(schema.getType());
valueChunkList.add(chunk);
hasSelectedField = true;
@@ -452,7 +456,7 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
for (final IMeasurementSchema schema : fieldSchemaList) {
if (!ModsOperationUtil.isAllDeletedByMods(
currentDeviceID,
- schema.getMeasurementName(),
+ internMeasurementName(schema),
alignedChunkMetadata.getStartTime(),
alignedChunkMetadata.getEndTime(),
modifications)) {
@@ -469,6 +473,13 @@ public class TsFileInsertionEventTableParserTabletIterator
implements Iterator<T
deviceID, measurementID, startTime, endTime, modifications);
}
+ private String internMeasurementName(final IMeasurementSchema schema) {
+ if (schema instanceof MeasurementSchema) {
+ tabletStringInternPool.intern((MeasurementSchema) schema);
+ }
+ return tabletStringInternPool.intern(schema.getMeasurementName());
+ }
+
private boolean fillMeasurementValueColumns(
final BatchData data, final Tablet tablet, final int rowIndex) {
final TsPrimitiveType[] primitiveTypes =
@@ -488,41 +499,55 @@ public class
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
case TEXT:
case BLOB:
case STRING:
- tablet.addValue(rowIndex, i, Binary.EMPTY_VALUE.getValues());
+ PipeTabletUtils.putValue(tablet, rowIndex, i, dataTypeList.get(i),
Binary.EMPTY_VALUE);
}
- tablet.getBitMaps()[i].mark(rowIndex);
+ PipeTabletUtils.markNullValue(tablet, rowIndex, i);
continue;
}
needFillTime = true;
switch (dataTypeList.get(i)) {
case BOOLEAN:
- tablet.addValue(rowIndex, i, primitiveType.getBoolean());
+ PipeTabletUtils.putValue(
+ tablet, rowIndex, i, dataTypeList.get(i),
primitiveType.getBoolean());
break;
case INT32:
- tablet.addValue(rowIndex, i, primitiveType.getInt());
+ PipeTabletUtils.putValue(
+ tablet, rowIndex, i, dataTypeList.get(i),
primitiveType.getInt());
break;
case DATE:
- tablet.addValue(rowIndex, i,
DateUtils.parseIntToLocalDate(primitiveType.getInt()));
+ PipeTabletUtils.putValue(
+ tablet,
+ rowIndex,
+ i,
+ dataTypeList.get(i),
+ DateUtils.parseIntToLocalDate(primitiveType.getInt()));
break;
case INT64:
case TIMESTAMP:
- tablet.addValue(rowIndex, i, primitiveType.getLong());
+ PipeTabletUtils.putValue(
+ tablet, rowIndex, i, dataTypeList.get(i),
primitiveType.getLong());
break;
case FLOAT:
- tablet.addValue(rowIndex, i, primitiveType.getFloat());
+ PipeTabletUtils.putValue(
+ tablet, rowIndex, i, dataTypeList.get(i),
primitiveType.getFloat());
break;
case DOUBLE:
- tablet.addValue(rowIndex, i, primitiveType.getDouble());
+ PipeTabletUtils.putValue(
+ tablet, rowIndex, i, dataTypeList.get(i),
primitiveType.getDouble());
break;
case TEXT:
case BLOB:
case STRING:
Binary binary = primitiveType.getBinary();
- tablet.addValue(
+ PipeTabletUtils.putValue(
+ tablet,
rowIndex,
i,
- binary.getValues() == null ? Binary.EMPTY_VALUE.getValues() :
binary.getValues());
+ dataTypeList.get(i),
+ Objects.isNull(binary) || Objects.isNull(binary.getValues())
+ ? Binary.EMPTY_VALUE
+ : binary);
break;
default:
throw new UnSupportedDataTypeException(
@@ -538,16 +563,19 @@ public class
TsFileInsertionEventTableParserTabletIterator implements Iterator<T
int i = 1;
for (int totalColumns = deviceIdSegments.length; i < totalColumns; i++) {
if (deviceIdSegments[i] == null) {
- tablet.addValue(rowIndex, i - 1, Binary.EMPTY_VALUE.getValues());
- tablet.getBitMaps()[i - 1].mark(rowIndex);
+ PipeTabletUtils.putValue(
+ tablet, rowIndex, i - 1, dataTypeList.get(i - 1),
Binary.EMPTY_VALUE);
+ PipeTabletUtils.markNullValue(tablet, rowIndex, i - 1);
continue;
}
- tablet.addValue(rowIndex, i - 1, deviceIdSegments[i]);
+ PipeTabletUtils.putValue(
+ tablet, rowIndex, i - 1, dataTypeList.get(i - 1),
deviceIdSegments[i]);
}
while (i <= deviceIdSize) {
- tablet.addValue(rowIndex, i - 1, Binary.EMPTY_VALUE.getValues());
- tablet.getBitMaps()[i - 1].mark(rowIndex);
+ PipeTabletUtils.putValue(
+ tablet, rowIndex, i - 1, dataTypeList.get(i - 1),
Binary.EMPTY_VALUE);
+ PipeTabletUtils.markNullValue(tablet, rowIndex, i - 1);
i++;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index 7693c8ff512..04eff0067e5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -191,14 +191,14 @@ public class PipeMemoryWeightUtil {
return new Pair<>(1, 0);
}
- // Calculate row number according to the max size of a pipe tablet.
- // "-100" is the estimated size of other data structures in a pipe tablet.
+ // Calculate row number according to the max size of a pipe tablet. "100"
is the estimated size
+ // of other data structures in a pipe tablet.
// "*8" converts bytes to bits, because the bitmap size is 1 bit per
schema.
- // Here we estimate the max use of
int sizeLimit =
- Math.min(
-
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
- (int) (inputNum * rowBytesUsed * 1.2));
+ (int)
+ Math.min(
+
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
+ Math.min(Integer.MAX_VALUE, 100 + inputNum * (double)
rowBytesUsed * 1.2));
int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount);
rowNumber = Math.max(1, rowNumber);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
index 8d75e9864bb..ede3370f5b0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
import org.apache.iotdb.commons.utils.TestOnly;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
@@ -34,7 +35,6 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
-import org.apache.tsfile.write.record.Tablet;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -130,6 +130,7 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
public static PipeTransferTabletBatchReq fromTPipeTransferReq(
final TPipeTransferReq transferReq) {
final PipeTransferTabletBatchReq batchReq = new
PipeTransferTabletBatchReq();
+ final TabletStringInternPool tabletStringInternPool = new
TabletStringInternPool();
// Binary size, for rolling upgrade
ReadWriteIOUtils.readInt(transferReq.body);
@@ -143,8 +144,7 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
size = ReadWriteIOUtils.readInt(transferReq.body);
for (int i = 0; i < size; ++i) {
batchReq.tabletReqs.add(
- PipeTransferTabletRawReq.toTPipeTransferRawReq(
- Tablet.deserialize(transferReq.body),
ReadWriteIOUtils.readBool(transferReq.body)));
+ PipeTransferTabletRawReq.toTPipeTransferRawReq(transferReq.body,
tabletStringInternPool));
}
batchReq.version = transferReq.version;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
index 80550b6350f..e7278158876 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReqV2.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
import org.apache.iotdb.commons.utils.TestOnly;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
@@ -211,6 +212,7 @@ public class PipeTransferTabletBatchReqV2 extends
TPipeTransferReq {
public static PipeTransferTabletBatchReqV2 fromTPipeTransferReq(
final org.apache.iotdb.service.rpc.thrift.TPipeTransferReq transferReq) {
final PipeTransferTabletBatchReqV2 batchReq = new
PipeTransferTabletBatchReqV2();
+ final TabletStringInternPool tabletStringInternPool = new
TabletStringInternPool();
// Binary req, for rolling upgrade
ReadWriteIOUtils.readInt(transferReq.body);
@@ -220,12 +222,14 @@ public class PipeTransferTabletBatchReqV2 extends
TPipeTransferReq {
batchReq.insertNodeReqs.add(
PipeTransferTabletInsertNodeReqV2.toTabletInsertNodeReq(
(InsertNode) PlanFragment.deserializeHelper(transferReq.body,
null),
- ReadWriteIOUtils.readString(transferReq.body)));
+
tabletStringInternPool.intern(ReadWriteIOUtils.readString(transferReq.body))));
}
size = ReadWriteIOUtils.readInt(transferReq.body);
for (int i = 0; i < size; ++i) {
-
batchReq.tabletReqs.add(PipeTransferTabletRawReqV2.toTPipeTransferRawReq(transferReq.body));
+ batchReq.tabletReqs.add(
+ PipeTransferTabletRawReqV2.toTPipeTransferRawReq(
+ transferReq.body, tabletStringInternPool));
}
batchReq.version = transferReq.version;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
index 1504b3eadb9..98ea83b6d76 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -23,6 +23,8 @@ import org.apache.iotdb.commons.exception.MetadataException;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool;
import org.apache.iotdb.db.pipe.sink.util.TabletStatementConverter;
import
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
@@ -112,6 +114,27 @@ public class PipeTransferTabletRawReq extends
TPipeTransferReq {
return tabletReq;
}
+ public static PipeTransferTabletRawReq toTPipeTransferRawReq(
+ final ByteBuffer buffer, final TabletStringInternPool
tabletStringInternPool) {
+ final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq();
+
+ final int startPosition = buffer.position();
+ try {
+ final InsertTabletStatement insertTabletStatement =
+ TabletStatementConverter.deserializeStatementFromTabletFormat(
+ buffer, false, tabletStringInternPool);
+ tabletReq.isAligned = insertTabletStatement.isAligned();
+ tabletReq.statement = insertTabletStatement;
+ } catch (final Exception e) {
+ buffer.position(startPosition);
+ tabletReq.tablet =
+ PipeTabletUtils.internTablet(Tablet.deserialize(buffer),
tabletStringInternPool);
+ tabletReq.isAligned = ReadWriteIOUtils.readBool(buffer);
+ }
+
+ return tabletReq;
+ }
+
/////////////////////////////// Thrift ///////////////////////////////
public static PipeTransferTabletRawReq toTPipeTransferReq(
@@ -135,22 +158,8 @@ public class PipeTransferTabletRawReq extends
TPipeTransferReq {
}
public static PipeTransferTabletRawReq fromTPipeTransferReq(final
TPipeTransferReq transferReq) {
- final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq();
-
- final ByteBuffer buffer = transferReq.body;
- final int startPosition = buffer.position();
- try {
- // V1: no databaseName, readDatabaseName = false
- final InsertTabletStatement insertTabletStatement =
-
TabletStatementConverter.deserializeStatementFromTabletFormat(buffer, false);
- tabletReq.isAligned = insertTabletStatement.isAligned();
- // devicePath is already set in deserializeStatementFromTabletFormat for
V1 format
- tabletReq.statement = insertTabletStatement;
- } catch (final Exception e) {
- buffer.position(startPosition);
- tabletReq.tablet = Tablet.deserialize(buffer);
- tabletReq.isAligned = ReadWriteIOUtils.readBool(buffer);
- }
+ final PipeTransferTabletRawReq tabletReq =
+ toTPipeTransferRawReq(transferReq.body, new TabletStringInternPool());
tabletReq.version = transferReq.version;
tabletReq.type = transferReq.type;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
index 2458e5e243f..d395bf6cf5f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReqV2.java
@@ -24,6 +24,8 @@ import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkReques
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool;
import org.apache.iotdb.db.pipe.sink.util.TabletStatementConverter;
import
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTableModelTabletEventSorter;
import
org.apache.iotdb.db.pipe.sink.util.sorter.PipeTreeModelTabletEventSorter;
@@ -116,9 +118,14 @@ public class PipeTransferTabletRawReqV2 extends
PipeTransferTabletRawReq {
}
public static PipeTransferTabletRawReqV2 toTPipeTransferRawReq(final
ByteBuffer buffer) {
+ return toTPipeTransferRawReq(buffer, new TabletStringInternPool());
+ }
+
+ public static PipeTransferTabletRawReqV2 toTPipeTransferRawReq(
+ final ByteBuffer buffer, final TabletStringInternPool
tabletStringInternPool) {
final PipeTransferTabletRawReqV2 tabletReq = new
PipeTransferTabletRawReqV2();
- tabletReq.deserializeTPipeTransferRawReq(buffer);
+ tabletReq.deserializeTPipeTransferRawReq(buffer, tabletStringInternPool);
tabletReq.version = IoTDBSinkRequestVersion.VERSION_1.getVersion();
tabletReq.type = PipeRequestType.TRANSFER_TABLET_RAW_V2.getType();
@@ -153,7 +160,7 @@ public class PipeTransferTabletRawReqV2 extends
PipeTransferTabletRawReq {
final TPipeTransferReq transferReq) {
final PipeTransferTabletRawReqV2 tabletReq = new
PipeTransferTabletRawReqV2();
- tabletReq.deserializeTPipeTransferRawReq(transferReq.body);
+ tabletReq.deserializeTPipeTransferRawReq(transferReq.body, new
TabletStringInternPool());
tabletReq.body = transferReq.body;
tabletReq.version = transferReq.version;
@@ -202,11 +209,20 @@ public class PipeTransferTabletRawReqV2 extends
PipeTransferTabletRawReq {
/////////////////////////////// Util ///////////////////////////////
public void deserializeTPipeTransferRawReq(final ByteBuffer buffer) {
+ deserializeTPipeTransferRawReq(buffer, new TabletStringInternPool());
+ }
+
+ public void deserializeTPipeTransferRawReq(
+ final ByteBuffer buffer, final TabletStringInternPool
tabletStringInternPool) {
+ final TabletStringInternPool internPool =
+ Objects.nonNull(tabletStringInternPool)
+ ? tabletStringInternPool
+ : new TabletStringInternPool();
final int startPosition = buffer.position();
try {
// V2: read databaseName, readDatabaseName = true
final InsertTabletStatement insertTabletStatement =
-
TabletStatementConverter.deserializeStatementFromTabletFormat(buffer, true);
+
TabletStatementConverter.deserializeStatementFromTabletFormat(buffer, true,
internPool);
this.isAligned = insertTabletStatement.isAligned();
// databaseName is already set in deserializeStatementFromTabletFormat
when
// readDatabaseName=true
@@ -216,9 +232,9 @@ public class PipeTransferTabletRawReqV2 extends
PipeTransferTabletRawReq {
// If Statement deserialization fails, fallback to Tablet format
// Reset buffer position for Tablet deserialization
buffer.position(startPosition);
- this.tablet = Tablet.deserialize(buffer);
+ this.tablet = PipeTabletUtils.internTablet(Tablet.deserialize(buffer),
internPool);
this.isAligned = ReadWriteIOUtils.readBool(buffer);
- this.dataBaseName = ReadWriteIOUtils.readString(buffer);
+ this.dataBaseName =
internPool.intern(ReadWriteIOUtils.readString(buffer));
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index 917720220bf..58df22a20f4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -460,7 +460,7 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
for (int rowIndex = 0; rowIndex < tablet.getRowSize(); ++rowIndex) {
// Filter null value
- if (tablet.getBitMaps()[columnIndex].isMarked(rowIndex)) {
+ if (tablet.isNull(rowIndex, columnIndex)) {
continue;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
index e8b8e36cb49..773d40e99d1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/TabletStatementConverter.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.commons.utils.PathUtils;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool;
import org.apache.iotdb.db.pipe.resource.memory.InsertNodeMemoryEstimator;
import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDevicePathCache;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
@@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Objects;
/**
* Utility class for converting between InsertTabletStatement and Tablet
format ByteBuffer. This
@@ -76,12 +78,21 @@ public class TabletStatementConverter {
*/
public static InsertTabletStatement deserializeStatementFromTabletFormat(
final ByteBuffer byteBuffer, final boolean readDatabaseName) throws
IllegalPathException {
+ return deserializeStatementFromTabletFormat(byteBuffer, readDatabaseName,
null);
+ }
+
+ public static InsertTabletStatement deserializeStatementFromTabletFormat(
+ final ByteBuffer byteBuffer,
+ final boolean readDatabaseName,
+ final TabletStringInternPool tabletStringInternPool)
+ throws IllegalPathException {
final InsertTabletStatement statement = new InsertTabletStatement();
// Calculate memory size during deserialization, use INSTANCE_SIZE constant
long memorySize = InsertTabletStatement.getInstanceSize();
- final String insertTargetName = ReadWriteIOUtils.readString(byteBuffer);
+ final String insertTargetName =
+ intern(ReadWriteIOUtils.readString(byteBuffer),
tabletStringInternPool);
final int rowSize = ReadWriteIOUtils.readInt(byteBuffer);
@@ -118,7 +129,7 @@ public class TabletStatementConverter {
for (int i = 0; i < schemaSize; i++) {
final boolean hasSchema =
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
if (hasSchema) {
- final Pair<String, TSDataType> pair = readMeasurement(byteBuffer);
+ final Pair<String, TSDataType> pair = readMeasurement(byteBuffer,
tabletStringInternPool);
measurement[i] = pair.getLeft();
dataTypes[i] = pair.getRight();
columnCategories[i] =
@@ -169,15 +180,12 @@ public class TabletStatementConverter {
if (isBitMapsNotNull) {
// Use the method that returns both BitMap array and memory size
final Pair<BitMap[], Long> bitMapsAndMemory =
- readBitMapsFromBufferWithMemory(byteBuffer, schemaSize);
+ readBitMapsFromBufferWithMemory(byteBuffer, schemaSize, rowSize);
bitMaps = bitMapsAndMemory.getLeft();
bitMapsMemorySize = bitMapsAndMemory.getRight();
} else {
- // Calculate memory for empty BitMap array: array header + references
- bitMaps = new BitMap[schemaSize];
- bitMapsMemorySize =
- org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
- NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * schemaSize);
+ bitMaps = null;
+ bitMapsMemorySize = 0;
}
// Add bitMaps memory to total
@@ -217,7 +225,8 @@ public class TabletStatementConverter {
// Read databaseName if requested (V2 format)
if (readDatabaseName) {
- final String databaseName = ReadWriteIOUtils.readString(byteBuffer);
+ final String databaseName =
+ intern(ReadWriteIOUtils.readString(byteBuffer),
tabletStringInternPool);
if (databaseName != null) {
statement.setDatabaseName(databaseName);
// Calculate memory for databaseName
@@ -226,7 +235,9 @@ public class TabletStatementConverter {
if (PathUtils.isTableModelDatabase(databaseName)) {
statement.setWriteToTable(true);
// For table model, insertTargetName is table name, convert to
lowercase
- statement.setDevicePath(new
PartialPath(insertTargetName.toLowerCase(), false));
+ statement.setDevicePath(
+ new PartialPath(
+ intern(insertTargetName.toLowerCase(),
tabletStringInternPool), false));
statement.setColumnCategories(columnCategories);
memorySize += columnCategoriesMemorySize;
@@ -269,6 +280,11 @@ public class TabletStatementConverter {
return deserializeStatementFromTabletFormat(byteBuffer, false);
}
+ private static String intern(
+ final String value, final TabletStringInternPool tabletStringInternPool)
{
+ return Objects.nonNull(tabletStringInternPool) ?
tabletStringInternPool.intern(value) : value;
+ }
+
/**
* Skip a string in ByteBuffer without reading it. This is more efficient
than reading and
* discarding the string.
@@ -289,10 +305,13 @@ public class TabletStatementConverter {
* @param buffer ByteBuffer containing serialized measurement schema
* @return Pair of measurement name and data type
*/
- private static Pair<String, TSDataType> readMeasurement(final ByteBuffer
buffer) {
+ private static Pair<String, TSDataType> readMeasurement(
+ final ByteBuffer buffer, final TabletStringInternPool
tabletStringInternPool) {
// Read measurement name and data type
final Pair<String, TSDataType> pair =
- new Pair<>(ReadWriteIOUtils.readString(buffer),
TSDataType.deserializeFrom(buffer));
+ new Pair<>(
+ intern(ReadWriteIOUtils.readString(buffer),
tabletStringInternPool),
+ TSDataType.deserializeFrom(buffer));
// Skip encoding type (byte) and compression type (byte) - 2 bytes total
buffer.position(buffer.position() + 2);
@@ -315,13 +334,11 @@ public class TabletStatementConverter {
* array and the calculated memory size.
*/
private static Pair<BitMap[], Long> readBitMapsFromBufferWithMemory(
- final ByteBuffer byteBuffer, final int columns) {
+ final ByteBuffer byteBuffer, final int columns, final int rowSize) {
final BitMap[] bitMaps = new BitMap[columns];
- // Calculate memory: array header + object references
- long memorySize =
- org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
- NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns);
+ long bitMapsMemorySize = 0;
+ boolean hasMarkedBitMap = false;
for (int i = 0; i < columns; i++) {
final boolean hasBitMap =
BytesUtils.byteToBool(ReadWriteIOUtils.readByte(byteBuffer));
@@ -329,18 +346,30 @@ public class TabletStatementConverter {
final int size = ReadWriteIOUtils.readInt(byteBuffer);
final Binary valueBinary = ReadWriteIOUtils.readBinary(byteBuffer);
final byte[] byteArray = valueBinary.getValues();
- bitMaps[i] = new BitMap(size, byteArray);
+ final BitMap bitMap = new BitMap(size, byteArray);
+ if (bitMap.isAllUnmarked(Math.min(rowSize, bitMap.getSize()))) {
+ continue;
+ }
+ bitMaps[i] = bitMap;
+ hasMarkedBitMap = true;
// Calculate memory for this BitMap: BitMap object + byte array
// BitMap shallow size + byte array (array header + array length)
- memorySize +=
+ bitMapsMemorySize +=
SIZE_OF_BITMAP
+ org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
NUM_BYTES_ARRAY_HEADER + byteArray.length);
}
}
- return new Pair<>(bitMaps, memorySize);
+ if (!hasMarkedBitMap) {
+ return new Pair<>(null, 0L);
+ }
+ return new Pair<>(
+ bitMaps,
+ bitMapsMemorySize
+ + org.apache.tsfile.utils.RamUsageEstimator.alignObjectSize(
+ NUM_BYTES_ARRAY_HEADER + NUM_BYTES_OBJECT_REF * columns));
}
/**
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java
index c98c978988d..8f81dd43032 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilder.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.sink.util.builder;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.tsfile.enums.ColumnCategory;
@@ -243,7 +244,7 @@ public class PipeTableModelTsFileBuilder extends
PipeTsFileBuilder {
aggregatedSchemas.addAll(tablet.getSchemas());
aggregatedColumnCategories.addAll(tablet.getColumnTypes());
aggregatedValues.addAll(Arrays.asList(tablet.getValues()));
- aggregatedBitMaps.addAll(Arrays.asList(tablet.getBitMaps()));
+
aggregatedBitMaps.addAll(Arrays.asList(PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet)));
// Remove the aggregated tablet
tablets.pollFirst();
} else {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilderV2.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilderV2.java
index 8c89109e1ed..fb275a1893f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilderV2.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTableModelTsFileBuilderV2.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
@@ -202,7 +203,7 @@ public class PipeTableModelTsFileBuilderV2 extends
PipeTsFileBuilder {
.map(schema -> (MeasurementSchema) schema)
.toArray(MeasurementSchema[]::new);
Object[] values = Arrays.copyOf(tablet.getValues(),
tablet.getValues().length);
- BitMap[] bitMaps = Arrays.copyOf(tablet.getBitMaps(),
tablet.getBitMaps().length);
+ BitMap[] bitMaps = PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet);
ColumnCategory[] columnCategory = tablet.getColumnTypes().toArray(new
ColumnCategory[0]);
// convert date value to int refer to
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java
index 34e30c99d4a..844fc6042d8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilder.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.pipe.sink.util.builder;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.external.commons.io.FileUtils;
@@ -230,7 +231,7 @@ public class PipeTreeModelTsFileBuilder extends
PipeTsFileBuilder {
// Aggregate the current tablet's data
aggregatedSchemas.addAll(tablet.getSchemas());
aggregatedValues.addAll(Arrays.asList(tablet.getValues()));
- aggregatedBitMaps.addAll(Arrays.asList(tablet.getBitMaps()));
+
aggregatedBitMaps.addAll(Arrays.asList(PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet)));
// Remove the aggregated tablet
tablets.pollFirst();
} else {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilderV2.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilderV2.java
index 48e8982be80..07703695d16 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilderV2.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/builder/PipeTreeModelTsFileBuilderV2.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.pipe.sink.util.builder;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.i18n.DataNodePipeMessages;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
@@ -146,7 +147,7 @@ public class PipeTreeModelTsFileBuilderV2 extends
PipeTsFileBuilder {
.map(schema -> (MeasurementSchema) schema)
.toArray(MeasurementSchema[]::new);
Object[] values = Arrays.copyOf(tablet.getValues(),
tablet.getValues().length);
- BitMap[] bitMaps = Arrays.copyOf(tablet.getBitMaps(),
tablet.getBitMaps().length);
+ BitMap[] bitMaps = PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet);
// convert date value to int refer to
//
org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeInsertEventSorter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeInsertEventSorter.java
index 46a3fc6df94..55c7a3ea788 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeInsertEventSorter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/sorter/PipeInsertEventSorter.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.pipe.sink.util.sorter;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.tsfile.enums.TSDataType;
@@ -106,7 +107,7 @@ public class PipeInsertEventSorter {
}
if (bitMapsModified) {
- dataAdapter.setBitMaps(bitMaps);
+ dataAdapter.setBitMaps(PipeTabletUtils.compactBitMaps(bitMaps,
deDuplicatedSize));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index a6289141531..adf14a39381 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -45,6 +45,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGr
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
+import org.apache.iotdb.db.utils.BitMapUtils;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -332,7 +333,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
protected InsertTabletNode getEmptySplit(int count) {
long[] subTimes = new long[count];
Object[] values = initTabletValues(dataTypes.length, count, dataTypes);
- BitMap[] newBitMaps = this.bitMaps == null ? null :
initBitmaps(dataTypes.length, count);
+ BitMap[] newBitMaps = initBitmapsForSplit(dataTypes.length, count);
return new InsertTabletNode(
getPlanNodeId(),
targetPath,
@@ -370,7 +371,10 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
if (dataTypes[k] != null) {
System.arraycopy(columns[k], start, subNode.columns[k], destLoc,
length);
}
- if (subNode.bitMaps != null && this.bitMaps[k] != null) {
+ if (subNode.bitMaps != null
+ && subNode.bitMaps[k] != null
+ && k < this.bitMaps.length
+ && this.bitMaps[k] != null) {
BitMap.copyOfRange(this.bitMaps[k], start, subNode.bitMaps[k],
destLoc, length);
}
}
@@ -379,6 +383,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
subNode.setFailedMeasurementNumber(getFailedMeasurementNumber());
subNode.setRange(locs);
subNode.setDataRegionReplicaSet(entry.getKey());
+ subNode.bitMaps = BitMapUtils.compactBitMaps(subNode.bitMaps,
subNode.rowCount);
return subNode;
}
@@ -441,6 +446,24 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
return bitMaps;
}
+ protected BitMap[] initBitmapsForSplit(int columnSize, int rowSize) {
+ if (this.bitMaps == null) {
+ return null;
+ }
+
+ final int sourceRowCount = rowCount > 0 ? rowCount : times == null ? 0 :
times.length;
+ final BitMap[] splitBitMaps = new BitMap[columnSize];
+ boolean hasBitMap = false;
+ for (int i = 0; i < columnSize && i < this.bitMaps.length; ++i) {
+ if (this.bitMaps[i] != null
+ && !this.bitMaps[i].isAllUnmarked(Math.min(sourceRowCount,
this.bitMaps[i].getSize()))) {
+ splitBitMaps[i] = new BitMap(rowSize);
+ hasBitMap = true;
+ }
+ }
+ return hasBitMap ? splitBitMaps : null;
+ }
+
@Override
public void markFailedMeasurement(int index) {
if (measurements[index] == null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
index 8d24ad77364..3302d8d7fc4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/RelationalInsertTabletNode.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.Ta
import org.apache.iotdb.db.storageengine.dataregion.memtable.AbstractMemTable;
import
org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
+import org.apache.iotdb.db.utils.BitMapUtils;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
@@ -180,7 +181,7 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
protected InsertTabletNode getEmptySplit(int count) {
long[] subTimes = new long[count];
Object[] values = initTabletValues(dataTypes.length, count, dataTypes);
- BitMap[] newBitMaps = this.bitMaps == null ? null :
initBitmaps(dataTypes.length, count);
+ BitMap[] newBitMaps = initBitmapsForSplit(dataTypes.length, count);
RelationalInsertTabletNode split =
new RelationalInsertTabletNode(
getPlanNodeId(),
@@ -442,7 +443,10 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
if (dataTypes[i] != null) {
System.arraycopy(columns[i], start, subNode.columns[i], destLoc,
length);
}
- if (subNode.bitMaps != null && this.bitMaps[i] != null) {
+ if (subNode.bitMaps != null
+ && subNode.bitMaps[i] != null
+ && i < this.bitMaps.length
+ && this.bitMaps[i] != null) {
BitMap.copyOfRange(this.bitMaps[i], start, subNode.bitMaps[i],
destLoc, length);
}
}
@@ -451,6 +455,7 @@ public class RelationalInsertTabletNode extends
InsertTabletNode {
subNode.setFailedMeasurementNumber(getFailedMeasurementNumber());
subNode.setRange(locs);
subNode.setDataRegionReplicaSet(entry.getKey());
+ subNode.bitMaps = BitMapUtils.compactBitMaps(subNode.bitMaps,
subNode.rowCount);
result.add(subNode);
return result;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index 19946305570..82f5990e037 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -43,6 +43,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalIn
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertTablet;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.utils.BitMapUtils;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.tsfile.enums.ColumnCategory;
@@ -390,7 +391,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
statement.setMeasurementSchemas(measurementSchemas);
statement.setDataTypes(dataTypes);
if (this.nullBitMaps != null) {
- statement.setBitMaps(copiedBitMaps);
+ statement.setBitMaps(BitMapUtils.compactBitMaps(copiedBitMaps,
rowCount));
}
statement.setFailedMeasurementIndex2Info(failedMeasurementIndex2Info);
insertTabletStatementList.add(statement);
@@ -833,7 +834,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
tabletColumnTypes,
timestamps,
tabletValues,
- bitMaps,
+ BitMapUtils.compactBitMaps(bitMaps, rowSize),
rowSize);
} catch (final Exception e) {
throw new MetadataException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
index af83d12c307..764d6763184 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
@@ -117,9 +117,7 @@ public class DefaultOperationQuota implements
OperationQuota {
case BATCH_INSERT:
// InsertTabletStatement
InsertTabletStatement insertTabletStatement =
(InsertTabletStatement) s;
- for (BitMap bitMap : insertTabletStatement.getBitMaps()) {
- avgSize += bitMap.getSize();
- }
+ avgSize += calculationWrite(insertTabletStatement.getBitMaps());
break;
case BATCH_INSERT_ONE_DEVICE:
// InsertRowsOfOneDeviceStatement
@@ -152,10 +150,12 @@ public class DefaultOperationQuota implements
OperationQuota {
for (int i = 0;
i <
insertMultiTabletsStatement.getInsertTabletStatementList().size();
i++) {
- for (BitMap bitMap :
-
insertMultiTabletsStatement.getInsertTabletStatementList().get(i).getBitMaps())
{
- avgSize += bitMap.getSize();
- }
+ avgSize +=
+ calculationWrite(
+ insertMultiTabletsStatement
+ .getInsertTabletStatementList()
+ .get(i)
+ .getBitMaps());
}
}
break;
@@ -179,6 +179,20 @@ public class DefaultOperationQuota implements
OperationQuota {
return size;
}
+ private long calculationWrite(BitMap[] bitMaps) {
+ if (bitMaps == null) {
+ return 0;
+ }
+
+ long size = 0;
+ for (BitMap bitMap : bitMaps) {
+ if (bitMap != null) {
+ size += bitMap.getSize();
+ }
+ }
+ return size;
+ }
+
private long estimateConsume(int numReqs, long avgSize) {
if (numReqs > 0) {
return avgSize * numReqs;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/BitMapUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/BitMapUtils.java
new file mode 100644
index 00000000000..c6e1aecbd3d
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/BitMapUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+import org.apache.tsfile.utils.BitMap;
+
+import java.util.Objects;
+
+public final class BitMapUtils {
+
+ private BitMapUtils() {}
+
+ public static BitMap[] compactBitMaps(final BitMap[] bitMaps, final int
rowCount) {
+ if (Objects.isNull(bitMaps)) {
+ return null;
+ }
+
+ boolean hasMarkedBitMap = false;
+ for (int i = 0; i < bitMaps.length; ++i) {
+ if (Objects.nonNull(bitMaps[i])
+ && bitMaps[i].isAllUnmarked(Math.min(rowCount,
bitMaps[i].getSize()))) {
+ bitMaps[i] = null;
+ }
+ if (Objects.nonNull(bitMaps[i])) {
+ hasMarkedBitMap = true;
+ }
+ }
+ return hasMarkedBitMap ? bitMaps : null;
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
index 367578f2a40..da3dee91caa 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
@@ -198,11 +198,6 @@ public class PipeTabletInsertionEventTest {
final Object[] values = new Object[schemas.length];
// create tablet for insertRowNode
- BitMap[] bitMapsForInsertRowNode = new BitMap[schemas.length];
- for (int i = 0; i < schemas.length; i++) {
- bitMapsForInsertRowNode[i] = new BitMap(1);
- }
-
values[0] = new int[1];
values[1] = new long[1];
values[2] = new float[1];
@@ -228,20 +223,9 @@ public class PipeTabletInsertionEventTest {
}
tabletForInsertRowNode =
- new Tablet(
- deviceId,
- Arrays.asList(schemas),
- new long[] {times[0]},
- values,
- bitMapsForInsertRowNode,
- 1);
+ new Tablet(deviceId, Arrays.asList(schemas), new long[] {times[0]},
values, null, 1);
// create tablet for insertTabletNode
- BitMap[] bitMapsForInsertTabletNode = new BitMap[schemas.length];
- for (int i = 0; i < schemas.length; i++) {
- bitMapsForInsertTabletNode[i] = new BitMap(times.length);
- }
-
values[0] = new int[times.length];
values[1] = new long[times.length];
values[2] = new float[times.length];
@@ -268,13 +252,7 @@ public class PipeTabletInsertionEventTest {
tabletForInsertTabletNode = new Tablet(deviceId, Arrays.asList(schemas),
times.length);
tabletForInsertTabletNode =
- new Tablet(
- deviceId,
- Arrays.asList(schemas),
- times,
- values,
- bitMapsForInsertTabletNode,
- times.length);
+ new Tablet(deviceId, Arrays.asList(schemas), times, values, null,
times.length);
}
@Test
@@ -341,6 +319,37 @@ public class PipeTabletInsertionEventTest {
Assert.assertTrue(isAligned4);
}
+ @Test
+ public void convertToTabletSkipsUnnecessaryBitMapsForTest() throws Exception
{
+ final BitMap[] bitMaps = new BitMap[schemas.length];
+ bitMaps[0] = new BitMap(times.length);
+ bitMaps[1] = new BitMap(times.length);
+ bitMaps[1].mark(1);
+
+ final InsertTabletNode nodeWithSparseColumn =
+ new InsertTabletNode(
+ new PlanNodeId("plannode bitmap"),
+ new PartialPath(deviceId),
+ false,
+ measurementIds,
+ dataTypes,
+ schemas,
+ times,
+ bitMaps,
+ insertTabletNode.getColumns(),
+ times.length);
+
+ final Tablet tablet =
+ new TabletInsertionEventTreePatternParser(
+ nodeWithSparseColumn, new PrefixTreePattern(pattern))
+ .convertToTablet();
+
+ Assert.assertNotNull(tablet.getBitMaps());
+ Assert.assertNull(tablet.getBitMaps()[0]);
+ Assert.assertNotNull(tablet.getBitMaps()[1]);
+ Assert.assertTrue(tablet.isNull(1, 1));
+ }
+
@Test
public void convertToTabletWithFilteredRowsForTest() throws Exception {
TabletInsertionEventTreePatternParser container1 =
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
index cdde28bce38..3ce07680ce6 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
@@ -58,6 +58,7 @@ import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.TsFileGeneratorUtils;
import org.apache.tsfile.write.TsFileWriter;
@@ -79,6 +80,7 @@ import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
@@ -103,6 +105,7 @@ public class TsFileInsertionEventParserTest {
"iotdb.query.parser.performance.enabled";
private static final String MANUAL_TABLE_PARSER_PERFORMANCE_TEST =
"iotdb.table.parser.performance.enabled";
+ private static final long BITMAP_TEST_PIPE_MAX_READER_CHUNK_SIZE = 1024 *
1024L;
private File alignedTsFile;
private File nonalignedTsFile;
@@ -276,6 +279,80 @@ public class TsFileInsertionEventParserTest {
}
}
+ @Test
+ public void testQueryParserSkipsUnnecessaryBitMaps() throws Exception {
+ testTreeParserSkipsUnnecessaryBitMaps(true);
+ }
+
+ @Test
+ public void testScanParserSkipsUnnecessaryBitMaps() throws Exception {
+ testTreeParserSkipsUnnecessaryBitMaps(false);
+ }
+
+ @Test
+ public void testTableParserSkipsUnnecessaryBitMaps() throws Exception {
+ final long originalPipeMaxReaderChunkSize =
+ PipeConfig.getInstance().getPipeMaxReaderChunkSize();
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(BITMAP_TEST_PIPE_MAX_READER_CHUNK_SIZE);
+
+ try {
+ alignedTsFile = new File("table-parser-bitmap.tsfile");
+ if (alignedTsFile.exists()) {
+ Assert.assertTrue(alignedTsFile.delete());
+ }
+
+ final List<IMeasurementSchema> schemaList =
+ Arrays.asList(
+ new MeasurementSchema("tag0", TSDataType.STRING),
+ new MeasurementSchema("dense", TSDataType.INT64),
+ new MeasurementSchema("sparse", TSDataType.INT64));
+ final List<String> columnNameList = Arrays.asList("tag0", "dense",
"sparse");
+ final List<TSDataType> dataTypeList =
+ Arrays.asList(TSDataType.STRING, TSDataType.INT64, TSDataType.INT64);
+ final List<ColumnCategory> columnCategoryList =
+ Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD,
ColumnCategory.FIELD);
+
+ final Tablet tablet =
+ new Tablet("bitmap_table", columnNameList, dataTypeList,
columnCategoryList, 2);
+ for (int rowIndex = 0; rowIndex < 2; ++rowIndex) {
+ tablet.addTimestamp(rowIndex, rowIndex);
+ tablet.addValue(rowIndex, 0, "tag-value");
+ tablet.addValue(rowIndex, 1, (long) rowIndex);
+ tablet.addValue("sparse", rowIndex, rowIndex == 0 ? 100L : null);
+ }
+
+ try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
+ writer.registerTableSchema(new TableSchema("bitmap_table", schemaList,
columnCategoryList));
+ writer.writeTable(tablet);
+ }
+
+ try (final TsFileInsertionEventTableParser parser =
+ new TsFileInsertionEventTableParser(
+ alignedTsFile,
+ new TablePattern(true, null, null),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE,
+ null,
+ null,
+ null,
+ false)) {
+ final Iterator<TabletInsertionEvent> iterator =
parser.toTabletInsertionEvents().iterator();
+ Assert.assertTrue(iterator.hasNext());
+ final Tablet parsedTablet =
+ ((PipeRawTabletInsertionEvent) iterator.next()).convertToTablet();
+ assertBitMapExistence(parsedTablet, false, false, true);
+ Assert.assertTrue(parsedTablet.isNull(1, 2));
+ Assert.assertFalse(iterator.hasNext());
+ }
+ } finally {
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+ }
+ }
+
@Test
public void manualTestScanParserSplitPerformance() throws Exception {
Assume.assumeTrue(
@@ -970,6 +1047,81 @@ public class TsFileInsertionEventParserTest {
alignedTsFile, new PrefixTreePattern("root"), Long.MIN_VALUE,
Long.MAX_VALUE, isQuery, 4);
}
+ private void testTreeParserSkipsUnnecessaryBitMaps(final boolean isQuery)
throws Exception {
+ final long originalPipeMaxReaderChunkSize =
+ PipeConfig.getInstance().getPipeMaxReaderChunkSize();
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(BITMAP_TEST_PIPE_MAX_READER_CHUNK_SIZE);
+
+ try {
+ alignedTsFile =
+ new File(isQuery ? "query-parser-bitmap.tsfile" :
"scan-parser-bitmap.tsfile");
+ if (alignedTsFile.exists()) {
+ Assert.assertTrue(alignedTsFile.delete());
+ }
+
+ final List<IMeasurementSchema> schemaList =
+ Arrays.asList(
+ new MeasurementSchema("dense", TSDataType.INT64),
+ new MeasurementSchema("sparse", TSDataType.INT64));
+ final Tablet tablet = new Tablet("root.sg.d", schemaList, 2);
+ for (int rowIndex = 0; rowIndex < 2; ++rowIndex) {
+ tablet.addTimestamp(rowIndex, rowIndex);
+ tablet.addValue("dense", rowIndex, (long) rowIndex);
+ tablet.addValue("sparse", rowIndex, rowIndex == 0 ? 100L : null);
+ }
+
+ try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
+ writer.registerAlignedTimeseries(new PartialPath("root.sg.d"),
schemaList);
+ writer.writeAligned(tablet);
+ }
+
+ try (final TsFileInsertionEventParser parser =
+ isQuery
+ ? new TsFileInsertionEventQueryParser(
+ alignedTsFile,
+ new PrefixTreePattern("root"),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE,
+ null)
+ : new TsFileInsertionEventScanParser(
+ alignedTsFile,
+ new PrefixTreePattern("root"),
+ Long.MIN_VALUE,
+ Long.MAX_VALUE,
+ null,
+ null,
+ false)) {
+ final Iterator<TabletInsertionEvent> iterator =
parser.toTabletInsertionEvents().iterator();
+ Assert.assertTrue(iterator.hasNext());
+ final Tablet parsedTablet =
+ ((PipeRawTabletInsertionEvent) iterator.next()).convertToTablet();
+ assertBitMapExistence(parsedTablet, false, true);
+ Assert.assertTrue(parsedTablet.isNull(1, 1));
+ Assert.assertFalse(iterator.hasNext());
+ }
+ } finally {
+ CommonDescriptor.getInstance()
+ .getConfig()
+ .setPipeMaxReaderChunkSize(originalPipeMaxReaderChunkSize);
+ }
+ }
+
+ private void assertBitMapExistence(
+ final Tablet tablet, final boolean... expectedColumnHasBitMap) {
+ final BitMap[] bitMaps = tablet.getBitMaps();
+ Assert.assertNotNull(bitMaps);
+ Assert.assertEquals(expectedColumnHasBitMap.length, bitMaps.length);
+ for (int i = 0; i < expectedColumnHasBitMap.length; ++i) {
+ if (expectedColumnHasBitMap[i]) {
+ Assert.assertNotNull(bitMaps[i]);
+ } else {
+ Assert.assertNull(bitMaps[i]);
+ }
+ }
+ }
+
private void generateLargeAlignedTsFile(
final File tsFile,
final List<IMeasurementSchema> schemaList,
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtilsTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtilsTest.java
new file mode 100644
index 00000000000..9ef48fe52ec
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtilsTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tablet;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class PipeTabletUtilsTest {
+
+ @Test
+ public void testPutValueUnmarksReusedNullRow() {
+ final List<IMeasurementSchema> schemas =
+ Arrays.asList(
+ new MeasurementSchema("s1", TSDataType.FLOAT),
+ new MeasurementSchema("s2", TSDataType.FLOAT));
+ final Tablet tablet = new Tablet("root.sg.d1", schemas, 2);
+
+ PipeTabletUtils.markNullValue(tablet, 0, 0);
+ PipeTabletUtils.markNullValue(tablet, 0, 1);
+
+ PipeTabletUtils.putValue(tablet, 0, 0, TSDataType.FLOAT, 1.0f);
+ PipeTabletUtils.putTimestamp(tablet, 0, 1L);
+ PipeTabletUtils.compactBitMaps(tablet);
+
+ Assert.assertFalse(tablet.isNull(0, 0));
+ Assert.assertTrue(tablet.isNull(0, 1));
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
index 38704ec7fea..10573e5609d 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
@@ -46,6 +46,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -54,6 +55,7 @@ import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
import org.apache.tsfile.write.record.Tablet;
@@ -494,6 +496,28 @@ public class PipeDataNodeThriftRequestTest {
Assert.assertFalse(deserializedReq.getTabletReqs().get(0).getIsAligned());
}
+ @Test
+ public void testPipeTransferTabletBatchReqInternsRepeatedMeasurementNames()
throws IOException {
+ final List<ByteBuffer> tabletBuffers = new ArrayList<>();
+ tabletBuffers.add(
+ serializeTablet(createSingleValueTablet(new String("root.sg.d"), new
String("s1")), false));
+ tabletBuffers.add(
+ serializeTablet(createSingleValueTablet(new String("root.sg.d"), new
String("s1")), false));
+
+ final PipeTransferTabletBatchReq deserializedReq =
+ PipeTransferTabletBatchReq.fromTPipeTransferReq(
+
PipeTransferTabletBatchReq.toTPipeTransferReq(Collections.emptyList(),
tabletBuffers));
+ final Pair<InsertRowsStatement, InsertMultiTabletsStatement> statements =
+ deserializedReq.constructStatements();
+ final List<InsertTabletStatement> insertTabletStatements =
+ statements.getRight().getInsertTabletStatementList();
+
+ Assert.assertEquals(2, insertTabletStatements.size());
+ Assert.assertSame(
+ insertTabletStatements.get(0).getMeasurements()[0],
+ insertTabletStatements.get(1).getMeasurements()[0]);
+ }
+
@Test
public void testPipeTransferTabletBatchReqV2() throws IOException {
final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
@@ -770,4 +794,24 @@ public class PipeDataNodeThriftRequestTest {
Assert.assertEquals(resp.getStatus(), deserializeResp.getStatus());
Assert.assertEquals(resp.getEndWritingOffset(),
deserializeResp.getEndWritingOffset());
}
+
+ private static Tablet createSingleValueTablet(final String deviceId, final
String measurement) {
+ final List<IMeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema(measurement, TSDataType.INT32));
+
+ final Tablet tablet = new Tablet(deviceId, schemaList, 8);
+ tablet.addTimestamp(0, 1);
+ tablet.addValue(measurement, 0, 1);
+ return tablet;
+ }
+
+ private static ByteBuffer serializeTablet(final Tablet tablet, final boolean
isAligned)
+ throws IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ tablet.serialize(outputStream);
+ ReadWriteIOUtils.write(isAligned, outputStream);
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
index 3b8f9723809..01857cb5f8a 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
@@ -47,6 +47,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalIn
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -205,6 +206,9 @@ public class WritePlanNodeSplitTest {
insertTabletNode.setColumns(
new Object[] {new int[] {-20, -10, 10, 20, 30, 40, 50, 60, 70, 80, 90,
100}});
insertTabletNode.setRowCount(insertTabletNode.getTimes().length);
+ final BitMap[] bitMaps = new BitMap[] {new
BitMap(insertTabletNode.getRowCount())};
+ bitMaps[0].mark(2);
+ insertTabletNode.setBitMaps(bitMaps);
DataPartitionQueryParam dataPartitionQueryParam = new
DataPartitionQueryParam();
dataPartitionQueryParam.setDeviceID(
@@ -224,6 +228,12 @@ public class WritePlanNodeSplitTest {
Assert.assertEquals(tabletNode.getTimes().length, 2);
TConsensusGroupId regionId =
tabletNode.getDataRegionReplicaSet().getRegionId();
Assert.assertEquals(getRegionIdByTime(tabletNode.getMinTime()),
regionId.getId());
+ if (tabletNode.getTimes()[0] == 1) {
+ Assert.assertNotNull(tabletNode.getBitMaps());
+ Assert.assertTrue(tabletNode.getBitMaps()[0].isMarked(0));
+ } else {
+ Assert.assertNull(tabletNode.getBitMaps());
+ }
}
insertTabletNode = new InsertTabletNode(new PlanNodeId("plan node 2"));
@@ -271,6 +281,9 @@ public class WritePlanNodeSplitTest {
relationalInsertTabletNode.setColumnCategories(
new TsTableColumnCategory[] {TsTableColumnCategory.TAG,
TsTableColumnCategory.FIELD});
relationalInsertTabletNode.setRowCount(12);
+ final BitMap[] bitMaps = new BitMap[] {new BitMap(12), new BitMap(12)};
+ bitMaps[1].mark(2);
+ relationalInsertTabletNode.setBitMaps(bitMaps);
List<DataPartitionQueryParam> dataPartitionQueryParamList = new
ArrayList<>();
DataPartitionQueryParam dataPartitionQueryParam = new
DataPartitionQueryParam();
@@ -300,6 +313,13 @@ public class WritePlanNodeSplitTest {
Assert.assertTrue(tabletNode.getTimes()[0] < tabletNode.getTimes()[1]);
TConsensusGroupId regionId =
tabletNode.getDataRegionReplicaSet().getRegionId();
Assert.assertEquals(getRegionIdByTime(tabletNode.getMinTime()),
regionId.getId());
+ if (tabletNode.getTimes()[0] == 1) {
+ Assert.assertNotNull(tabletNode.getBitMaps());
+ Assert.assertNull(tabletNode.getBitMaps()[0]);
+ Assert.assertTrue(tabletNode.getBitMaps()[1].isMarked(0));
+ } else {
+ Assert.assertNull(tabletNode.getBitMaps());
+ }
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuotaTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuotaTest.java
new file mode 100644
index 00000000000..5f11039eeb3
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuotaTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.rescon.quotas;
+
+import org.apache.iotdb.common.rpc.thrift.TTimedQuota;
+import org.apache.iotdb.common.rpc.thrift.ThrottleType;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import org.apache.tsfile.utils.BitMap;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Map;
+
+public class DefaultOperationQuotaTest {
+
+ @Test
+ public void testCheckQuotaWithNullAndSparseBitMaps() throws Exception {
+ final DefaultOperationQuota quota = new
DefaultOperationQuota(createQuotaLimiter());
+
+ final InsertTabletStatement tabletWithoutBitMaps = new
InsertTabletStatement();
+ tabletWithoutBitMaps.setBitMaps(null);
+ quota.checkQuota(1, 0, tabletWithoutBitMaps);
+
+ final InsertTabletStatement tabletWithSparseBitMaps = new
InsertTabletStatement();
+ final BitMap bitMap = new BitMap(8);
+ bitMap.mark(0);
+ tabletWithSparseBitMaps.setBitMaps(new BitMap[] {null, bitMap});
+ quota.checkQuota(1, 0, tabletWithSparseBitMaps);
+
+ final InsertMultiTabletsStatement multiTabletsStatement = new
InsertMultiTabletsStatement();
+ multiTabletsStatement.setInsertTabletStatementList(
+ Arrays.asList(tabletWithoutBitMaps, tabletWithSparseBitMaps));
+ quota.checkQuota(1, 0, multiTabletsStatement);
+ }
+
+ private static QuotaLimiter createQuotaLimiter() {
+ final Map<ThrottleType, TTimedQuota> quotas = new
EnumMap<>(ThrottleType.class);
+ for (final ThrottleType throttleType : ThrottleType.values()) {
+ quotas.put(throttleType, new TTimedQuota(60_000L, 1_000_000_000L));
+ }
+ return QuotaLimiter.fromThrottle(Collections.unmodifiableMap(quotas));
+ }
+}