Support multiple output batches for hash aggr.

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/1726d734
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/1726d734
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/1726d734

Branch: refs/heads/master
Commit: 1726d734a8e7e90cdb12ad092c0b79eb6e4f3cb2
Parents: 3f21451
Author: Aman Sinha <[email protected]>
Authored: Thu May 15 16:41:24 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Thu Jun 5 09:36:38 2014 -0700

----------------------------------------------------------------------
 .../physical/impl/aggregate/HashAggBatch.java   |  24 +--
 .../impl/aggregate/HashAggTemplate.java         | 173 +++++++++++++++----
 .../physical/impl/aggregate/HashAggregator.java |   5 +
 .../exec/physical/impl/common/HashTable.java    |   2 +-
 .../physical/impl/common/HashTableTemplate.java |  16 ++
 .../apache/drill/exec/record/RecordBatch.java   |   4 +-
 6 files changed, 174 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1726d734/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
index d2800bd..4478938 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggBatch.java
@@ -18,18 +18,13 @@
 package org.apache.drill.exec.physical.impl.aggregate;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.ErrorCollector;
 import org.apache.drill.common.expression.ErrorCollectorImpl;
-import org.apache.drill.common.expression.ExpressionPosition;
-import org.apache.drill.common.expression.FunctionCall;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.logical.data.NamedExpression;
-import org.apache.drill.common.types.TypeProtos;
 import org.apache.drill.exec.compile.sig.GeneratorMapping;
 import org.apache.drill.exec.compile.sig.MappingSet;
 import org.apache.drill.exec.exception.ClassTransformationException;
@@ -39,11 +34,8 @@ import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.ClassGenerator.BlockType;
 import org.apache.drill.exec.expr.ClassGenerator.HoldingContainer;
 import org.apache.drill.exec.expr.ExpressionTreeMaterializer;
-import org.apache.drill.exec.expr.HoldingContainerExpression;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.expr.ValueVectorWriteExpression;
-import org.apache.drill.exec.expr.holders.IntHolder;
-import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.config.HashAggregate;
 import org.apache.drill.exec.record.AbstractRecordBatch;
@@ -51,17 +43,12 @@ import 
org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
-import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
 import org.apache.drill.exec.physical.impl.aggregate.HashAggregator.AggOutcome;
-import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
-import org.apache.drill.exec.physical.impl.common.HashTable;
-import org.apache.drill.exec.record.VectorWrapper;
 
-import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import com.sun.codemodel.JExpr;
 import com.sun.codemodel.JVar;
@@ -124,12 +111,16 @@ public class HashAggBatch extends 
AbstractRecordBatch<HashAggregate> {
         }
       }
 
-
       if (aggregator.allFlushed()) {
         return IterOutcome.NONE;
       }
 
-      logger.debug("Starting aggregator doWork; incoming record count = {} ", 
incoming.getRecordCount());
+    if (aggregator.buildComplete() && ! aggregator.allFlushed()) {
+      // aggregation is complete and not all records have been output yet
+      return aggregator.outputCurrentBatch();    
+    }
+
+    logger.debug("Starting aggregator doWork; incoming record count = {} ", 
incoming.getRecordCount());   
 
       while(true){
         AggOutcome out = aggregator.doWork();
@@ -284,6 +275,9 @@ public class HashAggBatch extends 
AbstractRecordBatch<HashAggregate> {
 
   @Override
   public void cleanup() {
+    if (aggregator != null) {
+      aggregator.cleanup();
+    }
     super.cleanup();
     incoming.cleanup();
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1726d734/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 039445b..b65acb0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -73,6 +73,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
   private IterOutcome outcome;
   private int outputCount = 0;
   private int numGroupedRecords = 0;
+  private int outBatchIndex = 0;
   private RecordBatch incoming;
   private BatchSchema schema;
   private RecordBatch outgoing;
@@ -91,11 +92,13 @@ public abstract class HashAggTemplate implements 
HashAggregator {
 
   private MaterializedField[] materializedValueFields;
   private boolean allFlushed = false;
+  private boolean  buildComplete = false;
 
   public class BatchHolder {
 
     private VectorContainer aggrValuesContainer; // container for aggr values 
(workspace variables)
     int maxOccupiedIdx = -1;
+    int batchOutputCount = 0;
 
     private BatchHolder() {
 
@@ -120,15 +123,15 @@ public abstract class HashAggTemplate implements 
HashAggregator {
       return true;
     }
 
-    private void setup(int idx) {
+    private void setup() {
       setupInterior(incoming, outgoing, aggrValuesContainer);
     }
 
     private boolean outputValues() {
       for (int i = 0; i <= maxOccupiedIdx; i++) {
-        if (outputRecordValues(i, outputCount) ) {
-          if (EXTRA_DEBUG_2) logger.debug("Outputting values to {}", 
outputCount) ;
-          outputCount++;
+        if (outputRecordValues(i, batchOutputCount) ) {
+          if (EXTRA_DEBUG_2) logger.debug("Outputting values to batch index: 
{} output index: {}", batchOutputCount) ;
+          batchOutputCount++;
         } else {
           return false;
         }
@@ -139,7 +142,15 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     private void clear() {
       aggrValuesContainer.clear();
     }
+    
+    private int getNumGroups() {
+      return maxOccupiedIdx + 1;
+    }
 
+    private int getOutputCount() {
+      return batchOutputCount;
+    }
+    
     // Code-generated methods (implemented in HashAggBatch)
 
     @RuntimeOverridden
@@ -260,16 +271,29 @@ public abstract class HashAggTemplate implements 
HashAggregator {
               }
 
             case NONE:
-              outcome = out;
-              outputKeysAndValues() ;
-
-              // cleanup my internal state since there is nothing more to 
return
-              this.cleanup();
+              // outcome = out;
+              
+              buildComplete = true;
+              
+              // outputKeysAndValues() ;
+
+              // output the first batch; remaining batches will be output 
+              // in response to each next() call by a downstream operator
+              
+              // outputKeysAndValues(outBatchIndex);
+              outputCurrentBatch();
+
+              //if (isLastBatchOutput()) {
+                // cleanup my internal state since there is nothing more to 
return
+              //  this.cleanup();
+              // }
+              
               // cleanup incoming batch since output of aggregation does not 
need
               // any references to the incoming
 
               incoming.cleanup();
-              return setOkAndReturn();
+              // return setOkAndReturn();
+              return AggOutcome.RETURN_OUTCOME;
 
             case STOP:
             default:
@@ -286,24 +310,19 @@ public abstract class HashAggTemplate implements 
HashAggregator {
       if(first) first = !first;
     }
   }
-
-  private void allocateOutgoing() {
-
-    // At present, since we output all records at once, we create the outgoing 
batch
-    // with a size of numGroupedRecords..however this has to be restricted to 
max of 64K right
-    // now otherwise downstream operators will break.
-    // TODO: allow outputting arbitrarily large number of records in batches
-    assert (numGroupedRecords < Character.MAX_VALUE);
+  
+  private void allocateOutgoing(int numOutputRecords) {
 
     for (VectorAllocator a : keyAllocators) {
-      if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} 
records.", a, numGroupedRecords);
-      a.alloc(numGroupedRecords);
+      if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} 
records.", a, numOutputRecords);
+      a.alloc(numOutputRecords);
     }
 
     for (VectorAllocator a : valueAllocators) {
-      if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} 
records.", a, numGroupedRecords);
-      a.alloc(numGroupedRecords);
+      if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} 
records.", a, numOutputRecords);
+      a.alloc(numOutputRecords);
     }
+    
   }
 
   @Override
@@ -314,20 +333,25 @@ public abstract class HashAggTemplate implements 
HashAggregator {
   @Override
   public int getOutputCount() {
     return outputCount;
+    // return batchHolders.get(outBatchIndex).getOutputCount();
   }
 
   @Override
   public void cleanup(){
-    htable.clear();
-    htable = null;
+    if (htable != null) {
+      htable.clear();
+      htable = null;
+    }
     htIdxHolder = null;
     materializedValueFields = null;
 
-    for (BatchHolder bh : batchHolders) {
-      bh.clear();
+    if (batchHolders != null) {
+      for (BatchHolder bh : batchHolders) {
+        bh.clear();
+      }
+      batchHolders.clear();
+      batchHolders = null;
     }
-    batchHolders.clear();
-    batchHolders = null;
   }
 
   private AggOutcome tooBigFailure(){
@@ -368,29 +392,114 @@ public abstract class HashAggTemplate implements 
HashAggregator {
 
     if (EXTRA_DEBUG_1) logger.debug("HashAggregate: Added new batch; num 
batches = {}.", batchHolders.size());
 
-    int batchIdx = batchHolders.size() - 1;
-    bh.setup(batchIdx);
+    bh.setup();
   }
 
+  /*
   private boolean outputKeysAndValues() {
 
     allocateOutgoing();
 
-    this.htable.outputKeys();
+    int batchIdx = 0;
+    for (BatchHolder bh : batchHolders) {
+      if (! this.htable.outputKeys(batchIdx++)) {
+        return false;
+      }
+    }
 
     for (BatchHolder bh : batchHolders) {
       if (! bh.outputValues() ) {
         return false;
       }
     }
-
+    
     allFlushed = true ;
     return true;
   }
+*/
+  
+  // output the keys and values for a particular batch holder
+  private boolean outputKeysAndValues(int batchIdx) {
+    
+    allocateOutgoing(batchIdx);
+    
+    if (! this.htable.outputKeys(batchIdx)) {
+      return false;
+    }
+    if (! batchHolders.get(batchIdx).outputValues()) {
+      return false;
+    }
+
+    outBatchIndex = batchIdx+1;
+    
+    if (outBatchIndex == batchHolders.size()) {
+      allFlushed = true;
+    }
+    
+    return true;
+  }
+  
+  public IterOutcome outputCurrentBatch() {
+    if (outBatchIndex >= batchHolders.size()) {
+      this.outcome = IterOutcome.NONE;
+      return outcome;
+    }
 
+    // get the number of groups in the batch holder corresponding to this 
batch index
+    int batchOutputRecords = batchHolders.get(outBatchIndex).getNumGroups();
+    
+    if (batchOutputRecords == 0) {
+      this.outcome = IterOutcome.NONE;
+      return outcome;
+    }
+    
+    allocateOutgoing(batchOutputRecords);
+    
+    if (this.htable.outputKeys(outBatchIndex) 
+        && batchHolders.get(outBatchIndex).outputValues()) {
+      
+      // set the value count for outgoing batch value vectors
+      for(VectorWrapper<?> v : outgoing) {
+        v.getValueVector().getMutator().setValueCount(batchOutputRecords);
+      }
+      
+      outputCount += batchOutputRecords;
+      
+      if(first){
+        this.outcome = IterOutcome.OK_NEW_SCHEMA;
+      }else{
+        this.outcome = IterOutcome.OK;
+      }
+      
+      logger.debug("HashAggregate: Output current batch index {} with {} 
records.", outBatchIndex, batchOutputRecords);
+      
+      outBatchIndex++;
+      if (outBatchIndex == batchHolders.size()) {
+        allFlushed = true;
+        
+        logger.debug("HashAggregate: All batches flushed.");
+        
+        // cleanup my internal state since there is nothing more to return
+        this.cleanup();
+      }
+    } else {
+      this.outcome = IterOutcome.STOP;
+    }
+    
+    return this.outcome;
+  }
+  
   public boolean allFlushed() {
     return allFlushed;
   }
+  
+  public boolean buildComplete() {
+    return buildComplete;
+  }
+  
+  public int numGroupedRecords() {
+    return numGroupedRecords;
+  }
 
   // Check if a group is present in the hash table; if not, insert it in the 
hash table.
   // The htIdxHolder contains the index of the group in the hash table 
container; this same

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1726d734/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
index 9032f2a..9e6cdb9 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggregator.java
@@ -57,4 +57,9 @@ public interface HashAggregator {
   public abstract void cleanup();
 
   public abstract boolean allFlushed();
+  
+  public abstract boolean buildComplete();
+  
+  public abstract IterOutcome outputCurrentBatch();
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1726d734/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
index e5959f2..46cb47d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java
@@ -58,7 +58,7 @@ public interface HashTable {
 
   public void clear();
 
-  public boolean outputKeys();
+  public boolean outputKeys(int batchIdx);
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1726d734/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
index 3a8e609..f2844ac 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java
@@ -157,6 +157,12 @@ public abstract class HashTableTemplate implements 
HashTable {
       int currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK;
       boolean match = false;
 
+      if (currentIdxWithinBatch >= HashTable.BATCH_SIZE) {
+        logger.debug("Batch size = {}, incomingRowIdx = {}, 
currentIdxWithinBatch = {}.", HashTable.BATCH_SIZE, incomingRowIdx, 
currentIdxWithinBatch);
+      }
+      assert (currentIdxWithinBatch < HashTable.BATCH_SIZE);
+      assert (incomingRowIdx < HashTable.BATCH_SIZE);
+      
       if (isProbe)
         match = isKeyMatchInternalProbe(incomingRowIdx, currentIdxWithinBatch);
       else
@@ -599,6 +605,7 @@ public abstract class HashTableTemplate implements 
HashTable {
     }
   }
 
+  /* 
   public boolean outputKeys() {
     for (BatchHolder bh : batchHolders) {
       if ( ! bh.outputKeys()) {
@@ -607,7 +614,16 @@ public abstract class HashTableTemplate implements 
HashTable {
     }
     return true;
   }
+  */
 
+  public boolean outputKeys(int batchIdx) {
+    assert batchIdx < batchHolders.size();
+    if (! batchHolders.get(batchIdx).outputKeys()) {
+      return false;
+    }
+    return true;
+  }
+  
   private IntVector allocMetadataVector(int size, int initialValue) {
     IntVector vector = (IntVector) TypeHelper.getNewVector(dummyIntField, 
allocator);
     vector.allocateNew(size);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1726d734/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
index 60fdd4d..662deb6 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatch.java
@@ -33,8 +33,8 @@ import org.apache.drill.exec.vector.ValueVector;
  */
 public interface RecordBatch extends VectorAccessible {
 
-  /* max batch size, limited by 2-byte-lentgh in SV2 : 65535 = 2^16 -1 */
-  public static final int MAX_BATCH_SIZE = 65535;
+  /* max batch size, limited by 2-byte-lentgh in SV2 : 65536 = 2^16 */
+  public static final int MAX_BATCH_SIZE = 65536;
 
   /**
    * Describes the outcome of a RecordBatch being incremented forward.

Reply via email to