DRILL-1068: Reduce memory consumption of various operators.  Add more memory 
stats to query profile.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c3eea13c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c3eea13c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c3eea13c

Branch: refs/heads/master
Commit: c3eea13ce4c45e6bcd0b6863571e1561a9fa1931
Parents: 0dfeac8
Author: Jacques Nadeau <[email protected]>
Authored: Sat Jun 21 13:01:43 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Wed Jun 25 09:09:39 2014 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/ops/FragmentStats.java    |  6 +-
 .../apache/drill/exec/ops/OperatorContext.java  |  2 +-
 .../apache/drill/exec/ops/OperatorStats.java    | 16 +++-
 .../org/apache/drill/exec/ops/SenderStats.java  | 11 +--
 .../drill/exec/physical/impl/BaseRootExec.java  |  4 +-
 .../exec/physical/impl/TopN/TopNBatch.java      |  6 +-
 .../impl/aggregate/HashAggTemplate.java         | 66 ++++++++--------
 .../impl/aggregate/StreamingAggBatch.java       |  5 +-
 .../impl/aggregate/StreamingAggTemplate.java    | 14 ++--
 .../impl/aggregate/StreamingAggregator.java     |  5 +-
 .../physical/impl/filter/FilterRecordBatch.java |  7 +-
 .../exec/physical/impl/join/HashJoinBatch.java  | 17 ++--
 .../impl/join/HashJoinProbeTemplate.java        | 82 ++++++++++++--------
 .../impl/mergereceiver/MergingRecordBatch.java  | 28 +++----
 .../PartitionSenderRootExec.java                |  4 +-
 .../partitionsender/PartitionerTemplate.java    |  7 +-
 .../exec/physical/impl/svremover/Copier.java    |  2 +-
 .../impl/svremover/CopierTemplate2.java         |  9 +--
 .../impl/svremover/CopierTemplate4.java         |  7 +-
 .../impl/svremover/RemovingRecordBatch.java     | 17 +---
 .../planner/fragment/SimpleParallelizer.java    |  3 +-
 .../drill/exec/record/VectorContainer.java      | 21 ++++-
 .../exec/vector/allocator/VectorAllocator.java  | 29 +++----
 23 files changed, 184 insertions(+), 184 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
index 19ac0aa..22872f9 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentStats.java
@@ -19,7 +19,7 @@ package org.apache.drill.exec.ops;
 
 import java.util.List;
 
-import org.apache.drill.exec.proto.BitControl.FragmentStatus;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.UserBitShared.MinorFragmentProfile;
 
 import com.codahale.metrics.MetricRegistry;
@@ -46,8 +46,8 @@ public class FragmentStats {
     }
   }
 
-  public OperatorStats getOperatorStats(OpProfileDef profileDef){
-    OperatorStats stats = new OperatorStats(profileDef);
+  public OperatorStats getOperatorStats(OpProfileDef profileDef, 
BufferAllocator allocator){
+    OperatorStats stats = new OperatorStats(profileDef, allocator);
     if(profileDef.operatorType != -1){
       operators.add(stats);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index d62ea2f..2d46733 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -37,7 +37,7 @@ public class OperatorContext implements Closeable {
     this.popConfig = popConfig;
 
     OpProfileDef def = new OpProfileDef(popConfig.getOperatorId(), 
popConfig.getOperatorType(), getChildCount(popConfig));
-    this.stats = context.getStats().getOperatorStats(def);
+    this.stats = context.getStats().getOperatorStats(def, allocator);
   }
 
   public OperatorContext(PhysicalOperator popConfig, FragmentContext context, 
OperatorStats stats) throws OutOfMemoryException {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
index dcb73c8..bd8d899 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorStats.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.ops;
 
-import org.apache.commons.collections.Buffer;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.proto.UserBitShared.MetricValue;
 import org.apache.drill.exec.proto.UserBitShared.OperatorProfile;
 import org.apache.drill.exec.proto.UserBitShared.StreamProfile;
@@ -30,6 +30,7 @@ public class OperatorStats {
 
   protected final int operatorId;
   protected final int operatorType;
+  private final BufferAllocator allocator;
 
   private IntLongOpenHashMap longMetrics = new IntLongOpenHashMap();
   private IntDoubleOpenHashMap doubleMetrics = new IntDoubleOpenHashMap();
@@ -53,12 +54,13 @@ public class OperatorStats {
 
   private long schemas;
 
-  public OperatorStats(OpProfileDef def){
-    this(def.getOperatorId(), def.getOperatorType(), def.getIncomingCount());
+  public OperatorStats(OpProfileDef def, BufferAllocator allocator){
+    this(def.getOperatorId(), def.getOperatorType(), def.getIncomingCount(), 
allocator);
   }
 
-  private OperatorStats(int operatorId, int operatorType, int inputCount) {
+  private OperatorStats(int operatorId, int operatorType, int inputCount, 
BufferAllocator allocator) {
     super();
+    this.allocator = allocator;
     this.operatorId = operatorId;
     this.operatorType = operatorType;
     this.recordsReceivedByInput = new long[inputCount];
@@ -126,6 +128,12 @@ public class OperatorStats {
         .setProcessNanos(processingNanos)
         .setWaitNanos(waitNanos);
 
+    if(allocator != null){
+      b.setLocalMemoryAllocated(allocator.getAllocatedMemory());
+    }
+
+
+
     addAllMetrics(b);
 
     return b.build();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java
index c766632..5167edb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/SenderStats.java
@@ -17,21 +17,22 @@
  */
 package org.apache.drill.exec.ops;
 
+import java.util.List;
+
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
-import org.apache.drill.exec.physical.impl.partitionsender.PartitionStatsBatch;
 import 
org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderStats;
+import org.apache.drill.exec.physical.impl.partitionsender.PartitionStatsBatch;
 import org.apache.drill.exec.proto.UserBitShared;
 
-import java.util.List;
-
 public class SenderStats extends OperatorStats {
 
   long minReceiverRecordCount = 0;
   long maxReceiverRecordCount = 0;
   int nSenders = 0;
 
-  public SenderStats(PhysicalOperator operator) {
-    super(new OpProfileDef(operator.getOperatorId(), 
operator.getOperatorType(), OperatorContext.getChildCount(operator)));
+  public SenderStats(PhysicalOperator operator, BufferAllocator allocator) {
+    super(new OpProfileDef(operator.getOperatorId(), 
operator.getOperatorType(), OperatorContext.getChildCount(operator)), 
allocator);
   }
 
   public void updatePartitionStats(List<? extends PartitionStatsBatch> 
outgoing) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
index 452052b..d56da51 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/BaseRootExec.java
@@ -32,9 +32,9 @@ public abstract class BaseRootExec implements RootExec {
   protected OperatorContext oContext = null;
 
   public BaseRootExec(FragmentContext context, PhysicalOperator config) throws 
OutOfMemoryException {
-    this.stats = new SenderStats(config);
-    context.getStats().addOperatorStats(this.stats);
     this.oContext = new OperatorContext(config, context, stats);
+    this.stats = new SenderStats(config, oContext.getAllocator());
+    context.getStats().addOperatorStats(this.stats);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
index 4a5d368..846d419 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/TopN/TopNBatch.java
@@ -219,14 +219,12 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     if (copier == null) {
       copier = RemovingRecordBatch.getGenerated4Copier(batch, context, 
oContext.getAllocator(),  newContainer, newBatch);
     } else {
-      List<VectorAllocator> allocators = Lists.newArrayList();
       for(VectorWrapper<?> i : batch){
 
         ValueVector v = TypeHelper.getNewVector(i.getField(), 
oContext.getAllocator());
         newContainer.add(v);
-        allocators.add(RemovingRecordBatch.getAllocator4(v));
       }
-      copier.setupRemover(context, batch, newBatch, allocators.toArray(new 
VectorAllocator[allocators.size()]));
+      copier.setupRemover(context, batch, newBatch);
     }
     SortRecordBatchBuilder builder = new 
SortRecordBatchBuilder(oContext.getAllocator(), MAX_SORT_BYTES);
     do {
@@ -372,7 +370,7 @@ public class TopNBatch extends AbstractRecordBatch<TopN> {
     public Iterator<VectorWrapper<?>> iterator() {
       return container.iterator();
     }
-    
+
     @Override
     public VectorContainer getOutgoingContainer() {
       throw new UnsupportedOperationException(String.format(" You should not 
call getOutgoingContainer() for class %s", this.getClass().getCanonicalName()));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 935bbb3..1afa5ae 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -148,7 +148,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     private void clear() {
       aggrValuesContainer.clear();
     }
-    
+
     private int getNumGroups() {
       return maxOccupiedIdx + 1;
     }
@@ -156,7 +156,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     private int getOutputCount() {
       return batchOutputCount;
     }
-    
+
     // Code-generated methods (implemented in HashAggBatch)
 
     @RuntimeOverridden
@@ -242,7 +242,8 @@ public abstract class HashAggTemplate implements 
HashAggregator {
         if (EXTRA_DEBUG_1) logger.debug ("Starting outer loop of doWork()...");
         for (; underlyingIndex < incoming.getRecordCount(); incIndex()) {
           if(EXTRA_DEBUG_2) logger.debug("Doing loop with values underlying 
{}, current {}", underlyingIndex, currentIndex);
-          checkGroupAndAggrValues(currentIndex);
+          boolean success = checkGroupAndAggrValues(currentIndex);
+          assert success : "HashAgg couldn't copy values.";
         }
 
         if (EXTRA_DEBUG_1) logger.debug("Processed {} records", 
underlyingIndex);
@@ -273,7 +274,8 @@ public abstract class HashAggTemplate implements 
HashAggregator {
               if(incoming.getRecordCount() == 0){
                 continue;
               } else {
-                checkGroupAndAggrValues(currentIndex);
+                boolean success = checkGroupAndAggrValues(currentIndex);
+                assert success : "HashAgg couldn't copy values.";
                 incIndex();
 
                 if(EXTRA_DEBUG_1) logger.debug("Continuing outside loop");
@@ -282,16 +284,16 @@ public abstract class HashAggTemplate implements 
HashAggregator {
 
             case NONE:
               // outcome = out;
-              
+
               buildComplete = true;
-              
+
               updateStats(htable);
 
-              // output the first batch; remaining batches will be output 
+              // output the first batch; remaining batches will be output
               // in response to each next() call by a downstream operator
-              
+
               outputCurrentBatch();
-              
+
               // cleanup incoming batch since output of aggregation does not 
need
               // any references to the incoming
 
@@ -314,7 +316,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
       if(first) first = !first;
     }
   }
-  
+
   private void allocateOutgoing(int numOutputRecords) {
 
     for (VectorAllocator a : keyAllocators) {
@@ -326,7 +328,7 @@ public abstract class HashAggTemplate implements 
HashAggregator {
       if(EXTRA_DEBUG_2) logger.debug("Outgoing batch: Allocating {} with {} 
records.", a, numOutputRecords);
       a.alloc(numOutputRecords);
     }
-    
+
   }
 
   @Override
@@ -398,12 +400,12 @@ public abstract class HashAggTemplate implements 
HashAggregator {
 
     bh.setup();
   }
-  
+
   // output the keys and values for a particular batch holder
   private boolean outputKeysAndValues(int batchIdx) {
-    
+
     allocateOutgoing(batchIdx);
-    
+
     if (! this.htable.outputKeys(batchIdx)) {
       return false;
     }
@@ -412,14 +414,14 @@ public abstract class HashAggTemplate implements 
HashAggregator {
     }
 
     outBatchIndex = batchIdx+1;
-    
+
     if (outBatchIndex == batchHolders.size()) {
       allFlushed = true;
     }
-    
+
     return true;
   }
-  
+
   public IterOutcome outputCurrentBatch() {
     if (outBatchIndex >= batchHolders.size()) {
       this.outcome = IterOutcome.NONE;
@@ -433,35 +435,35 @@ public abstract class HashAggTemplate implements 
HashAggregator {
       this.outcome = IterOutcome.NONE;
       return outcome;
     }
-    
+
     allocateOutgoing(batchOutputRecords);
-    
+
     boolean outputKeysStatus = this.htable.outputKeys(outBatchIndex) ;
-    boolean outputValuesStatus = 
batchHolders.get(outBatchIndex).outputValues(); 
+    boolean outputValuesStatus = 
batchHolders.get(outBatchIndex).outputValues();
     if (outputKeysStatus && outputValuesStatus) {
-      
+
       // set the value count for outgoing batch value vectors
       for(VectorWrapper<?> v : outgoing) {
         v.getValueVector().getMutator().setValueCount(batchOutputRecords);
       }
-      
+
       outputCount += batchOutputRecords;
-      
+
       if(first){
         this.outcome = IterOutcome.OK_NEW_SCHEMA;
       }else{
         this.outcome = IterOutcome.OK;
       }
-      
+
       logger.debug("HashAggregate: Output current batch index {} with {} 
records.", outBatchIndex, batchOutputRecords);
 
       lastBatchOutputCount = batchOutputRecords;
       outBatchIndex++;
       if (outBatchIndex == batchHolders.size()) {
         allFlushed = true;
-        
+
         logger.debug("HashAggregate: All batches flushed.");
-        
+
         // cleanup my internal state since there is nothing more to return
         this.cleanup();
       }
@@ -470,18 +472,18 @@ public abstract class HashAggTemplate implements 
HashAggregator {
       if (!outputValuesStatus) context.fail(new Exception("Failed to output 
values for current batch !"));
       this.outcome = IterOutcome.STOP;
     }
-    
+
     return this.outcome;
   }
-  
+
   public boolean allFlushed() {
     return allFlushed;
   }
-  
+
   public boolean buildComplete() {
     return buildComplete;
   }
-  
+
   public int numGroupedRecords() {
     return numGroupedRecords;
   }
@@ -545,12 +547,12 @@ public abstract class HashAggTemplate implements 
HashAggregator {
 
     return false;
   }
-  
+
   private void updateStats(HashTable htable) {
     htable.getStats(htStats);
     this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_BUCKETS, 
htStats.numBuckets);
     this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_ENTRIES, 
htStats.numEntries);
-    this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_RESIZING, 
htStats.numResizing);      
+    this.stats.addLongStat(HashTableMetrics.HTABLE_NUM_RESIZING, 
htStats.numResizing);
   }
 
   // Code-generated methods (implemented in HashAggBatch)

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
index 367d2c7..b587ad1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggBatch.java
@@ -168,7 +168,6 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
   private StreamingAggregator createAggregatorInternal() throws 
SchemaChangeException, ClassTransformationException, IOException{
     ClassGenerator<StreamingAggregator> cg = 
CodeGenerator.getRoot(StreamingAggTemplate.TEMPLATE_DEFINITION, 
context.getFunctionRegistry());
     container.clear();
-    List<VectorAllocator> allocators = Lists.newArrayList();
 
     LogicalExpression[] keyExprs = new 
LogicalExpression[popConfig.getKeys().length];
     LogicalExpression[] valueExprs = new 
LogicalExpression[popConfig.getExprs().length];
@@ -183,7 +182,6 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
       keyExprs[i] = expr;
       final MaterializedField outputField = 
MaterializedField.create(ne.getRef(), expr.getMajorType());
       ValueVector vector = TypeHelper.getNewVector(outputField, 
oContext.getAllocator());
-      allocators.add(VectorAllocator.getAllocator(vector, 50));
       keyOutputIds[i] = container.add(vector);
     }
 
@@ -194,7 +192,6 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
 
       final MaterializedField outputField = 
MaterializedField.create(ne.getRef(), expr.getMajorType());
       ValueVector vector = TypeHelper.getNewVector(outputField, 
oContext.getAllocator());
-      allocators.add(VectorAllocator.getAllocator(vector, 50));
       TypedFieldId id = container.add(vector);
       valueExprs[i] = new ValueVectorWriteExpression(id, expr, true);
     }
@@ -212,7 +209,7 @@ public class StreamingAggBatch extends 
AbstractRecordBatch<StreamingAggregate> {
 
     container.buildSchema(SelectionVectorMode.NONE);
     StreamingAggregator agg = context.getImplementationClass(cg);
-    agg.setup(context, incoming, this, allocators.toArray(new 
VectorAllocator[allocators.size()]));
+    agg.setup(context, incoming, this);
     return agg;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
index 3bd861d..4d6e7c4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggTemplate.java
@@ -25,6 +25,7 @@ import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.RecordBatch.IterOutcome;
 import org.apache.drill.exec.record.VectorWrapper;
+import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.allocator.VectorAllocator;
 
 public abstract class StreamingAggTemplate implements StreamingAggregator {
@@ -44,18 +45,15 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
   private RecordBatch incoming;
   private BatchSchema schema;
   private StreamingAggBatch outgoing;
-  private VectorAllocator[] allocators;
   private FragmentContext context;
   private InternalBatch remainderBatch;
 
 
   @Override
-  public void setup(FragmentContext context, RecordBatch incoming, 
StreamingAggBatch outgoing, VectorAllocator[] allocators) throws 
SchemaChangeException {
-    this.allocators = allocators;
+  public void setup(FragmentContext context, RecordBatch incoming, 
StreamingAggBatch outgoing) throws SchemaChangeException {
     this.context = context;
     this.incoming = incoming;
     this.schema = incoming.getSchema();
-    this.allocators = allocators;
     this.outgoing = outgoing;
     setupInterior(incoming, outgoing);
     this.currentIndex = incoming.getRecordCount() == 0 ? 0 : 
this.getVectorIndex(underlyingIndex);
@@ -63,10 +61,8 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
 
 
   private void allocateOutgoing() {
-    for (VectorAllocator a : allocators) {
-      if(EXTRA_DEBUG) logger.debug("Allocating {} with {} records.", a, 20000);
-      a.alloc(20000);
-      if(EXTRA_DEBUG) logger.debug("Allocated {}", a);
+    for(VectorWrapper<?> w : outgoing){
+      w.getValueVector().allocateNew();
     }
   }
 
@@ -90,7 +86,7 @@ public abstract class StreamingAggTemplate implements 
StreamingAggregator {
   public AggOutcome doWork() {
     try{ // outside loop to ensure that first is set to false after the first 
run.
       outputCount = 0;
-      
+
       // if we're in the first state, allocate outgoing.
       if(first){
         allocateOutgoing();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
index 52f30ae..c624c9a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/StreamingAggregator.java
@@ -31,9 +31,8 @@ public interface StreamingAggregator {
   public static enum AggOutcome {
            RETURN_OUTCOME, CLEANUP_AND_RETURN, UPDATE_AGGREGATOR;
          }
-  
-  public abstract void setup(FragmentContext context, RecordBatch incoming, 
StreamingAggBatch outgoing,
-      VectorAllocator[] allocators) throws SchemaChangeException;
+
+  public abstract void setup(FragmentContext context, RecordBatch incoming, 
StreamingAggBatch outgoing) throws SchemaChangeException;
 
   public abstract IterOutcome getOutcome();
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
index d52d2e3..89a6d09 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/filter/FilterRecordBatch.java
@@ -55,7 +55,7 @@ public class FilterRecordBatch extends 
AbstractSingleRecordBatch<Filter>{
   public FilterRecordBatch(Filter pop, RecordBatch incoming, FragmentContext 
context) throws OutOfMemoryException {
     super(pop, context, incoming);
   }
-  
+
   @Override
   public FragmentContext getContext() {
     return context;
@@ -85,8 +85,8 @@ public class FilterRecordBatch extends 
AbstractSingleRecordBatch<Filter>{
 //      m.setValueCount(recordCount);
 //    }
   }
-  
-  
+
+
   @Override
   public void cleanup() {
     if(sv2 != null) sv2.clear();
@@ -128,7 +128,6 @@ public class FilterRecordBatch extends 
AbstractSingleRecordBatch<Filter>{
   protected Filterer generateSV4Filterer() throws SchemaChangeException {
     final ErrorCollector collector = new ErrorCollectorImpl();
     final List<TransferPair> transfers = Lists.newArrayList();
-    final List<VectorAllocator> allocators = Lists.newArrayList();
     final ClassGenerator<Filterer> cg = 
CodeGenerator.getRoot(Filterer.TEMPLATE_DEFINITION4, 
context.getFunctionRegistry());
 
     final LogicalExpression expr = 
ExpressionTreeMaterializer.materialize(popConfig.getExpr(), incoming, 
collector, context.getFunctionRegistry());

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 1c028d0..bd0e23f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -103,9 +103,6 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
     // Current batch index on the build side
     private int buildBatchIndex = 0;
 
-    // List of vector allocators
-    private List<VectorAllocator> allocators = null;
-
     // Schema of the build side
     private BatchSchema rightSchema = null;
 
@@ -261,8 +258,8 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
           }
         }
 
-        HashTableConfig htConfig = 
-            new 
HashTableConfig(context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE_KEY).num_val.intValue(),
 
+        HashTableConfig htConfig =
+            new 
HashTableConfig(context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE_KEY).num_val.intValue(),
             HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);
 
         // Create the chained hash table
@@ -346,7 +343,7 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
 
     public HashJoinProbe setupHashJoinProbe() throws 
ClassTransformationException, IOException {
 
-        allocators = new ArrayList<>();
+
 
         final CodeGenerator<HashJoinProbe> cg = 
CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, 
context.getFunctionRegistry());
         ClassGenerator<HashJoinProbe> g = cg.getRoot();
@@ -374,7 +371,6 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
                 // Add the vector to our output container
                 ValueVector v = 
TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), 
outputType), context.getAllocator());
                 container.add(v);
-                allocators.add(RemovingRecordBatch.getAllocator4(v));
 
                 JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", 
new TypedFieldId(vv.getField().getType(), true, fieldId));
                 JVar outVV = g.declareVectorValueSetupAndMember("outgoing", 
new TypedFieldId(outputType, false, fieldId));
@@ -410,7 +406,6 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
 
                 ValueVector v = 
TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), 
outputType), oContext.getAllocator());
                 container.add(v);
-                allocators.add(RemovingRecordBatch.getAllocator4(v));
 
                 JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", 
new TypedFieldId(inputType, false, fieldId));
                 JVar outVV = g.declareVectorValueSetupAndMember("outgoing", 
new TypedFieldId(outputType, false, outputFieldId));
@@ -432,9 +427,9 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
     }
 
     private void allocateVectors(){
-        for(VectorAllocator a : allocators){
-            a.alloc(RecordBatch.MAX_BATCH_SIZE);
-        }
+      for(VectorWrapper<?> v : container){
+        v.getValueVector().allocateNew();
+      }
     }
 
     public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, 
RecordBatch left, RecordBatch right) throws OutOfMemoryException {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
index a4cc662..21c4ae7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinProbeTemplate.java
@@ -41,9 +41,11 @@ public abstract class HashJoinProbeTemplate implements 
HashJoinProbe {
 
   // Join type, INNER, LEFT, RIGHT or OUTER
   private JoinRelType joinType;
-  
+
   private HashJoinBatch outgoingJoinBatch = null;
 
+  private static final int TARGET_RECORDS_PER_BATCH = 4000;
+
   /* Helper class
    * Maintains linked list of build side records with the same key
    * Keeps information about which build records have a corresponding
@@ -91,14 +93,21 @@ public abstract class HashJoinProbeTemplate implements 
HashJoinProbe {
   }
 
   public void executeProjectRightPhase() {
-    while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsProcessed < 
recordsToProcess) {
-      boolean success = 
projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed++), 
outputRecords++);
-      assert success;
+    boolean success = true;
+    while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsProcessed < 
recordsToProcess) {
+      success = 
projectBuildRecord(unmatchedBuildIndexes.get(recordsProcessed), outputRecords);
+      if(success){
+        recordsProcessed++;
+        outputRecords++;
+      }else{
+        if(outputRecords == 0) throw new IllegalStateException("Too big to 
fail.");
+        break;
+      }
     }
   }
 
   public void executeProbePhase() throws SchemaChangeException {
-    while (outputRecords < RecordBatch.MAX_BATCH_SIZE && recordsToProcess > 0) 
{
+    while (outputRecords < TARGET_RECORDS_PER_BATCH && recordsToProcess > 0) {
 
       // Check if we have processed all records in this batch we need to 
invoke next
       if (recordsProcessed == recordsToProcess) {
@@ -157,31 +166,34 @@ public abstract class HashJoinProbeTemplate implements 
HashJoinProbe {
              */
             hjHelper.setRecordMatched(currentCompositeIdx);
 
-            boolean success = projectBuildRecord(currentCompositeIdx, 
outputRecords);
-            assert success;
-            success = projectProbeRecord(recordsProcessed, outputRecords);
-            assert success;
-            outputRecords++;
+            boolean success = projectBuildRecord(currentCompositeIdx, 
outputRecords) //
+                &&  projectProbeRecord(recordsProcessed, outputRecords);
+            if(!success){
+              // we failed to project.  redo this record.
+              getNextRecord = false;
+            }else{
+              outputRecords++;
 
-            /* Projected single row from the build side with matching key but 
there
-             * may be more rows with the same key. Check if that's the case
-             */
-            currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
-            if (currentCompositeIdx == -1) {
-              /* We only had one row in the build side that matched the 
current key
-               * from the probe side. Drain the next row in the probe side.
+              /* Projected single row from the build side with matching key 
but there
+               * may be more rows with the same key. Check if that's the case
                */
-              recordsProcessed++;
+              currentCompositeIdx = hjHelper.getNextIndex(currentCompositeIdx);
+              if (currentCompositeIdx == -1) {
+                /* We only had one row in the build side that matched the 
current key
+                 * from the probe side. Drain the next row in the probe side.
+                 */
+                recordsProcessed++;
+              }
+              else {
+                /* There is more than one row with the same key on the build 
side
+                 * don't drain more records from the probe side till we have 
projected
+                 * all the rows with this key
+                 */
+                getNextRecord = false;
+              }
             }
-            else {
-              /* There is more than one row with the same key on the build side
-               * don't drain more records from the probe side till we have 
projected
-               * all the rows with this key
-               */
-              getNextRecord = false;
-            }
-        }
-          else { // No matching key
+
+        } else { // No matching key
 
             // If we have a left outer join, project the keys
             if (joinType == JoinRelType.LEFT || joinType == JoinRelType.FULL) {
@@ -190,12 +202,18 @@ public abstract class HashJoinProbeTemplate implements 
HashJoinProbe {
             }
             recordsProcessed++;
           }
-      }
-      else {
+      } else {
         hjHelper.setRecordMatched(currentCompositeIdx);
-        boolean success = projectBuildRecord(currentCompositeIdx, 
outputRecords);
-        assert success;
-        success = projectProbeRecord(recordsProcessed, outputRecords);
+        boolean success = projectBuildRecord(currentCompositeIdx, 
outputRecords) //
+            && projectProbeRecord(recordsProcessed, outputRecords);
+        if(!success){
+          if(outputRecords == 0){
+            throw new IllegalStateException("Record larger than single 
batch.");
+          }else{
+            // we've output some records but failed to output this one.  
return and wait for next call.
+            return;
+          }
+        }
         assert success;
         outputRecords++;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
index 9351844..a16d64c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/mergereceiver/MergingRecordBatch.java
@@ -90,14 +90,12 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
   private boolean hasRun = false;
   private boolean prevBatchWasFull = false;
   private boolean hasMoreIncoming = true;
-  private final int DEFAULT_ALLOC_RECORD_COUNT = 20000;
 
   private int outgoingPosition = 0;
   private int senderCount = 0;
   private RawFragmentBatch[] incomingBatches;
   private int[] batchOffsets;
   private PriorityQueue <Node> pqueue;
-  private List<VectorAllocator> allocators;
   private RawFragmentBatch emptyBatch = null;
   private boolean done = false;
 
@@ -117,7 +115,6 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
     super(config, context);
     this.fragProviders = fragProviders;
     this.context = context;
-    this.allocators = Lists.newArrayList();
     this.outgoingContainer = new VectorContainer();
   }
 
@@ -243,14 +240,11 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
 
         // allocate a new value vector
         ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), 
oContext.getAllocator());
-        VectorAllocator allocator = 
VectorAllocator.getAllocator(outgoingVector, 50);
-        allocator.alloc(DEFAULT_ALLOC_RECORD_COUNT);
-        allocators.add(allocator);
+        outgoingVector.allocateNew();
         outgoingContainer.add(outgoingVector);
         ++vectorCount;
       }
 
-      logger.debug("Allocating {} outgoing vectors with {} values", 
vectorCount, DEFAULT_ALLOC_RECORD_COUNT);
 
       schema = bldr.build();
       if (schema != null && !schema.equals(schema)) {
@@ -295,11 +289,11 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
       }
       pqueue.poll();
 
-      if (isOutgoingFull()) {
-        // set a flag so that we reallocate on the next iteration
-        logger.debug("Outgoing vectors record batch size reached; breaking");
-        prevBatchWasFull = true;
-      }
+//      if (isOutgoingFull()) {
+//        // set a flag so that we reallocate on the next iteration
+//        logger.debug("Outgoing vectors record batch size reached; breaking");
+//        prevBatchWasFull = true;
+//      }
 
       if (node.valueIndex == batchLoaders[node.batchId].getRecordCount() - 1) {
         // reached the end of an incoming record batch
@@ -443,14 +437,12 @@ public class MergingRecordBatch extends 
AbstractRecordBatch<MergingReceiverPOP>
   }
 
   private void allocateOutgoing() {
-    for (VectorAllocator allocator : allocators) {
-      allocator.alloc(DEFAULT_ALLOC_RECORD_COUNT);
-    }
+    outgoingContainer.allocateNew();
   }
 
-  private boolean isOutgoingFull() {
-    return outgoingPosition == DEFAULT_ALLOC_RECORD_COUNT;
-  }
+//  private boolean isOutgoingFull() {
+//    return outgoingPosition == DEFAULT_ALLOC_RECORD_COUNT;
+//  }
 
   /**
    * Creates a generate class which implements the copy and compare methods.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index c4844d5..5fcfdcc 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -139,7 +139,7 @@ public class PartitionSenderRootExec extends BaseRootExec {
           return false;
         }
         stats.updatePartitionStats(partitioner.getOutgoingBatches());
-        for (VectorWrapper v : incoming) {
+        for (VectorWrapper<?> v : incoming) {
           v.clear();
         }
         return true;
@@ -156,8 +156,6 @@ public class PartitionSenderRootExec extends BaseRootExec {
     final ErrorCollector collector = new ErrorCollectorImpl();
     final ClassGenerator<Partitioner> cg ;
 
-    boolean hyper = false;
-
     cg = CodeGenerator.getRoot(Partitioner.TEMPLATE_DEFINITION, 
context.getFunctionRegistry());
     ClassGenerator<Partitioner> cgInner = 
cg.getInnerGenerator("OutgoingRecordBatch");
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 6a26d30..0d967b5 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -212,8 +212,7 @@ public abstract class PartitionerTemplate implements 
Partitioner {
     private int recordCount;
     private int totalRecords;
     private OperatorStats stats;
-    private static final int DEFAULT_RECORD_BATCH_SIZE = 20000;
-    private static final int DEFAULT_VARIABLE_WIDTH_SIZE = 200;
+    private static final int DEFAULT_RECORD_BATCH_SIZE = 1000;
 
     private final StatusHandler statusHandler;
 
@@ -301,7 +300,7 @@ public abstract class PartitionerTemplate implements 
Partitioner {
       recordCount = 0;
       vectorContainer.zeroVectors();
       for (VectorWrapper<?> v : vectorContainer) {
-        VectorAllocator.getAllocator(v.getValueVector(), 
DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_RECORD_BATCH_SIZE);
+        v.getValueVector().allocateNew();
       }
       if (!statusHandler.isOk()) {
         throw new IOException(statusHandler.getException());
@@ -320,7 +319,7 @@ public abstract class PartitionerTemplate implements 
Partitioner {
 
         // allocate a new value vector
         ValueVector outgoingVector = TypeHelper.getNewVector(v.getField(), 
allocator);
-        VectorAllocator.getAllocator(outgoingVector, 
DEFAULT_VARIABLE_WIDTH_SIZE).alloc(DEFAULT_RECORD_BATCH_SIZE);
+        outgoingVector.allocateNew();
         vectorContainer.add(outgoingVector);
       }
       outSchema = bldr.build();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
index 0aab7b2..dfc37c6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/Copier.java
@@ -27,7 +27,7 @@ public interface Copier {
   public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION2 = new 
TemplateClassDefinition<Copier>(Copier.class, CopierTemplate2.class);
   public static TemplateClassDefinition<Copier> TEMPLATE_DEFINITION4 = new 
TemplateClassDefinition<Copier>(Copier.class, CopierTemplate4.class);
 
-  public void setupRemover(FragmentContext context, RecordBatch incoming, 
RecordBatch outgoing, VectorAllocator[] allocators) throws 
SchemaChangeException;
+  public void setupRemover(FragmentContext context, RecordBatch incoming, 
RecordBatch outgoing) throws SchemaChangeException;
   public abstract int copyRecords(int index, int recordCount);
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
index 387497c..5cc308a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate2.java
@@ -20,26 +20,21 @@ package org.apache.drill.exec.physical.impl.svremover;
 import javax.inject.Named;
 
 import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.record.selection.SelectionVector2;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.allocator.VectorAllocator;
 
 
 public abstract class CopierTemplate2 implements Copier{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(CopierTemplate2.class);
 
   private SelectionVector2 sv2;
-  private RecordBatch incoming;
   private RecordBatch outgoing;
 
   @Override
-  public void setupRemover(FragmentContext context, RecordBatch incoming, 
RecordBatch outgoing, VectorAllocator[] allocators) throws 
SchemaChangeException{
+  public void setupRemover(FragmentContext context, RecordBatch incoming, 
RecordBatch outgoing) throws SchemaChangeException{
     this.sv2 = incoming.getSelectionVector2();
-    this.incoming = incoming;
     this.outgoing = outgoing;
     doSetup(context, incoming, outgoing);
   }
@@ -47,7 +42,7 @@ public abstract class CopierTemplate2 implements Copier{
   @Override
   public int copyRecords(int index, int recordCount){
     for(VectorWrapper<?> out : outgoing){
-      out.getValueVector().allocateNewSafe();
+      out.getValueVector().allocateNew();
     }
 
     int outgoingPosition = 0;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
index b48a8fd..7a1c029 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/CopierTemplate4.java
@@ -31,13 +31,11 @@ public abstract class CopierTemplate4 implements Copier{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(CopierTemplate4.class);
 
   private SelectionVector4 sv4;
-  private RecordBatch incoming;
   private RecordBatch outgoing;
 
 
   @Override
-  public void setupRemover(FragmentContext context, RecordBatch incoming, 
RecordBatch outgoing, VectorAllocator[] allocators) throws 
SchemaChangeException{
-    this.incoming = incoming;
+  public void setupRemover(FragmentContext context, RecordBatch incoming, 
RecordBatch outgoing) throws SchemaChangeException{
     this.outgoing = outgoing;
     this.sv4 = incoming.getSelectionVector4();
     doSetup(context, incoming, outgoing);
@@ -46,9 +44,8 @@ public abstract class CopierTemplate4 implements Copier{
 
   @Override
   public int copyRecords(int index, int recordCount){
-//    logger.debug("Copying records.");
     for(VectorWrapper<?> out : outgoing){
-      out.getValueVector().allocateNewSafe();
+      out.getValueVector().allocateNew();
     }
 
     int outgoingPosition = 0;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
index 558ce63..42f2128 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java
@@ -195,7 +195,7 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
     private List<ValueVector> out = Lists.newArrayList();
 
     @Override
-    public void setupRemover(FragmentContext context, RecordBatch incoming, 
RecordBatch outgoing, VectorAllocator[] allocators){
+    public void setupRemover(FragmentContext context, RecordBatch incoming, 
RecordBatch outgoing){
       for(VectorWrapper<?> vv : incoming){
         TransferPair tp = vv.getValueVector().getTransferPair();
         pairs.add(tp);
@@ -220,7 +220,7 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
 
   private Copier getStraightCopier(){
     StraightCopier copier = new StraightCopier();
-    copier.setupRemover(context, incoming, this, null);
+    copier.setupRemover(context, incoming, this);
     container.addCollection(copier.getOut());
     return copier;
   }
@@ -237,7 +237,7 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
       final CodeGenerator<Copier> cg = 
CodeGenerator.get(Copier.TEMPLATE_DEFINITION2, context.getFunctionRegistry());
       CopyUtil.generateCopies(cg.getRoot(), incoming, false);
       Copier copier = context.getImplementationClass(cg);
-      copier.setupRemover(context, incoming, this, null);
+      copier.setupRemover(context, incoming, this);
 
       return copier;
     } catch (ClassTransformationException | IOException e) {
@@ -262,7 +262,7 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
       final CodeGenerator<Copier> cg = 
CodeGenerator.get(Copier.TEMPLATE_DEFINITION4, context.getFunctionRegistry());
       CopyUtil.generateCopies(cg.getRoot(), batch, true);
       Copier copier = context.getImplementationClass(cg);
-      copier.setupRemover(context, batch, outgoing, null);
+      copier.setupRemover(context, batch, outgoing);
 
       return copier;
     } catch (ClassTransformationException | IOException e) {
@@ -275,15 +275,6 @@ public class RemovingRecordBatch extends 
AbstractSingleRecordBatch<SelectionVect
     return WritableBatch.get(this);
   }
 
-  public static VectorAllocator getAllocator4(ValueVector outgoing){
-    if(outgoing instanceof FixedWidthVector){
-      return new FixedVectorAllocator((FixedWidthVector) outgoing);
-    }else if(outgoing instanceof VariableWidthVector ){
-      return new VariableEstimatedVector( (VariableWidthVector) outgoing, 250);
-    }else{
-      throw new UnsupportedOperationException();
-    }
-  }
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
index 0ce480d..cea5460 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/fragment/SimpleParallelizer.java
@@ -60,7 +60,8 @@ public class SimpleParallelizer {
   private double affinityFactor;
 
   public SimpleParallelizer(QueryContext context){
-    this.parallelizationThreshold = 
context.getOptions().getOption(ExecConstants.SLICE_TARGET).num_val;
+    long sliceTarget = 
context.getOptions().getOption(ExecConstants.SLICE_TARGET).num_val;
+    this.parallelizationThreshold = sliceTarget > 0 ? sliceTarget : 1;
     this.maxWidthPerNode = 
context.getOptions().getOption(ExecConstants.MAX_WIDTH_PER_NODE_KEY).num_val.intValue();
     this.maxGlobalWidth = 
context.getOptions().getOption(ExecConstants.MAX_WIDTH_GLOBAL_KEY).num_val.intValue();
     this.affinityFactor = 
context.getOptions().getOption(ExecConstants.AFFINITY_FACTOR_KEY).float_val.intValue();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
index 1f2c33a..49c7399 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/VectorContainer.java
@@ -42,7 +42,7 @@ public class VectorContainer extends AbstractMapVector 
implements Iterable<Vecto
   private BatchSchema schema;
   private int recordCount = -1;
   private final OperatorContext oContext;
-  
+
   public VectorContainer() {
     this.oContext = null;
   }
@@ -74,12 +74,12 @@ public class VectorContainer extends AbstractMapVector 
implements Iterable<Vecto
     add(vv, releasable);
   }
 
-  public <T extends ValueVector> T addOrGet(String name, MajorType type, 
Class<T> clazz){    
+  public <T extends ValueVector> T addOrGet(String name, MajorType type, 
Class<T> clazz){
     MaterializedField field = MaterializedField.create(name, type);
     ValueVector v = TypeHelper.getNewVector(field, 
this.oContext.getAllocator());
-    
+
     add(v);
-    
+
     if(clazz.isAssignableFrom(v.getClass())){
       return (T) v;
     }else{
@@ -252,5 +252,18 @@ public class VectorContainer extends AbstractMapVector 
implements Iterable<Vecto
     return this.wrappers.size();
   }
 
+  public void allocateNew(){
+    for (VectorWrapper<?> w : wrappers) {
+      w.getValueVector().allocateNew();
+    }
+  }
+
+  public boolean allocateNewSafe(){
+    for (VectorWrapper<?> w : wrappers) {
+      if(!w.getValueVector().allocateNewSafe()) return false;
+    }
+
+    return true;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3eea13c/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VectorAllocator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VectorAllocator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VectorAllocator.java
index 77b6e1c..eb01bef 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VectorAllocator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/allocator/VectorAllocator.java
@@ -22,22 +22,23 @@ import 
org.apache.drill.exec.vector.RepeatedVariableWidthVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
 
+@Deprecated
 public abstract class VectorAllocator{
   public abstract void alloc(int recordCount);
-  
-  public static VectorAllocator getAllocator(ValueVector in, ValueVector 
outgoing){
-    if(outgoing instanceof FixedWidthVector){
-      return new FixedVectorAllocator((FixedWidthVector) outgoing);
-    }else if(outgoing instanceof VariableWidthVector && in instanceof 
VariableWidthVector){
-      return new VariableVectorAllocator( (VariableWidthVector) in, 
(VariableWidthVector) outgoing);
-    } else if (outgoing instanceof RepeatedVariableWidthVector && in 
instanceof RepeatedVariableWidthVector) {
-      return new RepeatedVectorAllocator((RepeatedVariableWidthVector) in, 
(RepeatedVariableWidthVector) outgoing);
-    }else{
-      throw new UnsupportedOperationException();
-    }
-  }
-    
-  
+
+//  public static VectorAllocator getAllocator(ValueVector in, ValueVector 
outgoing){
+//    if(outgoing instanceof FixedWidthVector){
+//      return new FixedVectorAllocator((FixedWidthVector) outgoing);
+//    }else if(outgoing instanceof VariableWidthVector && in instanceof 
VariableWidthVector){
+//      return new VariableVectorAllocator( (VariableWidthVector) in, 
(VariableWidthVector) outgoing);
+//    } else if (outgoing instanceof RepeatedVariableWidthVector && in 
instanceof RepeatedVariableWidthVector) {
+//      return new RepeatedVectorAllocator((RepeatedVariableWidthVector) in, 
(RepeatedVariableWidthVector) outgoing);
+//    }else{
+//      throw new UnsupportedOperationException();
+//    }
+//  }
+
+  @Deprecated
   public static VectorAllocator getAllocator(ValueVector outgoing, int 
averageBytesPerVariable){
     if(outgoing instanceof FixedWidthVector){
       return new FixedVectorAllocator((FixedWidthVector) outgoing);

Reply via email to