[ 
https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16051144#comment-16051144
 ] 

ASF GitHub Bot commented on DRILL-5457:
---------------------------------------

Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/822#discussion_r122324273
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
    @@ -512,122 +509,122 @@ private void updateEstMaxBatchSize(RecordBatch 
incoming) {
         }
       }
     
    +  /**
    +   *  Read and process (i.e., insert into the hash table and aggregate) 
records from the current batch.
    +   *  Once complete, get the incoming NEXT batch and process it as well, 
etc.
    +   *  For 1st phase, may return when an early output needs to be performed.
    +   *
    +   * @return Agg outcome status
    +   */
       @Override
       public AggOutcome doWork() {
    -    try {
    -      // Note: Keeping the outer and inner try blocks here to maintain 
some similarity with
    -      // StreamingAggregate which does somethings conditionally in the 
outer try block.
    -      // In the future HashAggregate may also need to perform some actions 
conditionally
    -      // in the outer try block.
    -
    -      assert ! handlingSpills || currentIndex < Integer.MAX_VALUE;
     
    -      outside:
    -      while (true) {
    +    while (true) {
     
    -        // This would be called only once - after actual data arrives on 
incoming
    -        if ( schema == null && incoming.getRecordCount() > 0 ) {
    -          this.schema = incoming.getSchema();
    -          // Calculate the number of partitions based on actual incoming 
data
    -          delayedSetup();
    -        }
    +      // This would be called only once - first time actual data arrives 
on incoming
    +      if ( schema == null && incoming.getRecordCount() > 0 ) {
    +        this.schema = incoming.getSchema();
    +        currentBatchRecordCount = incoming.getRecordCount(); // initialize 
for first non empty batch
    +        // Calculate the number of partitions based on actual incoming data
    +        delayedSetup();
    +      }
     
    -        // loop through existing records, aggregating the values as 
necessary.
    -        if (EXTRA_DEBUG_1) {
    -          logger.debug("Starting outer loop of doWork()...");
    +      //
    +      //  loop through existing records in this batch, aggregating the 
values as necessary.
    +      //
    +      if (EXTRA_DEBUG_1) {
    +        logger.debug("Starting outer loop of doWork()...");
    +      }
    +      for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
    +        if (EXTRA_DEBUG_2) {
    +          logger.debug("Doing loop with values underlying {}, current {}", 
underlyingIndex, currentIndex);
             }
    -        for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
    -          if (EXTRA_DEBUG_2) {
    -            logger.debug("Doing loop with values underlying {}, current 
{}", underlyingIndex, currentIndex);
    -          }
    -          checkGroupAndAggrValues(currentIndex);
    -          // If adding a group discovered a memory pressure during 1st 
phase, then start
    -          // outputing some partition to free memory.
    -          if ( earlyOutput ) {
    -            outputCurrentBatch();
    -            incIndex(); // next time continue with the next incoming row
    -            return AggOutcome.RETURN_OUTCOME;
    -          }
    +        checkGroupAndAggrValues(currentIndex);
    +        // If adding a group discovered a memory pressure during 1st 
phase, then start
    +        // outputing some partition downstream in order to free memory.
    +        if ( earlyOutput ) {
    +          outputCurrentBatch();
    +          incIndex(); // next time continue with the next incoming row
    +          return AggOutcome.RETURN_OUTCOME;
             }
    +      }
    +
    +      if (EXTRA_DEBUG_1) {
    +        logger.debug("Processed {} records", underlyingIndex);
    +      }
     
    -        if (EXTRA_DEBUG_1) {
    -          logger.debug("Processed {} records", underlyingIndex);
    +      // Cleanup the previous batch since we are done processing it.
    +      for (VectorWrapper<?> v : incoming) {
    +        v.getValueVector().clear();
    +      }
    +      //
    +      // Get the NEXT input batch, initially from the upstream, later (if 
there was a spill)
    +      // from one of the spill files (The spill case is handled 
differently here to avoid
    +      // collecting stats on the spilled records)
    +      //
    +      if ( handlingSpills ) {
    +        outcome = context.shouldContinue() ? incoming.next() : 
IterOutcome.STOP;
    +      } else {
    +        long beforeAlloc = allocator.getAllocatedMemory();
    +
    +        // Get the next RecordBatch from the incoming (i.e. upstream 
operator)
    +        outcome = outgoing.next(0, incoming);
    +
    +        // If incoming batch is bigger than our estimate - adjust the 
estimate to match
    +        long afterAlloc = allocator.getAllocatedMemory();
    +        long incomingBatchSize = afterAlloc - beforeAlloc;
    +        if ( estMaxBatchSize < incomingBatchSize) {
    +          logger.trace("Found a bigger incoming batch: {} , prior estimate 
was: {}", incomingBatchSize, estMaxBatchSize);
    +          estMaxBatchSize = incomingBatchSize;
             }
    +      }
     
    -        try {
    +      if (EXTRA_DEBUG_1) {
    +        logger.debug("Received IterOutcome of {}", outcome);
    +      }
     
    -          while (true) {
    -            // Cleanup the previous batch since we are done processing it.
    -            long pre = allocator.getAllocatedMemory();
    -            for (VectorWrapper<?> v : incoming) {
    -              v.getValueVector().clear();
    -            }
    -            long beforeAlloc = allocator.getAllocatedMemory();
    +      // Handle various results from getting the next batch
    +      switch (outcome) {
    +        case OUT_OF_MEMORY:
    +        case NOT_YET:
    +          return AggOutcome.RETURN_OUTCOME;
     
    -            // Get the next RecordBatch from the incoming
    -            IterOutcome out = outgoing.next(0, incoming);
    +        case OK_NEW_SCHEMA:
    --- End diff --
    
    This case leads to immediate failure; even the first batch does not get 
here. The error message returned is "Hash aggregate does not support schema 
change" . (Again -- all is original code). 



> Support Spill to Disk for the Hash Aggregate Operator
> -----------------------------------------------------
>
>                 Key: DRILL-5457
>                 URL: https://issues.apache.org/jira/browse/DRILL-5457
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Relational Operators
>    Affects Versions: 1.10.0
>            Reporter: Boaz Ben-Zvi
>            Assignee: Boaz Ben-Zvi
>             Fix For: 1.11.0
>
>
> Support gradual spilling memory to disk as the available memory gets too 
> small to allow in memory work for the Hash Aggregate Operator.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to