This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new a37a992a146 refactor: load max page&chunk configuration in
ReadOnlyMemchunk and M… (#15310)
a37a992a146 is described below
commit a37a992a146c02811c03c24a7dd833c1d31e27b3
Author: shizy <[email protected]>
AuthorDate: Fri Apr 18 09:42:25 2025 +0800
refactor: load max page&chunk configuration in ReadOnlyMemchunk and M…
(#15310)
* refactor: load max page&chunk configuration in ReadOnlyMemchunk and
MemTableFlushTask
* bug: fix testFlushMultiAlignedBinaryChunks
---
.idea/icon.png | Bin 6736 -> 0 bytes
.../dataregion/flush/MemTableFlushTask.java | 18 +-
.../memtable/AlignedReadOnlyMemChunk.java | 6 +-
.../memtable/AlignedWritableMemChunk.java | 44 +++--
.../dataregion/memtable/ReadOnlyMemChunk.java | 19 +-
.../dataregion/memtable/WritableMemChunk.java | 19 +-
.../db/utils/datastructure/AlignedTVList.java | 32 ++--
.../db/utils/datastructure/BatchEncodeInfo.java | 16 +-
.../datastructure/MemPointIteratorFactory.java | 200 ++++++++++++++-------
.../MergeSortMultiAlignedTVListIterator.java | 26 +--
.../MergeSortMultiTVListIterator.java | 16 +-
.../datastructure/MultiAlignedTVListIterator.java | 18 +-
.../utils/datastructure/MultiTVListIterator.java | 13 +-
.../OrderedMultiAlignedTVListIterator.java | 6 +-
.../datastructure/OrderedMultiTVListIterator.java | 5 +-
.../iotdb/db/utils/datastructure/TVList.java | 39 ++--
.../reader/chunk/MemAlignedChunkLoaderTest.java | 4 +-
.../read/reader/chunk/MemChunkLoaderTest.java | 19 +-
18 files changed, 306 insertions(+), 194 deletions(-)
diff --git a/.idea/icon.png b/.idea/icon.png
deleted file mode 100644
index 493aca9320e..00000000000
Binary files a/.idea/icon.png and /dev/null differ
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index d1a3f3c5bab..61abb78c24f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -67,6 +67,8 @@ public class MemTableFlushTask {
private static final IoTDBConfig config =
IoTDBDescriptor.getInstance().getConfig();
private final int MAX_NUMBER_OF_POINTS_IN_PAGE =
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+ private final long MAX_NUMBER_OF_POINTS_IN_CHUNK =
config.getTargetChunkPointNum();
+ private final long TARGET_CHUNK_SIZE = config.getTargetChunkSize();
/* storage group name -> last time */
private static final Map<String, Long> flushPointsCache = new
ConcurrentHashMap<>();
@@ -107,7 +109,15 @@ public class MemTableFlushTask {
this.dataRegionId = dataRegionId;
this.encodingTaskFuture = SUB_TASK_POOL_MANAGER.submit(encodingTask);
this.ioTaskFuture = SUB_TASK_POOL_MANAGER.submit(ioTask);
- this.encodeInfo = new BatchEncodeInfo(0, 0, 0);
+
+ this.encodeInfo =
+ new BatchEncodeInfo(
+ 0,
+ 0,
+ 0,
+ MAX_NUMBER_OF_POINTS_IN_PAGE,
+ MAX_NUMBER_OF_POINTS_IN_CHUNK,
+ TARGET_CHUNK_SIZE);
LOGGER.debug(
"flush task of database {} memtable is created, flushing to file {}.",
storageGroup,
@@ -259,6 +269,12 @@ public class MemTableFlushTask {
long starTime = System.currentTimeMillis();
IWritableMemChunk writableMemChunk = (IWritableMemChunk) task;
if (writableMemChunk instanceof AlignedWritableMemChunk && times
== null) {
+ encodeInfo.maxNumberOfPointsInChunk =
+ Math.min(
+ MAX_NUMBER_OF_POINTS_IN_CHUNK,
+ (TARGET_CHUNK_SIZE
+ / ((AlignedWritableMemChunk) writableMemChunk)
+ .getAvgPointSizeOfLargestColumn()));
times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
}
writableMemChunk.encode(ioTaskQueue, encodeInfo, times);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
index d178c574b67..bfe78f0336d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
@@ -140,7 +140,8 @@ public class AlignedReadOnlyMemChunk extends
ReadOnlyMemChunk {
alignedTvLists,
valueColumnsDeletionList,
floatPrecision,
- encodingList);
+ encodingList,
+ MAX_NUMBER_OF_POINTS_IN_PAGE);
while (timeValuePairIterator.hasNextBatch()) {
// create pageTimeStatistics and pageValueStatistics for new page
@@ -314,7 +315,8 @@ public class AlignedReadOnlyMemChunk extends
ReadOnlyMemChunk {
alignedTvLists,
valueColumnsDeletionList,
floatPrecision,
- encodingList);
+ encodingList,
+ MAX_NUMBER_OF_POINTS_IN_PAGE);
while (timeValuePairIterator.hasNextTimeValuePair()) {
TimeValuePair tvPair = timeValuePairIterator.nextTimeValuePair();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
index 14a0f7172ff..bca23f4df1f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java
@@ -29,7 +29,6 @@ import
org.apache.iotdb.db.utils.datastructure.MemPointIterator;
import org.apache.iotdb.db.utils.datastructure.MemPointIteratorFactory;
import org.apache.iotdb.db.utils.datastructure.TVList;
-import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.read.common.TimeRange;
import org.apache.tsfile.utils.Binary;
@@ -66,11 +65,7 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
private long sortedRowCount = 0;
private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
- private final long TARGET_CHUNK_SIZE = CONFIG.getTargetChunkSize();
- private long maxNumberOfPointsInChunk = CONFIG.getTargetChunkPointNum();
private final int TVLIST_SORT_THRESHOLD = CONFIG.getTvListSortThreshold();
- private final int MAX_NUMBER_OF_POINTS_IN_PAGE =
- TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
private static final String UNSUPPORTED_TYPE = "Unsupported data type:";
@@ -320,11 +315,11 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
}
@SuppressWarnings({"squid:S6541", "squid:S3776"})
- public void encodeWorkingAlignedTVList(BlockingQueue<Object> ioTaskQueue) {
+ public void encodeWorkingAlignedTVList(
+ BlockingQueue<Object> ioTaskQueue,
+ long maxNumberOfPointsInChunk,
+ int maxNumberOfPointsInPage) {
BitMap allValueColDeletedMap = list.getAllValueColDeletedMap();
- int avgPointSizeOfLargestColumn = list.getAvgPointSizeOfLargestColumn();
- maxNumberOfPointsInChunk =
- Math.min(maxNumberOfPointsInChunk, (TARGET_CHUNK_SIZE /
avgPointSizeOfLargestColumn));
boolean[] timeDuplicateInfo = null;
@@ -343,7 +338,7 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
}
pointNumInPage++;
pointNumInChunk++;
- if (pointNumInPage == MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ if (pointNumInPage == maxNumberOfPointsInPage) {
pageRange.add(sortedRowIndex);
pointNumInPage = 0;
}
@@ -379,14 +374,16 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
chunkRange.add(pageRange);
}
- handleEncoding(ioTaskQueue, chunkRange, timeDuplicateInfo,
allValueColDeletedMap);
+ handleEncoding(
+ ioTaskQueue, chunkRange, timeDuplicateInfo, allValueColDeletedMap,
maxNumberOfPointsInPage);
}
private void handleEncoding(
BlockingQueue<Object> ioTaskQueue,
List<List<Integer>> chunkRange,
boolean[] timeDuplicateInfo,
- BitMap allValueColDeletedMap) {
+ BitMap allValueColDeletedMap,
+ int maxNumberOfPointsInPage) {
List<TSDataType> dataTypes = list.getTsDataTypes();
Pair<Long, Integer>[] lastValidPointIndexForTimeDupCheck = new
Pair[dataTypes.size()];
for (List<Integer> pageRange : chunkRange) {
@@ -486,7 +483,7 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
alignedChunkWriter.nextColumn();
}
- long[] times = new long[Math.min(MAX_NUMBER_OF_POINTS_IN_PAGE,
list.rowCount())];
+ long[] times = new long[Math.min(maxNumberOfPointsInPage,
list.rowCount())];
int pointsInPage = 0;
for (int sortedRowIndex = pageRange.get(pageNum * 2);
sortedRowIndex <= pageRange.get(pageNum * 2 + 1);
@@ -516,7 +513,8 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
public synchronized void encode(
BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[]
times) {
if (TVLIST_SORT_THRESHOLD == 0) {
- encodeWorkingAlignedTVList(ioTaskQueue);
+ encodeWorkingAlignedTVList(
+ ioTaskQueue, encodeInfo.maxNumberOfPointsInChunk,
encodeInfo.maxNumberOfPointsInPage);
return;
}
@@ -527,16 +525,17 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
alignedTvLists.add(list);
List<Integer> columnIndexList = buildColumnIndexList(schemaList);
MemPointIterator timeValuePairIterator =
- MemPointIteratorFactory.create(dataTypes, columnIndexList,
alignedTvLists);
+ MemPointIteratorFactory.create(
+ dataTypes, columnIndexList, alignedTvLists,
encodeInfo.maxNumberOfPointsInPage);
while (timeValuePairIterator.hasNextBatch()) {
timeValuePairIterator.encodeBatch(alignedChunkWriter, encodeInfo, times);
- if (encodeInfo.pointNumInPage >= MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ if (encodeInfo.pointNumInPage >= encodeInfo.maxNumberOfPointsInPage) {
alignedChunkWriter.write(times, encodeInfo.pointNumInPage, 0);
encodeInfo.pointNumInPage = 0;
}
- if (encodeInfo.pointNumInChunk >= maxNumberOfPointsInChunk) {
+ if (encodeInfo.pointNumInChunk >= encodeInfo.maxNumberOfPointsInChunk) {
alignedChunkWriter.sealCurrentPage();
alignedChunkWriter.clearPageWriter();
try {
@@ -772,4 +771,15 @@ public class AlignedWritableMemChunk extends
AbstractWritableMemChunk {
}
return filteredTimestamps.stream().mapToLong(Long::valueOf).toArray();
}
+
+ // Choose maximum avgPointSizeOfLargestColumn among working and sorted
AlignedTVList as
+ // approximate calculation
+ public int getAvgPointSizeOfLargestColumn() {
+ int avgPointSizeOfLargestColumn = list.getAvgPointSizeOfLargestColumn();
+ for (AlignedTVList alignedTVList : sortedList) {
+ avgPointSizeOfLargestColumn =
+ Math.max(avgPointSizeOfLargestColumn,
alignedTVList.getAvgPointSizeOfLargestColumn());
+ }
+ return avgPointSizeOfLargestColumn;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
index 22fac1604ca..ed2fee4294f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
@@ -138,7 +138,13 @@ public class ReadOnlyMemChunk {
Statistics<? extends Serializable> chunkStatistics =
Statistics.getStatsByType(dataType);
List<TVList> tvLists = new ArrayList<>(tvListQueryMap.keySet());
timeValuePairIterator =
- MemPointIteratorFactory.create(dataType, tvLists, deletionList,
floatPrecision, encoding);
+ MemPointIteratorFactory.create(
+ dataType,
+ tvLists,
+ deletionList,
+ floatPrecision,
+ encoding,
+ MAX_NUMBER_OF_POINTS_IN_PAGE);
while (timeValuePairIterator.hasNextBatch()) {
// statistics for current batch
Statistics<? extends Serializable> pageStatistics =
Statistics.getStatsByType(dataType);
@@ -247,7 +253,12 @@ public class ReadOnlyMemChunk {
List<TVList> tvLists = new ArrayList<>(tvListQueryMap.keySet());
MemPointIterator timeValuePairIterator =
MemPointIteratorFactory.create(
- getDataType(), tvLists, deletionList, floatPrecision, encoding);
+ getDataType(),
+ tvLists,
+ deletionList,
+ floatPrecision,
+ encoding,
+ MAX_NUMBER_OF_POINTS_IN_PAGE);
while (timeValuePairIterator.hasNextTimeValuePair()) {
TimeValuePair tvPair = timeValuePairIterator.nextTimeValuePair();
@@ -322,8 +333,4 @@ public class ReadOnlyMemChunk {
public MemPointIterator getMemPointIterator() {
return timeValuePairIterator;
}
-
- public int getMaxNumberOfPointsInPage() {
- return MAX_NUMBER_OF_POINTS_IN_PAGE;
- }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
index 32a8152041e..01e91387612 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java
@@ -61,8 +61,6 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
private static final Logger LOGGER =
LoggerFactory.getLogger(WritableMemChunk.class);
private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
- private final long TARGET_CHUNK_SIZE = CONFIG.getTargetChunkSize();
- private final long MAX_NUMBER_OF_POINTS_IN_CHUNK =
CONFIG.getTargetChunkPointNum();
private final int TVLIST_SORT_THRESHOLD = CONFIG.getTvListSortThreshold();
public WritableMemChunk(IMeasurementSchema schema) {
@@ -368,7 +366,8 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
return out.toString();
}
- public void encodeWorkingTVList(BlockingQueue<Object> ioTaskQueue) {
+ public void encodeWorkingTVList(
+ BlockingQueue<Object> ioTaskQueue, long maxNumberOfPointsInChunk, long
targetChunkSize) {
TSDataType tsDataType = schema.getType();
ChunkWriterImpl chunkWriterImpl = createIChunkWriter();
@@ -426,8 +425,8 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
break;
}
pointNumInCurrentChunk++;
- if (pointNumInCurrentChunk > MAX_NUMBER_OF_POINTS_IN_CHUNK
- || dataSizeInCurrentChunk > TARGET_CHUNK_SIZE) {
+ if (pointNumInCurrentChunk > maxNumberOfPointsInChunk
+ || dataSizeInCurrentChunk > targetChunkSize) {
chunkWriterImpl.sealCurrentPage();
chunkWriterImpl.clearPageWriter();
try {
@@ -455,7 +454,8 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
public synchronized void encode(
BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[]
times) {
if (TVLIST_SORT_THRESHOLD == 0) {
- encodeWorkingTVList(ioTaskQueue);
+ encodeWorkingTVList(
+ ioTaskQueue, encodeInfo.maxNumberOfPointsInChunk,
encodeInfo.targetChunkSize);
return;
}
@@ -468,12 +468,13 @@ public class WritableMemChunk extends
AbstractWritableMemChunk {
List<TVList> tvLists = new ArrayList<>(sortedList);
tvLists.add(list);
MemPointIterator timeValuePairIterator =
- MemPointIteratorFactory.create(schema.getType(), tvLists);
+ MemPointIteratorFactory.create(
+ schema.getType(), tvLists, encodeInfo.maxNumberOfPointsInPage);
while (timeValuePairIterator.hasNextBatch()) {
timeValuePairIterator.encodeBatch(chunkWriterImpl, encodeInfo, times);
- if (encodeInfo.pointNumInChunk >= MAX_NUMBER_OF_POINTS_IN_CHUNK
- || encodeInfo.dataSizeInChunk >= TARGET_CHUNK_SIZE) {
+ if (encodeInfo.pointNumInChunk >= encodeInfo.maxNumberOfPointsInChunk
+ || encodeInfo.dataSizeInChunk >= encodeInfo.targetChunkSize) {
chunkWriterImpl.sealCurrentPage();
chunkWriterImpl.clearPageWriter();
try {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
index 007b56dd9a2..4b616967621 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java
@@ -19,14 +19,12 @@
package org.apache.iotdb.db.utils.datastructure;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.storageengine.dataregion.wal.buffer.IWALByteBufferView;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALWriteUtils;
import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
import org.apache.iotdb.db.utils.MathUtils;
import org.apache.tsfile.block.column.ColumnBuilder;
-import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.TimeValuePair;
@@ -1335,9 +1333,15 @@ public abstract class AlignedTVList extends TVList {
List<Integer> columnIndexList,
List<List<TimeRange>> valueColumnsDeletionList,
Integer floatPrecision,
- List<TSEncoding> encodingList) {
+ List<TSEncoding> encodingList,
+ int maxNumberOfPointsInPage) {
return new AlignedTVListIterator(
- dataTypeList, columnIndexList, valueColumnsDeletionList,
floatPrecision, encodingList);
+ dataTypeList,
+ columnIndexList,
+ valueColumnsDeletionList,
+ floatPrecision,
+ encodingList,
+ maxNumberOfPointsInPage);
}
/* AlignedTVList Iterator */
@@ -1355,18 +1359,14 @@ public abstract class AlignedTVList extends TVList {
private final List<int[]> valueColumnDeleteCursor = new ArrayList<>();
- private final int MAX_NUMBER_OF_POINTS_IN_PAGE =
-
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
- private long maxNumberOfPointsInChunk =
- IoTDBDescriptor.getInstance().getConfig().getTargetChunkPointNum();
-
public AlignedTVListIterator(
List<TSDataType> dataTypeList,
List<Integer> columnIndexList,
List<List<TimeRange>> valueColumnsDeletionList,
Integer floatPrecision,
- List<TSEncoding> encodingList) {
- super(null, null, null);
+ List<TSEncoding> encodingList,
+ int maxNumberOfPointsInPage) {
+ super(null, null, null, maxNumberOfPointsInPage);
this.dataTypeList = dataTypeList;
this.columnIndexList =
(columnIndexList == null)
@@ -1380,10 +1380,6 @@ public abstract class AlignedTVList extends TVList {
for (int i = 0; i < dataTypeList.size(); i++) {
valueColumnDeleteCursor.add(new int[] {0});
}
- int avgPointSizeOfLargestColumn = getAvgPointSizeOfLargestColumn();
- long TARGET_CHUNK_SIZE =
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
- maxNumberOfPointsInChunk =
- Math.min(maxNumberOfPointsInChunk, (TARGET_CHUNK_SIZE /
avgPointSizeOfLargestColumn));
}
@Override
@@ -1547,7 +1543,7 @@ public abstract class AlignedTVList extends TVList {
int startIndex = index;
// time column
for (; index < rows; index++) {
- if (validRowCount >= MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ if (validRowCount >= maxNumberOfPointsInPage) {
break;
}
// skip empty row
@@ -1691,8 +1687,8 @@ public abstract class AlignedTVList extends TVList {
int startIndex = index;
// time column
for (; index < rows; index++) {
- if (encodeInfo.pointNumInChunk >= maxNumberOfPointsInChunk
- || encodeInfo.pointNumInPage >= MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ if (encodeInfo.pointNumInChunk >= encodeInfo.maxNumberOfPointsInChunk
+ || encodeInfo.pointNumInPage >=
encodeInfo.maxNumberOfPointsInPage) {
break;
}
// skip empty row
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BatchEncodeInfo.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BatchEncodeInfo.java
index b836524438c..d4b3d7e9ccc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BatchEncodeInfo.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BatchEncodeInfo.java
@@ -21,15 +21,29 @@ package org.apache.iotdb.db.utils.datastructure;
// BatchEncodeInfo struct
public class BatchEncodeInfo {
+ // used by encode/encodeBatch during flush
+ public int maxNumberOfPointsInPage;
+ public long maxNumberOfPointsInChunk;
+ public long targetChunkSize;
+
public int pointNumInPage;
public int pointNumInChunk;
public long dataSizeInChunk;
public boolean lastIterator;
- public BatchEncodeInfo(int pointNumInPage, int pointNumInChunk, long
dataSizeInChunk) {
+ public BatchEncodeInfo(
+ int pointNumInPage,
+ int pointNumInChunk,
+ long dataSizeInChunk,
+ int maxNumberOfPointsInPage,
+ long maxNumberOfPointsInChunk,
+ long targetChunkSize) {
this.pointNumInPage = pointNumInPage;
this.pointNumInChunk = pointNumInChunk;
this.dataSizeInChunk = dataSizeInChunk;
+ this.maxNumberOfPointsInPage = maxNumberOfPointsInPage;
+ this.maxNumberOfPointsInChunk = maxNumberOfPointsInChunk;
+ this.targetChunkSize = targetChunkSize;
this.lastIterator = false;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIteratorFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIteratorFactory.java
index 25600e05d07..b622cc0295e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIteratorFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MemPointIteratorFactory.java
@@ -31,30 +31,38 @@ public class MemPointIteratorFactory {
}
// TVListIterator
- private static MemPointIterator single(List<TVList> tvLists) {
- return tvLists.get(0).iterator(null, null, null);
+ private static MemPointIterator single(List<TVList> tvLists, int
maxNumberOfPointsInPage) {
+ return tvLists.get(0).iterator(null, null, null, maxNumberOfPointsInPage);
}
- private static MemPointIterator single(List<TVList> tvLists, List<TimeRange>
deletionList) {
- return tvLists.get(0).iterator(deletionList, null, null);
+ private static MemPointIterator single(
+ List<TVList> tvLists, List<TimeRange> deletionList, int
maxNumberOfPointsInPage) {
+ return tvLists.get(0).iterator(deletionList, null, null,
maxNumberOfPointsInPage);
}
private static MemPointIterator single(
List<TVList> tvLists,
List<TimeRange> deletionList,
Integer floatPrecision,
- TSEncoding encoding) {
- return tvLists.get(0).iterator(deletionList, floatPrecision, encoding);
+ TSEncoding encoding,
+ int maxNumberOfPointsInPage) {
+ return tvLists.get(0).iterator(deletionList, floatPrecision, encoding,
maxNumberOfPointsInPage);
}
// MergeSortMultiTVListIterator
- private static MemPointIterator mergeSort(TSDataType tsDataType,
List<TVList> tvLists) {
- return new MergeSortMultiTVListIterator(tsDataType, tvLists, null, null,
null);
+ private static MemPointIterator mergeSort(
+ TSDataType tsDataType, List<TVList> tvLists, int
maxNumberOfPointsInPage) {
+ return new MergeSortMultiTVListIterator(
+ tsDataType, tvLists, null, null, null, maxNumberOfPointsInPage);
}
private static MemPointIterator mergeSort(
- TSDataType tsDataType, List<TVList> tvLists, List<TimeRange>
deletionList) {
- return new MergeSortMultiTVListIterator(tsDataType, tvLists, deletionList,
null, null);
+ TSDataType tsDataType,
+ List<TVList> tvLists,
+ List<TimeRange> deletionList,
+ int maxNumberOfPointsInPage) {
+ return new MergeSortMultiTVListIterator(
+ tsDataType, tvLists, deletionList, null, null,
maxNumberOfPointsInPage);
}
private static MemPointIterator mergeSort(
@@ -62,19 +70,26 @@ public class MemPointIteratorFactory {
List<TVList> tvLists,
List<TimeRange> deletionList,
Integer floatPrecision,
- TSEncoding encoding) {
+ TSEncoding encoding,
+ int maxNumberOfPointsInPage) {
return new MergeSortMultiTVListIterator(
- tsDataType, tvLists, deletionList, floatPrecision, encoding);
+ tsDataType, tvLists, deletionList, floatPrecision, encoding,
maxNumberOfPointsInPage);
}
// OrderedMultiTVListIterator
- private static MemPointIterator ordered(TSDataType tsDataType, List<TVList>
tvLists) {
- return new OrderedMultiTVListIterator(tsDataType, tvLists, null, null,
null);
+ private static MemPointIterator ordered(
+ TSDataType tsDataType, List<TVList> tvLists, int
maxNumberOfPointsInPage) {
+ return new OrderedMultiTVListIterator(
+ tsDataType, tvLists, null, null, null, maxNumberOfPointsInPage);
}
private static MemPointIterator ordered(
- TSDataType tsDataType, List<TVList> tvLists, List<TimeRange>
deletionList) {
- return new OrderedMultiTVListIterator(tsDataType, tvLists, deletionList,
null, null);
+ TSDataType tsDataType,
+ List<TVList> tvLists,
+ List<TimeRange> deletionList,
+ int maxNumberOfPointsInPage) {
+ return new OrderedMultiTVListIterator(
+ tsDataType, tvLists, deletionList, null, null,
maxNumberOfPointsInPage);
}
private static MemPointIterator ordered(
@@ -82,27 +97,38 @@ public class MemPointIteratorFactory {
List<TVList> tvLists,
List<TimeRange> deletionList,
Integer floatPrecision,
- TSEncoding encoding) {
+ TSEncoding encoding,
+ int maxNumberOfPointsInPage) {
return new OrderedMultiTVListIterator(
- tsDataType, tvLists, deletionList, floatPrecision, encoding);
+ tsDataType, tvLists, deletionList, floatPrecision, encoding,
maxNumberOfPointsInPage);
}
// AlignedTVListIterator
private static MemPointIterator single(
List<TSDataType> tsDataTypes,
List<Integer> columnIndexList,
- List<AlignedTVList> alignedTvLists) {
- return alignedTvLists.get(0).iterator(tsDataTypes, columnIndexList, null,
null, null);
+ List<AlignedTVList> alignedTvLists,
+ int maxNumberOfPointsInPage) {
+ return alignedTvLists
+ .get(0)
+ .iterator(tsDataTypes, columnIndexList, null, null, null,
maxNumberOfPointsInPage);
}
private static MemPointIterator single(
List<TSDataType> tsDataTypes,
List<Integer> columnIndexList,
List<AlignedTVList> alignedTvLists,
- List<List<TimeRange>> valueColumnsDeletionList) {
+ List<List<TimeRange>> valueColumnsDeletionList,
+ int maxNumberOfPointsInPage) {
return alignedTvLists
.get(0)
- .iterator(tsDataTypes, columnIndexList, valueColumnsDeletionList,
null, null);
+ .iterator(
+ tsDataTypes,
+ columnIndexList,
+ valueColumnsDeletionList,
+ null,
+ null,
+ maxNumberOfPointsInPage);
}
private static MemPointIterator single(
@@ -111,29 +137,43 @@ public class MemPointIteratorFactory {
List<AlignedTVList> alignedTvLists,
List<List<TimeRange>> valueColumnsDeletionList,
Integer floatPrecision,
- List<TSEncoding> encodingList) {
+ List<TSEncoding> encodingList,
+ int maxNumberOfPointsInPage) {
return alignedTvLists
.get(0)
.iterator(
- tsDataTypes, columnIndexList, valueColumnsDeletionList,
floatPrecision, encodingList);
+ tsDataTypes,
+ columnIndexList,
+ valueColumnsDeletionList,
+ floatPrecision,
+ encodingList,
+ maxNumberOfPointsInPage);
}
// MergeSortMultiAlignedTVListIterator
private static MemPointIterator mergeSort(
List<TSDataType> tsDataTypes,
List<Integer> columnIndexList,
- List<AlignedTVList> alignedTvLists) {
+ List<AlignedTVList> alignedTvLists,
+ int maxNumberOfPointsInPage) {
return new MergeSortMultiAlignedTVListIterator(
- tsDataTypes, columnIndexList, alignedTvLists, null, null, null);
+ tsDataTypes, columnIndexList, alignedTvLists, null, null, null,
maxNumberOfPointsInPage);
}
private static MemPointIterator mergeSort(
List<TSDataType> tsDataTypes,
List<Integer> columnIndexList,
List<AlignedTVList> alignedTvLists,
- List<List<TimeRange>> valueColumnsDeletionList) {
+ List<List<TimeRange>> valueColumnsDeletionList,
+ int maxNumberOfPointsInPage) {
return new MergeSortMultiAlignedTVListIterator(
- tsDataTypes, columnIndexList, alignedTvLists,
valueColumnsDeletionList, null, null);
+ tsDataTypes,
+ columnIndexList,
+ alignedTvLists,
+ valueColumnsDeletionList,
+ null,
+ null,
+ maxNumberOfPointsInPage);
}
private static MemPointIterator mergeSort(
@@ -142,32 +182,42 @@ public class MemPointIteratorFactory {
List<AlignedTVList> alignedTvLists,
List<List<TimeRange>> valueColumnsDeletionList,
Integer floatPrecision,
- List<TSEncoding> encodingList) {
+ List<TSEncoding> encodingList,
+ int maxNumberOfPointsInPage) {
return new MergeSortMultiAlignedTVListIterator(
tsDataTypes,
columnIndexList,
alignedTvLists,
valueColumnsDeletionList,
floatPrecision,
- encodingList);
+ encodingList,
+ maxNumberOfPointsInPage);
}
// OrderedMultiAlignedTVListIterator
private static MemPointIterator ordered(
List<TSDataType> tsDataTypes,
List<Integer> columnIndexList,
- List<AlignedTVList> alignedTvLists) {
+ List<AlignedTVList> alignedTvLists,
+ int maxNumberOfPointsInPage) {
return new OrderedMultiAlignedTVListIterator(
- tsDataTypes, columnIndexList, alignedTvLists, null, null, null);
+ tsDataTypes, columnIndexList, alignedTvLists, null, null, null,
maxNumberOfPointsInPage);
}
private static MemPointIterator ordered(
List<TSDataType> tsDataTypes,
List<Integer> columnIndexList,
List<AlignedTVList> alignedTvLists,
- List<List<TimeRange>> valueColumnsDeletionList) {
+ List<List<TimeRange>> valueColumnsDeletionList,
+ int maxNumberOfPointsInPage) {
return new OrderedMultiAlignedTVListIterator(
- tsDataTypes, columnIndexList, alignedTvLists,
valueColumnsDeletionList, null, null);
+ tsDataTypes,
+ columnIndexList,
+ alignedTvLists,
+ valueColumnsDeletionList,
+ null,
+ null,
+ maxNumberOfPointsInPage);
}
private static MemPointIterator ordered(
@@ -176,34 +226,40 @@ public class MemPointIteratorFactory {
List<AlignedTVList> alignedTvLists,
List<List<TimeRange>> valueColumnsDeletionList,
Integer floatPrecision,
- List<TSEncoding> encodingList) {
+ List<TSEncoding> encodingList,
+ int maxNumberOfPointsInPage) {
return new OrderedMultiAlignedTVListIterator(
tsDataTypes,
columnIndexList,
alignedTvLists,
valueColumnsDeletionList,
floatPrecision,
- encodingList);
+ encodingList,
+ maxNumberOfPointsInPage);
}
- public static MemPointIterator create(TSDataType tsDataType, List<TVList>
tvLists) {
+ public static MemPointIterator create(
+ TSDataType tsDataType, List<TVList> tvLists, int
maxNumberOfPointsInPage) {
if (tvLists.size() == 1) {
- return single(tvLists);
+ return single(tvLists, maxNumberOfPointsInPage);
} else if (isCompleteOrdered(tvLists)) {
- return ordered(tsDataType, tvLists);
+ return ordered(tsDataType, tvLists, maxNumberOfPointsInPage);
} else {
- return mergeSort(tsDataType, tvLists);
+ return mergeSort(tsDataType, tvLists, maxNumberOfPointsInPage);
}
}
public static MemPointIterator create(
- TSDataType tsDataType, List<TVList> tvLists, List<TimeRange>
deletionList) {
+ TSDataType tsDataType,
+ List<TVList> tvLists,
+ List<TimeRange> deletionList,
+ int maxNumberOfPointsInPage) {
if (tvLists.size() == 1) {
- return single(tvLists, deletionList);
+ return single(tvLists, deletionList, maxNumberOfPointsInPage);
} else if (isCompleteOrdered(tvLists)) {
- return ordered(tsDataType, tvLists, deletionList);
+ return ordered(tsDataType, tvLists, deletionList,
maxNumberOfPointsInPage);
} else {
- return mergeSort(tsDataType, tvLists, deletionList);
+ return mergeSort(tsDataType, tvLists, deletionList,
maxNumberOfPointsInPage);
}
}
@@ -212,26 +268,30 @@ public class MemPointIteratorFactory {
List<TVList> tvLists,
List<TimeRange> deletionList,
Integer floatPrecision,
- TSEncoding encoding) {
+ TSEncoding encoding,
+ int maxNumberOfPointsInPage) {
if (tvLists.size() == 1) {
- return single(tvLists, deletionList, floatPrecision, encoding);
+ return single(tvLists, deletionList, floatPrecision, encoding,
maxNumberOfPointsInPage);
} else if (isCompleteOrdered(tvLists)) {
- return ordered(tsDataType, tvLists, deletionList, floatPrecision,
encoding);
+ return ordered(
+ tsDataType, tvLists, deletionList, floatPrecision, encoding,
maxNumberOfPointsInPage);
} else {
- return mergeSort(tsDataType, tvLists, deletionList, floatPrecision,
encoding);
+ return mergeSort(
+ tsDataType, tvLists, deletionList, floatPrecision, encoding,
maxNumberOfPointsInPage);
}
}
public static MemPointIterator create(
List<TSDataType> tsDataTypes,
List<Integer> columnIndexList,
- List<AlignedTVList> alignedTvLists) {
+ List<AlignedTVList> alignedTvLists,
+ int maxNumberOfPointsInPage) {
if (alignedTvLists.size() == 1) {
- return single(tsDataTypes, columnIndexList, alignedTvLists);
+ return single(tsDataTypes, columnIndexList, alignedTvLists,
maxNumberOfPointsInPage);
} else if (isCompleteOrdered(alignedTvLists)) {
- return ordered(tsDataTypes, columnIndexList, alignedTvLists);
+ return ordered(tsDataTypes, columnIndexList, alignedTvLists,
maxNumberOfPointsInPage);
} else {
- return mergeSort(tsDataTypes, columnIndexList, alignedTvLists);
+ return mergeSort(tsDataTypes, columnIndexList, alignedTvLists,
maxNumberOfPointsInPage);
}
}
@@ -239,13 +299,29 @@ public class MemPointIteratorFactory {
List<TSDataType> tsDataTypes,
List<Integer> columnIndexList,
List<AlignedTVList> alignedTvLists,
- List<List<TimeRange>> valueColumnsDeletionList) {
+ List<List<TimeRange>> valueColumnsDeletionList,
+ int maxNumberOfPointsInPage) {
if (alignedTvLists.size() == 1) {
- return single(tsDataTypes, columnIndexList, alignedTvLists,
valueColumnsDeletionList);
+ return single(
+ tsDataTypes,
+ columnIndexList,
+ alignedTvLists,
+ valueColumnsDeletionList,
+ maxNumberOfPointsInPage);
} else if (isCompleteOrdered(alignedTvLists)) {
- return ordered(tsDataTypes, columnIndexList, alignedTvLists,
valueColumnsDeletionList);
+ return ordered(
+ tsDataTypes,
+ columnIndexList,
+ alignedTvLists,
+ valueColumnsDeletionList,
+ maxNumberOfPointsInPage);
} else {
- return mergeSort(tsDataTypes, columnIndexList, alignedTvLists,
valueColumnsDeletionList);
+ return mergeSort(
+ tsDataTypes,
+ columnIndexList,
+ alignedTvLists,
+ valueColumnsDeletionList,
+ maxNumberOfPointsInPage);
}
}
@@ -255,7 +331,8 @@ public class MemPointIteratorFactory {
List<AlignedTVList> alignedTvLists,
List<List<TimeRange>> valueColumnsDeletionList,
Integer floatPrecision,
- List<TSEncoding> encodingList) {
+ List<TSEncoding> encodingList,
+ int maxNumberOfPointsInPage) {
if (alignedTvLists.size() == 1) {
return single(
tsDataTypes,
@@ -263,7 +340,8 @@ public class MemPointIteratorFactory {
alignedTvLists,
valueColumnsDeletionList,
floatPrecision,
- encodingList);
+ encodingList,
+ maxNumberOfPointsInPage);
} else if (isCompleteOrdered(alignedTvLists)) {
return ordered(
tsDataTypes,
@@ -271,7 +349,8 @@ public class MemPointIteratorFactory {
alignedTvLists,
valueColumnsDeletionList,
floatPrecision,
- encodingList);
+ encodingList,
+ maxNumberOfPointsInPage);
} else {
return mergeSort(
tsDataTypes,
@@ -279,7 +358,8 @@ public class MemPointIteratorFactory {
alignedTvLists,
valueColumnsDeletionList,
floatPrecision,
- encodingList);
+ encodingList,
+ maxNumberOfPointsInPage);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
index 9a4f4c5776a..42fbf6fb4fd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiAlignedTVListIterator.java
@@ -19,9 +19,6 @@
package org.apache.iotdb.db.utils.datastructure;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.common.TimeRange;
@@ -54,23 +51,22 @@ public class MergeSortMultiAlignedTVListIterator extends
MultiAlignedTVListItera
new PriorityQueue<>(
(a, b) -> a.left.equals(b.left) ? b.right.compareTo(a.right) :
a.left.compareTo(b.left));
- private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
- private long maxNumberOfPointsInChunk = CONFIG.getTargetChunkPointNum();
-
public MergeSortMultiAlignedTVListIterator(
List<TSDataType> tsDataTypes,
List<Integer> columnIndexList,
List<AlignedTVList> alignedTvLists,
List<List<TimeRange>> valueColumnsDeletionList,
Integer floatPrecision,
- List<TSEncoding> encodingList) {
+ List<TSEncoding> encodingList,
+ int maxNumberOfPointsInPage) {
super(
tsDataTypes,
columnIndexList,
alignedTvLists,
valueColumnsDeletionList,
floatPrecision,
- encodingList);
+ encodingList,
+ maxNumberOfPointsInPage);
this.probeIterators =
IntStream.range(0,
alignedTvListIterators.size()).boxed().collect(Collectors.toSet());
this.bitMap = new BitMap(tsDataTypeList.size());
@@ -80,16 +76,6 @@ public class MergeSortMultiAlignedTVListIterator extends
MultiAlignedTVListItera
for (int i = 0; i < tsDataTypeList.size(); i++) {
valueColumnDeleteCursor.add(new int[] {0});
}
- if (!alignedTvLists.isEmpty()) {
- int avgPointSizeOfLargestColumn =
- alignedTvLists.stream()
- .mapToInt(AlignedTVList::getAvgPointSizeOfLargestColumn)
- .max()
- .getAsInt();
- long TARGET_CHUNK_SIZE = CONFIG.getTargetChunkSize();
- maxNumberOfPointsInChunk =
- Math.min(maxNumberOfPointsInChunk, (TARGET_CHUNK_SIZE /
avgPointSizeOfLargestColumn));
- }
}
@Override
@@ -254,8 +240,8 @@ public class MergeSortMultiAlignedTVListIterator extends
MultiAlignedTVListItera
encodeInfo.pointNumInChunk++;
// new page
- if (encodeInfo.pointNumInPage >= MAX_NUMBER_OF_POINTS_IN_PAGE
- || encodeInfo.pointNumInChunk >= maxNumberOfPointsInChunk) {
+ if (encodeInfo.pointNumInPage >= encodeInfo.maxNumberOfPointsInPage
+ || encodeInfo.pointNumInChunk >=
encodeInfo.maxNumberOfPointsInChunk) {
alignedChunkWriterImpl.write(times, encodeInfo.pointNumInPage, 0);
encodeInfo.pointNumInPage = 0;
break;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
index 85528b27ba6..74636222acc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MergeSortMultiTVListIterator.java
@@ -19,9 +19,6 @@
package org.apache.iotdb.db.utils.datastructure;
-import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.common.TimeRange;
@@ -44,17 +41,14 @@ public class MergeSortMultiTVListIterator extends
MultiTVListIterator {
new PriorityQueue<>(
(a, b) -> a.left.equals(b.left) ? b.right.compareTo(a.right) :
a.left.compareTo(b.left));
- private static final IoTDBConfig CONFIG =
IoTDBDescriptor.getInstance().getConfig();
- private final long TARGET_CHUNK_SIZE = CONFIG.getTargetChunkSize();
- private final long MAX_NUMBER_OF_POINTS_IN_CHUNK =
CONFIG.getTargetChunkPointNum();
-
public MergeSortMultiTVListIterator(
TSDataType tsDataType,
List<TVList> tvLists,
List<TimeRange> deletionList,
Integer floatPrecision,
- TSEncoding encoding) {
- super(tsDataType, tvLists, deletionList, floatPrecision, encoding);
+ TSEncoding encoding,
+ int maxNumberOfPointsInPage) {
+ super(tsDataType, tvLists, deletionList, floatPrecision, encoding,
maxNumberOfPointsInPage);
this.probeIterators =
IntStream.range(0,
tvListIterators.size()).boxed().collect(Collectors.toList());
}
@@ -147,8 +141,8 @@ public class MergeSortMultiTVListIterator extends
MultiTVListIterator {
}
encodeInfo.pointNumInChunk++;
- if (encodeInfo.pointNumInChunk >= MAX_NUMBER_OF_POINTS_IN_CHUNK
- || encodeInfo.dataSizeInChunk >= TARGET_CHUNK_SIZE) {
+ if (encodeInfo.pointNumInChunk >= encodeInfo.maxNumberOfPointsInChunk
+ || encodeInfo.dataSizeInChunk >= encodeInfo.targetChunkSize) {
break;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java
index 5ef966f2d27..42292dc27fc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiAlignedTVListIterator.java
@@ -20,7 +20,6 @@
package org.apache.iotdb.db.utils.datastructure;
import org.apache.tsfile.block.column.ColumnBuilder;
-import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.TimeValuePair;
@@ -49,8 +48,8 @@ public abstract class MultiAlignedTVListIterator implements
MemPointIterator {
protected List<TsBlock> tsBlocks;
protected long currentTime;
- protected final int MAX_NUMBER_OF_POINTS_IN_PAGE =
- TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+ // used by nextBatch during query
+ protected final int maxNumberOfPointsInPage;
protected MultiAlignedTVListIterator(
List<TSDataType> tsDataTypeList,
@@ -58,19 +57,26 @@ public abstract class MultiAlignedTVListIterator implements
MemPointIterator {
List<AlignedTVList> alignedTvLists,
List<List<TimeRange>> valueColumnsDeletionList,
Integer floatPrecision,
- List<TSEncoding> encodingList) {
+ List<TSEncoding> encodingList,
+ int maxNumberOfPointsInPage) {
this.tsDataTypeList = tsDataTypeList;
this.columnIndexList = columnIndexList;
this.alignedTvListIterators = new ArrayList<>(alignedTvLists.size());
for (AlignedTVList alignedTvList : alignedTvLists) {
alignedTvListIterators.add(
alignedTvList.iterator(
- tsDataTypeList, columnIndexList, null, floatPrecision,
encodingList));
+ tsDataTypeList,
+ columnIndexList,
+ null,
+ floatPrecision,
+ encodingList,
+ maxNumberOfPointsInPage));
}
this.valueColumnsDeletionList = valueColumnsDeletionList;
this.floatPrecision = floatPrecision != null ? floatPrecision : 0;
this.encodingList = encodingList;
this.tsBlocks = new ArrayList<>();
+ this.maxNumberOfPointsInPage = maxNumberOfPointsInPage;
}
@Override
@@ -125,7 +131,7 @@ public abstract class MultiAlignedTVListIterator implements
MemPointIterator {
// Time column
TimeColumnBuilder timeBuilder = builder.getTimeColumnBuilder();
- while (hasNextTimeValuePair() && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ while (hasNextTimeValuePair() && builder.getPositionCount() <
maxNumberOfPointsInPage) {
timeBuilder.writeLong(currentTime);
for (int columnIndex = 0; columnIndex < tsDataTypeList.size();
columnIndex++) {
// Value column
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java
index d400f629971..aa9ec257311 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/MultiTVListIterator.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.db.utils.datastructure;
-import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.TimeValuePair;
@@ -46,23 +45,25 @@ public abstract class MultiTVListIterator implements
MemPointIterator {
protected int iteratorIndex = 0;
protected int rowIndex = 0;
- protected final int MAX_NUMBER_OF_POINTS_IN_PAGE =
- TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
+ // used by nextBatch during query
+ protected final int maxNumberOfPointsInPage;
protected MultiTVListIterator(
TSDataType tsDataType,
List<TVList> tvLists,
List<TimeRange> deletionList,
Integer floatPrecision,
- TSEncoding encoding) {
+ TSEncoding encoding,
+ int maxNumberOfPointsInPage) {
this.tsDataType = tsDataType;
this.tvListIterators = new ArrayList<>(tvLists.size());
for (TVList tvList : tvLists) {
- tvListIterators.add(tvList.iterator(deletionList, null, null));
+ tvListIterators.add(tvList.iterator(deletionList, null, null,
maxNumberOfPointsInPage));
}
this.floatPrecision = floatPrecision != null ? floatPrecision : 0;
this.encoding = encoding;
this.tsBlocks = new ArrayList<>();
+ this.maxNumberOfPointsInPage = maxNumberOfPointsInPage;
}
@Override
@@ -102,7 +103,7 @@ public abstract class MultiTVListIterator implements
MemPointIterator {
@Override
public TsBlock nextBatch() {
TsBlockBuilder builder = new
TsBlockBuilder(Collections.singletonList(tsDataType));
- while (hasNextTimeValuePair() && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ while (hasNextTimeValuePair() && builder.getPositionCount() <
maxNumberOfPointsInPage) {
TVList.TVListIterator iterator = tvListIterators.get(iteratorIndex);
builder.getTimeColumnBuilder().writeLong(currentTime);
switch (tsDataType) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
index 0ab9e2bac89..ecd6c496dfb 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiAlignedTVListIterator.java
@@ -42,14 +42,16 @@ public class OrderedMultiAlignedTVListIterator extends
MultiAlignedTVListIterato
List<AlignedTVList> alignedTvLists,
List<List<TimeRange>> valueColumnsDeletionList,
Integer floatPrecision,
- List<TSEncoding> encodingList) {
+ List<TSEncoding> encodingList,
+ int maxNumberOfPointsInPage) {
super(
tsDataTypes,
columnIndexList,
alignedTvLists,
valueColumnsDeletionList,
floatPrecision,
- encodingList);
+ encodingList,
+ maxNumberOfPointsInPage);
this.bitMap = new BitMap(tsDataTypeList.size());
this.valueColumnDeleteCursor = new ArrayList<>();
for (int i = 0; i < tsDataTypeList.size(); i++) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
index e20ae061f75..eeb69ddb95b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/OrderedMultiTVListIterator.java
@@ -32,8 +32,9 @@ public class OrderedMultiTVListIterator extends
MultiTVListIterator {
List<TVList> tvLists,
List<TimeRange> deletionList,
Integer floatPrecision,
- TSEncoding encoding) {
- super(tsDataType, tvLists, deletionList, floatPrecision, encoding);
+ TSEncoding encoding,
+ int maxNumberOfPointsInPage) {
+ super(tsDataType, tvLists, deletionList, floatPrecision, encoding,
maxNumberOfPointsInPage);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
index c7a916d25b2..cc499ac1e0b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java
@@ -20,13 +20,11 @@
package org.apache.iotdb.db.utils.datastructure;
import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue;
import org.apache.iotdb.db.storageengine.rescon.memory.PrimitiveArrayManager;
import org.apache.iotdb.db.utils.MathUtils;
-import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.TimeValuePair;
@@ -645,8 +643,11 @@ public abstract class TVList implements WALEntryValue {
}
public TVListIterator iterator(
- List<TimeRange> deletionList, Integer floatPrecision, TSEncoding
encoding) {
- return new TVListIterator(deletionList, floatPrecision, encoding);
+ List<TimeRange> deletionList,
+ Integer floatPrecision,
+ TSEncoding encoding,
+ int maxNumberOfPointsInPage) {
+ return new TVListIterator(deletionList, floatPrecision, encoding,
maxNumberOfPointsInPage);
}
/* TVList Iterator */
@@ -661,15 +662,14 @@ public abstract class TVList implements WALEntryValue {
private final int floatPrecision;
private final TSEncoding encoding;
- private final int MAX_NUMBER_OF_POINTS_IN_PAGE =
-
TSFileDescriptor.getInstance().getConfig().getMaxNumberOfPointsInPage();
- private final long TARGET_CHUNK_SIZE =
- IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
- private final long MAX_NUMBER_OF_POINTS_IN_CHUNK =
- IoTDBDescriptor.getInstance().getConfig().getTargetChunkPointNum();
+ // used by nextBatch during query
+ protected final int maxNumberOfPointsInPage;
public TVListIterator(
- List<TimeRange> deletionList, Integer floatPrecision, TSEncoding
encoding) {
+ List<TimeRange> deletionList,
+ Integer floatPrecision,
+ TSEncoding encoding,
+ int maxNumberOfPointsInPage) {
this.deletionList = deletionList;
this.floatPrecision = floatPrecision != null ? floatPrecision : 0;
this.encoding = encoding;
@@ -677,6 +677,7 @@ public abstract class TVList implements WALEntryValue {
this.rows = rowCount;
this.probeNext = false;
this.tsBlocks = new ArrayList<>();
+ this.maxNumberOfPointsInPage = maxNumberOfPointsInPage;
}
protected void prepareNext() {
@@ -739,7 +740,7 @@ public abstract class TVList implements WALEntryValue {
TsBlockBuilder builder = new
TsBlockBuilder(Collections.singletonList(dataType));
switch (dataType) {
case BOOLEAN:
- while (index < rows && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ while (index < rows && builder.getPositionCount() <
maxNumberOfPointsInPage) {
long time = getTime(index);
if (!isNullValue(getValueIndex(index))
&& !isPointDeleted(time, deletionList, deleteCursor)
@@ -753,7 +754,7 @@ public abstract class TVList implements WALEntryValue {
break;
case INT32:
case DATE:
- while (index < rows && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ while (index < rows && builder.getPositionCount() <
maxNumberOfPointsInPage) {
long time = getTime(index);
if (!isNullValue(getValueIndex(index))
&& !isPointDeleted(time, deletionList, deleteCursor)
@@ -767,7 +768,7 @@ public abstract class TVList implements WALEntryValue {
break;
case INT64:
case TIMESTAMP:
- while (index < rows && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ while (index < rows && builder.getPositionCount() <
maxNumberOfPointsInPage) {
long time = getTime(index);
if (!isNullValue(getValueIndex(index))
&& !isPointDeleted(time, deletionList, deleteCursor)
@@ -780,7 +781,7 @@ public abstract class TVList implements WALEntryValue {
}
break;
case FLOAT:
- while (index < rows && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ while (index < rows && builder.getPositionCount() <
maxNumberOfPointsInPage) {
long time = getTime(index);
if (!isNullValue(getValueIndex(index))
&& !isPointDeleted(time, deletionList, deleteCursor)
@@ -796,7 +797,7 @@ public abstract class TVList implements WALEntryValue {
}
break;
case DOUBLE:
- while (index < rows && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ while (index < rows && builder.getPositionCount() <
maxNumberOfPointsInPage) {
long time = getTime(index);
if (!isNullValue(getValueIndex(index))
&& !isPointDeleted(time, deletionList, deleteCursor)
@@ -814,7 +815,7 @@ public abstract class TVList implements WALEntryValue {
case TEXT:
case BLOB:
case STRING:
- while (index < rows && builder.getPositionCount() <
MAX_NUMBER_OF_POINTS_IN_PAGE) {
+ while (index < rows && builder.getPositionCount() <
maxNumberOfPointsInPage) {
long time = getTime(index);
if (!isNullValue(getValueIndex(index))
&& !isPointDeleted(time, deletionList, deleteCursor)
@@ -893,8 +894,8 @@ public abstract class TVList implements WALEntryValue {
String.format("Data type %s is not supported.", dataType));
}
encodeInfo.pointNumInChunk++;
- if (encodeInfo.pointNumInChunk >= MAX_NUMBER_OF_POINTS_IN_CHUNK
- || encodeInfo.dataSizeInChunk >= TARGET_CHUNK_SIZE) {
+ if (encodeInfo.pointNumInChunk >= encodeInfo.maxNumberOfPointsInChunk
+ || encodeInfo.dataSizeInChunk >= encodeInfo.targetChunkSize) {
break;
}
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedChunkLoaderTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedChunkLoaderTest.java
index 1422ec48e11..feea095d389 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedChunkLoaderTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemAlignedChunkLoaderTest.java
@@ -57,6 +57,7 @@ import static org.junit.Assert.fail;
public class MemAlignedChunkLoaderTest {
private static final String BINARY_STR = "ty love zm";
+ private static final int maxNumberOfPointsInPage = 1000;
@Test
public void testMemAlignedChunkLoader() throws IOException {
@@ -78,7 +79,6 @@ public class MemAlignedChunkLoaderTest {
Mockito.when(timeStatistics.getCount()).thenReturn(2L);
timeStatitsticsList.add(timeStatistics);
Mockito.when(chunk.getTimeStatisticsList()).thenReturn(timeStatitsticsList);
- Mockito.when(chunk.getMaxNumberOfPointsInPage()).thenReturn(1000);
List<Statistics<? extends Serializable>[]> valuesStatitsticsList = new
ArrayList<>();
Statistics<? extends Serializable>[] valuesStatistics = new Statistics[6];
@@ -114,7 +114,7 @@ public class MemAlignedChunkLoaderTest {
List<AlignedTVList> alignedTvLists =
alignedTvListMap.keySet().stream().map(x -> (AlignedTVList)
x).collect(Collectors.toList());
MemPointIterator timeValuePairIterator =
- MemPointIteratorFactory.create(dataTypes, null, alignedTvLists);
+ MemPointIteratorFactory.create(dataTypes, null, alignedTvLists,
maxNumberOfPointsInPage);
timeValuePairIterator.nextBatch();
Mockito.when(chunk.getMemPointIterator()).thenReturn(timeValuePairIterator);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemChunkLoaderTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemChunkLoaderTest.java
index 7ed50f2e009..9ea7da676f9 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemChunkLoaderTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/read/reader/chunk/MemChunkLoaderTest.java
@@ -59,17 +59,17 @@ import static org.junit.Assert.fail;
public class MemChunkLoaderTest {
private static final String BINARY_STR = "ty love zm";
+ private static final int maxNumberOfPointsInPage = 1000;
@Test
public void testBooleanMemChunkLoader() throws IOException {
ReadOnlyMemChunk chunk = Mockito.mock(ReadOnlyMemChunk.class);
Mockito.when(chunk.getDataType()).thenReturn(TSDataType.BOOLEAN);
- Mockito.when(chunk.getMaxNumberOfPointsInPage()).thenReturn(1000);
Map<TVList, Integer> booleanTvListMap = buildBooleanTvListMap();
Mockito.when(chunk.getTvListQueryMap()).thenReturn(booleanTvListMap);
List<TVList> booleanTvLists = new ArrayList<>(booleanTvListMap.keySet());
MemPointIterator timeValuePairIterator =
- MemPointIteratorFactory.create(TSDataType.BOOLEAN, booleanTvLists);
+ MemPointIteratorFactory.create(TSDataType.BOOLEAN, booleanTvLists,
maxNumberOfPointsInPage);
timeValuePairIterator.nextBatch();
Mockito.when(chunk.getMemPointIterator()).thenReturn(timeValuePairIterator);
@@ -143,12 +143,11 @@ public class MemChunkLoaderTest {
public void testInt32MemChunkLoader() throws IOException {
ReadOnlyMemChunk chunk = Mockito.mock(ReadOnlyMemChunk.class);
Mockito.when(chunk.getDataType()).thenReturn(TSDataType.INT32);
- Mockito.when(chunk.getMaxNumberOfPointsInPage()).thenReturn(1000);
Map<TVList, Integer> int32TvListMap = buildInt32TvListMap();
Mockito.when(chunk.getTvListQueryMap()).thenReturn(int32TvListMap);
List<TVList> int32TvLists = new ArrayList<>(int32TvListMap.keySet());
MemPointIterator timeValuePairIterator =
- MemPointIteratorFactory.create(TSDataType.INT32, int32TvLists);
+ MemPointIteratorFactory.create(TSDataType.INT32, int32TvLists,
maxNumberOfPointsInPage);
timeValuePairIterator.nextBatch();
Mockito.when(chunk.getMemPointIterator()).thenReturn(timeValuePairIterator);
@@ -223,12 +222,11 @@ public class MemChunkLoaderTest {
public void testInt64MemChunkLoader() throws IOException {
ReadOnlyMemChunk chunk = Mockito.mock(ReadOnlyMemChunk.class);
Mockito.when(chunk.getDataType()).thenReturn(TSDataType.INT64);
- Mockito.when(chunk.getMaxNumberOfPointsInPage()).thenReturn(1000);
Map<TVList, Integer> int64TvListMap = buildInt64TvListMap();
Mockito.when(chunk.getTvListQueryMap()).thenReturn(int64TvListMap);
List<TVList> int64TvLists = new ArrayList<>(int64TvListMap.keySet());
MemPointIterator timeValuePairIterator =
- MemPointIteratorFactory.create(TSDataType.INT64, int64TvLists);
+ MemPointIteratorFactory.create(TSDataType.INT64, int64TvLists,
maxNumberOfPointsInPage);
timeValuePairIterator.nextBatch();
Mockito.when(chunk.getMemPointIterator()).thenReturn(timeValuePairIterator);
@@ -303,12 +301,11 @@ public class MemChunkLoaderTest {
public void testFloatMemChunkLoader() throws IOException {
ReadOnlyMemChunk chunk = Mockito.mock(ReadOnlyMemChunk.class);
Mockito.when(chunk.getDataType()).thenReturn(TSDataType.FLOAT);
- Mockito.when(chunk.getMaxNumberOfPointsInPage()).thenReturn(1000);
Map<TVList, Integer> floatTvListMap = buildFloatTvListMap();
Mockito.when(chunk.getTvListQueryMap()).thenReturn(floatTvListMap);
List<TVList> floatTvLists = new ArrayList<>(floatTvListMap.keySet());
MemPointIterator timeValuePairIterator =
- MemPointIteratorFactory.create(TSDataType.FLOAT, floatTvLists);
+ MemPointIteratorFactory.create(TSDataType.FLOAT, floatTvLists,
maxNumberOfPointsInPage);
timeValuePairIterator.nextBatch();
Mockito.when(chunk.getMemPointIterator()).thenReturn(timeValuePairIterator);
@@ -383,12 +380,11 @@ public class MemChunkLoaderTest {
public void testDoubleMemChunkLoader() throws IOException {
ReadOnlyMemChunk chunk = Mockito.mock(ReadOnlyMemChunk.class);
Mockito.when(chunk.getDataType()).thenReturn(TSDataType.DOUBLE);
- Mockito.when(chunk.getMaxNumberOfPointsInPage()).thenReturn(1000);
Map<TVList, Integer> doubleTvListMap = buildDoubleTvListMap();
Mockito.when(chunk.getTvListQueryMap()).thenReturn(doubleTvListMap);
List<TVList> doubleTvLists = new ArrayList<>(doubleTvListMap.keySet());
MemPointIterator timeValuePairIterator =
- MemPointIteratorFactory.create(TSDataType.DOUBLE, doubleTvLists);
+ MemPointIteratorFactory.create(TSDataType.DOUBLE, doubleTvLists,
maxNumberOfPointsInPage);
timeValuePairIterator.nextBatch();
Mockito.when(chunk.getMemPointIterator()).thenReturn(timeValuePairIterator);
@@ -463,12 +459,11 @@ public class MemChunkLoaderTest {
public void testTextMemChunkLoader() throws IOException {
ReadOnlyMemChunk chunk = Mockito.mock(ReadOnlyMemChunk.class);
Mockito.when(chunk.getDataType()).thenReturn(TSDataType.TEXT);
- Mockito.when(chunk.getMaxNumberOfPointsInPage()).thenReturn(1000);
Map<TVList, Integer> textTvListMap = buildTextTvListMap();
Mockito.when(chunk.getTvListQueryMap()).thenReturn(textTvListMap);
List<TVList> textTvLists = new ArrayList<>(textTvListMap.keySet());
MemPointIterator timeValuePairIterator =
- MemPointIteratorFactory.create(TSDataType.TEXT, textTvLists);
+ MemPointIteratorFactory.create(TSDataType.TEXT, textTvLists,
maxNumberOfPointsInPage);
timeValuePairIterator.nextBatch();
Mockito.when(chunk.getMemPointIterator()).thenReturn(timeValuePairIterator);