This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 256f3e72652 Pipe: Repair the table model construction TabletBatch
process causing memory expansion (#16123)
256f3e72652 is described below
commit 256f3e726520beaeb4d5cac527873930c9d77cb9
Author: Zhenyu Luo <[email protected]>
AuthorDate: Fri Aug 8 09:27:19 2025 +0800
Pipe: Repair the table model construction TabletBatch process causing
memory expansion (#16123)
* Pipe: Repair the table model construction TabletBatch process causing
memory expansion
* add ut
* update PipeTabletEventPlainBatch
* update PipeTabletEventPlainBatch
---
.../evolvable/batch/PipeTabletEventPlainBatch.java | 108 ++++++++++++++-
.../pipe/sink/PipeTabletEventPlainBatchTest.java | 147 +++++++++++++++++++++
.../db/pipe/sink/PipeTabletEventSorterTest.java | 4 +-
3 files changed, 251 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
index a68c5ae9602..0db905a6dc0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java
@@ -28,16 +28,22 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.utils.Binary;
+import org.apache.tsfile.utils.BitMap;
import org.apache.tsfile.utils.Pair;
import org.apache.tsfile.utils.PublicBAOS;
import org.apache.tsfile.utils.RamUsageEstimator;
import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
import org.apache.tsfile.write.record.Tablet;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.time.LocalDate;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -102,23 +108,28 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
final String databaseName = insertTablets.getKey();
for (final Map.Entry<String, Pair<Integer, List<Tablet>>> tabletEntry :
insertTablets.getValue().entrySet()) {
- final List<Tablet> batchTablets = new ArrayList<>();
+ // needCopyFlag and tablet
+ final List<Pair<Boolean, Tablet>> batchTablets = new ArrayList<>();
for (final Tablet tablet : tabletEntry.getValue().getRight()) {
boolean success = false;
- for (final Tablet batchTablet : batchTablets) {
- if (batchTablet.append(tablet, tabletEntry.getValue().getLeft())) {
+ for (final Pair<Boolean, Tablet> tabletPair : batchTablets) {
+ if (tabletPair.getLeft()) {
+ tabletPair.setRight(copyTablet(tabletPair.getRight()));
+ tabletPair.setLeft(Boolean.FALSE);
+ }
+ if (tabletPair.getRight().append(tablet,
tabletEntry.getValue().getLeft())) {
success = true;
break;
}
}
if (!success) {
- batchTablets.add(tablet);
+ batchTablets.add(new Pair<>(Boolean.TRUE, tablet));
}
}
- for (final Tablet batchTablet : batchTablets) {
+ for (final Pair<Boolean, Tablet> tabletPair : batchTablets) {
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
final DataOutputStream outputStream = new
DataOutputStream(byteArrayOutputStream)) {
- batchTablet.serialize(outputStream);
+ tabletPair.getRight().serialize(outputStream);
ReadWriteIOUtils.write(true, outputStream);
tabletBuffers.add(
ByteBuffer.wrap(byteArrayOutputStream.getBuf(), 0,
byteArrayOutputStream.size()));
@@ -214,4 +225,89 @@ public class PipeTabletEventPlainBatch extends
PipeTabletEventBatch {
currentBatch.getRight().add(tablet);
return PipeMemoryWeightUtil.calculateTabletSizeInBytes(tablet) + 4;
}
+
+ public static Tablet copyTablet(final Tablet tablet) {
+ final Object[] copiedValues = new Object[tablet.getValues().length];
+ for (int i = 0; i < tablet.getValues().length; i++) {
+ if (tablet.getValues()[i] == null
+ || tablet.getSchemas() == null
+ || tablet.getSchemas().get(i) == null) {
+ continue;
+ }
+ copiedValues[i] =
+ copyValueList(
+ tablet.getValues()[i], tablet.getSchemas().get(i).getType(),
tablet.getRowSize());
+ }
+
+ BitMap[] bitMaps = null;
+ if (tablet.getBitMaps() != null) {
+ bitMaps =
+ Arrays.stream(tablet.getBitMaps())
+ .map(
+ bitMap -> {
+ if (bitMap != null) {
+ final byte[] data = bitMap.getByteArray();
+ return new BitMap(bitMap.getSize(), Arrays.copyOf(data,
data.length));
+ }
+ return null;
+ })
+ .toArray(BitMap[]::new);
+ }
+
+ return new Tablet(
+ tablet.getTableName(),
+ new ArrayList<>(tablet.getSchemas()),
+ new ArrayList<>(tablet.getColumnTypes()),
+ Arrays.copyOf(tablet.getTimestamps(), tablet.getRowSize()),
+ copiedValues,
+ bitMaps,
+ tablet.getRowSize());
+ }
+
+ private static Object copyValueList(
+ final Object valueList, final TSDataType dataType, final int rowSize) {
+ switch (dataType) {
+ case BOOLEAN:
+ final boolean[] boolValues = (boolean[]) valueList;
+ final boolean[] copiedBoolValues = new boolean[rowSize];
+ System.arraycopy(boolValues, 0, copiedBoolValues, 0, rowSize);
+ return copiedBoolValues;
+ case INT32:
+ final int[] intValues = (int[]) valueList;
+ final int[] copiedIntValues = new int[rowSize];
+ System.arraycopy(intValues, 0, copiedIntValues, 0, rowSize);
+ return copiedIntValues;
+ case DATE:
+ final LocalDate[] dateValues = (LocalDate[]) valueList;
+ final LocalDate[] copiedDateValues = new LocalDate[rowSize];
+ System.arraycopy(dateValues, 0, copiedDateValues, 0, rowSize);
+ return copiedDateValues;
+ case INT64:
+ case TIMESTAMP:
+ final long[] longValues = (long[]) valueList;
+ final long[] copiedLongValues = new long[rowSize];
+ System.arraycopy(longValues, 0, copiedLongValues, 0, rowSize);
+ return copiedLongValues;
+ case FLOAT:
+ final float[] floatValues = (float[]) valueList;
+ final float[] copiedFloatValues = new float[rowSize];
+ System.arraycopy(floatValues, 0, copiedFloatValues, 0, rowSize);
+ return copiedFloatValues;
+ case DOUBLE:
+ final double[] doubleValues = (double[]) valueList;
+ final double[] copiedDoubleValues = new double[rowSize];
+ System.arraycopy(doubleValues, 0, copiedDoubleValues, 0, rowSize);
+ return copiedDoubleValues;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ final Binary[] binaryValues = (Binary[]) valueList;
+ final Binary[] copiedBinaryValues = new Binary[rowSize];
+ System.arraycopy(binaryValues, 0, copiedBinaryValues, 0, rowSize);
+ return copiedBinaryValues;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.", dataType));
+ }
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventPlainBatchTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventPlainBatchTest.java
new file mode 100644
index 00000000000..1815c205b57
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventPlainBatchTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink;
+
+import
org.apache.iotdb.db.pipe.sink.payload.evolvable.batch.PipeTabletEventPlainBatch;
+
+import org.apache.tsfile.write.record.Tablet;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+public class PipeTabletEventPlainBatchTest {
+
+ @Test
+ public void constructTabletBatch() {
+ Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, true,
true);
+
+ Tablet copyTablet = PipeTabletEventPlainBatch.copyTablet(tablet);
+
+ Assert.assertNotSame(tablet, copyTablet);
+ Assert.assertEquals(tablet.getRowSize(), copyTablet.getRowSize());
+
+ for (int i = 0; i < tablet.getSchemas().size(); i++) {
+ for (int j = 0; j < tablet.getRowSize(); j++) {
+ Assert.assertEquals(tablet.getValue(j, i), copyTablet.getValue(j, i));
+ }
+ }
+ }
+
+ @Test
+ public void constructTabletBatch1() {
+ Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, true,
true);
+ tablet.append(PipeTabletEventSorterTest.generateTablet("test", 10, true,
true));
+ Tablet copyTablet = PipeTabletEventPlainBatch.copyTablet(tablet);
+
+ Assert.assertNotSame(tablet, copyTablet);
+ Assert.assertEquals(tablet.getRowSize(), copyTablet.getRowSize());
+
+ for (int i = 0; i < tablet.getSchemas().size(); i++) {
+ for (int j = 0; j < tablet.getRowSize(); j++) {
+ Assert.assertEquals(tablet.getValue(j, i), copyTablet.getValue(j, i));
+ }
+ }
+ }
+
+ @Test
+ public void constructTabletBatch2() {
+ Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, true,
true);
+ tablet.append(PipeTabletEventSorterTest.generateTablet("test", 10, true,
true));
+
+ tablet = PipeTabletEventPlainBatch.copyTablet(tablet);
+ tablet.getBitMaps()[1].markAll();
+ tablet.getValues()[1] = null;
+
+ Tablet copyTablet = PipeTabletEventPlainBatch.copyTablet(tablet);
+
+ Assert.assertNotSame(tablet, copyTablet);
+ Assert.assertEquals(tablet.getRowSize(), copyTablet.getRowSize());
+
+ for (int i = 0; i < tablet.getSchemas().size(); i++) {
+ if (i == 1) {
+ Assert.assertTrue(tablet.getBitMaps()[i].isAllMarked());
+ Assert.assertNull(copyTablet.getValues()[1]);
+ continue;
+ }
+
+ for (int j = 0; j < tablet.getRowSize(); j++) {
+ Assert.assertEquals(tablet.getValue(j, i), copyTablet.getValue(j, i));
+ }
+ }
+ }
+
+ @Test
+ public void constructTabletBatch3() {
+ Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, true,
true);
+ tablet.append(PipeTabletEventSorterTest.generateTablet("test", 10, true,
true));
+
+ tablet = PipeTabletEventPlainBatch.copyTablet(tablet);
+ tablet.getBitMaps()[1] = null;
+
+ Tablet copyTablet = PipeTabletEventPlainBatch.copyTablet(tablet);
+
+ Assert.assertNotSame(tablet, copyTablet);
+ Assert.assertEquals(tablet.getRowSize(), copyTablet.getRowSize());
+
+ for (int i = 0; i < tablet.getSchemas().size(); i++) {
+ if (i == 1) {
+ Assert.assertNull(tablet.getBitMaps()[i]);
+ continue;
+ }
+
+ for (int j = 0; j < tablet.getRowSize(); j++) {
+ Assert.assertEquals(tablet.getValue(j, i), copyTablet.getValue(j, i));
+ }
+ }
+ }
+
+ @Test
+ public void constructTabletBatch4() {
+ Tablet tablet = PipeTabletEventSorterTest.generateTablet("test", 10, true,
true);
+ tablet.append(PipeTabletEventSorterTest.generateTablet("test", 10, true,
true));
+
+ List<Integer> rowIndices = new ArrayList<>(tablet.getSchemas().size());
+ Random random = new Random();
+ for (int i = 0; i < tablet.getSchemas().size(); i++) {
+ int r = random.nextInt(tablet.getRowSize());
+ rowIndices.add(r);
+ tablet.addValue(tablet.getSchemas().get(i).getMeasurementName(), r,
null);
+ }
+
+ Tablet copyTablet = PipeTabletEventPlainBatch.copyTablet(tablet);
+
+ Assert.assertNotSame(tablet, copyTablet);
+ Assert.assertEquals(tablet.getRowSize(), copyTablet.getRowSize());
+
+ for (int i = 0; i < tablet.getSchemas().size(); i++) {
+ for (int j = 0; j < tablet.getRowSize(); j++) {
+ if (rowIndices.get(i) == j) {
+ Assert.assertTrue(tablet.getBitMaps()[i].isMarked(j));
+ Assert.assertNull(tablet.getValue(j, i));
+ continue;
+ }
+ Assert.assertEquals(tablet.getValue(j, i), copyTablet.getValue(j, i));
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventSorterTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventSorterTest.java
index 4e9a9a95005..eb5ffd475d3 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventSorterTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/PipeTabletEventSorterTest.java
@@ -277,7 +277,7 @@ public class PipeTabletEventSorterTest {
}
}
- private Tablet generateTablet(
+ static Tablet generateTablet(
final String tableName,
final int deviceIDNum,
final boolean hasDuplicates,
@@ -389,7 +389,7 @@ public class PipeTabletEventSorterTest {
return tablet;
}
- public LocalDate getDate(final int value) {
+ public static LocalDate getDate(final int value) {
Date date = new Date(value);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
try {