DRILL-6177: Merge Join - Allocate memory for outgoing value vectors based on sizes of incoming batches.
closes #1125 Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/766315ea Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/766315ea Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/766315ea Branch: refs/heads/master Commit: 766315ea17377199897d685ab801edd38394fe01 Parents: 31e0f29 Author: Padma Penumarthy <[email protected]> Authored: Tue Mar 6 16:09:43 2018 -0800 Committer: Ben-Zvi <[email protected]> Committed: Wed Mar 7 15:42:11 2018 -0800 ---------------------------------------------------------------------- .../exec/physical/impl/join/MergeJoinBatch.java | 23 ++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/766315ea/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java index f612ae2..2155f0a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java @@ -57,7 +57,6 @@ import org.apache.drill.exec.record.VectorContainer; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.AbstractRecordBatchMemoryManager; -import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.complex.AbstractContainerVector; @@ -114,6 +113,9 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { private int leftRowWidth; private int rightRowWidth; + private RecordBatchSizer leftSizer; + private RecordBatchSizer rightSizer; + /** * mergejoin operates on one record at a time from the left and right batches * using RecordIterator abstraction. We have a callback mechanism to get notified @@ -126,11 +128,11 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { public void update(int inputIndex) { switch(inputIndex) { case 0: - final RecordBatchSizer leftSizer = new RecordBatchSizer(left); + leftSizer = new RecordBatchSizer(left); leftRowWidth = leftSizer.netRowWidth(); break; case 1: - final RecordBatchSizer rightSizer = new RecordBatchSizer(right); + rightSizer = new RecordBatchSizer(right); rightRowWidth = rightSizer.netRowWidth(); default: break; @@ -158,6 +160,14 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { status.setTargetOutputRowCount(status.getOutPosition() + numOutputRowsRemaining); setOutgoingRowWidth(newOutgoingRowWidth); } + + @Override + public RecordBatchSizer.ColumnSize getColumnSize(String name) { + if (leftSizer != null && leftSizer.getColumn(name) != null) { + return leftSizer.getColumn(name); + } + return rightSizer == null ? null : rightSizer.getColumn(name); + } } private final MergeJoinMemoryManager mergeJoinMemoryManager = new MergeJoinMemoryManager(); @@ -492,8 +502,13 @@ public class MergeJoinBatch extends AbstractRecordBatch<MergeJoinPOP> { } else { container.zeroVectors(); } + + // Allocate memory for the vectors. + // This will iteratively allocate memory for all nested columns underneath. + int outputRowCount = mergeJoinMemoryManager.getOutputRowCount(); for (VectorWrapper w : container) { - AllocationHelper.allocateNew(w.getValueVector(), Character.MAX_VALUE); + RecordBatchSizer.ColumnSize colSize = mergeJoinMemoryManager.getColumnSize(w.getField().getName()); + colSize.allocateVector(w.getValueVector(), outputRowCount); } container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
