DRILL-1253: Allocate new container for workspace vectors before exhausting the 
current container.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/90c4ef64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/90c4ef64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/90c4ef64

Branch: refs/heads/master
Commit: 90c4ef6471376cffd5ad792f2a270d34b7c8c346
Parents: d07fee4
Author: Mehant Baid <[email protected]>
Authored: Wed Aug 6 00:39:13 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Wed Aug 6 16:44:22 2014 -0700

----------------------------------------------------------------------
 .../exec/physical/impl/aggregate/HashAggTemplate.java | 14 ++++++++++++--
 .../drill/exec/physical/impl/common/HashTable.java    |  2 ++
 .../exec/physical/impl/common/HashTableTemplate.java  |  6 ++++++
 3 files changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/90c4ef64/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 0a44f3a..eca42c9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -120,6 +120,9 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     private int maxOccupiedIdx = -1;
     private int batchOutputCount = 0;
 
+    private int capacity = Integer.MAX_VALUE;
+    private boolean allocatedNextBatch = false;
+
     private BatchHolder() {
 
       aggrValuesContainer = new VectorContainer();
@@ -131,10 +134,10 @@ public abstract class HashAggTemplate implements 
HashAggregator {
         // Create a type-specific ValueVector for this value
         vector = TypeHelper.getNewVector(outputField, allocator) ;
         vector.allocateNew();
+        capacity = Math.min(capacity, vector.getValueCapacity());
 
         aggrValuesContainer.add(vector) ;
       }
-
     }
 
     private boolean updateAggrValues(int incomingRowIdx, int idxWithinBatch) {
@@ -545,6 +548,14 @@ public abstract class HashAggTemplate implements 
HashAggregator {
       BatchHolder bh = batchHolders.get( (currentIdx >>> 16) & 
HashTable.BATCH_MASK);
       int idxWithinBatch = currentIdx & HashTable.BATCH_MASK;
 
+      // Check if we have almost filled up the workspace vectors and add a 
batch if necessary
+      if ((idxWithinBatch ==  (bh.capacity - 1)) && (bh.allocatedNextBatch == 
false)) {
+        htable.addNewKeyBatch();
+        addBatchHolder();
+        bh.allocatedNextBatch = true;
+      }
+
+
       if (putStatus == HashTable.PutStatus.KEY_PRESENT) {
         if (EXTRA_DEBUG_2) logger.debug("Group-by key already present in hash 
table, updating the aggregate values");
 
@@ -567,7 +578,6 @@ public abstract class HashAggTemplate implements 
HashAggregator {
         numGroupedRecords++;
         return true;
       }
-
     }
 
     logger.debug("HashAggr Put failed ! incomingRowIdx = {}, hash table size = 
{}.", incomingRowIdx, htable.size());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/90c4ef64/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index 375836a..aff9751 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -62,6 +62,8 @@ public interface HashTable {
   public void clear();
 
   public boolean outputKeys(int batchIdx, VectorContainer outContainer, int 
outStartIndex, int numRecords);
+
+  public void addNewKeyBatch();
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/90c4ef64/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 45b9852..e53ce05 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -706,6 +706,12 @@ public abstract class HashTableTemplate implements 
HashTable {
     return vector;
   }
 
+  public void addNewKeyBatch() {
+    int numberOfBatches = batchHolders.size();
+    this.addBatchHolder();
+    freeIndex = numberOfBatches * BATCH_SIZE;
+  }
+
   // These methods will be code-generated in the context of the outer class
   protected abstract void doSetup(@Named("incomingBuild") RecordBatch 
incomingBuild, @Named("incomingProbe") RecordBatch incomingProbe);
   protected abstract int getHashBuild(@Named("incomingRowIdx") int 
incomingRowIdx) ;

Reply via email to