Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/822#discussion_r119736678
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 ---
    @@ -266,17 +508,138 @@ public void setup(HashAggregate hashAggrConfig, 
HashTableConfig htConfig, Fragme
           }
         }
     
    -    ChainedHashTable ht =
    +    spillSet = new SpillSet(context,hashAggrConfig, 
UserBitShared.CoreOperatorType.HASH_AGGREGATE);
    +    baseHashTable =
             new ChainedHashTable(htConfig, context, allocator, incoming, null 
/* no incoming probe */, outgoing);
    -    this.htable = ht.createAndSetupHashTable(groupByOutFieldIds);
    -
    +    this.groupByOutFieldIds = groupByOutFieldIds; // retain these for 
delayedSetup, and to allow recreating hash tables (after a spill)
         numGroupByOutFields = groupByOutFieldIds.length;
    -    batchHolders = new ArrayList<BatchHolder>();
    -    // First BatchHolder is created when the first put request is received.
     
         doSetup(incoming);
       }
     
    +  /**
    +   *  Delayed setup are the parts from setup() that can only be set after 
actual data arrives in incoming
    +   *  This data is used to compute the number of partitions.
    +   */
    +  private void delayedSetup() {
    +
    +    // Set the number of partitions from the configuration (raise to a 
power of two, if needed)
    +    numPartitions = 
context.getConfig().getInt(ExecConstants.HASHAGG_NUM_PARTITIONS_KEY);
    +    if ( numPartitions == 1 ) {
    +      canSpill = false;
    +      logger.warn("Spilling was disabled");
    +    }
    +    while (Integer.bitCount(numPartitions) > 1) { // in case not a power 
of 2
    +      numPartitions++;
    +    }
    +    if ( schema == null ) { estMaxBatchSize = 0; } // incoming was an 
empty batch
    +    else {
    +      // Estimate the max batch size; should use actual data (e.g. lengths 
of varchars)
    +      updateEstMaxBatchSize(incoming);
    +    }
    +    long memAvail = memoryLimit - allocator.getAllocatedMemory();
    +    if ( !canSpill ) { // single phase, or spill disabled by configuation
    +      numPartitions = 1; // single phase should use only a single 
partition (to save memory)
    +    } else { // two phase
    +      // Adjust down the number of partitions if needed - when the memory 
available can not hold as
    +      // many batches (configurable option), plus overhead (e.g. hash 
table, links, hash values))
    +      while ( numPartitions * ( estMaxBatchSize * minBatchesPerPartition + 
8 * 1024 * 1024) > memAvail ) {
    +        numPartitions /= 2;
    +        if ( numPartitions < 2) {
    +          if ( is2ndPhase ) { canSpill = false; } // 2nd phase needs at 
least 2 to make progress
    +          break;
    +        }
    +      }
    +    }
    +    logger.trace("{} phase. Number of partitions chosen: {}. {} spill", 
isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",
    +        numPartitions, canSpill ? "Can" : "Cannot");
    +
    +    // The following initial safety check should be revisited once we can 
lower the number of rows in a batch
    +    // In cases of very tight memory -- need at least memory to process 
one batch, plus overhead (e.g. hash table)
    +    if ( numPartitions == 1 ) {
    +      // if too little memory - behave like the old code -- no memory 
limit for hash aggregate
    +      allocator.setLimit(10_000_000_000L);
    +    }
    +    // Based on the number of partitions: Set the mask and bit count
    +    partitionMask = numPartitions - 1; // e.g. 32 --> 0x1F
    +    bitsInMask = Integer.bitCount(partitionMask); // e.g. 0x1F -> 5
    +
    +    // Create arrays (one entry per partition)
    +    htables = new HashTable[numPartitions] ;
    +    batchHolders = (ArrayList<BatchHolder>[]) new 
ArrayList<?>[numPartitions] ;
    +    outBatchIndex = new int[numPartitions] ;
    +    outputStream = new OutputStream[numPartitions];
    +    spilledBatchesCount = new int[numPartitions];
    +    // spilledPaths = new Path[numPartitions];
    +    spillFiles = new String[numPartitions];
    +    spilledPartitionsList = new ArrayList<SpilledPartition>();
    +
    +    plannedBatches = numPartitions; // each partition should allocate its 
first batch
    +
    +    // initialize every (per partition) entry in the arrays
    +    for (int i = 0; i < numPartitions; i++ ) {
    +      try {
    +        this.htables[i] = 
baseHashTable.createAndSetupHashTable(groupByOutFieldIds, numPartitions);
    +        this.htables[i].setMaxVarcharSize(maxColumnWidth);
    +      } catch (IllegalStateException ise) {} // ignore
    +      catch (Exception e) { throw new DrillRuntimeException(e); }
    +      this.batchHolders[i] = new ArrayList<BatchHolder>(); // First 
BatchHolder is created when the first put request is received.
    +    }
    +  }
    +  /**
    +   * get new incoming: (when reading spilled files like an "incoming")
    +   * @return The (newly replaced) incoming
    +   */
    +  @Override
    +  public RecordBatch getNewIncoming() { return incoming; }
    +
    +  private void initializeSetup(RecordBatch newIncoming) throws 
SchemaChangeException, ClassTransformationException, IOException {
    +    baseHashTable.updateIncoming(newIncoming); // after a spill - a new 
incoming
    +    this.incoming = newIncoming;
    +    nextPartitionToReturn = 0;
    +    for (int i = 0; i < numPartitions; i++ ) {
    +      htables[i].reinit(newIncoming);
    +      if ( batchHolders[i] != null) {
    +        for (BatchHolder bh : batchHolders[i]) {
    +          bh.clear();
    +        }
    +        batchHolders[i].clear();
    +        batchHolders[i] = new ArrayList<BatchHolder>();
    +      }
    +      outBatchIndex[i] = 0;
    +      outputStream[i] = null;
    +      spilledBatchesCount[i] = 0;
    +      // spilledPaths[i] = null;
    +      spillFiles[i] = null;
    +    }
    +  }
    +
    +  /**
    +   *  Update the estimated max batch size to be used in the Hash Aggr Op.
    +   *  using the record batch size to get the row width.
    +   * @param incoming
    +   */
    +  private void updateEstMaxBatchSize(RecordBatch incoming) {
    +    if ( estMaxBatchSize > 0 ) { return; }  // no handling of a schema (or 
varchar) change
    +    RecordBatchSizer sizer = new RecordBatchSizer(incoming);
    +    logger.trace("Incoming sizer: {}",sizer);
    +    // An empty batch only has the schema, can not tell actual length of 
varchars
    +    // else use the actual varchars length, each capped at 50 (to match 
the space allocation)
    +    estRowWidth = sizer.rowCount() == 0 ? sizer.stdRowWidth() : 
sizer.netRowWidthCap50();
    +    estMaxBatchSize = estRowWidth * MAX_BATCH_SIZE;
    +
    +    // Get approx max (varchar) column width to get better memory 
allocation
    +    maxColumnWidth = Math.max(sizer.maxSize(), 
VARIABLE_MIN_WIDTH_VALUE_SIZE);
    +    maxColumnWidth = Math.min(maxColumnWidth, 
VARIABLE_MAX_WIDTH_VALUE_SIZE);
    +
    +    logger.trace("{} phase. Estimated row width: {}  batch size: {}  
memory limit: {}  max column width: {}",
    +        
isTwoPhase?(is2ndPhase?"2nd":"1st"):"Single",estRowWidth,estMaxBatchSize,memoryLimit,maxColumnWidth);
    +
    +    if ( estMaxBatchSize > memoryLimit ) {
    +      logger.warn("HashAggregate: Estimated max batch size {} is larger 
than the memory limit {}",estMaxBatchSize,memoryLimit);
    --- End diff --
    
    True; this check alerts only of the more extreme cases, where the memory 
available initially can not hold even a single batch (easier to explain in a 
warning message).
      However note that following the execution of this code, there is another 
check (in delayedSetup() ) which does add the overhead (as a hard coded 8MB) 
and gives another warning alert (when down to 1 partition; this was just added 
in a prior review comment; see above).



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to