Github user Ben-Zvi commented on a diff in the pull request:

    https://github.com/apache/drill/pull/822#discussion_r117801284
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
 ---
    @@ -544,86 +572,69 @@ private static int roundUpToPowerOf2(int number) {
         return rounded;
       }
     
    -  @Override
    -  public void put(int incomingRowIdx, IndexPointer htIdxHolder, int 
retryCount) {
    -    put(incomingRowIdx, htIdxHolder);
    +  public int getHashCode(int incomingRowIdx) throws SchemaChangeException {
    +    return getHashBuild(incomingRowIdx);
       }
     
    -  private PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder) {
    +  /** put() uses the hash code (from gethashCode() above) to insert the 
key(s) from the incoming
    +   * row into the hash table. The code selects the bucket in the 
startIndices, then the keys are
    +   * placed into the chained list - by storing the key values into a 
batch, and updating its
    +   * "links" member. Last it modifies the index holder to the batch offset 
so that the caller
    +   * can store the remaining parts of the row into a matching batch 
(outside the hash table).
    +   * Returning
    +   *
    +   * @param incomingRowIdx - position of the incoming row
    +   * @param htIdxHolder - to return batch + batch-offset (for caller to 
manage a matching batch)
    +   * @param hashCode - computed over the key(s) by calling getHashCode()
    +   * @return Status - the key(s) was ADDED or was already PRESENT
    +   */
    +  @Override
    +  public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int 
hashCode) throws SchemaChangeException {
     
    -    int hash = getHashBuild(incomingRowIdx);
    -    int i = getBucketIndex(hash, numBuckets());
    -    int startIdx = startIndices.getAccessor().get(i);
    +    int bucketIndex = getBucketIndex(hashCode, numBuckets());
    +    int startIdx = startIndices.getAccessor().get(bucketIndex);
         int currentIdx;
    -    int currentIdxWithinBatch;
    -    BatchHolder bh;
         BatchHolder lastEntryBatch = null;
         int lastEntryIdxWithinBatch = EMPTY_SLOT;
     
    +    // if startIdx is non-empty, follow the hash chain links until we find 
a matching
    +    // key or reach the end of the chain (and remember the last link there)
    +    for ( currentIdxHolder.value = startIdx;
    +          currentIdxHolder.value != EMPTY_SLOT;
    +          /* isKeyMatch() below also advances the currentIdxHolder to the 
next link */) {
     
    -    if (startIdx == EMPTY_SLOT) {
    -      // this is the first entry in this bucket; find the first available 
slot in the
    -      // container of keys and values
    -      currentIdx = freeIndex++;
    -      addBatchIfNeeded(currentIdx);
    +      // remember the current link, which would be the last when the next 
link is empty
    +      lastEntryBatch = batchHolders.get((currentIdxHolder.value >>> 16) & 
HashTable.BATCH_MASK);
    +      lastEntryIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
     
    -      if (EXTRA_DEBUG) {
    -        logger.debug("Empty bucket index = {}. incomingRowIdx = {}; 
inserting new entry at currentIdx = {}.", i,
    -            incomingRowIdx, currentIdx);
    +      if (lastEntryBatch.isKeyMatch(incomingRowIdx, currentIdxHolder, 
false)) {
    +        htIdxHolder.value = currentIdxHolder.value;
    +        return PutStatus.KEY_PRESENT;
           }
    -
    -      insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, 
lastEntryIdxWithinBatch);
    -      // update the start index array
    -      startIndices.getMutator().setSafe(getBucketIndex(hash, 
numBuckets()), currentIdx);
    -      htIdxHolder.value = currentIdx;
    -      return PutStatus.KEY_ADDED;
         }
     
    -    currentIdx = startIdx;
    -    boolean found = false;
    -
    -    bh = batchHolders.get((currentIdx >>> 16) & BATCH_MASK);
    -    currentIdxHolder.value = currentIdx;
    -
    -    // if startIdx is non-empty, follow the hash chain links until we find 
a matching
    -    // key or reach the end of the chain
    -    while (true) {
    -      currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
    +    // no match was found, so insert a new entry
    +    currentIdx = freeIndex++;
    +    boolean addedBatch = addBatchIfNeeded(currentIdx);
     
    -      if (bh.isKeyMatch(incomingRowIdx, currentIdxHolder, false)) {
    -        htIdxHolder.value = currentIdxHolder.value;
    -        found = true;
    -        break;
    -      } else if (currentIdxHolder.value == EMPTY_SLOT) {
    -        lastEntryBatch = bh;
    -        lastEntryIdxWithinBatch = currentIdxWithinBatch;
    -        break;
    -      } else {
    -        bh = batchHolders.get((currentIdxHolder.value >>> 16) & 
HashTable.BATCH_MASK);
    -        lastEntryBatch = bh;
    -      }
    +    if (EXTRA_DEBUG) {
    +      logger.debug("No match was found for incomingRowIdx = {}; inserting 
new entry at currentIdx = {}.", incomingRowIdx, currentIdx);
         }
     
    -    if (!found) {
    -      // no match was found, so insert a new entry
    -      currentIdx = freeIndex++;
    -      addBatchIfNeeded(currentIdx);
    +    insertEntry(incomingRowIdx, currentIdx, hashCode, lastEntryBatch, 
lastEntryIdxWithinBatch);
     
    -      if (EXTRA_DEBUG) {
    -        logger.debug("No match was found for incomingRowIdx = {}; 
inserting new entry at currentIdx = {}.", incomingRowIdx, currentIdx);
    -      }
    -
    -      insertEntry(incomingRowIdx, currentIdx, hash, lastEntryBatch, 
lastEntryIdxWithinBatch);
    -      htIdxHolder.value = currentIdx;
    -      return PutStatus.KEY_ADDED;
    +    // if there was no hash chain at this bucket, need to update the start 
index array
    +    if (startIdx == EMPTY_SLOT) {
    +      startIndices.getMutator().setSafe(getBucketIndex(hashCode, 
numBuckets()), currentIdx);
         }
    -
    -    return found ? PutStatus.KEY_PRESENT : PutStatus.KEY_ADDED;
    +    htIdxHolder.value = currentIdx;
    +    return  addedBatch ? PutStatus.NEW_BATCH_ADDED :
    +        ( freeIndex + 1 > batchHolders.size() * BATCH_SIZE ) ?
    --- End diff --
    
    The former: Last key in a batch. This information is needed at the Hash 
Aggr to initiate a check for memory pressure (could calculate this information, 
but looks cleaner/simpler to get that as a special status from the hash table). 
Hopefully we'd never have to deal with batches of size 1 (row), as it would 
conflict - both "last row" and "new batch" at the same insertion. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to