This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 1d3578026d8 [to dev/1.3] Fix query reuse of flushing memtable TVList 
(#17709)
1d3578026d8 is described below

commit 1d3578026d8e19d5801f4d47fe0a9e03c141678b
Author: shuwenwei <[email protected]>
AuthorDate: Tue May 19 14:27:43 2026 +0800

    [to dev/1.3] Fix query reuse of flushing memtable TVList (#17709)
---
 .../execution/fragment/QueryContext.java           |  4 +-
 .../schemaregion/utils/ResourceByPathUtils.java    | 40 ++++++++++++++-
 .../dataregion/flush/MemTableFlushTask.java        |  1 -
 .../memtable/AbstractWritableMemChunk.java         | 20 ++++++--
 .../memtable/AlignedReadOnlyMemChunk.java          |  2 +-
 .../dataregion/memtable/IWritableMemChunk.java     |  3 ++
 .../dataregion/memtable/ReadOnlyMemChunk.java      |  2 +-
 .../fragment/FragmentInstanceExecutionTest.java    |  6 +--
 .../dataregion/memtable/PrimitiveMemTableTest.java | 59 +++++++++++++++++++++-
 9 files changed, 123 insertions(+), 14 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
index 8d62e207766..d1b235ff7b1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/QueryContext.java
@@ -217,7 +217,7 @@ public class QueryContext {
     this.queryStatistics = queryStatistics;
   }
 
-  public void addTVListToSet(Map<TVList, Integer> tvListMap) {
-    tvListSet.addAll(tvListMap.keySet());
+  public void addTVListToSet(Set<TVList> set) {
+    tvListSet.addAll(set);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
index c64d32c1e32..fa2f603d6fa 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java
@@ -158,10 +158,46 @@ public abstract class ResourceByPathUtils {
       }
 
       if (!isWorkMemTable) {
+        /*
+         * 1. Q1 queries this TVList while it is still in the working memtable 
and records a smaller
+         *    visible row count.
+         * 2. Later writes append out-of-order rows to the same TVList, then 
FLUSH moves the
+         *    memtable to the flushing list.
+         * 3. Q2 queries the flushing memtable. If Q2 directly reuses the 
original mutable TVList,
+         *    Q2's query-side sort may reorder the indices in place.
+         * 4. Q1 continues to read with its old row count and the reordered 
indices. The converted
+         *    value index can exceed Q1's bitmap range and cause out-of-bound 
access.
+         *
+         * Therefore, this flushing branch can reuse the original list only 
when it is already
+         * sorted or no active query is using it. Otherwise, Q2 should read 
from
+         * workingListForFlush.
+         */
+        boolean canUseListDirectly = list.isSorted() || 
list.getQueryContextSet().isEmpty();
         LOGGER.debug(
             "Flushing MemTable - add current query context to mutable TVList's 
query list");
-        list.getQueryContextSet().add(context);
-        tvListQueryMap.put(list, list.rowCount());
+        if (canUseListDirectly) {
+          list.getQueryContextSet().add(context);
+          tvListQueryMap.put(list, list.rowCount());
+        } else {
+          TVList workingListForFlushSort = 
memChunk.initWorkingListForFlushIfNecessary(list, true);
+          /*
+           * The query will read from workingListForFlushSort, but 
cloneForFlushSort() only clones
+           * times and indices. The value arrays and bitmaps are still shared 
with the original
+           * list.
+           *
+           * Therefore, this query must also hold the original list until it 
finishes. Adding
+           * context to list.getQueryContextSet() lets flush/query cleanup see 
that the original
+           * list is still in use. Adding list to context.tvListSet makes
+           * releaseTVListOwnedByQuery() remove this context from the original 
list later.
+           *
+           * Do not put the original list into tvListQueryMap here. The actual 
read path must use
+           * workingListForFlushSort to avoid sorting the original list in 
place.
+           */
+          list.getQueryContextSet().add(context);
+          context.addTVListToSet(Collections.singleton(list));
+          workingListForFlushSort.getQueryContextSet().add(context);
+          tvListQueryMap.put(workingListForFlushSort, 
workingListForFlushSort.rowCount());
+        }
       } else {
         if (list.isSorted() || list.getQueryContextSet().isEmpty()) {
           LOGGER.debug(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
index 0c5ccc9ac1b..48efe55c223 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/flush/MemTableFlushTask.java
@@ -278,7 +278,6 @@ public class MemTableFlushTask {
                 times = new long[MAX_NUMBER_OF_POINTS_IN_PAGE];
               }
               writableMemChunk.encode(ioTaskQueue, encodeInfo, times);
-              writableMemChunk.releaseTemporaryTvListForFlush();
               long subTaskTime = System.currentTimeMillis() - starTime;
               
WRITING_METRICS.recordFlushSubTaskCost(WritingMetrics.ENCODING_TASK, 
subTaskTime);
               memSerializeTime += subTaskTime;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
index 5167ea96b56..6c773942fb7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java
@@ -206,8 +206,9 @@ public abstract class AbstractWritableMemChunk implements 
IWritableMemChunk {
     /*
      * Concurrency background:
      *
-     * A query may start earlier and record the current row count (rows) of 
the TVList as its visible range.
-     *  After that, new unseq writes may arrive and immediately trigger a 
flush, which will sort the TVList.
+     * A query may start earlier and record the current row count (rows) of 
the TVList as its
+     * visible range. After that, new unseq writes may arrive and immediately 
trigger a flush, which
+     * will sort the TVList.
      *
      * During sorting, the underlying indices array of the TVList may be 
reordered.
      * If the query continues to use the previously recorded rows as its upper 
bound,
@@ -219,6 +220,9 @@ public abstract class AbstractWritableMemChunk implements 
IWritableMemChunk {
      * To avoid this issue, when there are active queries on the working 
TVList, we must
      * clone the times and indices before sorting, so that the flush sort does 
not mutate
      * the data structures that concurrent queries rely on.
+     *
+     * Flushing-memtable queries may also reuse workingListForFlush instead of 
the original working
+     * TVList for the same reason.
      */
     boolean needCloneTimesAndIndicesInWorkingTVList;
     workingList.lockQueryList();
@@ -228,7 +232,7 @@ public abstract class AbstractWritableMemChunk implements 
IWritableMemChunk {
       workingList.unlockQueryList();
     }
     workingListForFlush =
-        needCloneTimesAndIndicesInWorkingTVList ? 
workingList.cloneForFlushSort() : workingList;
+        initWorkingListForFlushIfNecessary(workingList, 
needCloneTimesAndIndicesInWorkingTVList);
     workingListForFlush.sort();
   }
 
@@ -267,4 +271,14 @@ public abstract class AbstractWritableMemChunk implements 
IWritableMemChunk {
 
   @Override
   public abstract int serializedSize();
+
+  @Override
+  public synchronized TVList initWorkingListForFlushIfNecessary(
+      TVList workingList, boolean needCloneTimesAndIndicesInWorkingTVList) {
+    if (workingListForFlush == null) {
+      workingListForFlush =
+          needCloneTimesAndIndicesInWorkingTVList ? 
workingList.cloneForFlushSort() : workingList;
+    }
+    return workingListForFlush;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
index 30976fb6790..bb2ee311d30 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java
@@ -104,7 +104,7 @@ public class AlignedReadOnlyMemChunk extends 
ReadOnlyMemChunk {
     this.valueStatisticsList = new ArrayList<>();
     this.alignedTvListQueryMap = alignedTvListQueryMap;
     this.columnIndexList = columnIndexList;
-    this.context.addTVListToSet(alignedTvListQueryMap);
+    this.context.addTVListToSet(alignedTvListQueryMap.keySet());
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
index 99404502437..376136e1d12 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/IWritableMemChunk.java
@@ -126,4 +126,7 @@ public interface IWritableMemChunk extends WALEntryValue {
   TVList getWorkingTVList();
 
   void setWorkingTVList(TVList list);
+
+  TVList initWorkingListForFlushIfNecessary(
+      TVList workingList, boolean needCloneTimesAndIndicesInWorkingTVList);
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
index 9e46740a759..c0a71bf7edc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java
@@ -127,7 +127,7 @@ public class ReadOnlyMemChunk {
     this.deletionList = deletionList;
     this.tvListQueryMap = tvListQueryMap;
     this.pageStatisticsList = new ArrayList<>();
-    this.context.addTVListToSet(tvListQueryMap);
+    this.context.addTVListToSet(tvListQueryMap.keySet());
   }
 
   public void sortTvLists() {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
index 22e5c360ae3..cfc7f887dcf 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java
@@ -44,7 +44,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk;
 import org.apache.iotdb.db.utils.datastructure.AlignedTVList;
 import org.apache.iotdb.db.utils.datastructure.TVList;
 
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
@@ -133,13 +133,13 @@ public class FragmentInstanceExecutionTest {
         FragmentInstanceExecution execution1 =
             createFragmentInstanceExecution(1, instanceNotificationExecutor);
         FragmentInstanceContext fragmentInstanceContext1 = 
execution1.getFragmentInstanceContext();
-        fragmentInstanceContext1.addTVListToSet(ImmutableMap.of(tvList, 0));
+        fragmentInstanceContext1.addTVListToSet(ImmutableSet.of(tvList));
         tvList.getQueryContextSet().add(fragmentInstanceContext1);
 
         FragmentInstanceExecution execution2 =
             createFragmentInstanceExecution(2, instanceNotificationExecutor);
         FragmentInstanceContext fragmentInstanceContext2 = 
execution2.getFragmentInstanceContext();
-        fragmentInstanceContext2.addTVListToSet(ImmutableMap.of(tvList, 0));
+        fragmentInstanceContext2.addTVListToSet(ImmutableSet.of(tvList));
         tvList.getQueryContextSet().add(fragmentInstanceContext2);
 
         // mock flush's behavior
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
index b366a48ab77..b9259db0a0f 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/memtable/PrimitiveMemTableTest.java
@@ -222,6 +222,63 @@ public class PrimitiveMemTableTest {
     }
   }
 
+  @Test
+  public void testFlushingQueryDoesNotSortWorkingTVListUsedByPreviousQuery()
+      throws QueryProcessException, IOException, IllegalPathException {
+
+    PrimitiveMemTable memTable = new PrimitiveMemTable("root.test", "0");
+    List<IMeasurementSchema> measurementSchemas =
+        Arrays.asList(
+            new MeasurementSchema("s1", TSDataType.INT32),
+            new MeasurementSchema("s2", TSDataType.INT32),
+            new MeasurementSchema("s3", TSDataType.INT32));
+    PlainDeviceID deviceID = new PlainDeviceID("root.test.d1");
+    for (int i = 1000; i < 2000; i++) {
+      memTable.writeAlignedRow(deviceID, measurementSchemas, i, new Object[] 
{i, i, i});
+    }
+
+    ResourceByPathUtils resourcesByPathUtils =
+        ResourceByPathUtils.getResourceInstance(
+            new AlignedPath("root.test.d1", Arrays.asList("s1", "s2", "s3"), 
measurementSchemas));
+    AlignedReadOnlyMemChunk firstQueryMemChunk =
+        (AlignedReadOnlyMemChunk)
+            resourcesByPathUtils.getReadOnlyMemChunkFromMemTable(
+                new QueryContext(1), memTable, null, Long.MAX_VALUE, null);
+    TVList originalWorkingList = memTable.getWritableMemChunk(deviceID, 
"").getWorkingTVList();
+    Assert.assertSame(
+        originalWorkingList,
+        
firstQueryMemChunk.getAligendTvListQueryMap().keySet().iterator().next());
+
+    for (int i = 1; i <= 50; i++) {
+      memTable.writeAlignedRow(deviceID, measurementSchemas, i, new Object[] 
{i, i, i});
+    }
+    MeasurementPath path = new MeasurementPath("root.test.d1.s1", 
TSDataType.INT32);
+    memTable.delete(path, path.getDevicePath(), 1, 10);
+    path = new MeasurementPath("root.test.d1.s2", TSDataType.INT32);
+    memTable.delete(path, path.getDevicePath(), 1, 10);
+    path = new MeasurementPath("root.test.d1.s3", TSDataType.INT32);
+    memTable.delete(path, path.getDevicePath(), 1, 10);
+    Assert.assertFalse(originalWorkingList.isSorted());
+
+    AlignedReadOnlyMemChunk flushingQueryMemChunk =
+        (AlignedReadOnlyMemChunk)
+            resourcesByPathUtils.getReadOnlyMemChunkFromMemTable(
+                new QueryContext(2), memTable, new ArrayList<>(), 
Long.MAX_VALUE, null);
+    TVList flushingQueryList =
+        
flushingQueryMemChunk.getAligendTvListQueryMap().keySet().iterator().next();
+    Assert.assertNotSame(originalWorkingList, flushingQueryList);
+
+    flushingQueryMemChunk.sortTvLists();
+    Assert.assertFalse(originalWorkingList.isSorted());
+
+    firstQueryMemChunk.sortTvLists();
+    MemPointIterator memPointIterator =
+        firstQueryMemChunk.createMemPointIterator(Ordering.ASC, null);
+    while (memPointIterator.hasNextBatch()) {
+      memPointIterator.nextBatch();
+    }
+  }
+
   @Test
   public void memSeriesToStringTest() throws IOException {
     TSDataType dataType = TSDataType.INT32;
@@ -743,7 +800,7 @@ public class PrimitiveMemTableTest {
     list.getQueryContextSet().add(queryContext);
     Map<TVList, Integer> tvlistMap = new HashMap<>();
     tvlistMap.put(list, 100);
-    queryContext.addTVListToSet(tvlistMap);
+    queryContext.addTVListToSet(tvlistMap.keySet());
 
     // fragment instance execution
     IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class);

Reply via email to