DRILL-6126: Allocate memory for value vectors upfront in flatten operator
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/31e0f29a Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/31e0f29a Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/31e0f29a Branch: refs/heads/master Commit: 31e0f29a6140a19eda8de615e615208f51f2cf96 Parents: 47c5d1f Author: Padma Penumarthy <[email protected]> Authored: Tue Mar 6 16:09:13 2018 -0800 Committer: Ben-Zvi <[email protected]> Committed: Wed Mar 7 15:41:26 2018 -0800 ---------------------------------------------------------------------- .../impl/flatten/FlattenRecordBatch.java | 34 +++++++++++--------- .../AbstractRecordBatchMemoryManager.java | 24 ++++++++++++-- 2 files changed, 40 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/31e0f29a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java index 4a910ef..9dd1770 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/flatten/FlattenRecordBatch.java @@ -104,25 +104,24 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { @Override public void update() { // Get sizing information for the batch. - RecordBatchSizer sizer = new RecordBatchSizer(incoming); + setRecordBatchSizer(new RecordBatchSizer(incoming)); final TypedFieldId typedFieldId = incoming.getValueVectorId(popConfig.getColumn()); final MaterializedField field = incoming.getSchema().getColumn(typedFieldId.getFieldIds()[0]); // Get column size of flatten column. - RecordBatchSizer.ColumnSize columnSize = RecordBatchSizer.getColumn(incoming.getValueAccessorById(field.getValueClass(), - typedFieldId.getFieldIds()).getValueVector(), field.getName()); + RecordBatchSizer.ColumnSize columnSize = getRecordBatchSizer().getColumn(field.getName()); // Average rowWidth of flatten column - final int avgRowWidthFlattenColumn = RecordBatchSizer.safeDivide(columnSize.netSize, incoming.getRecordCount()); + final int avgRowWidthFlattenColumn = columnSize.getNetSizePerEntry(); // Average rowWidth excluding the flatten column. - final int avgRowWidthWithOutFlattenColumn = sizer.netRowWidth() - avgRowWidthFlattenColumn; + final int avgRowWidthWithOutFlattenColumn = getRecordBatchSizer().netRowWidth() - avgRowWidthFlattenColumn; // Average rowWidth of single element in the flatten list. // subtract the offset vector size from column data size. final int avgRowWidthSingleFlattenEntry = - RecordBatchSizer.safeDivide(columnSize.netSize - (OFFSET_VECTOR_WIDTH * columnSize.valueCount), columnSize.elementCount); + RecordBatchSizer.safeDivide(columnSize.getTotalNetSize() - (OFFSET_VECTOR_WIDTH * columnSize.getValueCount()), columnSize.getElementCount()); // Average rowWidth of outgoing batch. final int avgOutgoingRowWidth = avgRowWidthWithOutFlattenColumn + avgRowWidthSingleFlattenEntry; @@ -130,13 +129,16 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { // Number of rows in outgoing batch setOutputRowCount(outputBatchSize, avgOutgoingRowWidth); + // Limit to lower bound of total number of rows possible for this batch + // i.e. all rows fit within memory budget. + setOutputRowCount(Math.min(columnSize.getElementCount(), getOutputRowCount())); + logger.debug("flatten incoming batch sizer : {}, outputBatchSize : {}," + - "avgOutgoingRowWidth : {}, outputRowCount : {}", sizer, outputBatchSize, avgOutgoingRowWidth, getOutputRowCount()); + "avgOutgoingRowWidth : {}, outputRowCount : {}", getRecordBatchSizer(), outputBatchSize, + avgOutgoingRowWidth, getOutputRowCount()); } - } - public FlattenRecordBatch(FlattenPOP pop, RecordBatch incoming, FragmentContext context) throws OutOfMemoryException { super(pop, context, incoming); @@ -199,7 +201,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { int incomingRecordCount = incoming.getRecordCount(); - if (!doAlloc()) { + if (!doAlloc(flattenMemoryManager.getOutputRowCount())) { outOfMemory = true; return IterOutcome.OUT_OF_MEMORY; } @@ -235,7 +237,7 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { private void handleRemainder() { int remainingRecordCount = flattener.getFlattenField().getAccessor().getInnerValueCount() - remainderIndex; - if (!doAlloc()) { + if (!doAlloc(remainingRecordCount)) { outOfMemory = true; return; } @@ -266,12 +268,12 @@ public class FlattenRecordBatch extends AbstractSingleRecordBatch<FlattenPOP> { complexWriters.add(writer); } - private boolean doAlloc() { - //Allocate vv in the allocationVectors. + private boolean doAlloc(int recordCount) { + for (ValueVector v : this.allocationVectors) { - if (!v.allocateNewSafe()) { - return false; - } + // This will iteratively allocate memory for nested columns underneath. + RecordBatchSizer.ColumnSize colSize = flattenMemoryManager.getColumnSize(v.getField().getName()); + colSize.allocateVector(v, recordCount); } //Allocate vv for complexWriters. http://git-wip-us.apache.org/repos/asf/drill/blob/31e0f29a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java index b91ede0..1abd365 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractRecordBatchMemoryManager.java @@ -27,6 +27,7 @@ public abstract class AbstractRecordBatchMemoryManager { protected static final int MIN_NUM_ROWS = 1; private int outputRowCount = MAX_NUM_ROWS; private int outgoingRowWidth; + private RecordBatchSizer sizer; public void update(int inputIndex) {}; @@ -41,14 +42,20 @@ public abstract class AbstractRecordBatchMemoryManager { * the min and max that is allowed. */ public void setOutputRowCount(int targetBatchSize, int rowWidth) { - this.outputRowCount = adjustOutputRowCount(RecordBatchSizer.safeDivide(targetBatchSize/WORST_CASE_FRAGMENTATION_FACTOR, rowWidth)); + this.outputRowCount = adjustOutputRowCount(RecordBatchSizer.safeDivide(targetBatchSize, rowWidth)); + } + + public void setOutputRowCount(int outputRowCount) { + this.outputRowCount = outputRowCount; } /** * This will adjust rowCount taking into account the min and max that is allowed. + * We will round down to nearest power of two - 1 for better memory utilization. + * -1 is done for adjusting accounting for offset vectors. */ public static int adjustOutputRowCount(int rowCount) { - return (Math.min(MAX_NUM_ROWS, Math.max(rowCount, MIN_NUM_ROWS))); + return (Math.min(MAX_NUM_ROWS, Math.max(Integer.highestOneBit(rowCount) - 1, MIN_NUM_ROWS))); } public void setOutgoingRowWidth(int outgoingRowWidth) { @@ -58,4 +65,17 @@ public abstract class AbstractRecordBatchMemoryManager { public int getOutgoingRowWidth() { return outgoingRowWidth; } + + public void setRecordBatchSizer(RecordBatchSizer sizer) { + this.sizer = sizer; + } + + public RecordBatchSizer getRecordBatchSizer() { + return sizer; + } + + public RecordBatchSizer.ColumnSize getColumnSize(String name) { + return sizer.getColumn(name); + } + }
