This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch fixBug0518 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0e867158973c1fd7aba0b6c0e5444713d3e64e9d Author: shuwenwei <[email protected]> AuthorDate: Mon May 18 18:54:47 2026 +0800 Fix query reuse of flushing memtable TVList --- .../schemaregion/utils/ResourceByPathUtils.java | 29 ++++++++++- .../dataregion/flush/MemTableFlushTask.java | 1 - .../memtable/AbstractWritableMemChunk.java | 19 ++++++-- .../dataregion/memtable/IWritableMemChunk.java | 3 ++ .../dataregion/memtable/PrimitiveMemTableTest.java | 57 ++++++++++++++++++++++ 5 files changed, 103 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java index 6a2e554900e..2c7397c0be6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java @@ -166,10 +166,35 @@ public abstract class ResourceByPathUtils { } if (!isWorkMemTable) { + /* + * 1. Q1 queries this TVList while it is still in the working memtable and records a smaller + * visible row count. + * 2. Later writes append out-of-order rows to the same TVList, then FLUSH moves the + * memtable to the flushing list. + * 3. Q2 queries the flushing memtable. If Q2 directly reuses the original mutable TVList, + * Q2's query-side sort may reorder the indices in place. + * 4. Q1 continues to read with its old row count and the reordered indices. The converted + * value index can exceed Q1's bitmap range and cause out-of-bound access. + * + * Therefore, this flushing branch can reuse the original list only when it is already + * sorted or no active query is using it. Otherwise, Q2 should read from + * workingListForFlush. + */ + boolean canUseListDirectly = list.isSorted() || list.getQueryContextSet().isEmpty(); LOGGER.debug( "Flushing MemTable - add current query context to mutable TVList's query list"); - list.getQueryContextSet().add(context); - tvListQueryMap.put(list, list.rowCount()); + if (canUseListDirectly) { + list.getQueryContextSet().add(context); + tvListQueryMap.put(list, list.rowCount()); + } else { + TVList workingListForFlushSort = memChunk.initWorkingListForFlushIfNecessary(list, true); + // The flush list shares value arrays with the original list, so keep the original list + // referenced by this query until the query finishes. + list.getQueryContextSet().add(context); + workingListForFlushSort.getQueryContextSet().add(context); + context.addTVListToSet(Collections.singletonMap(list, 0)); + tvListQueryMap.put(workingListForFlushSort, workingListForFlushSort.rowCount()); + } } else { if (list.isSorted() || list.getQueryContextSet().isEmpty()) { LOGGER.debug( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java index 574470a2bc4..a8794b1ebf6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java @@ -273,7 +273,6 @@ public class MemTableFlushTask { times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE]; } writableMemChunk.encode(ioTaskQueue, encodeInfo, times); - writableMemChunk.releaseTemporaryTvListForFlush(); long subTaskTime = System.currentTimeMillis() - starTime; WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, subTaskTime); memSerializeTime += subTaskTime; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java index 6ae9333b2de..873f49c54e2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java @@ -210,8 +210,9 @@ public abstract class AbstractWritableMemChunk implements IWritableMemChunk { /* * Concurrency background: * - * A query may start earlier and record the current row count (rows) of the TVList as its visible range. - * After that, new unseq writes may arrive and immediately trigger a flush, which will sort the TVList. + * A query may start earlier and record the current row count (rows) of the TVList as its + * visible range. After that, new unseq writes may arrive and immediately trigger a flush, which + * will sort the TVList. * * During sorting, the underlying indices array of the TVList may be reordered. * If the query continues to use the previously recorded rows as its upper bound, @@ -223,6 +224,9 @@ public abstract class AbstractWritableMemChunk implements IWritableMemChunk { * To avoid this issue, when there are active queries on the working TVList, we must * clone the times and indices before sorting, so that the flush sort does not mutate * the data structures that concurrent queries rely on. + * + * Flushing-memtable queries may also reuse workingListForFlush instead of the original working + * TVList for the same reason. */ boolean needCloneTimesAndIndicesInWorkingTVList; workingList.lockQueryList(); @@ -232,7 +236,7 @@ public abstract class AbstractWritableMemChunk implements IWritableMemChunk { workingList.unlockQueryList(); } workingListForFlush = - needCloneTimesAndIndicesInWorkingTVList ? workingList.cloneForFlushSort() : workingList; + initWorkingListForFlushIfNecessary(workingList, needCloneTimesAndIndicesInWorkingTVList); workingListForFlush.sort(); } @@ -274,4 +278,13 @@ public abstract class AbstractWritableMemChunk implements IWritableMemChunk { @Override public abstract void setEncryptParameter(EncryptParameter encryptParameter); + + public synchronized TVList initWorkingListForFlushIfNecessary( + TVList workingList, boolean needCloneTimesAndIndicesInWorkingTVList) { + if (workingListForFlush == null) { + workingListForFlush = + needCloneTimesAndIndicesInWorkingTVList ? workingList.cloneForFlushSort() : workingList; + } + return workingListForFlush; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java index af55d7df9d1..e57f30680f9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java @@ -132,4 +132,7 @@ public interface IWritableMemChunk extends WALEntryValue { void setWorkingTVList(TVList list); void setEncryptParameter(EncryptParameter encryptParameter); + + TVList initWorkingListForFlushIfNecessary( + TVList workingList, boolean needCloneTimesAndIndicesInWorkingTVList); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java index c47297fbeb4..92ec98b796e 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java @@ -259,6 +259,63 @@ public class PrimitiveMemTableTest { } } + @Test + public void testFlushingQueryDoesNotSortWorkingTVListUsedByPreviousQuery() + throws QueryProcessException, IOException, IllegalPathException { + + PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0"); + List<IMeasurementSchema> measurementSchemas = + Arrays.asList( + new MeasurementSchema("s1", TSDataType.INT32), + new MeasurementSchema("s2", TSDataType.INT32), + new MeasurementSchema("s3", TSDataType.INT32)); + StringArrayDeviceID deviceID = new StringArrayDeviceID("root.test.d1"); + for (int i = 1000; i < 2000; i++) { + memTable.writeAlignedRow(deviceID, measurementSchemas, i, new Object[] {i, i, i}); + } + + ResourceByPathUtils resourcesByPathUtils = + ResourceByPathUtils.getResourceInstance( + new AlignedFullPath(deviceID, Arrays.asList("s1", "s2", "s3"), measurementSchemas)); + AlignedReadOnlyMemChunk firstQueryMemChunk = + (AlignedReadOnlyMemChunk) + resourcesByPathUtils.getReadOnlyMemChunkFromMemTable( + new QueryContext(1, false), memTable, null, Long.MAX_VALUE, null); + TVList originalWorkingList = memTable.getWritableMemChunk(deviceID, "").getWorkingTVList(); + Assert.assertSame( + originalWorkingList, + firstQueryMemChunk.getAligendTvListQueryMap().keySet().iterator().next()); + + for (int i = 1; i <= 50; i++) { + memTable.writeAlignedRow(deviceID, measurementSchemas, i, new Object[] {i, i, i}); + } + memTable.delete( + new TreeDeletionEntry(new MeasurementPath("root.test.d1.s1", TSDataType.INT32), 1, 10)); + memTable.delete( + new TreeDeletionEntry(new MeasurementPath("root.test.d1.s2", TSDataType.INT32), 1, 10)); + memTable.delete( + new TreeDeletionEntry(new MeasurementPath("root.test.d1.s3", TSDataType.INT32), 1, 10)); + Assert.assertFalse(originalWorkingList.isSorted()); + + AlignedReadOnlyMemChunk flushingQueryMemChunk = + (AlignedReadOnlyMemChunk) + resourcesByPathUtils.getReadOnlyMemChunkFromMemTable( + new QueryContext(2, false), memTable, new ArrayList<>(), Long.MAX_VALUE, null); + TVList flushingQueryList = + flushingQueryMemChunk.getAligendTvListQueryMap().keySet().iterator().next(); + Assert.assertNotSame(originalWorkingList, flushingQueryList); + + flushingQueryMemChunk.sortTvLists(); + Assert.assertFalse(originalWorkingList.isSorted()); + + firstQueryMemChunk.sortTvLists(); + MemPointIterator memPointIterator = + firstQueryMemChunk.createMemPointIterator(Ordering.ASC, null); + while (memPointIterator.hasNextBatch()) { + memPointIterator.nextBatch(); + } + } + @Test public void memSeriesToStringTest() throws IOException { TSDataType dataType = TSDataType.INT32;
