Repository: incubator-drill Updated Branches: refs/heads/master 79c1502c1 -> 33c28f624
DRILL-938: Set the links entry correctly when resizing when hash chains cross batch boundaries. Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/0c9d4ebb Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/0c9d4ebb Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/0c9d4ebb Branch: refs/heads/master Commit: 0c9d4ebbf5e0c2a5f9b3a554f4a051568efa46e7 Parents: 79c1502 Author: Aman Sinha <[email protected]> Authored: Fri Jun 20 00:06:07 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Wed Jun 25 09:08:14 2014 -0700 ---------------------------------------------------------------------- .../physical/impl/common/HashTableTemplate.java | 84 ++++++++++++-------- 1 file changed, 53 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0c9d4ebb/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index f7cadf1..099d2ee 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -33,6 +33,7 @@ import org.apache.drill.exec.compile.sig.RuntimeOverridden; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.expr.holders.IntHolder; import org.apache.drill.exec.record.RecordBatch; +import org.apache.drill.exec.vector.BigIntVector; import org.apache.drill.exec.vector.IntVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.allocator.VectorAllocator; @@ -41,8 +42,9 @@ public abstract class HashTableTemplate implements HashTable { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashTable.class); private static final boolean EXTRA_DEBUG = false; - + private static final int EMPTY_SLOT = -1; + // private final int MISSING_VALUE = 65544; // A hash 'bucket' consists of the start index to indicate start of a hash chain @@ -108,8 +110,12 @@ public abstract class HashTableTemplate implements HashTable { private int maxOccupiedIdx = -1; private int batchOutputCount = 0; + private int batchIndex = 0; + private BatchHolder(int idx) { + this.batchIndex = idx; + if (idx == 0) { // first batch holder can use the original htContainer htContainer = htContainerOrig; } else { // otherwise create a new one using the original's fields @@ -199,19 +205,19 @@ public abstract class HashTableTemplate implements HashTable { private void rehash(int numbuckets, IntVector newStartIndices, int batchStartIdx) { - if (EXTRA_DEBUG) logger.debug("Resizing and rehashing table..."); + logger.debug("Rehashing entries within the batch: {}; batchStartIdx = {}, total numBuckets in hash table = {}.", batchIndex, batchStartIdx, numbuckets); int size = links.getAccessor().getValueCount(); IntVector newLinks = allocMetadataVector(size, EMPTY_SLOT); IntVector newHashValues = allocMetadataVector(size, 0); - + for (int i = 0; i <= maxOccupiedIdx; i++) { int entryIdxWithinBatch = i; int entryIdx = entryIdxWithinBatch + batchStartIdx; int hash = hashValues.getAccessor().get(entryIdxWithinBatch); // get the already saved hash value int bucketIdx = getBucketIndex(hash, numbuckets); int newStartIdx = newStartIndices.getAccessor().get(bucketIdx); - + if (newStartIdx == EMPTY_SLOT) { // new bucket was empty newStartIndices.getMutator().setSafe(bucketIdx, entryIdx); // update the start index to point to entry newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT); @@ -220,12 +226,19 @@ public abstract class HashTableTemplate implements HashTable { if (EXTRA_DEBUG) logger.debug("New bucket was empty. bucketIdx = {}, newStartIndices[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, bucketIdx, newStartIndices.getAccessor().get(bucketIdx), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch)); } else { - // follow the new table's hash chain until we encounter empty slot + // follow the new table's hash chain until we encounter empty slot. Note that the hash chain could + // traverse multiple batch holders, so make sure we are accessing the right batch holder. int idx = newStartIdx; int idxWithinBatch = 0; + BatchHolder bh = this; while (true) { - idxWithinBatch = idx & BATCH_MASK; - if (newLinks.getAccessor().get(idxWithinBatch) == EMPTY_SLOT) { + if (idx != EMPTY_SLOT) { + idxWithinBatch = idx & BATCH_MASK; + int batchIdx = ((idx >>> 16) & BATCH_MASK); + bh = batchHolders.get(batchIdx); + } + + if (bh == this && newLinks.getAccessor().get(idxWithinBatch) == EMPTY_SLOT) { newLinks.getMutator().setSafe(idxWithinBatch, entryIdx); newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT); newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash); @@ -233,8 +246,20 @@ public abstract class HashTableTemplate implements HashTable { if (EXTRA_DEBUG) logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch)); break; + } else if (bh != this && bh.links.getAccessor().get(idxWithinBatch) == EMPTY_SLOT) { + bh.links.getMutator().setSafe(idxWithinBatch, entryIdx); // update the link in the other batch + newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT); // update the newLink entry in this batch to mark end of the hash chain + newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash); + + if (EXTRA_DEBUG) logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch)); + + break; + } + if (bh == this) { + idx = newLinks.getAccessor().get(idxWithinBatch); + } else { + idx = bh.links.getAccessor().get(idxWithinBatch); } - idx = newLinks.getAccessor().get(idxWithinBatch); } } @@ -251,14 +276,8 @@ public abstract class HashTableTemplate implements HashTable { private boolean outputKeys() { /** for debugging - Object tmp = (htContainer).getValueAccessorById(0, BigIntVector.class).getValueVector(); - BigIntVector vv0 = null; - BigIntHolder holder = null; - - if (tmp != null) { - vv0 = ((BigIntVector) tmp); - holder = new BigIntHolder(); - } + BigIntVector vv0 = getValueVector(0); + BigIntHolder holder = new BigIntHolder(); */ for (int i = 0; i <= maxOccupiedIdx; i++) { @@ -296,6 +315,17 @@ public abstract class HashTableTemplate implements HashTable { links.clear(); hashValues.clear(); } + + // Only used for internal debugging. Get the value vector at a particular index from the htContainer. + // By default this assumes the VV is a BigIntVector. + private ValueVector getValueVector(int index) { + Object tmp = (htContainer).getValueAccessorById(BigIntVector.class, index).getValueVector(); + if (tmp != null) { + BigIntVector vv0 = ((BigIntVector) tmp); + return vv0; + } + return null; + } // These methods will be code-generated @@ -414,6 +444,7 @@ public abstract class HashTableTemplate implements HashTable { public PutStatus put(int incomingRowIdx, IntHolder htIdxHolder) { int hash = getHashBuild(incomingRowIdx); + hash = Math.abs(hash); int i = getBucketIndex(hash, numBuckets()); int startIdx = startIndices.getAccessor().get(i); int currentIdx; @@ -510,12 +541,14 @@ public abstract class HashTableTemplate implements HashTable { @Override public int containsKey(int incomingRowIdx, boolean isProbe) { int hash = isProbe ? getHashProbe(incomingRowIdx) : getHashBuild(incomingRowIdx); + hash = Math.abs(hash); int i = getBucketIndex(hash, numBuckets()); int currentIdx = startIndices.getAccessor().get(i); - - if (currentIdx == EMPTY_SLOT) - return -1; + + if (currentIdx == EMPTY_SLOT) { + return -1; + } BatchHolder bh = batchHolders.get( (currentIdx >>> 16) & BATCH_MASK); currentIdxHolder.value = currentIdx; @@ -529,7 +562,7 @@ public abstract class HashTableTemplate implements HashTable { } else if (currentIdxHolder.value == EMPTY_SLOT) { break; } else { - bh = batchHolders.get( (currentIdxHolder.value >>> 16) & BATCH_MASK); + bh = batchHolders.get( (currentIdxHolder.value >>> 16) & BATCH_MASK); } } @@ -611,17 +644,6 @@ public abstract class HashTableTemplate implements HashTable { numResizing++; } - /* - public boolean outputKeys() { - for (BatchHolder bh : batchHolders) { - if ( ! bh.outputKeys()) { - return false; - } - } - return true; - } - */ - public boolean outputKeys(int batchIdx) { assert batchIdx < batchHolders.size(); if (! batchHolders.get(batchIdx).outputKeys()) {
