Github user Ben-Zvi commented on a diff in the pull request:
https://github.com/apache/drill/pull/938#discussion_r138433967
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
---
@@ -500,22 +516,45 @@ private void initializeSetup(RecordBatch newIncoming)
throws SchemaChangeExcepti
*/
private void updateEstMaxBatchSize(RecordBatch incoming) {
if ( estMaxBatchSize > 0 ) { return; } // no handling of a schema (or
varchar) change
+ // Use the sizer to get the input row width and the length of the
longest varchar column
RecordBatchSizer sizer = new RecordBatchSizer(incoming);
logger.trace("Incoming sizer: {}",sizer);
// An empty batch only has the schema, can not tell actual length of
varchars
// else use the actual varchars length, each capped at 50 (to match
the space allocation)
- estRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() :
sizer.netRowWidthCap50();
- estMaxBatchSize = estRowWidth * MAX_BATCH_SIZE;
+ long estInputRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() :
sizer.netRowWidthCap50();
// Get approx max (varchar) column width to get better memory
allocation
- maxColumnWidth = Math.max(sizer.maxSize(),
VARIABLE_MIN_WIDTH_VALUE_SIZE);
+ maxColumnWidth = Math.max(sizer.maxAvgColumnSize(),
VARIABLE_MIN_WIDTH_VALUE_SIZE);
maxColumnWidth = Math.min(maxColumnWidth,
VARIABLE_MAX_WIDTH_VALUE_SIZE);
- logger.trace("{} phase. Estimated row width: {} batch size: {}
memory limit: {} max column width: {}",
-
isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estMaxBatchSize,memoryLimit,maxColumnWidth);
+ //
+ // Calculate the estimated max (internal) batch (i.e. Keys batch +
Values batch) size
+ // (which is used to decide when to spill)
+ // Also calculate the values batch size (used as a reserve to overcome
an OOM)
+ //
+ Iterator<VectorWrapper<?>> outgoingIter = outContainer.iterator();
+ int fieldId = 0;
+ while (outgoingIter.hasNext()) {
+ ValueVector vv = outgoingIter.next().getValueVector();
+ MaterializedField mr = vv.getField();
+ int fieldSize = vv instanceof VariableWidthVector ? maxColumnWidth :
+ TypeHelper.getSize(mr.getType());
+ estRowWidth += fieldSize;
+ estOutputRowWidth += fieldSize;
+ if ( fieldId < numGroupByOutFields ) { fieldId++; }
+ else { estValuesRowWidth += fieldSize; }
+ }
+ // multiply by the max number of rows in a batch to get the final
estimated max size
+ estMaxBatchSize = Math.max(estRowWidth, estInputRowWidth) *
MAX_BATCH_SIZE;
--- End diff --
Most of these estimates are for internal "worst case". Only the "outgoing"
one is for the outgoing batch (which is also for spilling - which is internal).
Anyway all these estimates have nothing to do with _throttling_ the
outgoing batch size; that logic was not changed from the original code (likely
up to MAX_BATCH_SIZE).
Making such a change should be a separate project.
---