[ 
https://issues.apache.org/jira/browse/DRILL-6123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16354481#comment-16354481
 ] 

ASF GitHub Bot commented on DRILL-6123:
---------------------------------------

Github user ppadma commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1107#discussion_r166427892
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/MergeJoinBatch.java
 ---
    @@ -102,20 +105,78 @@
       private final List<Comparator> comparators;
       private final JoinRelType joinType;
       private JoinWorker worker;
    +  private final long outputBatchSize;
     
       private static final String LEFT_INPUT = "LEFT INPUT";
       private static final String RIGHT_INPUT = "RIGHT INPUT";
     
    +  private class MergeJoinMemoryManager extends 
AbstractRecordBatchMemoryManager {
    +    private int leftRowWidth;
    +    private int rightRowWidth;
    +
    +    /**
    +     * mergejoin operates on one record at a time from the left and right 
batches
    +     * using RecordIterator abstraction. We have a callback mechanism to 
get notified
    +     * when new batch is loaded in record iterator.
    +     * This can get called in the middle of current output batch we are 
building.
    +     * when this gets called, adjust number of output rows for the current 
batch and
    +     * update the value to be used for subsequent batches.
    +     */
    +    @Override
    +    public void update(int inputIndex) {
    +      switch(inputIndex) {
    +        case 0:
    +          final RecordBatchSizer leftSizer = new RecordBatchSizer(left);
    +          leftRowWidth = leftSizer.netRowWidth();
    +          break;
    +        case 1:
    +          final RecordBatchSizer rightSizer = new RecordBatchSizer(right);
    +          rightRowWidth = rightSizer.netRowWidth();
    +        default:
    +          break;
    +      }
    +
    +      final int newOutgoingRowWidth = leftRowWidth + rightRowWidth;
    +
    +      // If outgoing row width is 0, just return. This is possible for 
empty batches or
    +      // when first set of batches come with OK_NEW_SCHEMA and no data.
    +      if (newOutgoingRowWidth == 0) {
    +        return;
    +      }
    +
    +      // update the value to be used for next batch(es)
    +      setOutputRowCount(Math.min(ValueVector.MAX_ROW_COUNT,
    +        
Math.max(RecordBatchSizer.safeDivide(outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR,
 newOutgoingRowWidth), MIN_NUM_ROWS)));
    +
    +      // Adjust for the current batch.
    +      // calculate memory used so far based on previous outgoing row width 
and how many rows we already processed.
    +      final long memoryUsed = status.getOutPosition() * 
getOutgoingRowWidth();
    +      // This is the remaining memory.
    +      final long remainingMemory = 
Math.max(outputBatchSize/WORST_CASE_FRAGMENTATION_FACTOR - memoryUsed, 0);
    +      // These are number of rows we can fit in remaining memory based on 
new outgoing row width.
    +      final int numOutputRowsRemaining = 
RecordBatchSizer.safeDivide(remainingMemory, newOutgoingRowWidth);
    +
    +      final int adjustedOutputRowCount = Math.min(MAX_NUM_ROWS, 
Math.max(status.getOutPosition() + numOutputRowsRemaining, MIN_NUM_ROWS));
    +      status.setOutputRowCount(adjustedOutputRowCount);
    +      setOutgoingRowWidth(newOutgoingRowWidth);
    --- End diff --
    
    Yes, this is how it works.
    We read from left and right side using RecordIterator abstraction, which 
reads full record batches underneath and gives one record at a time with its 
next call.  I have a callback mechanism when we read a new batch in record 
iterator to adjust the row widths. When I get the callback, I adjust row count 
for the current batch we are working on based on remaining memory available for 
the current batch and also compute and save the row count I should use for next 
full batch. In the innerNext, when we start working on the next output batch, I 
am setting the target output row count based on this value.
    
    Addressed all other code review comments. Please take a look when you get a 
chance.



> Limit batch size for Merge Join based on memory
> -----------------------------------------------
>
>                 Key: DRILL-6123
>                 URL: https://issues.apache.org/jira/browse/DRILL-6123
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Flow
>    Affects Versions: 1.12.0
>            Reporter: Padma Penumarthy
>            Assignee: Padma Penumarthy
>            Priority: Major
>             Fix For: 1.13.0
>
>
> Merge join limits output batch size to 32K rows irrespective of row size. 
> This can create very large or very small batches (in terms of memory), 
> depending upon average row width. Change this to figure out output row count 
> based on memory specified with the new outputBatchSize option and average row 
> width of incoming left and right batches. Output row count will be minimum of 
> 1 and max of 64k. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to