shizy818 commented on code in PR #14616:
URL: https://github.com/apache/iotdb/pull/14616#discussion_r1909718601
##########
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java:
##########
@@ -534,34 +717,210 @@ private void handleEncoding(
}
}
+ private void writePageValuesIntoWriter(
+ IChunkWriter chunkWriter,
+ long[] times,
+ PageColumnAccessInfo[] pageColumnAccessInfo,
+ MergeSortAlignedTVListIterator timeValuePairIterator) {
+ AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl)
chunkWriter;
+
+ // update value statistics
+ for (int columnIndex = 0; columnIndex < dataTypes.size(); columnIndex++) {
+ ValueChunkWriter valueChunkWriter =
+ alignedChunkWriter.getValueChunkWriterByIndex(columnIndex);
+ PageColumnAccessInfo pageAccessInfo = pageColumnAccessInfo[columnIndex];
+ switch (dataTypes.get(columnIndex)) {
+ case BOOLEAN:
+ for (int index = 0; index < pageAccessInfo.count(); index++) {
+ int[] accessInfo = pageAccessInfo.get(index);
+ TsPrimitiveType value =
+ timeValuePairIterator.getPrimitiveObject(accessInfo,
columnIndex);
+ valueChunkWriter.write(
+ times[index], value != null && value.getBoolean(), value ==
null);
+ }
+ break;
+ case INT32:
+ case DATE:
+ for (int index = 0; index < pageAccessInfo.count(); index++) {
+ int[] accessInfo = pageAccessInfo.get(index);
+ TsPrimitiveType value =
+ timeValuePairIterator.getPrimitiveObject(accessInfo,
columnIndex);
+ valueChunkWriter.write(times[index], value == null ? 0 :
value.getInt(), value == null);
+ }
+ break;
+ case INT64:
+ case TIMESTAMP:
+ for (int index = 0; index < pageAccessInfo.count(); index++) {
+ int[] accessInfo = pageAccessInfo.get(index);
+ TsPrimitiveType value =
+ timeValuePairIterator.getPrimitiveObject(accessInfo,
columnIndex);
+ valueChunkWriter.write(
+ times[index], value == null ? 0L : value.getLong(), value ==
null);
+ }
+ break;
+ case FLOAT:
+ for (int index = 0; index < pageAccessInfo.count(); index++) {
+ int[] accessInfo = pageAccessInfo.get(index);
+ TsPrimitiveType value =
+ timeValuePairIterator.getPrimitiveObject(accessInfo,
columnIndex);
+ valueChunkWriter.write(
+ times[index], value == null ? 0f : value.getFloat(), value ==
null);
+ }
+ break;
+ case DOUBLE:
+ for (int index = 0; index < pageAccessInfo.count(); index++) {
+ int[] accessInfo = pageAccessInfo.get(index);
+ TsPrimitiveType value =
+ timeValuePairIterator.getPrimitiveObject(accessInfo,
columnIndex);
+ valueChunkWriter.write(
+ times[index], value == null ? 0d : value.getDouble(), value ==
null);
+ }
+ break;
+ case TEXT:
+ case BLOB:
+ case STRING:
+ for (int index = 0; index < pageAccessInfo.count(); index++) {
+ int[] accessInfo = pageAccessInfo.get(index);
+ TsPrimitiveType value =
+ timeValuePairIterator.getPrimitiveObject(accessInfo,
columnIndex);
+ valueChunkWriter.write(
+ times[index],
+ value == null ? Binary.EMPTY_VALUE : value.getBinary(),
+ value == null);
+ }
+ break;
+ default:
+ throw new UnSupportedDataTypeException(
+ String.format("Data type %s is not supported.",
dataTypes.get(columnIndex)));
+ }
+ }
+ }
+
+ @Override
+ public synchronized void encode(BlockingQueue<Object> ioTaskQueue) {
+ if (TVLIST_SORT_THRESHOLD == 0) {
+ encodeWorkingAlignedTVList(ioTaskQueue);
+ return;
+ }
+
+ AlignedChunkWriterImpl alignedChunkWriter = new
AlignedChunkWriterImpl(schemaList);
+ // create MergeSortAlignedTVListIterator.
+ List<AlignedTVList> alignedTvLists = new ArrayList<>(sortedList);
+ alignedTvLists.add(list);
+ MergeSortAlignedTVListIterator timeValuePairIterator =
+ new MergeSortAlignedTVListIterator(
+ alignedTvLists, dataTypes, null, null, null, ignoreAllNullRows);
+
+ int pointNumInPage = 0;
+ int pointNumInChunk = 0;
+ long[] times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
+
+ PageColumnAccessInfo[] pageColumnAccessInfo = new
PageColumnAccessInfo[dataTypes.size()];
+ for (int i = 0; i < pageColumnAccessInfo.length; i++) {
+ pageColumnAccessInfo[i] = new PageColumnAccessInfo();
+ }
+
+ while (timeValuePairIterator.hasNextTimeValuePair()) {
+ // prepare column access info for current page
+ int[][] accessInfo = timeValuePairIterator.getColumnAccessInfo();
+ times[pointNumInPage] = timeValuePairIterator.getTime();
+ for (int i = 0; i < dataTypes.size(); i++) {
+ pageColumnAccessInfo[i].add(accessInfo[i]);
+ }
+ timeValuePairIterator.step();
+ pointNumInPage++;
+ pointNumInChunk++;
+
+ if (pointNumInPage == MAX_NUMBER_OF_POINTS_IN_PAGE
+ || pointNumInChunk >= maxNumberOfPointsInChunk) {
+ writePageValuesIntoWriter(
+ alignedChunkWriter, times, pageColumnAccessInfo,
timeValuePairIterator);
+ alignedChunkWriter.write(times, pointNumInPage, 0);
+ for (PageColumnAccessInfo columnAccessInfo : pageColumnAccessInfo) {
+ columnAccessInfo.reset();
+ }
+ pointNumInPage = 0;
+ }
+
+ if (pointNumInChunk >= maxNumberOfPointsInChunk) {
+ alignedChunkWriter.sealCurrentPage();
+ alignedChunkWriter.clearPageWriter();
+ try {
+ ioTaskQueue.put(alignedChunkWriter);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ alignedChunkWriter = new AlignedChunkWriterImpl(schemaList);
+ pointNumInChunk = 0;
+ }
+ }
Review Comment:
Logically it makes sense that AlignedChunkWriter shuould handle paging split
itself. But TsFile is a seperate project now. Shall we do this improvement in
next PR?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]