This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch improve-metadata-oom-diagnostics
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 99fe166a16c71b4d959d1037675ea084d0dfb2d4
Author: Caideyipi <[email protected]>
AuthorDate: Fri Jun 26 15:14:45 2026 +0800

    Improve metadata OOM diagnostics
---
 .../db/queryengine/common/MPPQueryContext.java     | 295 ++++++++++++++++++++-
 .../common/schematree/ClusterSchemaTree.java       |   8 +
 .../queryengine/plan/analyze/AnalyzeVisitor.java   |   8 +
 .../queryengine/plan/analyze/ExpressionUtils.java  |   2 +
 .../analyze/schema/ClusterSchemaFetchExecutor.java |   9 +
 .../db/queryengine/common/MPPQueryContextTest.java | 125 +++++++++
 6 files changed, 444 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
index 852028e6a2b..66be5aa4829 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.common;
 
+import org.apache.iotdb.calc.exception.MemoryNotEnoughException;
 import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
@@ -43,6 +44,7 @@ import 
org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider;
 import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
 import 
org.apache.iotdb.db.queryengine.plan.planner.memory.NotThreadSafeMemoryReservationManager;
 import 
org.apache.iotdb.db.queryengine.plan.relational.function.tvf.read_tsfile.ExternalTsFileQueryResource;
 import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExplainOutputFormat;
@@ -61,6 +63,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -142,6 +145,21 @@ public class MPPQueryContext implements IAuditEntity {
   private boolean releaseSchemaTreeAfterAnalyzing = true;
   private LongConsumer reserveMemoryForSchemaTreeFunc = null;
 
+  private boolean reservingMemoryForSchemaTree = false;
+
+  private boolean resultSetColumnMemoryTrackingEnabled = false;
+  private boolean alignByDeviceForResultSetColumnTracking = false;
+  private long seriesLimitForResultSetColumnTracking = 0;
+  private long seriesOffsetForResultSetColumnTracking = 0;
+  private long matchedSourceColumnsForResultSet = 0;
+  private long expandedSourceColumnsForResultSet = 0;
+  private long sourceColumnMemoryCostForResultSet = 0;
+  private long generatedResultSetColumns = 0;
+  private long generatedResultSetColumnMemoryCost = 0;
+  private long schemaFetchEstimatedMemoryCost = 0;
+  private long schemaFetchReservedMemoryCost = 0;
+  private long schemaFetchDeserializedColumnCount = 0;
+
   private boolean userQuery = false;
 
   /**
@@ -218,8 +236,17 @@ public class MPPQueryContext implements IAuditEntity {
     if (reserveMemoryForSchemaTreeFunc == null) {
       return;
     }
-    reserveMemoryForSchemaTreeFunc.accept(memoryCost);
+    schemaFetchEstimatedMemoryCost += memoryCost;
+    reservingMemoryForSchemaTree = true;
+    try {
+      reserveMemoryForSchemaTreeFunc.accept(memoryCost);
+    } catch (MemoryNotEnoughException e) {
+      throw enrichSchemaFetchMemoryNotEnoughException(e, memoryCost);
+    } finally {
+      reservingMemoryForSchemaTree = false;
+    }
     this.reservedMemoryCostForSchemaTree += memoryCost;
+    this.schemaFetchReservedMemoryCost += memoryCost;
   }
 
   public void setReleaseSchemaTreeAfterAnalyzing(boolean 
releaseSchemaTreeAfterAnalyzing) {
@@ -244,6 +271,7 @@ public class MPPQueryContext implements IAuditEntity {
     }
     this.initResultNodeContext();
     this.releaseAllMemoryReservedForFrontEnd();
+    this.resetResultSetColumnMemoryTracking();
   }
 
   private void cleanUpCte() {
@@ -540,11 +568,25 @@ public class MPPQueryContext implements IAuditEntity {
    * single-threaded manner.
    */
   public void reserveMemoryForFrontEnd(final long bytes) {
-    this.memoryReservationManager.reserveMemoryCumulatively(bytes);
+    try {
+      this.memoryReservationManager.reserveMemoryCumulatively(bytes);
+    } catch (MemoryNotEnoughException e) {
+      if (reservingMemoryForSchemaTree) {
+        throw e;
+      }
+      throw enrichResultSetColumnMemoryNotEnoughException(e, bytes);
+    }
   }
 
   public void reserveMemoryForFrontEndImmediately() {
-    this.memoryReservationManager.reserveMemoryImmediately();
+    try {
+      this.memoryReservationManager.reserveMemoryImmediately();
+    } catch (MemoryNotEnoughException e) {
+      if (reservingMemoryForSchemaTree) {
+        throw e;
+      }
+      throw enrichResultSetColumnMemoryNotEnoughException(e, 
extractRequestedMemory(e));
+    }
   }
 
   public void releaseAllMemoryReservedForFrontEnd() {
@@ -555,6 +597,253 @@ public class MPPQueryContext implements IAuditEntity {
     this.memoryReservationManager.releaseMemoryCumulatively(bytes);
   }
 
+  public void initResultSetColumnMemoryTracking(
+      long seriesLimit, long seriesOffset, boolean alignByDevice) {
+    resetResultSetColumnMemoryTracking();
+    resultSetColumnMemoryTrackingEnabled = true;
+    seriesLimitForResultSetColumnTracking = seriesLimit;
+    seriesOffsetForResultSetColumnTracking = seriesOffset;
+    alignByDeviceForResultSetColumnTracking = alignByDevice;
+  }
+
+  public void recordMatchedSourceColumnsForResultSet(long columnCount) {
+    if (resultSetColumnMemoryTrackingEnabled && columnCount > 0) {
+      matchedSourceColumnsForResultSet += columnCount;
+    }
+  }
+
+  public void recordExpandedSourceColumnForResultSet(long memoryCost) {
+    if (!resultSetColumnMemoryTrackingEnabled) {
+      return;
+    }
+    expandedSourceColumnsForResultSet++;
+    sourceColumnMemoryCostForResultSet += Math.max(memoryCost, 0);
+  }
+
+  public void recordGeneratedResultSetColumn(long memoryCost) {
+    if (!resultSetColumnMemoryTrackingEnabled) {
+      return;
+    }
+    generatedResultSetColumns++;
+    generatedResultSetColumnMemoryCost += Math.max(memoryCost, 0);
+  }
+
+  public void recordSchemaFetchDeserializedColumns(long columnCount) {
+    if (columnCount > 0) {
+      schemaFetchDeserializedColumnCount += columnCount;
+    }
+  }
+
+  private void resetResultSetColumnMemoryTracking() {
+    resultSetColumnMemoryTrackingEnabled = false;
+    alignByDeviceForResultSetColumnTracking = false;
+    seriesLimitForResultSetColumnTracking = 0;
+    seriesOffsetForResultSetColumnTracking = 0;
+    matchedSourceColumnsForResultSet = 0;
+    expandedSourceColumnsForResultSet = 0;
+    sourceColumnMemoryCostForResultSet = 0;
+    generatedResultSetColumns = 0;
+    generatedResultSetColumnMemoryCost = 0;
+    schemaFetchEstimatedMemoryCost = 0;
+    schemaFetchReservedMemoryCost = 0;
+    schemaFetchDeserializedColumnCount = 0;
+  }
+
+  private MemoryNotEnoughException 
enrichResultSetColumnMemoryNotEnoughException(
+      MemoryNotEnoughException e, long requestedBytes) {
+    if (!resultSetColumnMemoryTrackingEnabled
+        || (matchedSourceColumnsForResultSet == 0
+            && expandedSourceColumnsForResultSet == 0
+            && generatedResultSetColumns == 0)) {
+      return e;
+    }
+
+    long freeBytes = 
LocalExecutionPlanner.getInstance().getFreeMemoryForOperators();
+    long shortageBytes =
+        requestedBytes > 0 && requestedBytes > freeBytes ? requestedBytes - 
freeBytes : -1;
+    long exceededColumns = estimateExceededColumns(freeBytes, requestedBytes);
+
+    return new MemoryNotEnoughException(
+        String.format(
+            Locale.ROOT,
+            "Not enough memory while analyzing metadata for query result 
columns. "
+                + "The result set has too many columns. "
+                + "Before the failure, IoTDB had matched %,d source columns 
for result-column "
+                + "expansion, expanded %,d source columns, and generated %,d 
result-set columns. "
+                + "%s"
+                + "Current series pagination is %s. "
+                + "Use SLIMIT/SOFFSET to reduce returned series%s, narrow the 
path pattern, "
+                + "or increase query memory%s. "
+                + "Memory details: source-column memory for result expansion 
%s, "
+                + "generated-result-column memory %s, requested this time %s, 
current free memory %s. "
+                + "Original error: %s",
+            matchedSourceColumnsForResultSet,
+            expandedSourceColumnsForResultSet,
+            generatedResultSetColumns,
+            exceededColumns > 0
+                ? String.format(
+                    Locale.ROOT,
+                    "The matched source columns exceed the estimated current 
memory capacity by "
+                        + "at least %,d columns. ",
+                    exceededColumns)
+                : "",
+            formatSeriesPaginationForDiagnostics(),
+            alignByDeviceForResultSetColumnTracking
+                ? ""
+                : ", use ALIGN BY DEVICE to reduce cross-device result 
columns",
+            shortageBytes > 0
+                ? " by at least " + formatBytes(shortageBytes)
+                : " for the query engine/operator memory pool",
+            formatBytes(sourceColumnMemoryCostForResultSet),
+            formatBytes(generatedResultSetColumnMemoryCost),
+            formatBytes(requestedBytes),
+            formatBytes(freeBytes),
+            e.getMessage()));
+  }
+
+  private MemoryNotEnoughException enrichSchemaFetchMemoryNotEnoughException(
+      MemoryNotEnoughException e, long requestedBytes) {
+    long freeBytes = 
LocalExecutionPlanner.getInstance().getFreeMemoryForOperators();
+    if (!resultSetColumnMemoryTrackingEnabled && 
schemaFetchDeserializedColumnCount == 0) {
+      return e;
+    }
+
+    long shortageBytes =
+        requestedBytes > 0 && requestedBytes > freeBytes ? requestedBytes - 
freeBytes : -1;
+    long exceededColumns = estimateExceededSchemaFetchColumns(freeBytes, 
requestedBytes);
+
+    return new MemoryNotEnoughException(
+        String.format(
+            Locale.ROOT,
+            "Not enough memory while fetching metadata for query analysis. "
+                + "The result set may have too many columns. "
+                + "Before the failure, IoTDB had deserialized %,d time-series 
columns from schema "
+                + "fetch results. Schema fetch memory may be reserved before 
safely deserializing "
+                + "the whole fetched metadata, so this count can be lower than 
the matched schema "
+                + "columns. %s"
+                + "Current series pagination is %s. "
+                + "Use SLIMIT/SOFFSET to reduce returned series%s, narrow the 
path pattern, "
+                + "or increase query memory%s. "
+                + "Memory details: fetched schema tree estimated memory %s, "
+                + "fetched schema tree reserved memory %s, requested this time 
%s, "
+                + "current free memory %s. Original error: %s",
+            schemaFetchDeserializedColumnCount,
+            exceededColumns > 0
+                ? String.format(
+                    Locale.ROOT,
+                    "The fetched schema columns exceed the estimated current 
memory capacity by "
+                        + "at least %,d columns. ",
+                    exceededColumns)
+                : "",
+            formatSeriesPaginationForDiagnostics(),
+            alignByDeviceForResultSetColumnTracking
+                ? ""
+                : ", use ALIGN BY DEVICE to reduce cross-device result 
columns",
+            shortageBytes > 0
+                ? " by at least " + formatBytes(shortageBytes)
+                : " for the query engine/operator memory pool",
+            formatBytes(schemaFetchEstimatedMemoryCost),
+            formatBytes(schemaFetchReservedMemoryCost),
+            formatBytes(requestedBytes),
+            formatBytes(freeBytes),
+            e.getMessage()));
+  }
+
+  private long estimateExceededColumns(long freeBytes, long requestedBytes) {
+    long avgColumnMemory;
+    if (expandedSourceColumnsForResultSet > 0 && 
sourceColumnMemoryCostForResultSet > 0) {
+      avgColumnMemory =
+          Math.max(1, sourceColumnMemoryCostForResultSet / 
expandedSourceColumnsForResultSet);
+    } else if (requestedBytes > 0) {
+      avgColumnMemory = requestedBytes;
+    } else {
+      return -1;
+    }
+    long estimatedCapacity =
+        (sourceColumnMemoryCostForResultSet + Math.max(freeBytes, 0)) / 
avgColumnMemory;
+    long columnsToCompare =
+        Math.max(matchedSourceColumnsForResultSet, 
expandedSourceColumnsForResultSet + 1);
+    return Math.max(0, columnsToCompare - estimatedCapacity);
+  }
+
+  private long estimateExceededSchemaFetchColumns(long freeBytes, long 
requestedBytes) {
+    if (schemaFetchDeserializedColumnCount <= 0) {
+      return -1;
+    }
+
+    long avgColumnMemory;
+    long columnsToCompare = schemaFetchDeserializedColumnCount;
+    if (schemaFetchReservedMemoryCost > 0) {
+      avgColumnMemory =
+          Math.max(
+              1, divideCeil(schemaFetchReservedMemoryCost, 
schemaFetchDeserializedColumnCount));
+      if (requestedBytes > 0) {
+        columnsToCompare += Math.max(1, divideCeil(requestedBytes, 
avgColumnMemory));
+      }
+    } else if (requestedBytes > 0) {
+      avgColumnMemory = Math.max(1, divideCeil(requestedBytes, 
schemaFetchDeserializedColumnCount));
+    } else {
+      return -1;
+    }
+
+    long estimatedCapacity =
+        (schemaFetchReservedMemoryCost + Math.max(freeBytes, 0)) / 
avgColumnMemory;
+    return Math.max(0, columnsToCompare - estimatedCapacity);
+  }
+
+  private static long divideCeil(long dividend, long divisor) {
+    return dividend / divisor + (dividend % divisor == 0 ? 0 : 1);
+  }
+
+  private String formatSeriesPaginationForDiagnostics() {
+    return String.format(
+        Locale.ROOT,
+        "SLIMIT=%s, SOFFSET=%,d",
+        seriesLimitForResultSetColumnTracking > 0
+            ? String.format(Locale.ROOT, "%,d", 
seriesLimitForResultSetColumnTracking)
+            : "not set",
+        seriesOffsetForResultSetColumnTracking);
+  }
+
+  private static long extractRequestedMemory(MemoryNotEnoughException e) {
+    String message = e.getMessage();
+    if (message == null) {
+      return -1;
+    }
+    String marker = "the memory requested this time is ";
+    int start = message.indexOf(marker);
+    if (start < 0) {
+      return -1;
+    }
+    start += marker.length();
+    int end = message.indexOf('B', start);
+    if (end < 0) {
+      return -1;
+    }
+    try {
+      return Long.parseLong(message.substring(start, end));
+    } catch (NumberFormatException ignored) {
+      return -1;
+    }
+  }
+
+  private static String formatBytes(long bytes) {
+    if (bytes < 0) {
+      return "unknown";
+    }
+    if (bytes < 1024) {
+      return bytes + " B";
+    }
+    double value = bytes;
+    String[] units = {"B", "KB", "MB", "GB", "TB"};
+    int unitIndex = 0;
+    while (value >= 1024 && unitIndex < units.length - 1) {
+      value /= 1024;
+      unitIndex++;
+    }
+    return String.format(Locale.ROOT, "%.2f %s (%d B)", value, 
units[unitIndex], bytes);
+  }
+
   public boolean useSampledAvgTimeseriesOperandMemCost() {
     return numsOfSampledTimeseriesOperand >= 
MIN_SIZE_TO_USE_SAMPLED_TIMESERIES_OPERAND_MEM_COST;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java
index f822ef397ac..49493b88047 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java
@@ -570,10 +570,16 @@ public class ClusterSchemaTree implements ISchemaTree {
     private Map<Integer, Template> templateMap = new HashMap<>();
     private boolean isFirstBatch = true;
 
+    private long measurementCount = 0;
+
     public boolean isFirstBatch() {
       return isFirstBatch;
     }
 
+    public long getMeasurementCount() {
+      return measurementCount;
+    }
+
     public void deserializeFromBatch(InputStream inputStream) throws 
IOException {
       isFirstBatch = false;
       while (inputStream.available() > 0) {
@@ -581,6 +587,7 @@ public class ClusterSchemaTree implements ISchemaTree {
         if (nodeType == SCHEMA_MEASUREMENT_NODE) {
           SchemaMeasurementNode measurementNode = 
SchemaMeasurementNode.deserialize(inputStream);
           stack.push(measurementNode);
+          measurementCount++;
           if (measurementNode.isLogicalView()) {
             hasLogicalView = true;
           }
@@ -638,6 +645,7 @@ public class ClusterSchemaTree implements ISchemaTree {
       // templateMap is set to the returned schema tree, so we should create a 
new one
       templateMap = new HashMap<>();
       isFirstBatch = true;
+      measurementCount = 0;
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
index 22d5b0518ac..82c90e3376e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
@@ -280,6 +280,10 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
       // check for semantic errors
       queryStatement.semanticCheck();
 
+      context.initResultSetColumnMemoryTracking(
+          queryStatement.getSeriesLimit(),
+          queryStatement.getSeriesOffset(),
+          queryStatement.isAlignByDevice());
       // fetch model inference information and check
       analyzeModelInference(analysis, queryStatement);
 
@@ -716,6 +720,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
             groupByLevelHelper.applyLevels(
                 isCountStar, resultExpression, resultColumn.getAlias(), 
analysis);
         Expression normalizedOutputExpression = 
normalizeExpression(outputExpression);
+        
queryContext.recordGeneratedResultSetColumn(normalizedOutputExpression.ramBytesUsed());
         analyzeExpressionType(analysis, normalizedOutputExpression);
         outputExpressionSet.add(
             new Pair<>(
@@ -780,6 +785,7 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
           checkAliasUniqueness(resultColumn.getAlias(), aliasSet);
 
           Expression normalizedExpression = 
normalizeExpression(resultExpression);
+          
queryContext.recordGeneratedResultSetColumn(normalizedExpression.ramBytesUsed());
           analyzeExpressionType(analysis, normalizedExpression);
           outputExpressions.add(
               new Pair<>(
@@ -879,6 +885,8 @@ public class AnalyzeVisitor extends 
StatementVisitor<Analysis, MPPQueryContext>
           Expression lowerCaseMeasurementExpression = 
toLowerCaseExpression(measurementExpression);
           analyzeExpressionType(analysis, lowerCaseMeasurementExpression);
 
+          queryContext.recordGeneratedResultSetColumn(
+              lowerCaseMeasurementExpression.ramBytesUsed());
           outputExpressions.add(
               new Pair<>(
                   lowerCaseMeasurementExpression,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java
index 1d9bb7838d5..aab50a32b65 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java
@@ -70,6 +70,7 @@ public class ExpressionUtils {
       final List<? extends PartialPath> actualPaths,
       final MPPQueryContext queryContext) {
     List<Expression> resultExpressions = new ArrayList<>();
+    queryContext.recordMatchedSourceColumnsForResultSet(actualPaths.size());
     for (PartialPath actualPath : actualPaths) {
       Expression expression = reconstructTimeSeriesOperand(rawExpression, 
actualPath);
       long memCost;
@@ -81,6 +82,7 @@ public class ExpressionUtils {
       }
       queryContext.reserveMemoryForFrontEnd(memCost);
 
+      queryContext.recordExpandedSourceColumnForResultSet(memCost);
       resultExpressions.add(expression);
     }
     return resultExpressions;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
index e9b5da05955..c5621b49ee3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java
@@ -61,6 +61,8 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.function.Consumer;
 
+import static org.apache.iotdb.commons.schema.SchemaConstant.ALL_MATCH_PATTERN;
+
 class ClusterSchemaFetchExecutor {
 
   private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
@@ -331,6 +333,8 @@ class ClusterSchemaFetchExecutor {
         // for data from old version
         ClusterSchemaTree deserializedSchemaTree = 
ClusterSchemaTree.deserialize(inputStream);
         if (context != null) {
+          context.recordSchemaFetchDeserializedColumns(
+              
deserializedSchemaTree.searchMeasurementPaths(ALL_MATCH_PATTERN).left.size());
           
context.reserveMemoryForSchemaTree(deserializedSchemaTree.ramBytesUsed());
         }
         resultSchemaTree.mergeSchemaTree(deserializedSchemaTree);
@@ -341,7 +345,12 @@ class ClusterSchemaFetchExecutor {
             context.reserveMemoryForSchemaTree(memCost);
           }
         }
+        long measurementCountBeforeDeserialization = 
deserializer.getMeasurementCount();
         deserializer.deserializeFromBatch(inputStream);
+        if (context != null) {
+          context.recordSchemaFetchDeserializedColumns(
+              deserializer.getMeasurementCount() - 
measurementCountBeforeDeserialization);
+        }
         if (type == 3) {
           // 'type == 3' indicates this batch is finished
           resultSchemaTree.mergeSchemaTree(deserializer.finish());
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/common/MPPQueryContextTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/common/MPPQueryContextTest.java
new file mode 100644
index 00000000000..9f5803e1930
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/common/MPPQueryContextTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.common;
+
+import org.apache.iotdb.calc.exception.MemoryNotEnoughException;
+import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MPPQueryContextTest {
+
+  private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L;
+
+  @Test
+  public void 
resultSetColumnMemoryNotEnoughExceptionContainsColumnDiagnostics() {
+    MPPQueryContext context = new MPPQueryContext(new 
QueryId("result_column_oom_test"));
+    long failedRequestBytes = requestLargerThanFreeOperatorMemory();
+
+    context.initResultSetColumnMemoryTracking(100, 5, false);
+    context.recordMatchedSourceColumnsForResultSet(10);
+    context.recordExpandedSourceColumnForResultSet(failedRequestBytes);
+    context.recordGeneratedResultSetColumn(128);
+
+    MemoryNotEnoughException exception =
+        Assert.assertThrows(
+            MemoryNotEnoughException.class,
+            () -> context.reserveMemoryForFrontEnd(failedRequestBytes));
+
+    String message = exception.getMessage();
+    assertContains(message, "Not enough memory while analyzing metadata for 
query result columns.");
+    assertContains(message, "The result set has too many columns.");
+    assertContains(message, "matched 10 source columns");
+    assertContains(message, "expanded 1 source columns");
+    assertContains(message, "generated 1 result-set columns");
+    assertContains(message, "exceed the estimated current memory capacity by 
at least");
+    assertContains(message, "SLIMIT=100, SOFFSET=5");
+    assertContains(message, "Use SLIMIT/SOFFSET");
+    assertContains(message, "ALIGN BY DEVICE");
+    assertContains(message, "increase query memory by at least");
+    assertContains(message, "requested this time");
+    assertContains(message, "current free memory");
+    assertContains(message, "Original error:");
+  }
+
+  @Test
+  public void 
schemaFetchMemoryNotEnoughExceptionContainsFetchedColumnDiagnostics() {
+    MPPQueryContext context = new MPPQueryContext(new 
QueryId("schema_fetch_oom_test"));
+    long failedRequestBytes = requestLargerThanFreeOperatorMemory();
+
+    context.initResultSetColumnMemoryTracking(0, 2, true);
+    context.recordSchemaFetchDeserializedColumns(4);
+    context.setReserveMemoryForSchemaTreeFunc(
+        bytes -> {
+          throw new MemoryNotEnoughException("schema fetch OOM");
+        });
+
+    MemoryNotEnoughException exception =
+        Assert.assertThrows(
+            MemoryNotEnoughException.class,
+            () -> context.reserveMemoryForSchemaTree(failedRequestBytes));
+
+    String message = exception.getMessage();
+    assertContains(message, "Not enough memory while fetching metadata for 
query analysis.");
+    assertContains(message, "deserialized 4 time-series columns");
+    assertContains(message, "fetched schema columns exceed the estimated 
current memory capacity");
+    assertContains(message, "SLIMIT=not set, SOFFSET=2");
+    assertContains(message, "Use SLIMIT/SOFFSET");
+    assertContains(message, "increase query memory by at least");
+    assertContains(message, "fetched schema tree estimated memory");
+    assertContains(message, "requested this time");
+    assertContains(message, "schema fetch OOM");
+  }
+
+  @Test
+  public void 
schemaFetchMemoryNotEnoughExceptionKeepsOriginalWithoutColumnContext() {
+    MPPQueryContext context = new MPPQueryContext(new 
QueryId("schema_fetch_without_context"));
+    MemoryNotEnoughException expected = new MemoryNotEnoughException("original 
schema OOM");
+    context.setReserveMemoryForSchemaTreeFunc(
+        bytes -> {
+          throw expected;
+        });
+
+    MemoryNotEnoughException actual =
+        Assert.assertThrows(
+            MemoryNotEnoughException.class, () -> 
context.reserveMemoryForSchemaTree(1));
+
+    Assert.assertSame(expected, actual);
+    Assert.assertEquals("original schema OOM", actual.getMessage());
+  }
+
+  private static long requestLargerThanFreeOperatorMemory() {
+    long freeBytes = 
LocalExecutionPlanner.getInstance().getFreeMemoryForOperators();
+    if (freeBytes < MEMORY_BATCH_THRESHOLD) {
+      return MEMORY_BATCH_THRESHOLD;
+    }
+    if (freeBytes >= Long.MAX_VALUE - 1) {
+      return Long.MAX_VALUE;
+    }
+    return freeBytes + 1;
+  }
+
+  private static void assertContains(String message, String expected) {
+    Assert.assertTrue(
+        String.format("Expected message to contain <%s>, but was <%s>.", 
expected, message),
+        message.contains(expected));
+  }
+}

Reply via email to