This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch fixBug0513-dev1.3 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit e1eea3161f99306cccc551e33e4d9c675ab413e9 Author: shuwenwei <[email protected]> AuthorDate: Wed Feb 11 18:17:08 2026 +0800 Fixed concurrency issues caused by write and flush sorting during query execution (#17193) --- .../dataregion/flush/MemTableFlushTask.java | 1 + .../memtable/AbstractWritableMemChunk.java | 43 ++++++++++- .../memtable/AlignedWritableMemChunk.java | 88 +++++++++++++--------- .../dataregion/memtable/IWritableMemChunk.java | 2 + .../dataregion/memtable/WritableMemChunk.java | 38 +++++----- .../db/utils/datastructure/AlignedTVList.java | 10 +++ .../iotdb/db/utils/datastructure/BinaryTVList.java | 9 +++ .../db/utils/datastructure/BooleanTVList.java | 9 +++ .../iotdb/db/utils/datastructure/DoubleTVList.java | 9 +++ .../iotdb/db/utils/datastructure/FloatTVList.java | 9 +++ .../iotdb/db/utils/datastructure/IntTVList.java | 9 +++ .../iotdb/db/utils/datastructure/LongTVList.java | 9 +++ .../iotdb/db/utils/datastructure/TVList.java | 2 + .../dataregion/memtable/PrimitiveMemTableTest.java | 45 +++++++++++ 14 files changed, 227 insertions(+), 56 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java index 48efe55c223..0c5ccc9ac1b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java @@ -278,6 +278,7 @@ public class MemTableFlushTask { times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE]; } writableMemChunk.encode(ioTaskQueue, encodeInfo, times); + writableMemChunk.releaseTemporaryTvListForFlush(); long subTaskTime = System.currentTimeMillis() - starTime; WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, subTaskTime); memSerializeTime += subTaskTime; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java index 1b9dbbcae60..5167ea96b56 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java @@ -45,6 +45,8 @@ public abstract class AbstractWritableMemChunk implements IWritableMemChunk { protected static long RETRY_INTERVAL_MS = 100L; protected static long MAX_WAIT_QUERY_MS = 60 * 1000L; + protected TVList workingListForFlush; + /** * Release the TVList if there is no query on it. Otherwise, it should set the first query as the * owner. TVList is released until all queries finish. If it throws memory-not-enough exception @@ -194,7 +196,46 @@ public abstract class AbstractWritableMemChunk implements IWritableMemChunk { public abstract IMeasurementSchema getSchema(); @Override - public abstract void sortTvListForFlush(); + public void sortTvListForFlush() { + TVList workingList = getWorkingTVList(); + if (workingList.isSorted()) { + workingListForFlush = workingList; + return; + } + + /* + * Concurrency background: + * + * A query may start earlier and record the current row count (rows) of the TVList as its visible range. + * After that, new unseq writes may arrive and immediately trigger a flush, which will sort the TVList. + * + * During sorting, the underlying indices array of the TVList may be reordered. + * If the query continues to use the previously recorded rows as its upper bound, + * it may convert a logical index to a physical index via the updated indices array. + * + * In this case, the converted physical index may exceed the previously visible + * rows range, leading to invalid access or unexpected behavior. + * + * To avoid this issue, when there are active queries on the working TVList, we must + * clone the times and indices before sorting, so that the flush sort does not mutate + * the data structures that concurrent queries rely on. + */ + boolean needCloneTimesAndIndicesInWorkingTVList; + workingList.lockQueryList(); + try { + needCloneTimesAndIndicesInWorkingTVList = !workingList.getQueryContextSet().isEmpty(); + } finally { + workingList.unlockQueryList(); + } + workingListForFlush = + needCloneTimesAndIndicesInWorkingTVList ? workingList.cloneForFlushSort() : workingList; + workingListForFlush.sort(); + } + + @Override + public void releaseTemporaryTvListForFlush() { + workingListForFlush = null; + } @Override public abstract int delete(long lowerBound, long upperBound); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java index 2fca3ca398a..06bd5404be0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedWritableMemChunk.java @@ -63,6 +63,7 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { private final Map<String, Integer> measurementIndexMap; private List<TSDataType> dataTypes; private final List<IMeasurementSchema> schemaList; + // Note: Use AbstractWritableMemChunk.workingListForFlush instead of list in FlushTask private AlignedTVList list; private List<AlignedTVList> sortedList; private long sortedRowCount = 0; @@ -273,13 +274,6 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { return minTime; } - @Override - public synchronized void sortTvListForFlush() { - if (!list.isSorted()) { - list.sort(); - } - } - @Override public int delete(long lowerBound, long upperBound) { int deletedNumber = list.delete(lowerBound, upperBound); @@ -322,7 +316,8 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { BlockingQueue<Object> ioTaskQueue, long maxNumberOfPointsInChunk, int maxNumberOfPointsInPage) { - BitMap allValueColDeletedMap = list.getAllValueColDeletedMap(); + AlignedTVList alignedWorkingListForFlush = (AlignedTVList) workingListForFlush; + BitMap allValueColDeletedMap = alignedWorkingListForFlush.getAllValueColDeletedMap(); boolean[] timeDuplicateInfo = null; @@ -334,8 +329,10 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { int pointNumInPage = 0; int pointNumInChunk = 0; - for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount(); sortedRowIndex++) { - long time = list.getTime(sortedRowIndex); + for (int sortedRowIndex = 0; + sortedRowIndex < alignedWorkingListForFlush.rowCount(); + sortedRowIndex++) { + long time = alignedWorkingListForFlush.getTime(sortedRowIndex); if (pointNumInPage == 0) { pageRange.add(sortedRowIndex); } @@ -356,14 +353,16 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { } int nextRowIndex = sortedRowIndex + 1; - while (nextRowIndex < list.rowCount() + while (nextRowIndex < alignedWorkingListForFlush.rowCount() && allValueColDeletedMap != null - && allValueColDeletedMap.isMarked(list.getValueIndex(nextRowIndex))) { + && allValueColDeletedMap.isMarked( + alignedWorkingListForFlush.getValueIndex(nextRowIndex))) { nextRowIndex++; } - if (nextRowIndex != list.rowCount() && time == list.getTime(nextRowIndex)) { + if (nextRowIndex != alignedWorkingListForFlush.rowCount() + && time == alignedWorkingListForFlush.getTime(nextRowIndex)) { if (Objects.isNull(timeDuplicateInfo)) { - timeDuplicateInfo = new boolean[list.rowCount()]; + timeDuplicateInfo = new boolean[alignedWorkingListForFlush.rowCount()]; } timeDuplicateInfo[sortedRowIndex] = true; } @@ -371,7 +370,7 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { } if (pointNumInPage != 0) { - pageRange.add(list.rowCount() - 1); + pageRange.add(alignedWorkingListForFlush.rowCount() - 1); } if (pointNumInChunk != 0) { chunkRange.add(pageRange); @@ -387,7 +386,8 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { boolean[] timeDuplicateInfo, BitMap allValueColDeletedMap, int maxNumberOfPointsInPage) { - List<TSDataType> dataTypes = list.getTsDataTypes(); + AlignedTVList alignedWorkingListForFlush = (AlignedTVList) workingListForFlush; + List<TSDataType> dataTypes = alignedWorkingListForFlush.getTsDataTypes(); Pair<Long, Integer>[] lastValidPointIndexForTimeDupCheck = new Pair[dataTypes.size()]; for (List<Integer> pageRange : chunkRange) { AlignedChunkWriterImpl alignedChunkWriter = new AlignedChunkWriterImpl(schemaList); @@ -404,16 +404,18 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { sortedRowIndex++) { // skip empty row if (allValueColDeletedMap != null - && allValueColDeletedMap.isMarked(list.getValueIndex(sortedRowIndex))) { + && allValueColDeletedMap.isMarked( + alignedWorkingListForFlush.getValueIndex(sortedRowIndex))) { continue; } // skip time duplicated rows - long time = list.getTime(sortedRowIndex); + long time = alignedWorkingListForFlush.getTime(sortedRowIndex); if (Objects.nonNull(timeDuplicateInfo)) { - if (!list.isNullValue(list.getValueIndex(sortedRowIndex), columnIndex)) { + if (!alignedWorkingListForFlush.isNullValue( + alignedWorkingListForFlush.getValueIndex(sortedRowIndex), columnIndex)) { lastValidPointIndexForTimeDupCheck[columnIndex].left = time; lastValidPointIndexForTimeDupCheck[columnIndex].right = - list.getValueIndex(sortedRowIndex); + alignedWorkingListForFlush.getValueIndex(sortedRowIndex); } if (timeDuplicateInfo[sortedRowIndex]) { continue; @@ -434,41 +436,55 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { && (time == lastValidPointIndexForTimeDupCheck[columnIndex].left)) { originRowIndex = lastValidPointIndexForTimeDupCheck[columnIndex].right; } else { - originRowIndex = list.getValueIndex(sortedRowIndex); + originRowIndex = alignedWorkingListForFlush.getValueIndex(sortedRowIndex); } - boolean isNull = list.isNullValue(originRowIndex, columnIndex); + boolean isNull = alignedWorkingListForFlush.isNullValue(originRowIndex, columnIndex); switch (tsDataType) { case BOOLEAN: alignedChunkWriter.writeByColumn( time, - !isNull && list.getBooleanByValueIndex(originRowIndex, columnIndex), + !isNull + && alignedWorkingListForFlush.getBooleanByValueIndex( + originRowIndex, columnIndex), isNull); break; case INT32: case DATE: alignedChunkWriter.writeByColumn( time, - isNull ? 0 : list.getIntByValueIndex(originRowIndex, columnIndex), + isNull + ? 0 + : alignedWorkingListForFlush.getIntByValueIndex( + originRowIndex, columnIndex), isNull); break; case INT64: case TIMESTAMP: alignedChunkWriter.writeByColumn( time, - isNull ? 0 : list.getLongByValueIndex(originRowIndex, columnIndex), + isNull + ? 0 + : alignedWorkingListForFlush.getLongByValueIndex( + originRowIndex, columnIndex), isNull); break; case FLOAT: alignedChunkWriter.writeByColumn( time, - isNull ? 0 : list.getFloatByValueIndex(originRowIndex, columnIndex), + isNull + ? 0 + : alignedWorkingListForFlush.getFloatByValueIndex( + originRowIndex, columnIndex), isNull); break; case DOUBLE: alignedChunkWriter.writeByColumn( time, - isNull ? 0 : list.getDoubleByValueIndex(originRowIndex, columnIndex), + isNull + ? 0 + : alignedWorkingListForFlush.getDoubleByValueIndex( + originRowIndex, columnIndex), isNull); break; case TEXT: @@ -476,7 +492,10 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { case BLOB: alignedChunkWriter.writeByColumn( time, - isNull ? null : list.getBinaryByValueIndex(originRowIndex, columnIndex), + isNull + ? null + : alignedWorkingListForFlush.getBinaryByValueIndex( + originRowIndex, columnIndex), isNull); break; default: @@ -486,18 +505,20 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { alignedChunkWriter.nextColumn(); } - long[] times = new long[Math.min(maxNumberOfPointsInPage, list.rowCount())]; + long[] times = + new long[Math.min(maxNumberOfPointsInPage, alignedWorkingListForFlush.rowCount())]; int pointsInPage = 0; for (int sortedRowIndex = pageRange.get(pageNum * 2); sortedRowIndex <= pageRange.get(pageNum * 2 + 1); sortedRowIndex++) { // skip empty row if (allValueColDeletedMap != null - && allValueColDeletedMap.isMarked(list.getValueIndex(sortedRowIndex))) { + && allValueColDeletedMap.isMarked( + alignedWorkingListForFlush.getValueIndex(sortedRowIndex))) { continue; } if (Objects.isNull(timeDuplicateInfo) || !timeDuplicateInfo[sortedRowIndex]) { - times[pointsInPage++] = list.getTime(sortedRowIndex); + times[pointsInPage++] = alignedWorkingListForFlush.getTime(sortedRowIndex); } } alignedChunkWriter.write(times, pointsInPage, 0); @@ -513,8 +534,7 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { } @Override - public synchronized void encode( - BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times) { + public void encode(BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times) { if (TVLIST_SORT_THRESHOLD == 0) { encodeWorkingAlignedTVList( ioTaskQueue, encodeInfo.maxNumberOfPointsInChunk, encodeInfo.maxNumberOfPointsInPage); @@ -525,7 +545,7 @@ public class AlignedWritableMemChunk extends AbstractWritableMemChunk { // create MergeSortAlignedTVListIterator. List<AlignedTVList> alignedTvLists = new ArrayList<>(sortedList); - alignedTvLists.add(list); + alignedTvLists.add((AlignedTVList) workingListForFlush); List<Integer> columnIndexList = buildColumnIndexList(schemaList); MemPointIterator timeValuePairIterator = MemPointIteratorFactory.create( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java index 18ca5201bb0..99404502437 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java @@ -94,6 +94,8 @@ public interface IWritableMemChunk extends WALEntryValue { */ void sortTvListForFlush(); + void releaseTemporaryTvListForFlush(); + default long getMaxTime() { return Long.MAX_VALUE; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java index fd616ac94fa..90e96597b0c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/WritableMemChunk.java @@ -54,6 +54,7 @@ import static org.apache.iotdb.db.utils.MemUtils.getBinarySize; public class WritableMemChunk extends AbstractWritableMemChunk { private IMeasurementSchema schema; + // Note: Use AbstractWritableMemChunk.workingListForFlush instead of list in FlushTask private TVList list; private List<TVList> sortedList; private long sortedRowCount = 0; @@ -241,13 +242,6 @@ public class WritableMemChunk extends AbstractWritableMemChunk { throw new UnSupportedDataTypeException(UNSUPPORTED_TYPE + schema.getType()); } - @Override - public synchronized void sortTvListForFlush() { - if (!list.isSorted()) { - list.sort(); - } - } - @Override public TVList getWorkingTVList() { return list; @@ -374,50 +368,53 @@ public class WritableMemChunk extends AbstractWritableMemChunk { ChunkWriterImpl chunkWriterImpl = createIChunkWriter(); long dataSizeInCurrentChunk = 0; int pointNumInCurrentChunk = 0; - for (int sortedRowIndex = 0; sortedRowIndex < list.rowCount(); sortedRowIndex++) { - if (list.isNullValue(list.getValueIndex(sortedRowIndex))) { + for (int sortedRowIndex = 0; + sortedRowIndex < workingListForFlush.rowCount(); + sortedRowIndex++) { + if (workingListForFlush.isNullValue(workingListForFlush.getValueIndex(sortedRowIndex))) { continue; } - long time = list.getTime(sortedRowIndex); + long time = workingListForFlush.getTime(sortedRowIndex); // skip duplicated data - if ((sortedRowIndex + 1 < list.rowCount() && (time == list.getTime(sortedRowIndex + 1)))) { + if ((sortedRowIndex + 1 < workingListForFlush.rowCount() + && (time == workingListForFlush.getTime(sortedRowIndex + 1)))) { continue; } // store last point for SDT - if (sortedRowIndex + 1 == list.rowCount()) { + if (sortedRowIndex + 1 == workingListForFlush.rowCount()) { chunkWriterImpl.setLastPoint(true); } switch (tsDataType) { case BOOLEAN: - chunkWriterImpl.write(time, list.getBoolean(sortedRowIndex)); + chunkWriterImpl.write(time, workingListForFlush.getBoolean(sortedRowIndex)); dataSizeInCurrentChunk += 8L + 1L; break; case INT32: case DATE: - chunkWriterImpl.write(time, list.getInt(sortedRowIndex)); + chunkWriterImpl.write(time, workingListForFlush.getInt(sortedRowIndex)); dataSizeInCurrentChunk += 8L + 4L; break; case INT64: case TIMESTAMP: - chunkWriterImpl.write(time, list.getLong(sortedRowIndex)); + chunkWriterImpl.write(time, workingListForFlush.getLong(sortedRowIndex)); dataSizeInCurrentChunk += 8L + 8L; break; case FLOAT: - chunkWriterImpl.write(time, list.getFloat(sortedRowIndex)); + chunkWriterImpl.write(time, workingListForFlush.getFloat(sortedRowIndex)); dataSizeInCurrentChunk += 8L + 4L; break; case DOUBLE: - chunkWriterImpl.write(time, list.getDouble(sortedRowIndex)); + chunkWriterImpl.write(time, workingListForFlush.getDouble(sortedRowIndex)); dataSizeInCurrentChunk += 8L + 8L; break; case TEXT: case BLOB: case STRING: - Binary value = list.getBinary(sortedRowIndex); + Binary value = workingListForFlush.getBinary(sortedRowIndex); chunkWriterImpl.write(time, value); dataSizeInCurrentChunk += 8L + getBinarySize(value); break; @@ -452,8 +449,7 @@ public class WritableMemChunk extends AbstractWritableMemChunk { } @Override - public synchronized void encode( - BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times) { + public void encode(BlockingQueue<Object> ioTaskQueue, BatchEncodeInfo encodeInfo, long[] times) { if (TVLIST_SORT_THRESHOLD == 0) { encodeWorkingTVList( ioTaskQueue, encodeInfo.maxNumberOfPointsInChunk, encodeInfo.targetChunkSize); @@ -467,7 +463,7 @@ public class WritableMemChunk extends AbstractWritableMemChunk { // create MultiTvListIterator. It need not handle float/double precision here. List<TVList> tvLists = new ArrayList<>(sortedList); - tvLists.add(list); + tvLists.add(workingListForFlush); MemPointIterator timeValuePairIterator = MemPointIteratorFactory.create( schema.getType(), tvLists, encodeInfo.maxNumberOfPointsInPage); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index 615bfcaf4ca..b99e784faad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -137,6 +137,16 @@ public abstract class AlignedTVList extends TVList { return alignedTvList; } + @Override + public synchronized AlignedTVList cloneForFlushSort() { + AlignedTVList cloneList = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes)); + cloneAs(cloneList); + cloneList.memoryBinaryChunkSize = this.memoryBinaryChunkSize; + cloneList.values = this.values; + cloneList.bitMaps = this.bitMaps; + return cloneList; + } + @Override public synchronized AlignedTVList clone() { AlignedTVList cloneList = AlignedTVList.newAlignedList(new ArrayList<>(dataTypes)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java index dc4ff5529d4..b274bc8ef65 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BinaryTVList.java @@ -63,6 +63,15 @@ public abstract class BinaryTVList extends TVList { } } + @Override + public synchronized TVList cloneForFlushSort() { + BinaryTVList cloneList = BinaryTVList.newList(); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized BinaryTVList clone() { BinaryTVList cloneList = BinaryTVList.newList(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java index b8eb0e508bf..5e3461acb2c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/BooleanTVList.java @@ -62,6 +62,15 @@ public abstract class BooleanTVList extends TVList { } } + @Override + public synchronized TVList cloneForFlushSort() { + BooleanTVList cloneList = BooleanTVList.newList(); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized BooleanTVList clone() { BooleanTVList cloneList = BooleanTVList.newList(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java index f61995ef062..753ca2020a8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/DoubleTVList.java @@ -63,6 +63,15 @@ public abstract class DoubleTVList extends TVList { } } + @Override + public synchronized TVList cloneForFlushSort() { + DoubleTVList cloneList = DoubleTVList.newList(); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized DoubleTVList clone() { DoubleTVList cloneList = DoubleTVList.newList(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java index 3623fa49a3e..517dc208211 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/FloatTVList.java @@ -63,6 +63,15 @@ public abstract class FloatTVList extends TVList { } } + @Override + public synchronized TVList cloneForFlushSort() { + FloatTVList cloneList = FloatTVList.newList(); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized FloatTVList clone() { FloatTVList cloneList = FloatTVList.newList(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java index 758cd64053b..8593400fa79 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/IntTVList.java @@ -62,6 +62,15 @@ public abstract class IntTVList extends TVList { } } + @Override + public synchronized TVList cloneForFlushSort() { + IntTVList cloneList = IntTVList.newList(); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized IntTVList clone() { IntTVList cloneList = IntTVList.newList(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java index 7b4bd8d82d2..544d2cb7dd0 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/LongTVList.java @@ -62,6 +62,15 @@ public abstract class LongTVList extends TVList { } } + @Override + public synchronized TVList cloneForFlushSort() { + LongTVList cloneList = LongTVList.newList(); + cloneAs(cloneList); + cloneList.bitMap = this.bitMap; + cloneList.values = this.values; + return cloneList; + } + @Override public synchronized LongTVList clone() { LongTVList cloneList = LongTVList.newList(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index b4fd0d52221..d21ecc83e0f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -548,6 +548,8 @@ public abstract class TVList implements WALEntryValue { protected abstract void expandValues(); + public abstract TVList cloneForFlushSort(); + @Override public abstract TVList clone(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java index ff476c60e4e..b366a48ab77 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java @@ -177,6 +177,51 @@ public class PrimitiveMemTableTest { } } + @Test + public void testWriteAndFlushSortDuringQuerySortTVListAndActualQueryExecution() + throws QueryProcessException, IOException, IllegalPathException { + + PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0"); + List<IMeasurementSchema> measurementSchemas = + Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32)); + for (int i = 1000; i < 2000; i++) { + memTable.writeAlignedRow( + new PlainDeviceID("root.test.d1"), measurementSchemas, i, new Object[] {i, i, i}); + } + for (int i = 100; i < 200; i++) { + memTable.writeAlignedRow( + new PlainDeviceID("root.test.d1"), measurementSchemas, i, new Object[] {i, i, i}); + } + MeasurementPath path = new MeasurementPath("root.test.d1.s1", TSDataType.INT32); + memTable.delete(path, path.getDevicePath(), 150, 160); + path = new MeasurementPath("root.test.d1.s2", TSDataType.INT32); + memTable.delete(path, path.getDevicePath(), 150, 160); + path = new MeasurementPath("root.test.d1.s3", TSDataType.INT32); + memTable.delete(path, path.getDevicePath(), 150, 160); + ResourceByPathUtils resourcesByPathUtils = + ResourceByPathUtils.getResourceInstance( + new AlignedPath("root.test.d1", Arrays.asList("s1", "s2", "s3"), measurementSchemas)); + ReadOnlyMemChunk readOnlyMemChunk = + resourcesByPathUtils.getReadOnlyMemChunkFromMemTable( + new QueryContext(), memTable, null, Long.MAX_VALUE, null); + + for (int i = 1; i <= 50; i++) { + memTable.writeAlignedRow( + new PlainDeviceID("root.test.d1"), measurementSchemas, i, new Object[] {i, i, i}); + } + memTable.getWritableMemChunk(new PlainDeviceID("root.test.d1"), "").sortTvListForFlush(); + + readOnlyMemChunk.sortTvLists(); + + MemPointIterator memPointIterator = readOnlyMemChunk.createMemPointIterator(Ordering.ASC, null); + while (memPointIterator.hasNextBatch()) { + memPointIterator.nextBatch(); + } + } + @Test public void memSeriesToStringTest() throws IOException { TSDataType dataType = TSDataType.INT32;
