Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/822#discussion_r119975302
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
    @@ -400,114 +782,411 @@ public IterOutcome getOutcome() {
     
       @Override
       public int getOutputCount() {
    -    // return outputCount;
         return lastBatchOutputCount;
       }
     
       @Override
       public void cleanup() {
    -    if (htable != null) {
    -      htable.clear();
    -      htable = null;
    -    }
    +      if ( schema == null ) { return; } // not set up; nothing to clean
    +      for ( int i = 0; i < numPartitions; i++) {
    +          if (htables[i] != null) {
    +              htables[i].clear();
    +              htables[i] = null;
    +          }
    +          if ( batchHolders[i] != null) {
    +              for (BatchHolder bh : batchHolders[i]) {
    +                    bh.clear();
    +              }
    +              batchHolders[i].clear();
    +              batchHolders[i] = null;
    +          }
    +
    +          // delete any (still active) output spill file
    +          if ( outputStream[i] != null && spillFiles[i] != null) {
    +            try {
    +              spillSet.delete(spillFiles[i]);
    +            } catch(IOException e) {
    +              logger.warn("Cleanup: Failed to delete spill file 
{}",spillFiles[i]);
    +            }
    +          }
    +      }
    +      // delete any spill file left in unread spilled partitions
    +      while ( ! spilledPartitionsList.isEmpty() ) {
    +        SpilledPartition sp = spilledPartitionsList.remove(0);
    +        try {
    +          spillSet.delete(sp.spillFile);
    +        } catch(IOException e) {
    +          logger.warn("Cleanup: Failed to delete spill file 
{}",sp.spillFile);
    +        }
    +      }
    +      spillSet.close(); // delete the spill directory(ies)
         htIdxHolder = null;
         materializedValueFields = null;
         outStartIdxHolder = null;
         outNumRecordsHolder = null;
    +  }
     
    -    if (batchHolders != null) {
    -      for (BatchHolder bh : batchHolders) {
    +  // First free the memory used by the given (spilled) partition (i.e., 
hash table plus batches)
    +  // then reallocate them in pristine state to allow the partition to 
continue receiving rows
    +  private void reinitPartition(int part) throws SchemaChangeException, 
ClassTransformationException, IOException {
    +    assert htables[part] != null;
    +    htables[part].reset();
    +    if ( batchHolders[part] != null) {
    +      for (BatchHolder bh : batchHolders[part]) {
             bh.clear();
           }
    -      batchHolders.clear();
    -      batchHolders = null;
    +      batchHolders[part].clear();
         }
    +    batchHolders[part] = new ArrayList<BatchHolder>(); // First 
BatchHolder is created when the first put request is received.
       }
     
    -//  private final AggOutcome setOkAndReturn() {
    -//    this.outcome = IterOutcome.OK;
    -//    for (VectorWrapper<?> v : outgoing) {
    -//      v.getValueVector().getMutator().setValueCount(outputCount);
    -//    }
    -//    return AggOutcome.RETURN_OUTCOME;
    -//  }
     
       private final void incIndex() {
         underlyingIndex++;
         if (underlyingIndex >= incoming.getRecordCount()) {
           currentIndex = Integer.MAX_VALUE;
           return;
         }
    -    currentIndex = getVectorIndex(underlyingIndex);
    +    try { currentIndex = getVectorIndex(underlyingIndex); }
    +    catch (SchemaChangeException sc) { throw new 
DrillRuntimeException(sc);}
       }
     
       private final void resetIndex() {
         underlyingIndex = -1;
         incIndex();
       }
     
    -  private void addBatchHolder() {
    +  private boolean isSpilled(int part) {
    +    return outputStream[part] != null;
    +  }
    +  /**
    +   * Which partition to choose for flushing out (i.e. spill or return) ?
    +   * - The current partition (to which a new bach holder is added) has a 
priority,
    +   *   because its last batch holder is full.
    +   * - Also the largest prior spilled partition has some priority, as it 
is already spilled;
    +   *   but spilling too few rows (e.g. a single batch) gets us nothing.
    +   * - So the largest non-spilled partition has some priority, to get more 
memory freed.
    +   * Need to weigh the above three options.
    +   *
    +   *  @param currPart - The partition that hit the memory limit (gets a 
priority)
    +   *  @return The partition (number) chosen to be spilled
    +   */
    +  private int chooseAPartitionToFlush(int currPart) {
    +    if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the 
current partition
    +    int currPartSize = batchHolders[currPart].size();
    +    if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current 
if size is 1
    +    // first find the largest spilled partition
    +    int maxSizeSpilled = -1;
    +    int indexMaxSpilled = -1;
    +    for (int isp = 0; isp < numPartitions; isp++ ) {
    +      if ( isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size() ) {
    +        maxSizeSpilled = batchHolders[isp].size();
    +        indexMaxSpilled = isp;
    +      }
    +    }
    +    // Give the current (if already spilled) some priority
    +    if ( isSpilled(currPart) && ( currPartSize + 1 >= maxSizeSpilled )) {
    +      maxSizeSpilled = currPartSize ;
    +      indexMaxSpilled = currPart;
    +    }
    +    // now find the largest non-spilled partition
    +    int maxSize = -1;
    +    int indexMax = -1;
    +    // Use the largest spilled (if found) as a base line, with a factor of 
4
    +    if ( indexMaxSpilled > -1 && maxSizeSpilled > 1 ) {
    +      indexMax = indexMaxSpilled;
    +      maxSize = 4 * maxSizeSpilled ;
    +    }
    +    for ( int insp = 0; insp < numPartitions; insp++) {
    +      if ( ! isSpilled(insp) && maxSize < batchHolders[insp].size() ) {
    +        indexMax = insp;
    +        maxSize = batchHolders[insp].size();
    +      }
    +    }
    +    // again - priority to the current partition
    +    if ( ! isSpilled(currPart) && (currPartSize + 1 >= maxSize) ) {
    +      return currPart;
    +    }
    +    if ( maxSize <= 1 ) { // Can not make progress by spilling a single 
batch!
    +      return -1; // try skipping this spill
    +    }
    +    return indexMax;
    +  }
    +
    +  /**
    +   * Iterate through the batches of the given partition, writing them to a 
file
    +   *
    +   * @param part The partition (number) to spill
    +   */
    +  private void spillAPartition(int part) {
    +
    +    ArrayList<BatchHolder> currPartition = batchHolders[part];
    +    rowsInPartition = 0;
    +    if ( EXTRA_DEBUG_SPILL ) {
    +      logger.debug("HashAggregate: Spilling partition {} current cycle {} 
part size {}", part, cycleNum, currPartition.size());
    +    }
    +
    +    if ( currPartition.size() == 0 ) { return; } // in case empty - 
nothing to spill
    +
    +    // If this is the first spill for this partition, create an output 
stream
    +    if ( ! isSpilled(part) ) {
    +
    +      spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ? 
Integer.toString(cycleNum) : null);
    +
    +      try {
    +        outputStream[part] = spillSet.openForOutput(spillFiles[part]);
    +      } catch (IOException e) {
    +        throw new DrillRuntimeException("Hash Aggregation failed to open 
spill file: " + spillFiles[part]);
    +      }
    +    }
    +
    +    for (int currOutBatchIndex = 0; currOutBatchIndex < 
currPartition.size(); currOutBatchIndex++ ) {
    +
    +      // get the number of records in the batch holder that are pending 
output
    +      int numPendingOutput = 
currPartition.get(currOutBatchIndex).getNumPendingOutput();
    +
    +      rowsInPartition += numPendingOutput;  // for logging
    +      rowsSpilled += numPendingOutput;
    +
    +      allocateOutgoing(numPendingOutput);
    +
    +      currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, 
outNumRecordsHolder);
    +      int numOutputRecords = outNumRecordsHolder.value;
    +
    +      this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, 
outStartIdxHolder.value, outNumRecordsHolder.value);
    +
    +      // set the value count for outgoing batch value vectors
    +      /* int i = 0; */
    +      for (VectorWrapper<?> v : outgoing) {
    +        v.getValueVector().getMutator().setValueCount(numOutputRecords);
    +        /*
    +        // print out the first row to be spilled ( varchar, varchar, 
bigint )
    +        try {
    +          if (i++ < 2) {
    +            NullableVarCharVector vv = ((NullableVarCharVector) 
v.getValueVector());
    +            logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
    +          } else {
    +            NullableBigIntVector vv = ((NullableBigIntVector) 
v.getValueVector());
    +            logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
    +          }
    +        } catch (Exception e) { logger.info("While printing the first row 
- Got an exception = {}",e); }
    +        */
    +      }
    +
    +      outContainer.setRecordCount(numPendingOutput);
    +      WritableBatch batch = 
WritableBatch.getBatchNoHVWrap(numPendingOutput, outContainer, false);
    +      VectorAccessibleSerializable outputBatch = new 
VectorAccessibleSerializable(batch, allocator);
    +      Stopwatch watch = Stopwatch.createStarted();
    +      try {
    +        outputBatch.writeToStream(outputStream[part]);
    +      } catch (IOException e) {
    +        throw new DrillRuntimeException("Hash Aggregation failed to write 
to output stream: " + outputStream[part].toString());
    +      }
    +      outContainer.zeroVectors();
    +      logger.trace("HASH AGG: Took {} us to spill {} records", 
watch.elapsed(TimeUnit.MICROSECONDS), numPendingOutput);
    +    }
    +
    +    spilledBatchesCount[part] += currPartition.size(); // update count of 
spilled batches
    +
    +    logger.trace("HASH AGG: Spilled {} rows from {} batches of partition 
{}", rowsInPartition, currPartition.size(), part);
    +  }
    +
    +  private void addBatchHolder(int part) {
    +
         BatchHolder bh = newBatchHolder();
    -    batchHolders.add(bh);
    +    batchHolders[part].add(bh);
     
         if (EXTRA_DEBUG_1) {
    -      logger.debug("HashAggregate: Added new batch; num batches = {}.", 
batchHolders.size());
    +      logger.debug("HashAggregate: Added new batch; num batches = {}.", 
batchHolders[part].size());
         }
     
         bh.setup();
       }
     
    -  // Overridden in the generated class when created as plain Java code.
    -
    +  // These methods are overridden in the generated class when created as 
plain Java code.
       protected BatchHolder newBatchHolder() {
         return new BatchHolder();
       }
    +  protected SpilledRecordbatch newSpilledRecordBatch(String arg1, int 
arg2, FragmentContext arg4, BatchSchema arg5, OperatorContext arg6) {
    +    return new SpilledRecordbatch(arg1, arg2, arg4, arg5, arg6);
    +  }
     
    +  /**
    +   * Output the next batch from partition "nextPartitionToReturn"
    +   *
    +   * @return iteration outcome (e.g., OK, NONE ...)
    +   */
       @Override
       public IterOutcome outputCurrentBatch() {
    -    if (outBatchIndex >= batchHolders.size()) {
    -      this.outcome = IterOutcome.NONE;
    -      return outcome;
    +
    +    // when incoming was an empty batch, just finish up
    +    if ( schema == null ) {
    +      logger.trace("Incoming was empty; output is an empty batch.");
    +      this.outcome = IterOutcome.NONE; // no records were read
    +      allFlushed = true;
    +      return this.outcome;
    --- End diff --
    
    This code was changed anyway, as part of eliminating RESTART.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to