DRILL-1253: Allocate new container for workspace vectors before exhausting the current container.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/90c4ef64 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/90c4ef64 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/90c4ef64 Branch: refs/heads/master Commit: 90c4ef6471376cffd5ad792f2a270d34b7c8c346 Parents: d07fee4 Author: Mehant Baid <[email protected]> Authored: Wed Aug 6 00:39:13 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Wed Aug 6 16:44:22 2014 -0700 ---------------------------------------------------------------------- .../exec/physical/impl/aggregate/HashAggTemplate.java | 14 ++++++++++++-- .../drill/exec/physical/impl/common/HashTable.java | 2 ++ .../exec/physical/impl/common/HashTableTemplate.java | 6 ++++++ 3 files changed, 20 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/90c4ef64/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index 0a44f3a..eca42c9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -120,6 +120,9 @@ public abstract class HashAggTemplate implements HashAggregator { private int maxOccupiedIdx = -1; private int batchOutputCount = 0; + private int capacity = Integer.MAX_VALUE; + private boolean allocatedNextBatch = false; + private BatchHolder() { aggrValuesContainer = new VectorContainer(); @@ -131,10 +134,10 @@ public abstract class HashAggTemplate implements HashAggregator { // Create a type-specific ValueVector for this value vector = TypeHelper.getNewVector(outputField, allocator) ; vector.allocateNew(); + capacity = Math.min(capacity, vector.getValueCapacity()); aggrValuesContainer.add(vector) ; } - } private boolean updateAggrValues(int incomingRowIdx, int idxWithinBatch) { @@ -545,6 +548,14 @@ public abstract class HashAggTemplate implements HashAggregator { BatchHolder bh = batchHolders.get( (currentIdx >>> 16) & HashTable.BATCH_MASK); int idxWithinBatch = currentIdx & HashTable.BATCH_MASK; + // Check if we have almost filled up the workspace vectors and add a batch if necessary + if ((idxWithinBatch == (bh.capacity - 1)) && (bh.allocatedNextBatch == false)) { + htable.addNewKeyBatch(); + addBatchHolder(); + bh.allocatedNextBatch = true; + } + + if (putStatus == HashTable.PutStatus.KEY_PRESENT) { if (EXTRA_DEBUG_2) logger.debug("Group-by key already present in hash table, updating the aggregate values"); @@ -567,7 +578,6 @@ public abstract class HashAggTemplate implements HashAggregator { numGroupedRecords++; return true; } - } logger.debug("HashAggr Put failed ! incomingRowIdx = {}, hash table size = {}.", incomingRowIdx, htable.size()); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/90c4ef64/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java index 375836a..aff9751 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java @@ -62,6 +62,8 @@ public interface HashTable { public void clear(); public boolean outputKeys(int batchIdx, VectorContainer outContainer, int outStartIndex, int numRecords); + + public void addNewKeyBatch(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/90c4ef64/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index 45b9852..e53ce05 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -706,6 +706,12 @@ public abstract class HashTableTemplate implements HashTable { return vector; } + public void addNewKeyBatch() { + int numberOfBatches = batchHolders.size(); + this.addBatchHolder(); + freeIndex = numberOfBatches * BATCH_SIZE; + } + // These methods will be code-generated in the context of the outer class protected abstract void doSetup(@Named("incomingBuild") RecordBatch incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe); protected abstract int getHashBuild(@Named("incomingRowIdx") int incomingRowIdx) ;
