This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 34cc34643d3 Fix query reuse of flushing memtable TVList (#17718)
34cc34643d3 is described below
commit 34cc34643d37fbe5a861914c51ced5295a00c2b7
Author: shuwenwei <[email protected]>
AuthorDate: Tue May 19 16:55:19 2026 +0800
Fix query reuse of flushing memtable TVList (#17718)
---
.../execution/fragment/QueryContext.java | 4 +-
.../schemaregion/utils/ResourceByPathUtils.java | 40 ++++++++++++++-
.../dataregion/flush/MemTableFlushTask.java | 1 -
.../memtable/AbstractWritableMemChunk.java | 19 +++++--
.../memtable/AlignedReadOnlyMemChunk.java | 2 +-
.../dataregion/memtable/IWritableMemChunk.java | 3 ++
.../dataregion/memtable/ReadOnlyMemChunk.java | 2 +-
.../fragment/FragmentInstanceExecutionTest.java | 6 +--
.../dataregion/memtable/PrimitiveMemTableTest.java | 59 +++++++++++++++++++++-
9 files changed, 122 insertions(+), 14 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
index 5330903412b..d3edbe2e0d0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
@@ -267,8 +267,8 @@ public class QueryContext {
this.ignoreAllNullRows = ignoreAllNullRows;
}
- public void addTVListToSet(Map<TVList, Integer> tvListMap) {
- tvListSet.addAll(tvListMap.keySet());
+ public void addTVListToSet(Set<TVList> set) {
+ tvListSet.addAll(set);
}
public void addRowLevelFilteredCount(long count) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
index 6a2e554900e..b9cf97d76f7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
@@ -166,10 +166,46 @@ public abstract class ResourceByPathUtils {
}
if (!isWorkMemTable) {
+ /*
+ * 1. Q1 queries this TVList while it is still in the working memtable
and records a smaller
+ * visible row count.
+ * 2. Later writes append out-of-order rows to the same TVList, then
FLUSH moves the
+ * memtable to the flushing list.
+ * 3. Q2 queries the flushing memtable. If Q2 directly reuses the
original mutable TVList,
+ * Q2's query-side sort may reorder the indices in place.
+ * 4. Q1 continues to read with its old row count and the reordered
indices. The converted
+ * value index can exceed Q1's bitmap range and cause out-of-bound
access.
+ *
+ * Therefore, this flushing branch can reuse the original list only
when it is already
+ * sorted or no active query is using it. Otherwise, Q2 should read
from
+ * workingListForFlush.
+ */
+ boolean canUseListDirectly = list.isSorted() ||
list.getQueryContextSet().isEmpty();
LOGGER.debug(
"Flushing MemTable - add current query context to mutable TVList's
query list");
- list.getQueryContextSet().add(context);
- tvListQueryMap.put(list, list.rowCount());
+ if (canUseListDirectly) {
+ list.getQueryContextSet().add(context);
+ tvListQueryMap.put(list, list.rowCount());
+ } else {
+ TVList workingListForFlushSort =
memChunk.initWorkingListForFlushIfNecessary(list, true);
+ /*
+ * The query will read from workingListForFlushSort, but
cloneForFlushSort() only clones
+ * times and indices. The value arrays and bitmaps are still shared
with the original
+ * list.
+ *
+ * Therefore, this query must also hold the original list until it
finishes. Adding
+ * context to list.getQueryContextSet() lets flush/query cleanup see
that the original
+ * list is still in use. Adding list to context.tvListSet makes
+ * releaseTVListOwnedByQuery() remove this context from the original
list later.
+ *
+ * Do not put the original list into tvListQueryMap here. The actual
read path must use
+ * workingListForFlushSort to avoid sorting the original list in
place.
+ */
+ list.getQueryContextSet().add(context);
+ context.addTVListToSet(Collections.singleton(list));
+ workingListForFlushSort.getQueryContextSet().add(context);
+ tvListQueryMap.put(workingListForFlushSort,
workingListForFlushSort.rowCount());
+ }
} else {
if (list.isSorted() || list.getQueryContextSet().isEmpty()) {
LOGGER.debug(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index 574470a2bc4..a8794b1ebf6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -273,7 +273,6 @@ public class MemTableFlushTask {
times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
}
writableMemChunk.encode(ioTaskQueue, encodeInfo, times);
- writableMemChunk.releaseTemporaryTvListForFlush();
long subTaskTime = System.currentTimeMillis() - starTime;
WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK,
subTaskTime);
memSerializeTime += subTaskTime;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
index 6ae9333b2de..873f49c54e2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
@@ -210,8 +210,9 @@ public abstract class AbstractWritableMemChunk implements
IWritableMemChunk {
/*
* Concurrency background:
*
- * A query may start earlier and record the current row count (rows) of
the TVList as its visible range.
- * After that, new unseq writes may arrive and immediately trigger a
flush, which will sort the TVList.
+ * A query may start earlier and record the current row count (rows) of
the TVList as its
+ * visible range. After that, new unseq writes may arrive and immediately
trigger a flush, which
+ * will sort the TVList.
*
* During sorting, the underlying indices array of the TVList may be
reordered.
* If the query continues to use the previously recorded rows as its upper
bound,
@@ -223,6 +224,9 @@ public abstract class AbstractWritableMemChunk implements
IWritableMemChunk {
* To avoid this issue, when there are active queries on the working
TVList, we must
* clone the times and indices before sorting, so that the flush sort does
not mutate
* the data structures that concurrent queries rely on.
+ *
+ * Flushing-memtable queries may also reuse workingListForFlush instead of
the original working
+ * TVList for the same reason.
*/
boolean needCloneTimesAndIndicesInWorkingTVList;
workingList.lockQueryList();
@@ -232,7 +236,7 @@ public abstract class AbstractWritableMemChunk implements
IWritableMemChunk {
workingList.unlockQueryList();
}
workingListForFlush =
- needCloneTimesAndIndicesInWorkingTVList ?
workingList.cloneForFlushSort() : workingList;
+ initWorkingListForFlushIfNecessary(workingList,
needCloneTimesAndIndicesInWorkingTVList);
workingListForFlush.sort();
}
@@ -274,4 +278,13 @@ public abstract class AbstractWritableMemChunk implements
IWritableMemChunk {
@Override
public abstract void setEncryptParameter(EncryptParameter encryptParameter);
+
+ public synchronized TVList initWorkingListForFlushIfNecessary(
+ TVList workingList, boolean needCloneTimesAndIndicesInWorkingTVList) {
+ if (workingListForFlush == null) {
+ workingListForFlush =
+ needCloneTimesAndIndicesInWorkingTVList ?
workingList.cloneForFlushSort() : workingList;
+ }
+ return workingListForFlush;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
index 05c1602bf2a..01a61eee5cc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
@@ -109,7 +109,7 @@ public class AlignedReadOnlyMemChunk extends
ReadOnlyMemChunk {
this.valueStatisticsList = new ArrayList<>();
this.alignedTvListQueryMap = alignedTvListQueryMap;
this.columnIndexList = columnIndexList;
- this.context.addTVListToSet(alignedTvListQueryMap);
+ this.context.addTVListToSet(alignedTvListQueryMap.keySet());
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index af55d7df9d1..e57f30680f9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -132,4 +132,7 @@ public interface IWritableMemChunk extends WALEntryValue {
void setWorkingTVList(TVList list);
void setEncryptParameter(EncryptParameter encryptParameter);
+
+ TVList initWorkingListForFlushIfNecessary(
+ TVList workingList, boolean needCloneTimesAndIndicesInWorkingTVList);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
index 496035e91ad..4c32be0e798 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
@@ -128,7 +128,7 @@ public class ReadOnlyMemChunk {
this.deletionList = deletionList;
this.tvListQueryMap = tvListQueryMap;
this.pageStatisticsList = new ArrayList<>();
- this.context.addTVListToSet(tvListQueryMap);
+ this.context.addTVListToSet(tvListQueryMap.keySet());
}
public void sortTvLists() {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
index 93290fdf4da..e8c7994fb20 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
@@ -44,7 +44,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk;
import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
import org.apache.iotdb.db.utils.datastructure.TVList;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.file.metadata.enums.CompressionType;
@@ -145,13 +145,13 @@ public class FragmentInstanceExecutionTest {
FragmentInstanceExecution execution1 =
createFragmentInstanceExecution(1, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext1 =
execution1.getFragmentInstanceContext();
- fragmentInstanceContext1.addTVListToSet(ImmutableMap.of(tvList, 0));
+ fragmentInstanceContext1.addTVListToSet(ImmutableSet.of(tvList));
tvList.getQueryContextSet().add(fragmentInstanceContext1);
FragmentInstanceExecution execution2 =
createFragmentInstanceExecution(2, instanceNotificationExecutor);
FragmentInstanceContext fragmentInstanceContext2 =
execution2.getFragmentInstanceContext();
- fragmentInstanceContext2.addTVListToSet(ImmutableMap.of(tvList, 0));
+ fragmentInstanceContext2.addTVListToSet(ImmutableSet.of(tvList));
tvList.getQueryContextSet().add(fragmentInstanceContext2);
// mock flush's behavior
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
index c47297fbeb4..65621034e06 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
@@ -259,6 +259,63 @@ public class PrimitiveMemTableTest {
}
}
+ @Test
+ public void testFlushingQueryDoesNotSortWorkingTVListUsedByPreviousQuery()
+ throws QueryProcessException, IOException, IllegalPathException {
+
+ PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0");
+ List<IMeasurementSchema> measurementSchemas =
+ Arrays.asList(
+ new MeasurementSchema("s1", TSDataType.INT32),
+ new MeasurementSchema("s2", TSDataType.INT32),
+ new MeasurementSchema("s3", TSDataType.INT32));
+ StringArrayDeviceID deviceID = new StringArrayDeviceID("root.test.d1");
+ for (int i = 1000; i < 2000; i++) {
+ memTable.writeAlignedRow(deviceID, measurementSchemas, i, new Object[]
{i, i, i});
+ }
+
+ ResourceByPathUtils resourcesByPathUtils =
+ ResourceByPathUtils.getResourceInstance(
+ new AlignedFullPath(deviceID, Arrays.asList("s1", "s2", "s3"),
measurementSchemas));
+ AlignedReadOnlyMemChunk firstQueryMemChunk =
+ (AlignedReadOnlyMemChunk)
+ resourcesByPathUtils.getReadOnlyMemChunkFromMemTable(
+ new QueryContext(1, false), memTable, null, Long.MAX_VALUE,
null);
+ TVList originalWorkingList = memTable.getWritableMemChunk(deviceID,
"").getWorkingTVList();
+ Assert.assertSame(
+ originalWorkingList,
+
firstQueryMemChunk.getAligendTvListQueryMap().keySet().iterator().next());
+
+ for (int i = 1; i <= 50; i++) {
+ memTable.writeAlignedRow(deviceID, measurementSchemas, i, new Object[]
{i, i, i});
+ }
+ memTable.delete(
+ new TreeDeletionEntry(new MeasurementPath("root.test.d1.s1",
TSDataType.INT32), 1, 10));
+ memTable.delete(
+ new TreeDeletionEntry(new MeasurementPath("root.test.d1.s2",
TSDataType.INT32), 1, 10));
+ memTable.delete(
+ new TreeDeletionEntry(new MeasurementPath("root.test.d1.s3",
TSDataType.INT32), 1, 10));
+ Assert.assertFalse(originalWorkingList.isSorted());
+
+ AlignedReadOnlyMemChunk flushingQueryMemChunk =
+ (AlignedReadOnlyMemChunk)
+ resourcesByPathUtils.getReadOnlyMemChunkFromMemTable(
+ new QueryContext(2, false), memTable, new ArrayList<>(),
Long.MAX_VALUE, null);
+ TVList flushingQueryList =
+
flushingQueryMemChunk.getAligendTvListQueryMap().keySet().iterator().next();
+ Assert.assertNotSame(originalWorkingList, flushingQueryList);
+
+ flushingQueryMemChunk.sortTvLists();
+ Assert.assertFalse(originalWorkingList.isSorted());
+
+ firstQueryMemChunk.sortTvLists();
+ MemPointIterator memPointIterator =
+ firstQueryMemChunk.createMemPointIterator(Ordering.ASC, null);
+ while (memPointIterator.hasNextBatch()) {
+ memPointIterator.nextBatch();
+ }
+ }
+
@Test
public void memSeriesToStringTest() throws IOException {
TSDataType dataType = TSDataType.INT32;
@@ -767,7 +824,7 @@ public class PrimitiveMemTableTest {
list.getQueryContextSet().add(queryContext);
Map<TVList, Integer> tvlistMap = new HashMap<>();
tvlistMap.put(list, 100);
- queryContext.addTVListToSet(tvlistMap);
+ queryContext.addTVListToSet(tvlistMap.keySet());
// fragment instance execution
IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);