Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/822#discussion_r122323606
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
    @@ -512,122 +509,122 @@ private void updateEstMaxBatchSize(RecordBatch 
incoming) {
         }
       }
     
    +  /**
    +   *  Read and process (i.e., insert into the hash table and aggregate) 
records from the current batch.
    +   *  Once complete, get the incoming NEXT batch and process it as well, 
etc.
    +   *  For 1st phase, may return when an early output needs to be performed.
    +   *
    +   * @return Agg outcome status
    +   */
       @Override
       public AggOutcome doWork() {
    -    try {
    -      // Note: Keeping the outer and inner try blocks here to maintain 
some similarity with
    -      // StreamingAggregate which does somethings conditionally in the 
outer try block.
    -      // In the future HashAggregate may also need to perform some actions 
conditionally
    -      // in the outer try block.
    -
    -      assert ! handlingSpills || currentIndex < Integer.MAX_VALUE;
     
    -      outside:
    -      while (true) {
    +    while (true) {
     
    -        // This would be called only once - after actual data arrives on 
incoming
    -        if ( schema == null && incoming.getRecordCount() > 0 ) {
    -          this.schema = incoming.getSchema();
    -          // Calculate the number of partitions based on actual incoming 
data
    -          delayedSetup();
    -        }
    +      // This would be called only once - first time actual data arrives 
on incoming
    +      if ( schema == null && incoming.getRecordCount() > 0 ) {
    +        this.schema = incoming.getSchema();
    +        currentBatchRecordCount = incoming.getRecordCount(); // initialize 
for first non empty batch
    +        // Calculate the number of partitions based on actual incoming data
    +        delayedSetup();
    +      }
     
    -        // loop through existing records, aggregating the values as 
necessary.
    -        if (EXTRA_DEBUG_1) {
    -          logger.debug("Starting outer loop of doWork()...");
    +      //
    +      //  loop through existing records in this batch, aggregating the 
values as necessary.
    +      //
    +      if (EXTRA_DEBUG_1) {
    +        logger.debug("Starting outer loop of doWork()...");
    +      }
    +      for (; underlyingIndex < currentBatchRecordCount; incIndex()) {
    +        if (EXTRA_DEBUG_2) {
    +          logger.debug("Doing loop with values underlying {}, current {}", 
underlyingIndex, currentIndex);
             }
    -        for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
    -          if (EXTRA_DEBUG_2) {
    -            logger.debug("Doing loop with values underlying {}, current 
{}", underlyingIndex, currentIndex);
    -          }
    -          checkGroupAndAggrValues(currentIndex);
    -          // If adding a group discovered a memory pressure during 1st 
phase, then start
    -          // outputing some partition to free memory.
    -          if ( earlyOutput ) {
    -            outputCurrentBatch();
    -            incIndex(); // next time continue with the next incoming row
    -            return AggOutcome.RETURN_OUTCOME;
    -          }
    +        checkGroupAndAggrValues(currentIndex);
    +        // If adding a group discovered a memory pressure during 1st 
phase, then start
    +        // outputing some partition downstream in order to free memory.
    +        if ( earlyOutput ) {
    +          outputCurrentBatch();
    +          incIndex(); // next time continue with the next incoming row
    +          return AggOutcome.RETURN_OUTCOME;
             }
    +      }
    +
    +      if (EXTRA_DEBUG_1) {
    +        logger.debug("Processed {} records", underlyingIndex);
    +      }
     
    -        if (EXTRA_DEBUG_1) {
    -          logger.debug("Processed {} records", underlyingIndex);
    +      // Cleanup the previous batch since we are done processing it.
    +      for (VectorWrapper<?> v : incoming) {
    +        v.getValueVector().clear();
    +      }
    +      //
    +      // Get the NEXT input batch, initially from the upstream, later (if 
there was a spill)
    +      // from one of the spill files (The spill case is handled 
differently here to avoid
    +      // collecting stats on the spilled records)
    +      //
    +      if ( handlingSpills ) {
    +        outcome = context.shouldContinue() ? incoming.next() : 
IterOutcome.STOP;
    +      } else {
    +        long beforeAlloc = allocator.getAllocatedMemory();
    +
    +        // Get the next RecordBatch from the incoming (i.e. upstream 
operator)
    +        outcome = outgoing.next(0, incoming);
    +
    +        // If incoming batch is bigger than our estimate - adjust the 
estimate to match
    +        long afterAlloc = allocator.getAllocatedMemory();
    +        long incomingBatchSize = afterAlloc - beforeAlloc;
    +        if ( estMaxBatchSize < incomingBatchSize) {
    +          logger.trace("Found a bigger incoming batch: {} , prior estimate 
was: {}", incomingBatchSize, estMaxBatchSize);
    +          estMaxBatchSize = incomingBatchSize;
             }
    +      }
     
    -        try {
    +      if (EXTRA_DEBUG_1) {
    +        logger.debug("Received IterOutcome of {}", outcome);
    +      }
     
    -          while (true) {
    -            // Cleanup the previous batch since we are done processing it.
    -            long pre = allocator.getAllocatedMemory();
    -            for (VectorWrapper<?> v : incoming) {
    -              v.getValueVector().clear();
    -            }
    -            long beforeAlloc = allocator.getAllocatedMemory();
    +      // Handle various results from getting the next batch
    +      switch (outcome) {
    +        case OUT_OF_MEMORY:
    +        case NOT_YET:
    --- End diff --
    
    Again - this is the original Hash Agg code (just re-indented, as the 
while(true) and try blocks were removed).  The "outcome" comes from getting the 
next incoming batch, hence OUT_OF_MEMORY may (?) occur if a grossly abnormal 
incoming batch shows up unexpectedly. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to