Github user paul-rogers commented on a diff in the pull request:
https://github.com/apache/drill/pull/938#discussion_r137939287
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
---
@@ -500,22 +516,45 @@ private void initializeSetup(RecordBatch newIncoming)
throws SchemaChangeExcepti
*/
private void updateEstMaxBatchSize(RecordBatch incoming) {
if ( estMaxBatchSize > 0 ) { return; } // no handling of a schema (or
varchar) change
+ // Use the sizer to get the input row width and the length of the
longest varchar column
RecordBatchSizer sizer = new RecordBatchSizer(incoming);
logger.trace("Incoming sizer: {}",sizer);
// An empty batch only has the schema, can not tell actual length of
varchars
// else use the actual varchars length, each capped at 50 (to match
the space allocation)
- estRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() :
sizer.netRowWidthCap50();
- estMaxBatchSize = estRowWidth * MAX_BATCH_SIZE;
+ long estInputRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() :
sizer.netRowWidthCap50();
// Get approx max (varchar) column width to get better memory
allocation
- maxColumnWidth = Math.max(sizer.maxSize(),
VARIABLE_MIN_WIDTH_VALUE_SIZE);
+ maxColumnWidth = Math.max(sizer.maxAvgColumnSize(),
VARIABLE_MIN_WIDTH_VALUE_SIZE);
maxColumnWidth = Math.min(maxColumnWidth,
VARIABLE_MAX_WIDTH_VALUE_SIZE);
- logger.trace("{} phase. Estimated row width: {} batch size: {}
memory limit: {} max column width: {}",
-
isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estMaxBatchSize,memoryLimit,maxColumnWidth);
+ //
+ // Calculate the estimated max (internal) batch (i.e. Keys batch +
Values batch) size
+ // (which is used to decide when to spill)
+ // Also calculate the values batch size (used as a reserve to overcome
an OOM)
+ //
+ Iterator<VectorWrapper<?>> outgoingIter = outContainer.iterator();
+ int fieldId = 0;
+ while (outgoingIter.hasNext()) {
+ ValueVector vv = outgoingIter.next().getValueVector();
+ MaterializedField mr = vv.getField();
+ int fieldSize = vv instanceof VariableWidthVector ? maxColumnWidth :
+ TypeHelper.getSize(mr.getType());
+ estRowWidth += fieldSize;
+ estOutputRowWidth += fieldSize;
+ if ( fieldId < numGroupByOutFields ) { fieldId++; }
+ else { estValuesRowWidth += fieldSize; }
+ }
+ // multiply by the max number of rows in a batch to get the final
estimated max size
+ estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) *
MAX_BATCH_SIZE;
--- End diff --
Here, the output batch size is fixed based on the number of rows. Suppose
we had a sort as the output of this operator, and the sort has a memory ceiling
of x MB. Can the code here create batches larger than x/2 MB, meaning that that
sort is forced to consume batches so large that it can't buffer two and spill?
In other words, is there an attempt here to control overall output batch
memory use instead of just assuming that we always output `MAX_BATCH_SIZE` rows
regardless of memory use?
---