[ https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16051081#comment-16051081 ]
ASF GitHub Bot commented on DRILL-5457: --------------------------------------- Github user paul-rogers commented on a diff in the pull request: https://github.com/apache/drill/pull/822#discussion_r122312536 --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java --- @@ -512,122 +509,122 @@ private void updateEstMaxBatchSize(RecordBatch incoming) { } } + /** + * Read and process (i.e., insert into the hash table and aggregate) records from the current batch. + * Once complete, get the incoming NEXT batch and process it as well, etc. + * For 1st phase, may return when an early output needs to be performed. + * + * @return Agg outcome status + */ @Override public AggOutcome doWork() { - try { - // Note: Keeping the outer and inner try blocks here to maintain some similarity with - // StreamingAggregate which does somethings conditionally in the outer try block. - // In the future HashAggregate may also need to perform some actions conditionally - // in the outer try block. - - assert ! handlingSpills || currentIndex < Integer.MAX_VALUE; - outside: - while (true) { + while (true) { - // This would be called only once - after actual data arrives on incoming - if ( schema == null && incoming.getRecordCount() > 0 ) { - this.schema = incoming.getSchema(); - // Calculate the number of partitions based on actual incoming data - delayedSetup(); - } + // This would be called only once - first time actual data arrives on incoming + if ( schema == null && incoming.getRecordCount() > 0 ) { + this.schema = incoming.getSchema(); + currentBatchRecordCount = incoming.getRecordCount(); // initialize for first non empty batch + // Calculate the number of partitions based on actual incoming data + delayedSetup(); + } - // loop through existing records, aggregating the values as necessary. - if (EXTRA_DEBUG_1) { - logger.debug("Starting outer loop of doWork()..."); + // + // loop through existing records in this batch, aggregating the values as necessary. + // + if (EXTRA_DEBUG_1) { + logger.debug("Starting outer loop of doWork()..."); + } + for (; underlyingIndex < currentBatchRecordCount; incIndex()) { + if (EXTRA_DEBUG_2) { + logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); } - for (; underlyingIndex < incoming.getRecordCount(); incIndex()) { - if (EXTRA_DEBUG_2) { - logger.debug("Doing loop with values underlying {}, current {}", underlyingIndex, currentIndex); - } - checkGroupAndAggrValues(currentIndex); - // If adding a group discovered a memory pressure during 1st phase, then start - // outputing some partition to free memory. - if ( earlyOutput ) { - outputCurrentBatch(); - incIndex(); // next time continue with the next incoming row - return AggOutcome.RETURN_OUTCOME; - } + checkGroupAndAggrValues(currentIndex); + // If adding a group discovered a memory pressure during 1st phase, then start + // outputing some partition downstream in order to free memory. + if ( earlyOutput ) { + outputCurrentBatch(); + incIndex(); // next time continue with the next incoming row + return AggOutcome.RETURN_OUTCOME; } + } + + if (EXTRA_DEBUG_1) { + logger.debug("Processed {} records", underlyingIndex); + } - if (EXTRA_DEBUG_1) { - logger.debug("Processed {} records", underlyingIndex); + // Cleanup the previous batch since we are done processing it. + for (VectorWrapper<?> v : incoming) { --- End diff -- Two comments/questions here. First, can we be sure that there is always a Selection Vector Remover between the hash agg and anything that can emit a batch that uses an SV4 (such as sort)? Otherwise, the `getValueVector()` method won't work. Second, is it certain that no other code references the vectors from this container? If both those invariants are met, then, yes, it is the job of this operator to release buffers created by the upstream. > Support Spill to Disk for the Hash Aggregate Operator > ----------------------------------------------------- > > Key: DRILL-5457 > URL: https://issues.apache.org/jira/browse/DRILL-5457 > Project: Apache Drill > Issue Type: Improvement > Components: Execution - Relational Operators > Affects Versions: 1.10.0 > Reporter: Boaz Ben-Zvi > Assignee: Boaz Ben-Zvi > Fix For: 1.11.0 > > > Support gradual spilling memory to disk as the available memory gets too > small to allow in memory work for the Hash Aggregate Operator. -- This message was sent by Atlassian JIRA (v6.4.14#64029)