This is an automated email from the ASF dual-hosted git repository. Caideyipi pushed a commit to branch improve-metadata-oom-diagnostics in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 99fe166a16c71b4d959d1037675ea084d0dfb2d4 Author: Caideyipi <[email protected]> AuthorDate: Fri Jun 26 15:14:45 2026 +0800 Improve metadata OOM diagnostics --- .../db/queryengine/common/MPPQueryContext.java | 295 ++++++++++++++++++++- .../common/schematree/ClusterSchemaTree.java | 8 + .../queryengine/plan/analyze/AnalyzeVisitor.java | 8 + .../queryengine/plan/analyze/ExpressionUtils.java | 2 + .../analyze/schema/ClusterSchemaFetchExecutor.java | 9 + .../db/queryengine/common/MPPQueryContextTest.java | 125 +++++++++ 6 files changed, 444 insertions(+), 3 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java index 852028e6a2b..66be5aa4829 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.queryengine.common; +import org.apache.iotdb.calc.exception.MemoryNotEnoughException; import org.apache.iotdb.calc.plan.planner.memory.MemoryReservationManager; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; @@ -43,6 +44,7 @@ import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils; import org.apache.iotdb.db.queryengine.plan.analyze.QueryType; import org.apache.iotdb.db.queryengine.plan.analyze.TypeProvider; import org.apache.iotdb.db.queryengine.plan.analyze.lock.SchemaLockType; +import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; import org.apache.iotdb.db.queryengine.plan.planner.memory.NotThreadSafeMemoryReservationManager; import org.apache.iotdb.db.queryengine.plan.relational.function.tvf.read_tsfile.ExternalTsFileQueryResource; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ExplainOutputFormat; @@ -61,6 +63,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -142,6 +145,21 @@ public class MPPQueryContext implements IAuditEntity { private boolean releaseSchemaTreeAfterAnalyzing = true; private LongConsumer reserveMemoryForSchemaTreeFunc = null; + private boolean reservingMemoryForSchemaTree = false; + + private boolean resultSetColumnMemoryTrackingEnabled = false; + private boolean alignByDeviceForResultSetColumnTracking = false; + private long seriesLimitForResultSetColumnTracking = 0; + private long seriesOffsetForResultSetColumnTracking = 0; + private long matchedSourceColumnsForResultSet = 0; + private long expandedSourceColumnsForResultSet = 0; + private long sourceColumnMemoryCostForResultSet = 0; + private long generatedResultSetColumns = 0; + private long generatedResultSetColumnMemoryCost = 0; + private long schemaFetchEstimatedMemoryCost = 0; + private long schemaFetchReservedMemoryCost = 0; + private long schemaFetchDeserializedColumnCount = 0; + private boolean userQuery = false; /** @@ -218,8 +236,17 @@ public class MPPQueryContext implements IAuditEntity { if (reserveMemoryForSchemaTreeFunc == null) { return; } - reserveMemoryForSchemaTreeFunc.accept(memoryCost); + schemaFetchEstimatedMemoryCost += memoryCost; + reservingMemoryForSchemaTree = true; + try { + reserveMemoryForSchemaTreeFunc.accept(memoryCost); + } catch (MemoryNotEnoughException e) { + throw enrichSchemaFetchMemoryNotEnoughException(e, memoryCost); + } finally { + reservingMemoryForSchemaTree = false; + } this.reservedMemoryCostForSchemaTree += memoryCost; + this.schemaFetchReservedMemoryCost += memoryCost; } public void setReleaseSchemaTreeAfterAnalyzing(boolean releaseSchemaTreeAfterAnalyzing) { @@ -244,6 +271,7 @@ public class MPPQueryContext implements IAuditEntity { } this.initResultNodeContext(); this.releaseAllMemoryReservedForFrontEnd(); + this.resetResultSetColumnMemoryTracking(); } private void cleanUpCte() { @@ -540,11 +568,25 @@ public class MPPQueryContext implements IAuditEntity { * single-threaded manner. */ public void reserveMemoryForFrontEnd(final long bytes) { - this.memoryReservationManager.reserveMemoryCumulatively(bytes); + try { + this.memoryReservationManager.reserveMemoryCumulatively(bytes); + } catch (MemoryNotEnoughException e) { + if (reservingMemoryForSchemaTree) { + throw e; + } + throw enrichResultSetColumnMemoryNotEnoughException(e, bytes); + } } public void reserveMemoryForFrontEndImmediately() { - this.memoryReservationManager.reserveMemoryImmediately(); + try { + this.memoryReservationManager.reserveMemoryImmediately(); + } catch (MemoryNotEnoughException e) { + if (reservingMemoryForSchemaTree) { + throw e; + } + throw enrichResultSetColumnMemoryNotEnoughException(e, extractRequestedMemory(e)); + } } public void releaseAllMemoryReservedForFrontEnd() { @@ -555,6 +597,253 @@ public class MPPQueryContext implements IAuditEntity { this.memoryReservationManager.releaseMemoryCumulatively(bytes); } + public void initResultSetColumnMemoryTracking( + long seriesLimit, long seriesOffset, boolean alignByDevice) { + resetResultSetColumnMemoryTracking(); + resultSetColumnMemoryTrackingEnabled = true; + seriesLimitForResultSetColumnTracking = seriesLimit; + seriesOffsetForResultSetColumnTracking = seriesOffset; + alignByDeviceForResultSetColumnTracking = alignByDevice; + } + + public void recordMatchedSourceColumnsForResultSet(long columnCount) { + if (resultSetColumnMemoryTrackingEnabled && columnCount > 0) { + matchedSourceColumnsForResultSet += columnCount; + } + } + + public void recordExpandedSourceColumnForResultSet(long memoryCost) { + if (!resultSetColumnMemoryTrackingEnabled) { + return; + } + expandedSourceColumnsForResultSet++; + sourceColumnMemoryCostForResultSet += Math.max(memoryCost, 0); + } + + public void recordGeneratedResultSetColumn(long memoryCost) { + if (!resultSetColumnMemoryTrackingEnabled) { + return; + } + generatedResultSetColumns++; + generatedResultSetColumnMemoryCost += Math.max(memoryCost, 0); + } + + public void recordSchemaFetchDeserializedColumns(long columnCount) { + if (columnCount > 0) { + schemaFetchDeserializedColumnCount += columnCount; + } + } + + private void resetResultSetColumnMemoryTracking() { + resultSetColumnMemoryTrackingEnabled = false; + alignByDeviceForResultSetColumnTracking = false; + seriesLimitForResultSetColumnTracking = 0; + seriesOffsetForResultSetColumnTracking = 0; + matchedSourceColumnsForResultSet = 0; + expandedSourceColumnsForResultSet = 0; + sourceColumnMemoryCostForResultSet = 0; + generatedResultSetColumns = 0; + generatedResultSetColumnMemoryCost = 0; + schemaFetchEstimatedMemoryCost = 0; + schemaFetchReservedMemoryCost = 0; + schemaFetchDeserializedColumnCount = 0; + } + + private MemoryNotEnoughException enrichResultSetColumnMemoryNotEnoughException( + MemoryNotEnoughException e, long requestedBytes) { + if (!resultSetColumnMemoryTrackingEnabled + || (matchedSourceColumnsForResultSet == 0 + && expandedSourceColumnsForResultSet == 0 + && generatedResultSetColumns == 0)) { + return e; + } + + long freeBytes = LocalExecutionPlanner.getInstance().getFreeMemoryForOperators(); + long shortageBytes = + requestedBytes > 0 && requestedBytes > freeBytes ? requestedBytes - freeBytes : -1; + long exceededColumns = estimateExceededColumns(freeBytes, requestedBytes); + + return new MemoryNotEnoughException( + String.format( + Locale.ROOT, + "Not enough memory while analyzing metadata for query result columns. " + + "The result set has too many columns. " + + "Before the failure, IoTDB had matched %,d source columns for result-column " + + "expansion, expanded %,d source columns, and generated %,d result-set columns. " + + "%s" + + "Current series pagination is %s. " + + "Use SLIMIT/SOFFSET to reduce returned series%s, narrow the path pattern, " + + "or increase query memory%s. " + + "Memory details: source-column memory for result expansion %s, " + + "generated-result-column memory %s, requested this time %s, current free memory %s. " + + "Original error: %s", + matchedSourceColumnsForResultSet, + expandedSourceColumnsForResultSet, + generatedResultSetColumns, + exceededColumns > 0 + ? String.format( + Locale.ROOT, + "The matched source columns exceed the estimated current memory capacity by " + + "at least %,d columns. ", + exceededColumns) + : "", + formatSeriesPaginationForDiagnostics(), + alignByDeviceForResultSetColumnTracking + ? "" + : ", use ALIGN BY DEVICE to reduce cross-device result columns", + shortageBytes > 0 + ? " by at least " + formatBytes(shortageBytes) + : " for the query engine/operator memory pool", + formatBytes(sourceColumnMemoryCostForResultSet), + formatBytes(generatedResultSetColumnMemoryCost), + formatBytes(requestedBytes), + formatBytes(freeBytes), + e.getMessage())); + } + + private MemoryNotEnoughException enrichSchemaFetchMemoryNotEnoughException( + MemoryNotEnoughException e, long requestedBytes) { + long freeBytes = LocalExecutionPlanner.getInstance().getFreeMemoryForOperators(); + if (!resultSetColumnMemoryTrackingEnabled && schemaFetchDeserializedColumnCount == 0) { + return e; + } + + long shortageBytes = + requestedBytes > 0 && requestedBytes > freeBytes ? requestedBytes - freeBytes : -1; + long exceededColumns = estimateExceededSchemaFetchColumns(freeBytes, requestedBytes); + + return new MemoryNotEnoughException( + String.format( + Locale.ROOT, + "Not enough memory while fetching metadata for query analysis. " + + "The result set may have too many columns. " + + "Before the failure, IoTDB had deserialized %,d time-series columns from schema " + + "fetch results. Schema fetch memory may be reserved before safely deserializing " + + "the whole fetched metadata, so this count can be lower than the matched schema " + + "columns. %s" + + "Current series pagination is %s. " + + "Use SLIMIT/SOFFSET to reduce returned series%s, narrow the path pattern, " + + "or increase query memory%s. " + + "Memory details: fetched schema tree estimated memory %s, " + + "fetched schema tree reserved memory %s, requested this time %s, " + + "current free memory %s. Original error: %s", + schemaFetchDeserializedColumnCount, + exceededColumns > 0 + ? String.format( + Locale.ROOT, + "The fetched schema columns exceed the estimated current memory capacity by " + + "at least %,d columns. ", + exceededColumns) + : "", + formatSeriesPaginationForDiagnostics(), + alignByDeviceForResultSetColumnTracking + ? "" + : ", use ALIGN BY DEVICE to reduce cross-device result columns", + shortageBytes > 0 + ? " by at least " + formatBytes(shortageBytes) + : " for the query engine/operator memory pool", + formatBytes(schemaFetchEstimatedMemoryCost), + formatBytes(schemaFetchReservedMemoryCost), + formatBytes(requestedBytes), + formatBytes(freeBytes), + e.getMessage())); + } + + private long estimateExceededColumns(long freeBytes, long requestedBytes) { + long avgColumnMemory; + if (expandedSourceColumnsForResultSet > 0 && sourceColumnMemoryCostForResultSet > 0) { + avgColumnMemory = + Math.max(1, sourceColumnMemoryCostForResultSet / expandedSourceColumnsForResultSet); + } else if (requestedBytes > 0) { + avgColumnMemory = requestedBytes; + } else { + return -1; + } + long estimatedCapacity = + (sourceColumnMemoryCostForResultSet + Math.max(freeBytes, 0)) / avgColumnMemory; + long columnsToCompare = + Math.max(matchedSourceColumnsForResultSet, expandedSourceColumnsForResultSet + 1); + return Math.max(0, columnsToCompare - estimatedCapacity); + } + + private long estimateExceededSchemaFetchColumns(long freeBytes, long requestedBytes) { + if (schemaFetchDeserializedColumnCount <= 0) { + return -1; + } + + long avgColumnMemory; + long columnsToCompare = schemaFetchDeserializedColumnCount; + if (schemaFetchReservedMemoryCost > 0) { + avgColumnMemory = + Math.max( + 1, divideCeil(schemaFetchReservedMemoryCost, schemaFetchDeserializedColumnCount)); + if (requestedBytes > 0) { + columnsToCompare += Math.max(1, divideCeil(requestedBytes, avgColumnMemory)); + } + } else if (requestedBytes > 0) { + avgColumnMemory = Math.max(1, divideCeil(requestedBytes, schemaFetchDeserializedColumnCount)); + } else { + return -1; + } + + long estimatedCapacity = + (schemaFetchReservedMemoryCost + Math.max(freeBytes, 0)) / avgColumnMemory; + return Math.max(0, columnsToCompare - estimatedCapacity); + } + + private static long divideCeil(long dividend, long divisor) { + return dividend / divisor + (dividend % divisor == 0 ? 0 : 1); + } + + private String formatSeriesPaginationForDiagnostics() { + return String.format( + Locale.ROOT, + "SLIMIT=%s, SOFFSET=%,d", + seriesLimitForResultSetColumnTracking > 0 + ? String.format(Locale.ROOT, "%,d", seriesLimitForResultSetColumnTracking) + : "not set", + seriesOffsetForResultSetColumnTracking); + } + + private static long extractRequestedMemory(MemoryNotEnoughException e) { + String message = e.getMessage(); + if (message == null) { + return -1; + } + String marker = "the memory requested this time is "; + int start = message.indexOf(marker); + if (start < 0) { + return -1; + } + start += marker.length(); + int end = message.indexOf('B', start); + if (end < 0) { + return -1; + } + try { + return Long.parseLong(message.substring(start, end)); + } catch (NumberFormatException ignored) { + return -1; + } + } + + private static String formatBytes(long bytes) { + if (bytes < 0) { + return "unknown"; + } + if (bytes < 1024) { + return bytes + " B"; + } + double value = bytes; + String[] units = {"B", "KB", "MB", "GB", "TB"}; + int unitIndex = 0; + while (value >= 1024 && unitIndex < units.length - 1) { + value /= 1024; + unitIndex++; + } + return String.format(Locale.ROOT, "%.2f %s (%d B)", value, units[unitIndex], bytes); + } + public boolean useSampledAvgTimeseriesOperandMemCost() { return numsOfSampledTimeseriesOperand >= MIN_SIZE_TO_USE_SAMPLED_TIMESERIES_OPERAND_MEM_COST; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java index f822ef397ac..49493b88047 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/schematree/ClusterSchemaTree.java @@ -570,10 +570,16 @@ public class ClusterSchemaTree implements ISchemaTree { private Map<Integer, Template> templateMap = new HashMap<>(); private boolean isFirstBatch = true; + private long measurementCount = 0; + public boolean isFirstBatch() { return isFirstBatch; } + public long getMeasurementCount() { + return measurementCount; + } + public void deserializeFromBatch(InputStream inputStream) throws IOException { isFirstBatch = false; while (inputStream.available() > 0) { @@ -581,6 +587,7 @@ public class ClusterSchemaTree implements ISchemaTree { if (nodeType == SCHEMA_MEASUREMENT_NODE) { SchemaMeasurementNode measurementNode = SchemaMeasurementNode.deserialize(inputStream); stack.push(measurementNode); + measurementCount++; if (measurementNode.isLogicalView()) { hasLogicalView = true; } @@ -638,6 +645,7 @@ public class ClusterSchemaTree implements ISchemaTree { // templateMap is set to the returned schema tree, so we should create a new one templateMap = new HashMap<>(); isFirstBatch = true; + measurementCount = 0; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java index 22d5b0518ac..82c90e3376e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java @@ -280,6 +280,10 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> // check for semantic errors queryStatement.semanticCheck(); + context.initResultSetColumnMemoryTracking( + queryStatement.getSeriesLimit(), + queryStatement.getSeriesOffset(), + queryStatement.isAlignByDevice()); // fetch model inference information and check analyzeModelInference(analysis, queryStatement); @@ -716,6 +720,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> groupByLevelHelper.applyLevels( isCountStar, resultExpression, resultColumn.getAlias(), analysis); Expression normalizedOutputExpression = normalizeExpression(outputExpression); + queryContext.recordGeneratedResultSetColumn(normalizedOutputExpression.ramBytesUsed()); analyzeExpressionType(analysis, normalizedOutputExpression); outputExpressionSet.add( new Pair<>( @@ -780,6 +785,7 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> checkAliasUniqueness(resultColumn.getAlias(), aliasSet); Expression normalizedExpression = normalizeExpression(resultExpression); + queryContext.recordGeneratedResultSetColumn(normalizedExpression.ramBytesUsed()); analyzeExpressionType(analysis, normalizedExpression); outputExpressions.add( new Pair<>( @@ -879,6 +885,8 @@ public class AnalyzeVisitor extends StatementVisitor<Analysis, MPPQueryContext> Expression lowerCaseMeasurementExpression = toLowerCaseExpression(measurementExpression); analyzeExpressionType(analysis, lowerCaseMeasurementExpression); + queryContext.recordGeneratedResultSetColumn( + lowerCaseMeasurementExpression.ramBytesUsed()); outputExpressions.add( new Pair<>( lowerCaseMeasurementExpression, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java index 1d9bb7838d5..aab50a32b65 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/ExpressionUtils.java @@ -70,6 +70,7 @@ public class ExpressionUtils { final List<? extends PartialPath> actualPaths, final MPPQueryContext queryContext) { List<Expression> resultExpressions = new ArrayList<>(); + queryContext.recordMatchedSourceColumnsForResultSet(actualPaths.size()); for (PartialPath actualPath : actualPaths) { Expression expression = reconstructTimeSeriesOperand(rawExpression, actualPath); long memCost; @@ -81,6 +82,7 @@ public class ExpressionUtils { } queryContext.reserveMemoryForFrontEnd(memCost); + queryContext.recordExpandedSourceColumnForResultSet(memCost); resultExpressions.add(expression); } return resultExpressions; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java index e9b5da05955..c5621b49ee3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java @@ -61,6 +61,8 @@ import java.util.Optional; import java.util.Set; import java.util.function.Consumer; +import static org.apache.iotdb.commons.schema.SchemaConstant.ALL_MATCH_PATTERN; + class ClusterSchemaFetchExecutor { private final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig(); @@ -331,6 +333,8 @@ class ClusterSchemaFetchExecutor { // for data from old version ClusterSchemaTree deserializedSchemaTree = ClusterSchemaTree.deserialize(inputStream); if (context != null) { + context.recordSchemaFetchDeserializedColumns( + deserializedSchemaTree.searchMeasurementPaths(ALL_MATCH_PATTERN).left.size()); context.reserveMemoryForSchemaTree(deserializedSchemaTree.ramBytesUsed()); } resultSchemaTree.mergeSchemaTree(deserializedSchemaTree); @@ -341,7 +345,12 @@ class ClusterSchemaFetchExecutor { context.reserveMemoryForSchemaTree(memCost); } } + long measurementCountBeforeDeserialization = deserializer.getMeasurementCount(); deserializer.deserializeFromBatch(inputStream); + if (context != null) { + context.recordSchemaFetchDeserializedColumns( + deserializer.getMeasurementCount() - measurementCountBeforeDeserialization); + } if (type == 3) { // 'type == 3' indicates this batch is finished resultSchemaTree.mergeSchemaTree(deserializer.finish()); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/common/MPPQueryContextTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/common/MPPQueryContextTest.java new file mode 100644 index 00000000000..9f5803e1930 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/common/MPPQueryContextTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.queryengine.common; + +import org.apache.iotdb.calc.exception.MemoryNotEnoughException; +import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner; + +import org.junit.Assert; +import org.junit.Test; + +public class MPPQueryContextTest { + + private static final long MEMORY_BATCH_THRESHOLD = 1024L * 1024L; + + @Test + public void resultSetColumnMemoryNotEnoughExceptionContainsColumnDiagnostics() { + MPPQueryContext context = new MPPQueryContext(new QueryId("result_column_oom_test")); + long failedRequestBytes = requestLargerThanFreeOperatorMemory(); + + context.initResultSetColumnMemoryTracking(100, 5, false); + context.recordMatchedSourceColumnsForResultSet(10); + context.recordExpandedSourceColumnForResultSet(failedRequestBytes); + context.recordGeneratedResultSetColumn(128); + + MemoryNotEnoughException exception = + Assert.assertThrows( + MemoryNotEnoughException.class, + () -> context.reserveMemoryForFrontEnd(failedRequestBytes)); + + String message = exception.getMessage(); + assertContains(message, "Not enough memory while analyzing metadata for query result columns."); + assertContains(message, "The result set has too many columns."); + assertContains(message, "matched 10 source columns"); + assertContains(message, "expanded 1 source columns"); + assertContains(message, "generated 1 result-set columns"); + assertContains(message, "exceed the estimated current memory capacity by at least"); + assertContains(message, "SLIMIT=100, SOFFSET=5"); + assertContains(message, "Use SLIMIT/SOFFSET"); + assertContains(message, "ALIGN BY DEVICE"); + assertContains(message, "increase query memory by at least"); + assertContains(message, "requested this time"); + assertContains(message, "current free memory"); + assertContains(message, "Original error:"); + } + + @Test + public void schemaFetchMemoryNotEnoughExceptionContainsFetchedColumnDiagnostics() { + MPPQueryContext context = new MPPQueryContext(new QueryId("schema_fetch_oom_test")); + long failedRequestBytes = requestLargerThanFreeOperatorMemory(); + + context.initResultSetColumnMemoryTracking(0, 2, true); + context.recordSchemaFetchDeserializedColumns(4); + context.setReserveMemoryForSchemaTreeFunc( + bytes -> { + throw new MemoryNotEnoughException("schema fetch OOM"); + }); + + MemoryNotEnoughException exception = + Assert.assertThrows( + MemoryNotEnoughException.class, + () -> context.reserveMemoryForSchemaTree(failedRequestBytes)); + + String message = exception.getMessage(); + assertContains(message, "Not enough memory while fetching metadata for query analysis."); + assertContains(message, "deserialized 4 time-series columns"); + assertContains(message, "fetched schema columns exceed the estimated current memory capacity"); + assertContains(message, "SLIMIT=not set, SOFFSET=2"); + assertContains(message, "Use SLIMIT/SOFFSET"); + assertContains(message, "increase query memory by at least"); + assertContains(message, "fetched schema tree estimated memory"); + assertContains(message, "requested this time"); + assertContains(message, "schema fetch OOM"); + } + + @Test + public void schemaFetchMemoryNotEnoughExceptionKeepsOriginalWithoutColumnContext() { + MPPQueryContext context = new MPPQueryContext(new QueryId("schema_fetch_without_context")); + MemoryNotEnoughException expected = new MemoryNotEnoughException("original schema OOM"); + context.setReserveMemoryForSchemaTreeFunc( + bytes -> { + throw expected; + }); + + MemoryNotEnoughException actual = + Assert.assertThrows( + MemoryNotEnoughException.class, () -> context.reserveMemoryForSchemaTree(1)); + + Assert.assertSame(expected, actual); + Assert.assertEquals("original schema OOM", actual.getMessage()); + } + + private static long requestLargerThanFreeOperatorMemory() { + long freeBytes = LocalExecutionPlanner.getInstance().getFreeMemoryForOperators(); + if (freeBytes < MEMORY_BATCH_THRESHOLD) { + return MEMORY_BATCH_THRESHOLD; + } + if (freeBytes >= Long.MAX_VALUE - 1) { + return Long.MAX_VALUE; + } + return freeBytes + 1; + } + + private static void assertContains(String message, String expected) { + Assert.assertTrue( + String.format("Expected message to contain <%s>, but was <%s>.", expected, message), + message.contains(expected)); + } +}
