[ https://issues.apache.org/jira/browse/DRILL-6123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16354481#comment-16354481 ]
ASF GitHub Bot commented on DRILL-6123: --------------------------------------- Github user ppadma commented on a diff in the pull request: https://github.com/apache/drill/pull/1107#discussion_r166427892 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java --- @@ -102,20 +105,78 @@ private final List<Comparator> comparators; private final JoinRelType joinType; private JoinWorker worker; + private final long outputBatchSize; private static final String LEFT_INPUT = "LEFT INPUT"; private static final String RIGHT_INPUT = "RIGHT INPUT"; + private class MergeJoinMemoryManager extends AbstractRecordBatchMemoryManager { + private int leftRowWidth; + private int rightRowWidth; + + /** + * mergejoin operates on one record at a time from the left and right batches + * using RecordIterator abstraction. We have a callback mechanism to get notified + * when new batch is loaded in record iterator. + * This can get called in the middle of current output batch we are building. + * when this gets called, adjust number of output rows for the current batch and + * update the value to be used for subsequent batches. + */ + @Override + public void update(int inputIndex) { + switch(inputIndex) { + case 0: + final RecordBatchSizer leftSizer = new RecordBatchSizer(left); + leftRowWidth = leftSizer.netRowWidth(); + break; + case 1: + final RecordBatchSizer rightSizer = new RecordBatchSizer(right); + rightRowWidth = rightSizer.netRowWidth(); + default: + break; + } + + final int newOutgoingRowWidth = leftRowWidth + rightRowWidth; + + // If outgoing row width is 0, just return. This is possible for empty batches or + // when first set of batches come with OK_NEW_SCHEMA and no data. + if (newOutgoingRowWidth == 0) { + return; + } + + // update the value to be used for next batch(es) + setOutputRowCount(Math.min(ValueVector.MAX_ROW_COUNT, + Math.max(RecordBatchSizer.safeDivide(outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR, newOutgoingRowWidth), MIN_NUM_ROWS))); + + // Adjust for the current batch. + // calculate memory used so far based on previous outgoing row width and how many rows we already processed. + final long memoryUsed = status.getOutPosition() * getOutgoingRowWidth(); + // This is the remaining memory. + final long remainingMemory = Math.max(outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR - memoryUsed, 0); + // These are number of rows we can fit in remaining memory based on new outgoing row width. + final int numOutputRowsRemaining = RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth); + + final int adjustedOutputRowCount = Math.min(MAX_NUM_ROWS, Math.max(status.getOutPosition() + numOutputRowsRemaining, MIN_NUM_ROWS)); + status.setOutputRowCount(adjustedOutputRowCount); + setOutgoingRowWidth(newOutgoingRowWidth); --- End diff -- Yes, this is how it works. We read from left and right side using RecordIterator abstraction, which reads full record batches underneath and gives one record at a time with its next call. I have a callback mechanism when we read a new batch in record iterator to adjust the row widths. When I get the callback, I adjust row count for the current batch we are working on based on remaining memory available for the current batch and also compute and save the row count I should use for next full batch. In the innerNext, when we start working on the next output batch, I am setting the target output row count based on this value. Addressed all other code review comments. Please take a look when you get a chance. > Limit batch size for Merge Join based on memory > ----------------------------------------------- > > Key: DRILL-6123 > URL: https://issues.apache.org/jira/browse/DRILL-6123 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Flow > Affects Versions: 1.12.0 > Reporter: Padma Penumarthy > Assignee: Padma Penumarthy > Priority: Major > Fix For: 1.13.0 > > > Merge join limits output batch size to 32K rows irrespective of row size. > This can create very large or very small batches (in terms of memory), > depending upon average row width. Change this to figure out output row count > based on memory specified with the new outputBatchSize option and average row > width of incoming left and right batches. Output row count will be minimum of > 1 and max of 64k. -- This message was sent by Atlassian JIRA (v7.6.3#76005)