[ 
https://issues.apache.org/jira/browse/DRILL-5457?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16027251#comment-16027251
 ] 

ASF GitHub Bot commented on DRILL-5457:
---------------------------------------

Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/822#discussion_r118814014
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
    @@ -400,114 +782,411 @@ public IterOutcome getOutcome() {
     
       @Override
       public int getOutputCount() {
    -    // return outputCount;
         return lastBatchOutputCount;
       }
     
       @Override
       public void cleanup() {
    -    if (htable != null) {
    -      htable.clear();
    -      htable = null;
    -    }
    +      if ( schema == null ) { return; } // not set up; nothing to clean
    +      for ( int i = 0; i < numPartitions; i++) {
    +          if (htables[i] != null) {
    +              htables[i].clear();
    +              htables[i] = null;
    +          }
    +          if ( batchHolders[i] != null) {
    +              for (BatchHolder bh : batchHolders[i]) {
    +                    bh.clear();
    +              }
    +              batchHolders[i].clear();
    +              batchHolders[i] = null;
    +          }
    +
    +          // delete any (still active) output spill file
    +          if ( outputStream[i] != null && spillFiles[i] != null) {
    +            try {
    +              spillSet.delete(spillFiles[i]);
    +            } catch(IOException e) {
    +              logger.warn("Cleanup: Failed to delete spill file 
{}",spillFiles[i]);
    +            }
    +          }
    +      }
    +      // delete any spill file left in unread spilled partitions
    +      while ( ! spilledPartitionsList.isEmpty() ) {
    +        SpilledPartition sp = spilledPartitionsList.remove(0);
    +        try {
    +          spillSet.delete(sp.spillFile);
    +        } catch(IOException e) {
    +          logger.warn("Cleanup: Failed to delete spill file 
{}",sp.spillFile);
    +        }
    +      }
    +      spillSet.close(); // delete the spill directory(ies)
         htIdxHolder = null;
         materializedValueFields = null;
         outStartIdxHolder = null;
         outNumRecordsHolder = null;
    +  }
     
    -    if (batchHolders != null) {
    -      for (BatchHolder bh : batchHolders) {
    +  // First free the memory used by the given (spilled) partition (i.e., 
hash table plus batches)
    +  // then reallocate them in pristine state to allow the partition to 
continue receiving rows
    +  private void reinitPartition(int part) throws SchemaChangeException, 
ClassTransformationException, IOException {
    +    assert htables[part] != null;
    +    htables[part].reset();
    +    if ( batchHolders[part] != null) {
    +      for (BatchHolder bh : batchHolders[part]) {
             bh.clear();
           }
    -      batchHolders.clear();
    -      batchHolders = null;
    +      batchHolders[part].clear();
         }
    +    batchHolders[part] = new ArrayList<BatchHolder>(); // First 
BatchHolder is created when the first put request is received.
       }
     
    -//  private final AggOutcome setOkAndReturn() {
    -//    this.outcome = IterOutcome.OK;
    -//    for (VectorWrapper<?> v : outgoing) {
    -//      v.getValueVector().getMutator().setValueCount(outputCount);
    -//    }
    -//    return AggOutcome.RETURN_OUTCOME;
    -//  }
     
       private final void incIndex() {
         underlyingIndex++;
         if (underlyingIndex >= incoming.getRecordCount()) {
           currentIndex = Integer.MAX_VALUE;
           return;
         }
    -    currentIndex = getVectorIndex(underlyingIndex);
    +    try { currentIndex = getVectorIndex(underlyingIndex); }
    +    catch (SchemaChangeException sc) { throw new 
DrillRuntimeException(sc);}
       }
     
       private final void resetIndex() {
         underlyingIndex = -1;
         incIndex();
       }
     
    -  private void addBatchHolder() {
    +  private boolean isSpilled(int part) {
    +    return outputStream[part] != null;
    +  }
    +  /**
    +   * Which partition to choose for flushing out (i.e. spill or return) ?
    +   * - The current partition (to which a new bach holder is added) has a 
priority,
    +   *   because its last batch holder is full.
    +   * - Also the largest prior spilled partition has some priority, as it 
is already spilled;
    +   *   but spilling too few rows (e.g. a single batch) gets us nothing.
    +   * - So the largest non-spilled partition has some priority, to get more 
memory freed.
    +   * Need to weigh the above three options.
    +   *
    +   *  @param currPart - The partition that hit the memory limit (gets a 
priority)
    +   *  @return The partition (number) chosen to be spilled
    +   */
    +  private int chooseAPartitionToFlush(int currPart) {
    +    if ( ! is2ndPhase ) { return currPart; } // 1st phase: just use the 
current partition
    +    int currPartSize = batchHolders[currPart].size();
    +    if ( currPartSize == 1 ) { currPartSize = -1; } // don't pick current 
if size is 1
    +    // first find the largest spilled partition
    +    int maxSizeSpilled = -1;
    +    int indexMaxSpilled = -1;
    +    for (int isp = 0; isp < numPartitions; isp++ ) {
    +      if ( isSpilled(isp) && maxSizeSpilled < batchHolders[isp].size() ) {
    +        maxSizeSpilled = batchHolders[isp].size();
    +        indexMaxSpilled = isp;
    +      }
    +    }
    +    // Give the current (if already spilled) some priority
    +    if ( isSpilled(currPart) && ( currPartSize + 1 >= maxSizeSpilled )) {
    +      maxSizeSpilled = currPartSize ;
    +      indexMaxSpilled = currPart;
    +    }
    +    // now find the largest non-spilled partition
    +    int maxSize = -1;
    +    int indexMax = -1;
    +    // Use the largest spilled (if found) as a base line, with a factor of 
4
    +    if ( indexMaxSpilled > -1 && maxSizeSpilled > 1 ) {
    +      indexMax = indexMaxSpilled;
    +      maxSize = 4 * maxSizeSpilled ;
    +    }
    +    for ( int insp = 0; insp < numPartitions; insp++) {
    +      if ( ! isSpilled(insp) && maxSize < batchHolders[insp].size() ) {
    +        indexMax = insp;
    +        maxSize = batchHolders[insp].size();
    +      }
    +    }
    +    // again - priority to the current partition
    +    if ( ! isSpilled(currPart) && (currPartSize + 1 >= maxSize) ) {
    +      return currPart;
    +    }
    +    if ( maxSize <= 1 ) { // Can not make progress by spilling a single 
batch!
    +      return -1; // try skipping this spill
    +    }
    +    return indexMax;
    +  }
    +
    +  /**
    +   * Iterate through the batches of the given partition, writing them to a 
file
    +   *
    +   * @param part The partition (number) to spill
    +   */
    +  private void spillAPartition(int part) {
    +
    +    ArrayList<BatchHolder> currPartition = batchHolders[part];
    +    rowsInPartition = 0;
    +    if ( EXTRA_DEBUG_SPILL ) {
    +      logger.debug("HashAggregate: Spilling partition {} current cycle {} 
part size {}", part, cycleNum, currPartition.size());
    +    }
    +
    +    if ( currPartition.size() == 0 ) { return; } // in case empty - 
nothing to spill
    +
    +    // If this is the first spill for this partition, create an output 
stream
    +    if ( ! isSpilled(part) ) {
    +
    +      spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ? 
Integer.toString(cycleNum) : null);
    +
    +      try {
    +        outputStream[part] = spillSet.openForOutput(spillFiles[part]);
    +      } catch (IOException e) {
    +        throw new DrillRuntimeException("Hash Aggregation failed to open 
spill file: " + spillFiles[part]);
    +      }
    +    }
    +
    +    for (int currOutBatchIndex = 0; currOutBatchIndex < 
currPartition.size(); currOutBatchIndex++ ) {
    +
    +      // get the number of records in the batch holder that are pending 
output
    +      int numPendingOutput = 
currPartition.get(currOutBatchIndex).getNumPendingOutput();
    +
    +      rowsInPartition += numPendingOutput;  // for logging
    +      rowsSpilled += numPendingOutput;
    +
    +      allocateOutgoing(numPendingOutput);
    +
    +      currPartition.get(currOutBatchIndex).outputValues(outStartIdxHolder, 
outNumRecordsHolder);
    +      int numOutputRecords = outNumRecordsHolder.value;
    +
    +      this.htables[part].outputKeys(currOutBatchIndex, this.outContainer, 
outStartIdxHolder.value, outNumRecordsHolder.value);
    +
    +      // set the value count for outgoing batch value vectors
    +      /* int i = 0; */
    +      for (VectorWrapper<?> v : outgoing) {
    +        v.getValueVector().getMutator().setValueCount(numOutputRecords);
    +        /*
    +        // print out the first row to be spilled ( varchar, varchar, 
bigint )
    +        try {
    +          if (i++ < 2) {
    +            NullableVarCharVector vv = ((NullableVarCharVector) 
v.getValueVector());
    +            logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
    +          } else {
    +            NullableBigIntVector vv = ((NullableBigIntVector) 
v.getValueVector());
    +            logger.info("FIRST ROW = {}", vv.getAccessor().get(0));
    +          }
    +        } catch (Exception e) { logger.info("While printing the first row 
- Got an exception = {}",e); }
    +        */
    +      }
    +
    +      outContainer.setRecordCount(numPendingOutput);
    +      WritableBatch batch = 
WritableBatch.getBatchNoHVWrap(numPendingOutput, outContainer, false);
    +      VectorAccessibleSerializable outputBatch = new 
VectorAccessibleSerializable(batch, allocator);
    +      Stopwatch watch = Stopwatch.createStarted();
    +      try {
    +        outputBatch.writeToStream(outputStream[part]);
    +      } catch (IOException e) {
    +        throw new DrillRuntimeException("Hash Aggregation failed to write 
to output stream: " + outputStream[part].toString());
    +      }
    +      outContainer.zeroVectors();
    +      logger.trace("HASH AGG: Took {} us to spill {} records", 
watch.elapsed(TimeUnit.MICROSECONDS), numPendingOutput);
    +    }
    +
    +    spilledBatchesCount[part] += currPartition.size(); // update count of 
spilled batches
    +
    +    logger.trace("HASH AGG: Spilled {} rows from {} batches of partition 
{}", rowsInPartition, currPartition.size(), part);
    +  }
    +
    +  private void addBatchHolder(int part) {
    +
         BatchHolder bh = newBatchHolder();
    -    batchHolders.add(bh);
    +    batchHolders[part].add(bh);
     
         if (EXTRA_DEBUG_1) {
    -      logger.debug("HashAggregate: Added new batch; num batches = {}.", 
batchHolders.size());
    +      logger.debug("HashAggregate: Added new batch; num batches = {}.", 
batchHolders[part].size());
         }
     
         bh.setup();
       }
     
    -  // Overridden in the generated class when created as plain Java code.
    -
    +  // These methods are overridden in the generated class when created as 
plain Java code.
       protected BatchHolder newBatchHolder() {
         return new BatchHolder();
       }
    +  protected SpilledRecordbatch newSpilledRecordBatch(String arg1, int 
arg2, FragmentContext arg4, BatchSchema arg5, OperatorContext arg6) {
    +    return new SpilledRecordbatch(arg1, arg2, arg4, arg5, arg6);
    +  }
     
    +  /**
    +   * Output the next batch from partition "nextPartitionToReturn"
    +   *
    +   * @return iteration outcome (e.g., OK, NONE ...)
    +   */
       @Override
       public IterOutcome outputCurrentBatch() {
    -    if (outBatchIndex >= batchHolders.size()) {
    -      this.outcome = IterOutcome.NONE;
    -      return outcome;
    +
    +    // when incoming was an empty batch, just finish up
    +    if ( schema == null ) {
    +      logger.trace("Incoming was empty; output is an empty batch.");
    +      this.outcome = IterOutcome.NONE; // no records were read
    +      allFlushed = true;
    +      return this.outcome;
         }
     
    -    // get the number of records in the batch holder that are pending 
output
    -    int numPendingOutput = 
batchHolders.get(outBatchIndex).getNumPendingOutput();
    +    // Initialization (covers the case of early output)
    +    ArrayList<BatchHolder> currPartition = batchHolders[earlyPartition];
    +    int currOutBatchIndex = outBatchIndex[earlyPartition];
    +    int partitionToReturn = earlyPartition;
    +
    +    if ( ! earlyOutput ) {
    +      // Update the next partition to return (if needed)
    +      // skip fully returned (or spilled) partitions
    +      while (nextPartitionToReturn < numPartitions) {
    +        //
    +        // If this partition was spilled - spill the rest of it and skip it
    +        //
    +        if ( isSpilled(nextPartitionToReturn) ) {
    +          spillAPartition(nextPartitionToReturn); // spill the rest
    +          SpilledPartition sp = new SpilledPartition();
    --- End diff --
    
    Can `SpilledPartition` take on some of the behavior that is inline here? 
Can it, for example, take a partition state instance and do the spilling? Can 
it handle the reading later?
    
    Not yet entirely clear how this works, so take this suggestion with a grain 
of salt...


> Support Spill to Disk for the Hash Aggregate Operator
> -----------------------------------------------------
>
>                 Key: DRILL-5457
>                 URL: https://issues.apache.org/jira/browse/DRILL-5457
>             Project: Apache Drill
>          Issue Type: Improvement
>          Components: Execution - Relational Operators
>    Affects Versions: 1.10.0
>            Reporter: Boaz Ben-Zvi
>            Assignee: Boaz Ben-Zvi
>             Fix For: 1.11.0
>
>
> Support gradual spilling memory to disk as the available memory gets too 
> small to allow in memory work for the Hash Aggregate Operator.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to