This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch opt_aligned_tvlist in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit c186fd4300a1494b9a8fea72f150be6ec5ae39a0 Author: HTHou <[email protected]> AuthorDate: Mon Sep 1 19:03:28 2025 +0800 dev --- .../iotdb/AlignedTimeseriesSessionExample.java | 87 ++++++++++++++-------- .../dataregion/memtable/AbstractMemTable.java | 1 + .../memtable/AlignedWritableMemChunk.java | 9 +-- .../dataregion/memtable/TsFileProcessor.java | 19 +++-- 4 files changed, 67 insertions(+), 49 deletions(-) diff --git a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java index 288bd65ddcb..84e570bef23 100644 --- a/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java +++ b/example/session/src/main/java/org/apache/iotdb/AlignedTimeseriesSessionExample.java @@ -63,45 +63,45 @@ public class AlignedTimeseriesSessionExample { // set session fetchSize session.setFetchSize(10000); - // createTemplate(); - createAlignedTimeseries(); - createAlignedTimeseriesWithNullPartical(); - - insertAlignedRecord(); + // // createTemplate(); + // createAlignedTimeseries(); + // createAlignedTimeseriesWithNullPartical(); + // + // insertAlignedRecord(); // insertAlignedRecords(); // insertAlignedRecordsOfOneDevice(); // insertAlignedStringRecord(); // insertAlignedStringRecords(); - // insertTabletWithAlignedTimeseriesMethod1(); + insertTabletWithAlignedTimeseriesMethod1(); // insertTabletWithAlignedTimeseriesMethod2(); // insertNullableTabletWithAlignedTimeseries(); // insertTabletsWithAlignedTimeseries(); - session.executeNonQueryStatement(FLUSH); - selectTest(); - selectWithValueFilterTest(); - selectWithLastTest(); - selectWithLastTestWithoutValueFilter(); - session.executeNonQueryStatement("delete from root.sg_1.d1.s1 where time <= 5"); - System.out.println("execute sql delete from root.sg_1.d1.s1 where time <= 5"); - selectTest(); - selectWithValueFilterTest(); - selectWithLastTest(); - selectWithLastTestWithoutValueFilter(); - session.executeNonQueryStatement("delete from root.sg_1.d1.s2 where time <= 3"); - System.out.println("execute sql delete from root.sg_1.d1.s2 where time <= 3"); - - selectTest(); - selectWithValueFilterTest(); - selectWithLastTest(); - selectWithLastTestWithoutValueFilter(); - session.executeNonQueryStatement("delete from root.sg_1.d1.s1 where time <= 10"); - System.out.println("execute sql delete from root.sg_1.d1.s1 where time <= 10"); - selectTest(); - selectWithValueFilterTest(); - selectWithLastTest(); - selectWithLastTestWithoutValueFilter(); + // session.executeNonQueryStatement(FLUSH); + // selectTest(); + // selectWithValueFilterTest(); + // selectWithLastTest(); + // selectWithLastTestWithoutValueFilter(); + // session.executeNonQueryStatement("delete from root.sg_1.d1.s1 where time <= 5"); + // System.out.println("execute sql delete from root.sg_1.d1.s1 where time <= 5"); + // selectTest(); + // selectWithValueFilterTest(); + // selectWithLastTest(); + // selectWithLastTestWithoutValueFilter(); + // session.executeNonQueryStatement("delete from root.sg_1.d1.s2 where time <= 3"); + // System.out.println("execute sql delete from root.sg_1.d1.s2 where time <= 3"); + // + // selectTest(); + // selectWithValueFilterTest(); + // selectWithLastTest(); + // selectWithLastTestWithoutValueFilter(); + // session.executeNonQueryStatement("delete from root.sg_1.d1.s1 where time <= 10"); + // System.out.println("execute sql delete from root.sg_1.d1.s1 where time <= 10"); + // selectTest(); + // selectWithValueFilterTest(); + // selectWithLastTest(); + // selectWithLastTestWithoutValueFilter(); // selectWithValueFilterTest(); // selectWithGroupByTest(); @@ -347,7 +347,6 @@ public class AlignedTimeseriesSessionExample { // only measurementId and data type in MeasurementSchema take effects in Tablet List<IMeasurementSchema> schemaList = new ArrayList<>(); schemaList.add(new MeasurementSchema("s1", TSDataType.INT64)); - schemaList.add(new MeasurementSchema("s2", TSDataType.INT32)); Tablet tablet = new Tablet(ROOT_SG1_D1, schemaList); long timestamp = 1; @@ -357,8 +356,30 @@ public class AlignedTimeseriesSessionExample { tablet.addTimestamp(rowIndex, timestamp); tablet.addValue( schemaList.get(0).getMeasurementName(), rowIndex, new SecureRandom().nextLong()); + + if (tablet.getRowSize() == tablet.getMaxRowNumber()) { + session.insertAlignedTablet(tablet, true); + tablet.reset(); + } + timestamp++; + } + + if (tablet.getRowSize() != 0) { + session.insertAlignedTablet(tablet); + tablet.reset(); + } + + schemaList = new ArrayList<>(); + schemaList.add(new MeasurementSchema("s2", TSDataType.INT32)); + + tablet = new Tablet(ROOT_SG1_D1, schemaList); + timestamp = 1; + + for (long row = 1; row < 100; row++) { + int rowIndex = tablet.getRowSize(); + tablet.addTimestamp(rowIndex, timestamp); tablet.addValue( - schemaList.get(1).getMeasurementName(), rowIndex, new SecureRandom().nextInt()); + schemaList.get(0).getMeasurementName(), rowIndex, new SecureRandom().nextInt()); if (tablet.getRowSize() == tablet.getMaxRowNumber()) { session.insertAlignedTablet(tablet, true); @@ -372,7 +393,7 @@ public class AlignedTimeseriesSessionExample { tablet.reset(); } - session.executeNonQueryStatement(FLUSH); + // session.executeNonQueryStatement(FLUSH); } /** Method 2 for insert tablet with aligned timeseries */ diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java index f2f4d5e2943..72c673831bc 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractMemTable.java @@ -787,6 +787,7 @@ public abstract class AbstractMemTable implements IMemTable { @Override public void addTVListRamCost(long cost) { this.tvListRamCost += cost; + System.out.println(tvListRamCost); } @Override diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index d7b7b4b027c..a48692ba737 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -867,14 +867,11 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { public long getTvListArrayMemCostIncrement1( List<String> insertingMeasurements, List<TSDataType> insertingTypes) { long size = 0; - List<BitMap> bitMaps = list.getBitMap(); + List<List<BitMap>> bitMaps = list.getBitMaps(); // value & bitmap array mem size for (int column = 0; column < dataTypes.size(); column++) { - TSDataType type = dataTypes.get(column); - if (type != null) { - if (bitMaps != null && bitMaps.get(column) != null) { - size += (long) PrimitiveArrayManager.ARRAY_SIZE / 8 + 1; - } + if (bitMaps != null && bitMaps.get(column) != null) { + size += (long) PrimitiveArrayManager.ARRAY_SIZE / 8 + 1; } } int newMeasurementCount = 0; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java index d28880fd4cf..67ae375b15a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/TsFileProcessor.java @@ -1065,6 +1065,14 @@ public class TsFileProcessor { AlignedWritableMemChunk alignedMemChunk = (AlignedWritableMemChunk) memChunk; int currentPointNum = alignedMemChunk.alignedListSize(); int newPointNum = currentPointNum + incomingPointNum; + // calculate how many new arrays will be added after this insertion + int currentArrayCnt = + currentPointNum / PrimitiveArrayManager.ARRAY_SIZE + + (currentPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0); + int newArrayCnt = + newPointNum / PrimitiveArrayManager.ARRAY_SIZE + + (newPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0); + long acquireArray = newArrayCnt - currentArrayCnt; List<String> insertingMeasurements = new ArrayList<>(); List<TSDataType> insertingTypes = new ArrayList<>(); for (int i = 0; i < dataTypes.length; i++) { @@ -1083,20 +1091,11 @@ public class TsFileProcessor { if (!alignedMemChunk.containsMeasurement(measurementIds[i])) { // add a new column in the TVList, the new column should be as long as existing ones memIncrements[0] += - (currentPointNum / PrimitiveArrayManager.ARRAY_SIZE + 1) + newArrayCnt * AlignedTVList.emptyValueListArrayMemCost(); } } - // calculate how many new arrays will be added after this insertion - int currentArrayCnt = - currentPointNum / PrimitiveArrayManager.ARRAY_SIZE - + (currentPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0); - int newArrayCnt = - newPointNum / PrimitiveArrayManager.ARRAY_SIZE - + (newPointNum % PrimitiveArrayManager.ARRAY_SIZE > 0 ? 1 : 0); - long acquireArray = newArrayCnt - currentArrayCnt; - if (acquireArray != 0) { // memory of extending the TVList memIncrements[0] +=
