This is an automated email from the ASF dual-hosted git repository.
Caideyipi pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 17e73ee7126 [To dev/1.3] [Pipe] Optimize memory usage (#17775)
17e73ee7126 is described below
commit 17e73ee71265a67abc60293bd50a6033c9c09aed
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 27 16:36:00 2026 +0800
[To dev/1.3] [Pipe] Optimize memory usage (#17775)
---
.../iotdb/db/pipe/event/common/row/PipeRow.java | 5 +-
.../db/pipe/event/common/row/PipeRowCollector.java | 25 ++-
.../common/tablet/PipeRawTabletInsertionEvent.java | 1 +
.../pipe/event/common/tablet/PipeTabletUtils.java | 245 +++++++++++++++++++++
.../tablet/TabletInsertionDataContainer.java | 58 ++---
.../query/TsFileInsertionQueryDataContainer.java | 5 +-
.../TsFileInsertionQueryDataTabletIterator.java | 45 ++--
.../scan/TsFileInsertionScanDataContainer.java | 105 +++++----
.../pipe/resource/memory/PipeMemoryWeightUtil.java | 12 +-
.../batch/PipeTabletEventTsFileBatch.java | 5 +-
.../request/PipeTransferTabletBatchReq.java | 6 +-
.../request/PipeTransferTabletRawReq.java | 19 +-
.../sink/protocol/opcua/server/OpcUaNameSpace.java | 5 +-
.../db/pipe/sink/util/PipeTabletEventSorter.java | 8 +
.../planner/plan/node/write/InsertTabletNode.java | 21 +-
.../plan/statement/crud/InsertTabletStatement.java | 3 +-
.../rescon/quotas/DefaultOperationQuota.java | 28 ++-
.../org/apache/iotdb/db/utils/BitMapUtils.java | 46 ++++
.../pipe/event/PipeTabletInsertionEventTest.java | 36 ++-
.../event/common/tablet/PipeTabletUtilsTest.java | 72 ++++++
.../pipe/sink/PipeDataNodeThriftRequestTest.java | 40 ++++
.../planner/node/write/WritePlanNodeSplitTest.java | 10 +
.../rescon/quotas/DefaultOperationQuotaTest.java | 64 ++++++
23 files changed, 727 insertions(+), 137 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
index 33bf0a5925c..a8579b32bb7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRow.java
@@ -152,7 +152,10 @@ public class PipeRow implements Row {
@Override
public boolean isNull(final int columnIndex) {
- return bitMaps[columnIndex].isMarked(rowIndex);
+ return bitMaps != null
+ && columnIndex < bitMaps.length
+ && bitMaps[columnIndex] != null
+ && bitMaps[columnIndex].isMarked(rowIndex);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
index 42560f23a5c..c26b05f6756 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/row/PipeRowCollector.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletEventConverter;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.pipe.api.access.Row;
import org.apache.iotdb.pipe.api.collector.RowCollector;
@@ -66,27 +67,26 @@ public class PipeRowCollector extends
PipeRawTabletEventConverter implements Row
Pair<Integer, Integer> rowCountAndMemorySize =
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(pipeRow);
tablet = new Tablet(deviceId, measurementSchemaList,
rowCountAndMemorySize.getLeft());
- tablet.initBitMaps();
isAligned = pipeRow.isAligned();
}
final int rowIndex = tablet.rowSize;
- tablet.addTimestamp(rowIndex, row.getTime());
+ PipeTabletUtils.putTimestamp(tablet, rowIndex, row.getTime());
for (int i = 0; i < row.size(); i++) {
final Object value = row.getObject(i);
- if (value instanceof org.apache.iotdb.pipe.api.type.Binary) {
- tablet.addValue(
- measurementSchemaArray[i].getMeasurementId(),
- rowIndex,
-
PipeBinaryTransformer.transformToBinary((org.apache.iotdb.pipe.api.type.Binary)
value));
- } else {
- tablet.addValue(measurementSchemaArray[i].getMeasurementId(),
rowIndex, value);
- }
+ PipeTabletUtils.putValue(
+ tablet,
+ rowIndex,
+ i,
+ measurementSchemaArray[i].getType(),
+ value instanceof org.apache.iotdb.pipe.api.type.Binary
+ ? PipeBinaryTransformer.transformToBinary(
+ (org.apache.iotdb.pipe.api.type.Binary) value)
+ : value);
if (row.isNull(i)) {
- tablet.bitMaps[i].mark(rowIndex);
+ PipeTabletUtils.markNullValue(tablet, rowIndex, i);
}
}
- tablet.rowSize++;
if (tablet.rowSize == tablet.getMaxRowNumber()) {
collectTabletInsertionEvent();
@@ -95,6 +95,7 @@ public class PipeRowCollector extends
PipeRawTabletEventConverter implements Row
private void collectTabletInsertionEvent() {
if (tablet != null) {
+ PipeTabletUtils.compactBitMaps(tablet);
tabletInsertionEventList.add(
new PipeRawTabletInsertionEvent(
tablet,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
index d322291934f..261e3c7a8b3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java
@@ -323,6 +323,7 @@ public class PipeRawTabletInsertionEvent extends
EnrichedEvent
public Tablet convertToTablet() {
if (!shouldParseTimeOrPattern()) {
+ PipeTabletUtils.compactBitMaps(tablet);
return tablet;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java
new file mode 100644
index 00000000000..0a6b073b5b6
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtils.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tablet;
+
+import org.apache.iotdb.db.utils.BitMapUtils;
+
+import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+
+import java.time.LocalDate;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public final class PipeTabletUtils {
+
+ private PipeTabletUtils() {}
+
+ public static final class TabletStringInternPool {
+
+ private final Map<String, String> internedStrings = new HashMap<>();
+
+ public String intern(final String value) {
+ if (Objects.isNull(value)) {
+ return null;
+ }
+
+ final String internedValue = internedStrings.get(value);
+ if (Objects.nonNull(internedValue)) {
+ return internedValue;
+ }
+
+ internedStrings.put(value, value);
+ return value;
+ }
+
+ public void intern(final String[] values) {
+ if (Objects.isNull(values)) {
+ return;
+ }
+
+ for (int i = 0; i < values.length; ++i) {
+ values[i] = intern(values[i]);
+ }
+ }
+
+ public void intern(final List<String> values) {
+ if (Objects.isNull(values)) {
+ return;
+ }
+
+ for (int i = 0; i < values.size(); ++i) {
+ values.set(i, intern(values.get(i)));
+ }
+ }
+
+ public Tablet intern(final Tablet tablet) {
+ if (Objects.isNull(tablet)) {
+ return null;
+ }
+
+ tablet.setDeviceId(intern(tablet.deviceId));
+ internMeasurementSchemas(tablet.getSchemas());
+ return tablet;
+ }
+
+ public void internMeasurementSchemas(final List<MeasurementSchema>
schemas) {
+ if (Objects.isNull(schemas)) {
+ return;
+ }
+
+ for (final MeasurementSchema schema : schemas) {
+ intern(schema);
+ }
+ }
+
+ public MeasurementSchema intern(final MeasurementSchema schema) {
+ if (Objects.isNull(schema)) {
+ return null;
+ }
+
+ schema.setMeasurementId(intern(schema.getMeasurementId()));
+ schema.setProps(intern(schema.getProps()));
+ return schema;
+ }
+
+ private Map<String, String> intern(final Map<String, String> props) {
+ if (Objects.isNull(props) || props.isEmpty()) {
+ return props;
+ }
+
+ final Map<String, String> internedProps = new HashMap<>(props.size());
+ for (final Map.Entry<String, String> entry : props.entrySet()) {
+ internedProps.put(intern(entry.getKey()), intern(entry.getValue()));
+ }
+ return internedProps;
+ }
+ }
+
+ public static Tablet internTablet(
+ final Tablet tablet, final TabletStringInternPool
tabletStringInternPool) {
+ return Objects.nonNull(tabletStringInternPool) ?
tabletStringInternPool.intern(tablet) : tablet;
+ }
+
+ public static void compactBitMaps(final Tablet tablet) {
+ if (Objects.isNull(tablet)) {
+ return;
+ }
+ tablet.bitMaps = compactBitMaps(tablet.bitMaps, tablet.rowSize);
+ }
+
+ public static BitMap[] compactBitMaps(final BitMap[] bitMaps, final int
rowCount) {
+ return BitMapUtils.compactBitMaps(bitMaps, rowCount);
+ }
+
+ public static BitMap[] copyBitMapsOrCreateEmpty(final Tablet tablet) {
+ final BitMap[] bitMaps = tablet.bitMaps;
+ return Objects.nonNull(bitMaps)
+ ? Arrays.copyOf(bitMaps, bitMaps.length)
+ : new BitMap[getColumnCount(tablet)];
+ }
+
+ public static void markNullValue(final Tablet tablet, final int rowIndex,
final int columnIndex) {
+ final BitMap[] bitMaps = ensureBitMaps(tablet, columnIndex + 1);
+ if (Objects.isNull(bitMaps[columnIndex])) {
+ bitMaps[columnIndex] = new BitMap(tablet.getMaxRowNumber());
+ }
+ bitMaps[columnIndex].mark(rowIndex);
+ }
+
+ public static void putTimestamp(final Tablet tablet, final int rowIndex,
final long timestamp) {
+ tablet.timestamps[rowIndex] = timestamp;
+ tablet.rowSize = Math.max(tablet.rowSize, rowIndex + 1);
+ }
+
+ public static void putValue(
+ final Tablet tablet,
+ final int rowIndex,
+ final int columnIndex,
+ final TSDataType dataType,
+ final Object value) {
+ switch (dataType) {
+ case BOOLEAN:
+ ((boolean[]) tablet.values[columnIndex])[rowIndex] = (Boolean) value;
+ break;
+ case INT32:
+ ((int[]) tablet.values[columnIndex])[rowIndex] = (Integer) value;
+ break;
+ case DATE:
+ ((LocalDate[]) tablet.values[columnIndex])[rowIndex] = (LocalDate)
value;
+ break;
+ case INT64:
+ case TIMESTAMP:
+ ((long[]) tablet.values[columnIndex])[rowIndex] = (Long) value;
+ break;
+ case FLOAT:
+ ((float[]) tablet.values[columnIndex])[rowIndex] = (Float) value;
+ break;
+ case DOUBLE:
+ ((double[]) tablet.values[columnIndex])[rowIndex] = (Double) value;
+ break;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ ((Binary[]) tablet.values[columnIndex])[rowIndex] = toBinary(value);
+ break;
+ default:
+ throw new UnSupportedDataTypeException("Unsupported data type: " +
dataType);
+ }
+ unmarkNullValue(tablet, rowIndex, columnIndex);
+ }
+
+ private static void unmarkNullValue(
+ final Tablet tablet, final int rowIndex, final int columnIndex) {
+ final BitMap[] bitMaps = tablet.bitMaps;
+ if (Objects.nonNull(bitMaps)
+ && columnIndex < bitMaps.length
+ && Objects.nonNull(bitMaps[columnIndex])) {
+ bitMaps[columnIndex].unmark(rowIndex);
+ }
+ }
+
+ private static BitMap[] ensureBitMaps(final Tablet tablet, final int
minColumnCount) {
+ final int columnCount = Math.max(getColumnCount(tablet), minColumnCount);
+ BitMap[] bitMaps = tablet.bitMaps;
+ if (Objects.isNull(bitMaps)) {
+ bitMaps = new BitMap[columnCount];
+ tablet.bitMaps = bitMaps;
+ } else if (bitMaps.length < columnCount) {
+ final BitMap[] expandedBitMaps = new BitMap[columnCount];
+ System.arraycopy(bitMaps, 0, expandedBitMaps, 0, bitMaps.length);
+ bitMaps = expandedBitMaps;
+ tablet.bitMaps = bitMaps;
+ }
+ return bitMaps;
+ }
+
+ private static int getColumnCount(final Tablet tablet) {
+ if (Objects.nonNull(tablet.getSchemas())) {
+ return tablet.getSchemas().size();
+ }
+ return Objects.nonNull(tablet.values) ? tablet.values.length : 0;
+ }
+
+ private static Binary toBinary(final Object value) {
+ if (Objects.isNull(value)) {
+ return Binary.EMPTY_VALUE;
+ }
+ if (value instanceof Binary) {
+ return (Binary) value;
+ }
+ if (value instanceof byte[]) {
+ return new Binary((byte[]) value);
+ }
+ if (value instanceof String) {
+ return new Binary(((String)
value).getBytes(TSFileConfig.STRING_CHARSET));
+ }
+ throw new IllegalArgumentException(
+ String.format("Expected Binary, byte[] or String, but was %s.",
value.getClass()));
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
index 133dbb5bff8..d8c2bccaa97 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/TabletInsertionDataContainer.java
@@ -198,6 +198,7 @@ public class TabletInsertionDataContainer {
}
this.rowCount = this.timestampColumn.length;
+ this.nullValueColumnBitmaps =
PipeTabletUtils.compactBitMaps(nullValueColumnBitmaps, rowCount);
if (this.rowCount == 0 && LOGGER.isDebugEnabled()) {
LOGGER.debug(
"InsertRowNode({}) is parsed to zero rows according to the
pattern({}) and time range [{}, {}], the corresponding source event({}) will be
ignored.",
@@ -217,7 +218,6 @@ public class TabletInsertionDataContainer {
this.isAligned = insertTabletNode.isAligned();
final long[] originTimestampColumn = insertTabletNode.getTimes();
- final int originRowSize = originTimestampColumn.length;
final List<Integer> rowIndexList =
generateRowIndexList(originTimestampColumn);
this.timestampColumn = rowIndexList.stream().mapToLong(i ->
originTimestampColumn[i]).toArray();
@@ -243,18 +243,7 @@ public class TabletInsertionDataContainer {
final String[] originColumnNameStringList =
insertTabletNode.getMeasurements();
final TSDataType[] originValueColumnTypes =
insertTabletNode.getDataTypes();
final Object[] originValueColumns = insertTabletNode.getColumns();
- final BitMap[] originBitMapList =
- (insertTabletNode.getBitMaps() == null
- ? IntStream.range(0, originColumnSize)
- .boxed()
- .map(o -> new BitMap(originRowSize))
- .toArray(BitMap[]::new)
- : insertTabletNode.getBitMaps());
- for (int i = 0; i < originBitMapList.length; i++) {
- if (originBitMapList[i] == null) {
- originBitMapList[i] = new BitMap(originRowSize);
- }
- }
+ final BitMap[] originBitMapList = insertTabletNode.getBitMaps();
for (int i = 0; i <
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
@@ -277,7 +266,7 @@ public class TabletInsertionDataContainer {
originValueColumns[i],
rowIndexList,
false,
- originBitMapList[i],
+ getBitMap(originBitMapList, i),
bitMap);
}
this.nullValueColumnBitmaps[filteredColumnIndex] = bitMap;
@@ -285,6 +274,7 @@ public class TabletInsertionDataContainer {
}
this.rowCount = this.timestampColumn.length;
+ this.nullValueColumnBitmaps =
PipeTabletUtils.compactBitMaps(nullValueColumnBitmaps, rowCount);
if (rowCount == 0 && LOGGER.isDebugEnabled()) {
LOGGER.debug(
"InsertTabletNode({}) is parsed to zero rows according to the
pattern({}) and time range [{}, {}], the corresponding source event({}) will be
ignored.",
@@ -338,18 +328,7 @@ public class TabletInsertionDataContainer {
}
final Object[] originValueColumns =
tablet.values; // we do not reduce value columns here by origin row
size
- final BitMap[] originBitMapList =
- tablet.bitMaps == null
- ? IntStream.range(0, originColumnSize)
- .boxed()
- .map(o -> new BitMap(tablet.getMaxRowNumber()))
- .toArray(BitMap[]::new)
- : tablet.bitMaps; // We do not reduce bitmaps here by origin row
size
- for (int i = 0; i < originBitMapList.length; i++) {
- if (originBitMapList[i] == null) {
- originBitMapList[i] = new BitMap(tablet.getMaxRowNumber());
- }
- }
+ final BitMap[] originBitMapList = tablet.bitMaps;
for (int i = 0; i <
originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
@@ -372,7 +351,7 @@ public class TabletInsertionDataContainer {
originValueColumns[i],
rowIndexList,
false,
- originBitMapList[i],
+ getBitMap(originBitMapList, i),
bitMap);
}
this.nullValueColumnBitmaps[filteredColumnIndex] = bitMap;
@@ -380,6 +359,7 @@ public class TabletInsertionDataContainer {
}
this.rowCount = this.timestampColumn.length;
+ this.nullValueColumnBitmaps =
PipeTabletUtils.compactBitMaps(nullValueColumnBitmaps, rowCount);
if (this.rowCount == 0 && LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Tablet({}) is parsed to zero rows according to the pattern({}) and
time range [{}, {}], the corresponding source event({}) will be ignored.",
@@ -471,7 +451,7 @@ public class TabletInsertionDataContainer {
: (int[]) originValueColumn;
final int[] valueColumns = new int[rowIndexList.size()];
for (int i = 0; i < rowIndexList.size(); ++i) {
- if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i)))
{
valueColumns[i] = 0;
nullValueColumnBitmap.mark(i);
} else {
@@ -493,7 +473,7 @@ public class TabletInsertionDataContainer {
: (LocalDate[]) originValueColumn;
for (int i = 0; i < rowIndexList.size(); ++i) {
- if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ if (isNullValue(originNullValueColumnBitmap,
rowIndexList.get(i))) {
valueColumns[i] = EMPTY_LOCALDATE;
nullValueColumnBitmap.mark(i);
} else {
@@ -507,7 +487,7 @@ public class TabletInsertionDataContainer {
? new int[] {(int) originValueColumn}
: (int[]) originValueColumn;
for (int i = 0; i < rowIndexList.size(); ++i) {
- if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ if (isNullValue(originNullValueColumnBitmap,
rowIndexList.get(i))) {
valueColumns[i] = EMPTY_LOCALDATE;
nullValueColumnBitmap.mark(i);
} else {
@@ -527,7 +507,7 @@ public class TabletInsertionDataContainer {
: (long[]) originValueColumn;
final long[] valueColumns = new long[rowIndexList.size()];
for (int i = 0; i < rowIndexList.size(); ++i) {
- if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i)))
{
valueColumns[i] = 0L;
nullValueColumnBitmap.mark(i);
} else {
@@ -544,7 +524,7 @@ public class TabletInsertionDataContainer {
: (float[]) originValueColumn;
final float[] valueColumns = new float[rowIndexList.size()];
for (int i = 0; i < rowIndexList.size(); ++i) {
- if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i)))
{
valueColumns[i] = 0F;
nullValueColumnBitmap.mark(i);
} else {
@@ -561,7 +541,7 @@ public class TabletInsertionDataContainer {
: (double[]) originValueColumn;
final double[] valueColumns = new double[rowIndexList.size()];
for (int i = 0; i < rowIndexList.size(); ++i) {
- if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i)))
{
valueColumns[i] = 0D;
nullValueColumnBitmap.mark(i);
} else {
@@ -578,7 +558,7 @@ public class TabletInsertionDataContainer {
: (boolean[]) originValueColumn;
final boolean[] valueColumns = new boolean[rowIndexList.size()];
for (int i = 0; i < rowIndexList.size(); ++i) {
- if (originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ if (isNullValue(originNullValueColumnBitmap, rowIndexList.get(i)))
{
valueColumns[i] = false;
nullValueColumnBitmap.mark(i);
} else {
@@ -599,7 +579,7 @@ public class TabletInsertionDataContainer {
for (int i = 0; i < rowIndexList.size(); ++i) {
if (Objects.isNull(binaryValueColumns[rowIndexList.get(i)])
||
Objects.isNull(binaryValueColumns[rowIndexList.get(i)].getValues())
- || originNullValueColumnBitmap.isMarked(rowIndexList.get(i))) {
+ || isNullValue(originNullValueColumnBitmap,
rowIndexList.get(i))) {
valueColumns[i] = Binary.EMPTY_VALUE;
nullValueColumnBitmap.mark(i);
} else {
@@ -659,6 +639,14 @@ public class TabletInsertionDataContainer {
}
}
+ private static BitMap getBitMap(final BitMap[] bitMaps, final int index) {
+ return Objects.nonNull(bitMaps) && index < bitMaps.length ? bitMaps[index]
: null;
+ }
+
+ private static boolean isNullValue(final BitMap bitMap, final int rowIndex) {
+ return Objects.nonNull(bitMap) && bitMap.isMarked(rowIndex);
+ }
+
//////////////////////////// process ////////////////////////////
public List<TabletInsertionEvent> processRowByRow(final BiConsumer<Row,
RowCollector> consumer) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
index 4353e4984a2..e1fa58f5a0f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataContainer.java
@@ -25,6 +25,7 @@ import
org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -70,6 +71,7 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
private final Iterator<Map.Entry<IDeviceID, List<String>>>
deviceMeasurementsMapIterator;
private final Map<IDeviceID, Boolean> deviceIsAlignedMap;
private final Map<String, TSDataType> measurementDataTypeMap;
+ private final TabletStringInternPool tabletStringInternPool = new
TabletStringInternPool();
@TestOnly
public TsFileInsertionQueryDataContainer(
@@ -385,7 +387,8 @@ public class TsFileInsertionQueryDataContainer extends
TsFileInsertionDataContai
entry.getValue(),
timeFilterExpression,
allocatedMemoryBlockForTablet,
- currentModifications);
+ currentModifications,
+ tabletStringInternPool);
} catch (final Exception e) {
close();
throw new PipeException(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
index e16c7113da3..2e81f4aa335 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/query/TsFileInsertionQueryDataTabletIterator.java
@@ -20,6 +20,8 @@
package org.apache.iotdb.db.pipe.event.common.tsfile.container.query;
import org.apache.iotdb.commons.path.PatternTreeMap;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
@@ -37,6 +39,7 @@ import org.apache.tsfile.read.common.RowRecord;
import org.apache.tsfile.read.expression.IExpression;
import org.apache.tsfile.read.expression.QueryExpression;
import org.apache.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -57,6 +60,7 @@ public class TsFileInsertionQueryDataTabletIterator
implements Iterator<Tablet>
private final String deviceId;
private final List<String> measurements;
+ private final List<MeasurementSchema> schemas;
private final IExpression timeFilterExpression;
@@ -76,20 +80,28 @@ public class TsFileInsertionQueryDataTabletIterator
implements Iterator<Tablet>
final List<String> measurements,
final IExpression timeFilterExpression,
final PipeMemoryBlock allocatedBlockForTablet,
- final PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>
currentModifications)
+ final PatternTreeMap<Modification, PatternTreeMapFactory.ModsSerializer>
currentModifications,
+ final TabletStringInternPool tabletStringInternPool)
throws IOException {
this.tsFileReader = tsFileReader;
this.measurementDataTypeMap = measurementDataTypeMap;
- this.deviceId = deviceId;
+ this.deviceId = tabletStringInternPool.intern(deviceId);
this.measurements =
measurements.stream()
.filter(
measurement ->
// time column in aligned time-series should not be a
query column
measurement != null && !measurement.isEmpty())
+ .map(tabletStringInternPool::intern)
.sorted()
.collect(Collectors.toList());
+ this.schemas = new ArrayList<>();
+ for (final String measurement : this.measurements) {
+ final TSDataType dataType =
+ measurementDataTypeMap.get(this.deviceId +
TsFileConstant.PATH_SEPARATOR + measurement);
+ schemas.add(new MeasurementSchema(measurement, dataType));
+ }
this.timeFilterExpression = timeFilterExpression;
@@ -99,7 +111,7 @@ public class TsFileInsertionQueryDataTabletIterator
implements Iterator<Tablet>
this.measurementModsList =
ModsOperationUtil.initializeMeasurementMods(
- deviceId, this.measurements, currentModifications);
+ this.deviceId, this.measurements, currentModifications);
}
private QueryDataSet buildQueryDataSet() throws IOException {
@@ -133,18 +145,9 @@ public class TsFileInsertionQueryDataTabletIterator
implements Iterator<Tablet>
}
private Tablet buildNextTablet() throws IOException {
- final List<MeasurementSchema> schemas = new ArrayList<>();
- for (final String measurement : measurements) {
- final TSDataType dataType =
- measurementDataTypeMap.get(deviceId + TsFileConstant.PATH_SEPARATOR
+ measurement);
- schemas.add(new MeasurementSchema(measurement, dataType));
- }
-
Tablet tablet = null;
if (!queryDataSet.hasNext()) {
- tablet = new Tablet(deviceId, schemas, 1);
- tablet.initBitMaps();
- return tablet;
+ return new Tablet(deviceId, schemas, 1);
}
boolean isFirstRow = true;
@@ -156,7 +159,6 @@ public class TsFileInsertionQueryDataTabletIterator
implements Iterator<Tablet>
Pair<Integer, Integer> rowCountAndMemorySize =
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(rowRecord);
tablet = new Tablet(deviceId, schemas,
rowCountAndMemorySize.getLeft());
- tablet.initBitMaps();
if (allocatedBlockForTablet.getMemoryUsageInBytes() <
rowCountAndMemorySize.getRight()) {
PipeDataNodeResourceManager.memory()
.forceResize(allocatedBlockForTablet,
rowCountAndMemorySize.getRight());
@@ -172,27 +174,30 @@ public class TsFileInsertionQueryDataTabletIterator
implements Iterator<Tablet>
final int fieldSize = fields.size();
for (int i = 0; i < fieldSize; i++) {
final Field field = fields.get(i);
- final String measurement = measurements.get(i);
+ final TSDataType dataType = schemas.get(i).getType();
// Check if this value is deleted by mods
if (field == null
|| ModsOperationUtil.isDelete(rowRecord.getTimestamp(),
measurementModsList.get(i))) {
- tablet.bitMaps[i].mark(rowIndex);
+ if (dataType != null && dataType.isBinary()) {
+ PipeTabletUtils.putValue(tablet, rowIndex, i, dataType,
Binary.EMPTY_VALUE);
+ }
+ PipeTabletUtils.markNullValue(tablet, rowIndex, i);
} else {
- tablet.addValue(measurement, rowIndex,
field.getObjectValue(schemas.get(i).getType()));
+ PipeTabletUtils.putValue(
+ tablet, rowIndex, i, dataType,
field.getObjectValue(schemas.get(i).getType()));
isNeedFillTime = true;
}
}
if (isNeedFillTime) {
- tablet.addTimestamp(rowIndex, rowRecord.getTimestamp());
+ PipeTabletUtils.putTimestamp(tablet, rowIndex,
rowRecord.getTimestamp());
}
- tablet.rowSize++;
-
if (tablet.rowSize == tablet.getMaxRowNumber()) {
break;
}
}
+ PipeTabletUtils.compactBitMaps(tablet);
return tablet;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
index 9366d0f62df..e903c7340e4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java
@@ -26,6 +26,8 @@ import
org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
import
org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
@@ -93,6 +95,7 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
private String currentDevice;
private boolean currentIsAligned;
private final List<MeasurementSchema> currentMeasurements = new
ArrayList<>();
+ private final TabletStringInternPool tabletStringInternPool = new
TabletStringInternPool();
private final List<ModsOperationUtil.ModsInfo> modsInfos = new ArrayList<>();
// Cached time chunk
@@ -272,7 +275,6 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
if (!data.hasCurrent()) {
tablet = new Tablet(currentDevice, currentMeasurements, 1);
- tablet.initBitMaps();
// Ignore the memory cost of tablet
PipeDataNodeResourceManager.memory().forceResize(allocatedMemoryBlockForTablet,
0);
return tablet;
@@ -288,7 +290,6 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
PipeMemoryWeightUtil.calculateTabletRowCountAndMemory(data);
tablet =
new Tablet(currentDevice, currentMeasurements,
rowCountAndMemorySize.getLeft());
- tablet.initBitMaps();
if (allocatedMemoryBlockForTablet.getMemoryUsageInBytes()
< rowCountAndMemorySize.getRight()) {
PipeDataNodeResourceManager.memory()
@@ -300,10 +301,8 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
final int rowIndex = tablet.rowSize;
if (putValueToColumns(data, tablet, rowIndex)) {
- tablet.addTimestamp(rowIndex, data.currentTime());
+ PipeTabletUtils.putTimestamp(tablet, rowIndex, data.currentTime());
}
-
- tablet.rowSize++;
}
data.next();
@@ -318,13 +317,13 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
if (tablet == null) {
tablet = new Tablet(currentDevice, currentMeasurements, 1);
- tablet.initBitMaps();
}
// Switch chunk reader iff current chunk is all consumed
if (!data.hasCurrent()) {
prepareData();
}
+ PipeTabletUtils.compactBitMaps(tablet);
return tablet;
} catch (final Exception e) {
close();
@@ -372,81 +371,100 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
}
private boolean putValueToColumns(final BatchData data, final Tablet tablet,
final int rowIndex) {
- final Object[] columns = tablet.values;
boolean isNeedFillTime = false;
if (data.getDataType() == TSDataType.VECTOR) {
- for (int i = 0; i < columns.length; ++i) {
+ for (int i = 0; i < tablet.getSchemas().size(); ++i) {
final TsPrimitiveType primitiveType = data.getVector()[i];
+ final TSDataType type = tablet.getSchemas().get(i).getType();
if (Objects.isNull(primitiveType)
|| ModsOperationUtil.isDelete(data.currentTime(),
modsInfos.get(i))) {
- tablet.bitMaps[i].mark(rowIndex);
- final TSDataType type = tablet.getSchemas().get(i).getType();
if (type == TSDataType.TEXT || type == TSDataType.BLOB || type ==
TSDataType.STRING) {
- ((Binary[]) columns[i])[rowIndex] = Binary.EMPTY_VALUE;
- }
- if (type == TSDataType.DATE) {
- ((LocalDate[]) columns[i])[rowIndex] = EMPTY_DATE;
+ PipeTabletUtils.putValue(tablet, rowIndex, i, type,
Binary.EMPTY_VALUE);
}
+ PipeTabletUtils.markNullValue(tablet, rowIndex, i);
continue;
}
isNeedFillTime = true;
- switch (tablet.getSchemas().get(i).getType()) {
+ switch (type) {
case BOOLEAN:
- ((boolean[]) columns[i])[rowIndex] = primitiveType.getBoolean();
+ PipeTabletUtils.putValue(tablet, rowIndex, i, type,
primitiveType.getBoolean());
break;
case INT32:
- ((int[]) columns[i])[rowIndex] = primitiveType.getInt();
+ PipeTabletUtils.putValue(tablet, rowIndex, i, type,
primitiveType.getInt());
break;
case DATE:
- ((LocalDate[]) columns[i])[rowIndex] =
- DateUtils.parseIntToLocalDate(primitiveType.getInt());
+ PipeTabletUtils.putValue(
+ tablet, rowIndex, i, type,
DateUtils.parseIntToLocalDate(primitiveType.getInt()));
break;
case INT64:
case TIMESTAMP:
- ((long[]) columns[i])[rowIndex] = primitiveType.getLong();
+ PipeTabletUtils.putValue(tablet, rowIndex, i, type,
primitiveType.getLong());
break;
case FLOAT:
- ((float[]) columns[i])[rowIndex] = primitiveType.getFloat();
+ PipeTabletUtils.putValue(tablet, rowIndex, i, type,
primitiveType.getFloat());
break;
case DOUBLE:
- ((double[]) columns[i])[rowIndex] = primitiveType.getDouble();
+ PipeTabletUtils.putValue(tablet, rowIndex, i, type,
primitiveType.getDouble());
break;
case TEXT:
case BLOB:
case STRING:
- ((Binary[]) columns[i])[rowIndex] = primitiveType.getBinary();
+ final Binary binary = primitiveType.getBinary();
+ PipeTabletUtils.putValue(
+ tablet,
+ rowIndex,
+ i,
+ type,
+ Objects.isNull(binary) || Objects.isNull(binary.getValues())
+ ? Binary.EMPTY_VALUE
+ : binary);
break;
default:
throw new UnSupportedDataTypeException("UnSupported" +
primitiveType.getDataType());
}
}
} else {
+ if (!modsInfos.isEmpty()
+ && ModsOperationUtil.isDelete(data.currentTime(), modsInfos.get(0)))
{
+ return false;
+ }
+
isNeedFillTime = true;
- switch (tablet.getSchemas().get(0).getType()) {
+ final TSDataType type = tablet.getSchemas().get(0).getType();
+ switch (type) {
case BOOLEAN:
- ((boolean[]) columns[0])[rowIndex] = data.getBoolean();
+ PipeTabletUtils.putValue(tablet, rowIndex, 0, type,
data.getBoolean());
break;
case INT32:
- ((int[]) columns[0])[rowIndex] = data.getInt();
+ PipeTabletUtils.putValue(tablet, rowIndex, 0, type, data.getInt());
break;
case DATE:
- ((LocalDate[]) columns[0])[rowIndex] =
DateUtils.parseIntToLocalDate(data.getInt());
+ PipeTabletUtils.putValue(
+ tablet, rowIndex, 0, type,
DateUtils.parseIntToLocalDate(data.getInt()));
break;
case INT64:
case TIMESTAMP:
- ((long[]) columns[0])[rowIndex] = data.getLong();
+ PipeTabletUtils.putValue(tablet, rowIndex, 0, type, data.getLong());
break;
case FLOAT:
- ((float[]) columns[0])[rowIndex] = data.getFloat();
+ PipeTabletUtils.putValue(tablet, rowIndex, 0, type, data.getFloat());
break;
case DOUBLE:
- ((double[]) columns[0])[rowIndex] = data.getDouble();
+ PipeTabletUtils.putValue(tablet, rowIndex, 0, type,
data.getDouble());
break;
case TEXT:
case BLOB:
case STRING:
- ((Binary[]) columns[0])[rowIndex] = data.getBinary();
+ final Binary binary = data.getBinary();
+ PipeTabletUtils.putValue(
+ tablet,
+ rowIndex,
+ 0,
+ type,
+ Objects.isNull(binary) || Objects.isNull(binary.getValues())
+ ? Binary.EMPTY_VALUE
+ : binary);
break;
default:
throw new UnSupportedDataTypeException("UnSupported" +
data.getDataType());
@@ -560,13 +578,13 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
? new ChunkReader(chunk, filter)
: new SinglePageWholeChunkReader(chunk);
currentIsAligned = false;
+ final String measurementID =
+ tabletStringInternPool.intern(chunkHeader.getMeasurementID());
currentMeasurements.add(
- new MeasurementSchema(chunkHeader.getMeasurementID(),
chunkHeader.getDataType()));
+ new MeasurementSchema(measurementID,
chunkHeader.getDataType()));
modsInfos.addAll(
ModsOperationUtil.initializeMeasurementMods(
- currentDevice,
- Collections.singletonList(chunkHeader.getMeasurementID()),
- currentModifications));
+ currentDevice, Collections.singletonList(measurementID),
currentModifications));
return;
}
case MetaMarker.VALUE_CHUNK_HEADER:
@@ -615,9 +633,11 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
}
// Increase value index
+ final String measurementID =
+
tabletStringInternPool.intern(chunkHeader.getMeasurementID());
final int valueIndex =
measurementIndexMap.compute(
- chunkHeader.getMeasurementID(),
+ measurementID,
(measurement, index) -> Objects.nonNull(index) ? index +
1 : 0);
// Emit when encountered non-sequential value chunk, or the
chunk size exceeds
@@ -677,13 +697,13 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
valueChunkSize += chunkHeader.getDataSize();
valueChunkPageMemorySize += currentValueChunkPageMemorySize;
valueChunkList.add(chunk);
+ final String measurementID =
+ tabletStringInternPool.intern(chunkHeader.getMeasurementID());
currentMeasurements.add(
- new MeasurementSchema(chunkHeader.getMeasurementID(),
chunkHeader.getDataType()));
+ new MeasurementSchema(measurementID,
chunkHeader.getDataType()));
modsInfos.addAll(
ModsOperationUtil.initializeMeasurementMods(
- currentDevice,
- Collections.singletonList(chunkHeader.getMeasurementID()),
- currentModifications));
+ currentDevice, Collections.singletonList(measurementID),
currentModifications));
break;
}
case MetaMarker.CHUNK_GROUP_HEADER:
@@ -702,7 +722,10 @@ public class TsFileInsertionScanDataContainer extends
TsFileInsertionDataContain
timeChunkPageMemorySizeList.clear();
measurementIndexMap.clear();
- currentDevice = pattern.mayOverlapWithDevice(deviceID) ? deviceID
: null;
+ currentDevice =
+ pattern.mayOverlapWithDevice(deviceID)
+ ? tabletStringInternPool.intern(deviceID)
+ : null;
break;
}
case MetaMarker.OPERATION_INDEX_RANGE:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
index a707f554c51..a22522666c2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java
@@ -179,14 +179,14 @@ public class PipeMemoryWeightUtil {
return new Pair<>(1, 0);
}
- // Calculate row number according to the max size of a pipe tablet.
- // "-100" is the estimated size of other data structures in a pipe tablet.
+ // Calculate row number according to the max size of a pipe tablet. "100"
is the estimated size
+ // of other data structures in a pipe tablet.
// "*8" converts bytes to bits, because the bitmap size is 1 bit per
schema.
- // Here we estimate the max use of
int sizeLimit =
- Math.min(
-
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
- (int) (inputNum * rowBytesUsed * 1.2));
+ (int)
+ Math.min(
+
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
+ Math.min(Integer.MAX_VALUE, 100 + inputNum * (double)
rowBytesUsed * 1.2));
int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount);
rowNumber = Math.max(1, rowNumber);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
index 36ca10daa72..2e69b239277 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventTsFileBatch.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.pipe.sink.util.PipeTabletEventSorter;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -361,7 +362,7 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
// Aggregate the current tablet's data
aggregatedSchemas.addAll(tablet.getSchemas());
aggregatedValues.addAll(Arrays.asList(tablet.values));
- aggregatedBitMaps.addAll(Arrays.asList(tablet.bitMaps));
+
aggregatedBitMaps.addAll(Arrays.asList(PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet)));
// Remove the aggregated tablet
tablets.pollFirst();
} else {
@@ -563,7 +564,7 @@ public class PipeTabletEventTsFileBatch extends
PipeTabletEventBatch {
.map(schema -> (MeasurementSchema) schema)
.toArray(MeasurementSchema[]::new);
Object[] values = Arrays.copyOf(tablet.values, tablet.values.length);
- BitMap[] bitMaps = Arrays.copyOf(tablet.bitMaps,
tablet.bitMaps.length);
+ BitMap[] bitMaps = PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet);
// convert date value to int refer to
//
org.apache.iotdb.db.storageengine.dataregion.memtable.WritableMemChunk.writeNonAlignedTablet
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
index 94a838ee0ad..266894060dc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
import org.apache.iotdb.commons.utils.TestOnly;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
@@ -34,7 +35,6 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.ReadWriteIOUtils;
-import org.apache.tsfile.write.record.Tablet;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -130,6 +130,7 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
public static PipeTransferTabletBatchReq fromTPipeTransferReq(
final TPipeTransferReq transferReq) {
final PipeTransferTabletBatchReq batchReq = new
PipeTransferTabletBatchReq();
+ final TabletStringInternPool tabletStringInternPool = new
TabletStringInternPool();
// Binary req, for rolling upgrading
ReadWriteIOUtils.readInt(transferReq.body);
@@ -144,8 +145,7 @@ public class PipeTransferTabletBatchReq extends
TPipeTransferReq {
size = ReadWriteIOUtils.readInt(transferReq.body);
for (int i = 0; i < size; ++i) {
batchReq.tabletReqs.add(
- PipeTransferTabletRawReq.toTPipeTransferRawReq(
- Tablet.deserialize(transferReq.body),
ReadWriteIOUtils.readBool(transferReq.body)));
+ PipeTransferTabletRawReq.toTPipeTransferRawReq(transferReq.body,
tabletStringInternPool));
}
batchReq.version = transferReq.version;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
index 47bf4d44897..60619fd4268 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletRawReq.java
@@ -22,6 +22,8 @@ package
org.apache.iotdb.db.pipe.sink.payload.evolvable.request;
import org.apache.iotdb.commons.exception.MetadataException;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.IoTDBSinkRequestVersion;
import
org.apache.iotdb.commons.pipe.sink.payload.thrift.request.PipeRequestType;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils.TabletStringInternPool;
import org.apache.iotdb.db.pipe.sink.util.PipeTabletEventSorter;
import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
@@ -82,6 +84,17 @@ public class PipeTransferTabletRawReq extends
TPipeTransferReq {
return tabletReq;
}
+ public static PipeTransferTabletRawReq toTPipeTransferRawReq(
+ final ByteBuffer buffer, final TabletStringInternPool
tabletStringInternPool) {
+ final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq();
+
+ tabletReq.tablet =
+ PipeTabletUtils.internTablet(Tablet.deserialize(buffer),
tabletStringInternPool);
+ tabletReq.isAligned = ReadWriteIOUtils.readBool(buffer);
+
+ return tabletReq;
+ }
+
/////////////////////////////// Thrift ///////////////////////////////
public static PipeTransferTabletRawReq toTPipeTransferReq(
@@ -105,10 +118,8 @@ public class PipeTransferTabletRawReq extends
TPipeTransferReq {
}
public static PipeTransferTabletRawReq fromTPipeTransferReq(final
TPipeTransferReq transferReq) {
- final PipeTransferTabletRawReq tabletReq = new PipeTransferTabletRawReq();
-
- tabletReq.tablet = Tablet.deserialize(transferReq.body);
- tabletReq.isAligned = ReadWriteIOUtils.readBool(transferReq.body);
+ final PipeTransferTabletRawReq tabletReq =
+ toTPipeTransferRawReq(transferReq.body, new TabletStringInternPool());
tabletReq.version = transferReq.version;
tabletReq.type = transferReq.type;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
index 3a42ec87969..713a87b2e35 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/opcua/server/OpcUaNameSpace.java
@@ -374,7 +374,10 @@ public class OpcUaNameSpace extends
ManagedNamespaceWithLifecycle {
for (int rowIndex = 0; rowIndex < tablet.rowSize; ++rowIndex) {
// Filter null value
- if (tablet.bitMaps[columnIndex].isMarked(rowIndex)) {
+ if (tablet.bitMaps != null
+ && columnIndex < tablet.bitMaps.length
+ && tablet.bitMaps[columnIndex] != null
+ && tablet.bitMaps[columnIndex].isMarked(rowIndex)) {
continue;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/PipeTabletEventSorter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/PipeTabletEventSorter.java
index 17e7a7c13e2..50beb6405e9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/PipeTabletEventSorter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/util/PipeTabletEventSorter.java
@@ -19,6 +19,8 @@
package org.apache.iotdb.db.pipe.sink.util;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
+
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.utils.Binary;
import org.apache.tsfile.utils.BitMap;
@@ -129,6 +131,7 @@ public class PipeTabletEventSorter {
// Col: [6, 1]
private void sortAndMayDeduplicateValuesAndBitMaps() {
int columnIndex = 0;
+ boolean bitMapsModified = false;
for (int i = 0, size = tablet.getSchemas().size(); i < size; i++) {
final IMeasurementSchema schema = tablet.getSchemas().get(i);
if (schema != null) {
@@ -145,10 +148,15 @@ public class PipeTabletEventSorter {
if (tablet.bitMaps != null && tablet.bitMaps[columnIndex] != null) {
tablet.bitMaps[columnIndex] = deDuplicatedBitMap;
+ bitMapsModified = true;
}
columnIndex++;
}
}
+
+ if (bitMapsModified) {
+ tablet.bitMaps = PipeTabletUtils.compactBitMaps(tablet.bitMaps,
deDuplicatedSize);
+ }
}
private Object reorderValueListAndBitMap(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
index faacc10eccd..6a1e9bf1bb4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/write/InsertTabletNode.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.WritePlanNode;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
+import org.apache.iotdb.db.utils.BitMapUtils;
import org.apache.iotdb.db.utils.QueryDataSetUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -277,7 +278,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
long[] subTimes = new long[count];
int destLoc = 0;
Object[] values = initTabletValues(dataTypes.length, count, dataTypes);
- BitMap[] bitMaps = this.bitMaps == null ? null :
initBitmaps(dataTypes.length, count);
+ BitMap[] bitMaps = initBitmapsForSplit(dataTypes.length, count);
System.arraycopy(times, start, subTimes, destLoc, end - start);
for (int k = 0; k < values.length; k++) {
if (dataTypes[k] != null) {
@@ -302,6 +303,7 @@ public class InsertTabletNode extends InsertNode implements
WALEntryValue {
subNode.setFailedMeasurementNumber(getFailedMeasurementNumber());
subNode.setRange(locs);
subNode.setDataRegionReplicaSet(entry.getKey());
+ subNode.bitMaps = BitMapUtils.compactBitMaps(subNode.bitMaps,
subNode.rowCount);
result.add(subNode);
}
}
@@ -366,6 +368,23 @@ public class InsertTabletNode extends InsertNode
implements WALEntryValue {
return bitMaps;
}
+ protected BitMap[] initBitmapsForSplit(int columnSize, int rowSize) {
+ if (this.bitMaps == null) {
+ return null;
+ }
+
+ final int sourceRowCount = rowCount > 0 ? rowCount : times == null ? 0 :
times.length;
+ final BitMap[] splitBitMaps = new BitMap[columnSize];
+ boolean hasBitMap = false;
+ for (int i = 0; i < columnSize && i < this.bitMaps.length; ++i) {
+ if (this.bitMaps[i] != null && !this.bitMaps[i].isAllUnmarked()) {
+ splitBitMaps[i] = new BitMap(rowSize);
+ hasBitMap = true;
+ }
+ }
+ return hasBitMap ? splitBitMaps : null;
+ }
+
@Override
public void markFailedMeasurement(int index) {
if (measurements[index] == null) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
index 2e393678c5d..525bb48f0dd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/InsertTabletStatement.java
@@ -34,6 +34,7 @@ import
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeDeviceP
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ISchemaValidation;
import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+import org.apache.iotdb.db.utils.BitMapUtils;
import org.apache.iotdb.db.utils.CommonUtils;
import org.apache.tsfile.enums.TSDataType;
@@ -326,7 +327,7 @@ public class InsertTabletStatement extends
InsertBaseStatement implements ISchem
statement.setMeasurementSchemas(measurementSchemas);
statement.setDataTypes(dataTypes);
if (this.bitMaps != null) {
- statement.setBitMaps(copiedBitMaps);
+ statement.setBitMaps(BitMapUtils.compactBitMaps(copiedBitMaps,
rowCount));
}
statement.setFailedMeasurementIndex2Info(failedMeasurementIndex2Info);
insertTabletStatementList.add(statement);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
index e4f1170ddb3..3da4db4b408 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuota.java
@@ -116,9 +116,7 @@ public class DefaultOperationQuota implements
OperationQuota {
case BATCH_INSERT:
// InsertTabletStatement
InsertTabletStatement insertTabletStatement =
(InsertTabletStatement) s;
- for (BitMap bitMap : insertTabletStatement.getBitMaps()) {
- avgSize += bitMap.getSize();
- }
+ avgSize += calculationWrite(insertTabletStatement.getBitMaps());
break;
case BATCH_INSERT_ONE_DEVICE:
// InsertRowsOfOneDeviceStatement
@@ -151,10 +149,12 @@ public class DefaultOperationQuota implements
OperationQuota {
for (int i = 0;
i <
insertMultiTabletsStatement.getInsertTabletStatementList().size();
i++) {
- for (BitMap bitMap :
-
insertMultiTabletsStatement.getInsertTabletStatementList().get(i).getBitMaps())
{
- avgSize += bitMap.getSize();
- }
+ avgSize +=
+ calculationWrite(
+ insertMultiTabletsStatement
+ .getInsertTabletStatementList()
+ .get(i)
+ .getBitMaps());
}
}
break;
@@ -178,6 +178,20 @@ public class DefaultOperationQuota implements
OperationQuota {
return size;
}
+ private long calculationWrite(BitMap[] bitMaps) {
+ if (bitMaps == null) {
+ return 0;
+ }
+
+ long size = 0;
+ for (BitMap bitMap : bitMaps) {
+ if (bitMap != null) {
+ size += bitMap.getSize();
+ }
+ }
+ return size;
+ }
+
private long estimateConsume(int numReqs, long avgSize) {
if (numReqs > 0) {
return avgSize * numReqs;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/BitMapUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/BitMapUtils.java
new file mode 100644
index 00000000000..ba30c8847b4
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/BitMapUtils.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.utils;
+
+import org.apache.tsfile.utils.BitMap;
+
+import java.util.Objects;
+
+public final class BitMapUtils {
+
+ private BitMapUtils() {}
+
+ public static BitMap[] compactBitMaps(final BitMap[] bitMaps, final int
rowCount) {
+ if (Objects.isNull(bitMaps)) {
+ return null;
+ }
+
+ boolean hasMarkedBitMap = false;
+ for (int i = 0; i < bitMaps.length; ++i) {
+ if (Objects.nonNull(bitMaps[i]) && bitMaps[i].isAllUnmarked()) {
+ bitMaps[i] = null;
+ }
+ if (Objects.nonNull(bitMaps[i])) {
+ hasMarkedBitMap = true;
+ }
+ }
+ return hasMarkedBitMap ? bitMaps : null;
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
index 4a7d77eab8d..308c5458dc9 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTabletInsertionEventTest.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import org.apache.iotdb.db.pipe.event.common.tablet.PipeTabletUtils;
import
org.apache.iotdb.db.pipe.event.common.tablet.TabletInsertionDataContainer;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
@@ -217,7 +218,7 @@ public class PipeTabletInsertionEventTest {
tabletForInsertRowNode.values = values;
tabletForInsertRowNode.timestamps = new long[] {times[0]};
tabletForInsertRowNode.rowSize = 1;
- tabletForInsertRowNode.bitMaps = bitMapsForInsertRowNode;
+ tabletForInsertRowNode.bitMaps =
PipeTabletUtils.compactBitMaps(bitMapsForInsertRowNode, 1);
// create tablet for insertTabletNode
BitMap[] bitMapsForInsertTabletNode = new BitMap[schemas.length];
@@ -253,7 +254,8 @@ public class PipeTabletInsertionEventTest {
tabletForInsertTabletNode.values = values;
tabletForInsertTabletNode.timestamps = times;
tabletForInsertTabletNode.rowSize = times.length;
- tabletForInsertTabletNode.bitMaps = bitMapsForInsertTabletNode;
+ tabletForInsertTabletNode.bitMaps =
+ PipeTabletUtils.compactBitMaps(bitMapsForInsertTabletNode,
times.length);
}
@Test
@@ -318,6 +320,36 @@ public class PipeTabletInsertionEventTest {
Assert.assertTrue(isAligned4);
}
+ @Test
+ public void convertToTabletSkipsUnnecessaryBitMapsForTest() throws Exception
{
+ final BitMap[] bitMaps = new BitMap[schemas.length];
+ bitMaps[0] = new BitMap(times.length);
+ bitMaps[1] = new BitMap(times.length);
+ bitMaps[1].mark(1);
+
+ final InsertTabletNode nodeWithSparseColumn =
+ new InsertTabletNode(
+ new PlanNodeId("plannode bitmap"),
+ new PartialPath(deviceId),
+ false,
+ measurementIds,
+ dataTypes,
+ schemas,
+ times,
+ bitMaps,
+ insertTabletNode.getColumns(),
+ times.length);
+
+ final Tablet tablet =
+ new TabletInsertionDataContainer(nodeWithSparseColumn, new
PrefixPipePattern(pattern))
+ .convertToTablet();
+
+ Assert.assertNotNull(tablet.bitMaps);
+ Assert.assertNull(tablet.bitMaps[0]);
+ Assert.assertNotNull(tablet.bitMaps[1]);
+ Assert.assertTrue(tablet.bitMaps[1].isMarked(1));
+ }
+
@Test
public void convertToTabletWithFilteredRowsForTest() {
TabletInsertionDataContainer container1 =
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtilsTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtilsTest.java
new file mode 100644
index 00000000000..8bf32bd066f
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeTabletUtilsTest.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.event.common.tablet;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.BitMap;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class PipeTabletUtilsTest {
+
+ @Test
+ public void testPutValueUnmarksReusedNullRow() {
+ final List<MeasurementSchema> schemas =
+ Arrays.asList(
+ new MeasurementSchema("s1", TSDataType.FLOAT),
+ new MeasurementSchema("s2", TSDataType.FLOAT));
+ final Tablet tablet = new Tablet("root.sg.d1", schemas, 2);
+
+ PipeTabletUtils.markNullValue(tablet, 0, 0);
+ PipeTabletUtils.markNullValue(tablet, 0, 1);
+
+ PipeTabletUtils.putValue(tablet, 0, 0, TSDataType.FLOAT, 1.0f);
+ PipeTabletUtils.putTimestamp(tablet, 0, 1L);
+ PipeTabletUtils.compactBitMaps(tablet);
+
+ Assert.assertNull(tablet.bitMaps[0]);
+ Assert.assertTrue(tablet.bitMaps[1].isMarked(0));
+ }
+
+ @Test
+ public void testCopyBitMapsOrCreateEmptyWithNullBitMaps() {
+ final List<MeasurementSchema> schemas =
+ Arrays.asList(
+ new MeasurementSchema("s1", TSDataType.FLOAT),
+ new MeasurementSchema("s2", TSDataType.FLOAT));
+ final Tablet tablet = new Tablet("root.sg.d1", schemas, 2);
+ tablet.addTimestamp(0, 1L);
+ tablet.addValue("s1", 0, 1.0f);
+ tablet.addValue("s2", 0, 2.0f);
+
+ Assert.assertNull(tablet.bitMaps);
+
+ final BitMap[] bitMaps = PipeTabletUtils.copyBitMapsOrCreateEmpty(tablet);
+
+ Assert.assertEquals(schemas.size(), bitMaps.length);
+ Assert.assertNull(bitMaps[0]);
+ Assert.assertNull(bitMaps[1]);
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
index ee9b7218dab..4e4d11aacbf 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeDataNodeThriftRequestTest.java
@@ -321,6 +321,46 @@ public class PipeDataNodeThriftRequestTest {
Assert.assertFalse(deserializedReq.getTabletReqs().get(0).getIsAligned());
}
+ @Test
+ public void testPipeTransferTabletBatchReqInternsRepeatedMeasurementNames()
throws IOException {
+ final List<ByteBuffer> tabletBuffers = new ArrayList<>();
+ tabletBuffers.add(
+ serializeTablet(createSingleValueTablet(new String("root.sg.d"), new
String("s1")), false));
+ tabletBuffers.add(
+ serializeTablet(createSingleValueTablet(new String("root.sg.d"), new
String("s1")), false));
+
+ final PipeTransferTabletBatchReq deserializedReq =
+ PipeTransferTabletBatchReq.fromTPipeTransferReq(
+
PipeTransferTabletBatchReq.toTPipeTransferReq(Collections.emptyList(),
tabletBuffers));
+ final Tablet firstTablet =
deserializedReq.getTabletReqs().get(0).getTablet();
+ final Tablet secondTablet =
deserializedReq.getTabletReqs().get(1).getTablet();
+
+ Assert.assertSame(firstTablet.deviceId, secondTablet.deviceId);
+ Assert.assertSame(
+ firstTablet.getSchemas().get(0).getMeasurementId(),
+ secondTablet.getSchemas().get(0).getMeasurementId());
+ }
+
+ private static Tablet createSingleValueTablet(final String deviceId, final
String measurement) {
+ final List<MeasurementSchema> schemaList = new ArrayList<>();
+ schemaList.add(new MeasurementSchema(measurement, TSDataType.INT32));
+ final Tablet tablet = new Tablet(deviceId, schemaList, 1);
+ tablet.addTimestamp(0, 1);
+ tablet.addValue(measurement, 0, 1);
+ tablet.rowSize = 1;
+ return tablet;
+ }
+
+ private static ByteBuffer serializeTablet(final Tablet tablet, final boolean
isAligned)
+ throws IOException {
+ try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
+ final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
+ tablet.serialize(outputStream);
+ ReadWriteIOUtils.write(isAligned, outputStream);
+ return ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size());
+ }
+ }
+
@Test
public void testPipeTransferFilePieceReq() throws IOException {
final byte[] body = "testPipeTransferFilePieceReq".getBytes();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
index a93a22b6e9f..305a3197cfe 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/planner/node/write/WritePlanNodeSplitTest.java
@@ -44,6 +44,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOf
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.BitMap;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -201,6 +202,9 @@ public class WritePlanNodeSplitTest {
insertTabletNode.setColumns(
new Object[] {new int[] {-20, -10, 10, 20, 30, 40, 50, 60, 70, 80, 90,
100}});
insertTabletNode.setRowCount(insertTabletNode.getTimes().length);
+ final BitMap[] bitMaps = new BitMap[] {new
BitMap(insertTabletNode.getRowCount())};
+ bitMaps[0].mark(2);
+ insertTabletNode.setBitMaps(bitMaps);
DataPartitionQueryParam dataPartitionQueryParam = new
DataPartitionQueryParam();
dataPartitionQueryParam.setDevicePath(insertTabletNode.getDevicePath().getFullPath());
@@ -219,6 +223,12 @@ public class WritePlanNodeSplitTest {
Assert.assertEquals(tabletNode.getTimes().length, 2);
TConsensusGroupId regionId =
tabletNode.getDataRegionReplicaSet().getRegionId();
Assert.assertEquals(getRegionIdByTime(tabletNode.getMinTime()),
regionId.getId());
+ if (tabletNode.getTimes()[0] == 1) {
+ Assert.assertNotNull(tabletNode.getBitMaps());
+ Assert.assertTrue(tabletNode.getBitMaps()[0].isMarked(0));
+ } else {
+ Assert.assertNull(tabletNode.getBitMaps());
+ }
}
insertTabletNode = new InsertTabletNode(new PlanNodeId("plan node 2"));
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuotaTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuotaTest.java
new file mode 100644
index 00000000000..5f11039eeb3
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/rescon/quotas/DefaultOperationQuotaTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.rescon.quotas;
+
+import org.apache.iotdb.common.rpc.thrift.TTimedQuota;
+import org.apache.iotdb.common.rpc.thrift.ThrottleType;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+
+import org.apache.tsfile.utils.BitMap;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Map;
+
+public class DefaultOperationQuotaTest {
+
+ @Test
+ public void testCheckQuotaWithNullAndSparseBitMaps() throws Exception {
+ final DefaultOperationQuota quota = new
DefaultOperationQuota(createQuotaLimiter());
+
+ final InsertTabletStatement tabletWithoutBitMaps = new
InsertTabletStatement();
+ tabletWithoutBitMaps.setBitMaps(null);
+ quota.checkQuota(1, 0, tabletWithoutBitMaps);
+
+ final InsertTabletStatement tabletWithSparseBitMaps = new
InsertTabletStatement();
+ final BitMap bitMap = new BitMap(8);
+ bitMap.mark(0);
+ tabletWithSparseBitMaps.setBitMaps(new BitMap[] {null, bitMap});
+ quota.checkQuota(1, 0, tabletWithSparseBitMaps);
+
+ final InsertMultiTabletsStatement multiTabletsStatement = new
InsertMultiTabletsStatement();
+ multiTabletsStatement.setInsertTabletStatementList(
+ Arrays.asList(tabletWithoutBitMaps, tabletWithSparseBitMaps));
+ quota.checkQuota(1, 0, multiTabletsStatement);
+ }
+
+ private static QuotaLimiter createQuotaLimiter() {
+ final Map<ThrottleType, TTimedQuota> quotas = new
EnumMap<>(ThrottleType.class);
+ for (final ThrottleType throttleType : ThrottleType.values()) {
+ quotas.put(throttleType, new TTimedQuota(60_000L, 1_000_000_000L));
+ }
+ return QuotaLimiter.fromThrottle(Collections.unmodifiableMap(quotas));
+ }
+}